Skip to content

Commit

Permalink
Merge pull request #325 from Aiven-Open/jjaakola-aiven-update-to-kafk…
Browse files Browse the repository at this point in the history
…a-api-3.0.2

chore: update to Kafka API 3.0.2
  • Loading branch information
keejon authored May 29, 2024
2 parents f725118 + 19153c7 commit b9002cb
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 33 deletions.
6 changes: 1 addition & 5 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ publishing {
}
}

val kafkaVersion = "2.2.0"
val kafkaVersion = "3.0.2"
val slf4jVersion = "2.0.13"

val avroVersion = "1.8.1"
Expand All @@ -126,7 +126,6 @@ val confluentPlatformVersion = "4.1.4" // For compatibility tests use version 4.
val hamcrestVersion = "2.2"
val jacksonVersion = "2.17.0" // This Jackson is used in the tests.
val jupiterVersion = "5.10.2"
val jettyVersion = "12.0.8"
val servletVersion = "4.0.1"
val testcontainersVersion = "1.19.7"
val awaitilityVersion = "4.2.1"
Expand Down Expand Up @@ -197,9 +196,6 @@ dependencies {
integrationTestImplementation("javax.servlet:javax.servlet-api:$servletVersion")
integrationTestImplementation("org.apache.avro:avro:$avroVersion")
integrationTestImplementation("org.apache.kafka:connect-runtime:$kafkaVersion")
integrationTestImplementation("org.eclipse.jetty:jetty-http:$jettyVersion")
integrationTestImplementation("org.eclipse.jetty:jetty-server:$jettyVersion")
integrationTestImplementation("org.eclipse.jetty:jetty-util:$jettyVersion")
integrationTestImplementation("org.junit.jupiter:junit-jupiter:$jupiterVersion")
integrationTestImplementation("org.testcontainers:junit-jupiter:$testcontainersVersion")
integrationTestImplementation("org.testcontainers:kafka:$testcontainersVersion") // this is not Kafka version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,17 @@ protected KafkaConsumer<String, GenericRecord> createConsumer() {

@AfterEach
final void tearDown() {
connectRunner.stop();
producer.close();
consumer.close();

connectRunner.awaitStop();
if (connectRunner != null) {
connectRunner.stop();
}
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
if (connectRunner != null) {
connectRunner.awaitStop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.concurrent.ExecutionException;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
Expand Down Expand Up @@ -75,10 +77,14 @@ void start() {
final Plugins plugins = new Plugins(workerProps);
final StandaloneConfig config = new StandaloneConfig(workerProps);


final Worker worker = new Worker(workerId, time, plugins, config, new MemoryOffsetBackingStore());
herder = new StandaloneHerder(worker, kafkaClusterId);
connect = new Connect(herder, new RestServer(config));
final ConnectorClientConfigOverridePolicy overridePolicy = new AllConnectorClientConfigOverridePolicy();
final Worker worker = new Worker(workerId, time, plugins, config, new MemoryOffsetBackingStore(),
overridePolicy);
herder = new StandaloneHerder(worker, kafkaClusterId, overridePolicy);
final RestServer restServer = new RestServer(config);
restServer.initializeServer();
restServer.initializeResources(herder);
connect = new Connect(herder, restServer);

connect.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public final class SchemaRegistryContainer extends GenericContainer<SchemaRegist
public static final int SCHEMA_REGISTRY_PORT = 8081;

public SchemaRegistryContainer(final KafkaContainer kafka) {
this("5.0.4", kafka);
this("5.4.3", kafka);
}

public SchemaRegistryContainer(final String confluentPlatformVersion, final KafkaContainer kafka) {
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public class JdbcSourceTask extends SourceTask {

private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);

// Visible for testing
public static final int MAX_QUERY_SLEEP_MS = 100;

private Time time;
private JdbcSourceTaskConfig config;
private DatabaseDialect dialect;
Expand Down Expand Up @@ -304,12 +307,16 @@ public List<SourceRecord> poll() throws InterruptedException {
final long nextUpdate = querier.getLastUpdate()
+ config.getInt(JdbcSourceTaskConfig.POLL_INTERVAL_MS_CONFIG);
final long untilNext = nextUpdate - time.milliseconds();
final long sleepMs = Math.min(untilNext, 100);
final long sleepMs = Math.min(untilNext, MAX_QUERY_SLEEP_MS);
if (sleepMs > 0) {
log.trace("Waiting {} ms to poll {} next ({} ms total left to wait)",
sleepMs, querier.toString(), untilNext);
time.sleep(sleepMs);
continue; // Re-check stop flag before continuing
// Return control to the Connect runtime periodically
// See https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll():
// "If no data is currently available, this method should block but return control to the caller
// regularly (by returning null)"
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;

import org.apache.kafka.connect.data.Date;
Expand Down Expand Up @@ -53,7 +54,9 @@ public class JdbcSourceTaskConversionTest extends JdbcSourceTaskTestBase {
@BeforeEach
public void setup() throws Exception {
super.setup();
task.start(singleTableConfig(extendedMapping));
final Map<String, String> taskConfig = singleTableConfig(extendedMapping);
taskConfig.put(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG, "1");
task.start(taskConfig);
}

@AfterEach
Expand Down Expand Up @@ -303,7 +306,11 @@ private void typeConversion(final String sqlType, final boolean nullable,
}
db.createTable(SINGLE_TABLE_NAME, "id", sqlColumnSpec);
db.insert(SINGLE_TABLE_NAME, "id", sqlValue);
final List<SourceRecord> records = task.poll();
List<SourceRecord> records = null;
// May need to retry polling
for (int retries = 0; retries < 10 && records == null; retries++) {
records = task.poll();
}
validateRecords(records, convertedSchema, convertedValue);
db.dropTable(SINGLE_TABLE_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public void testPollInterval() throws Exception {

// Subsequent polls have to wait for timeout
task.poll();
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceTask.MAX_QUERY_SLEEP_MS);
task.poll();
assertThat(time.milliseconds()).isEqualTo(startTime + 2 * JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);
assertThat(time.milliseconds()).isEqualTo(startTime + 2 * JdbcSourceTask.MAX_QUERY_SLEEP_MS);

task.stop();
}
Expand Down Expand Up @@ -111,8 +111,7 @@ public void testSingleUpdateMultiplePoll() throws Exception {

// Subsequent poll should wait for next timeout
task.poll();
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);

assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceTask.MAX_QUERY_SLEEP_MS);
}

@Test
Expand All @@ -137,13 +136,12 @@ public void testMultipleTables() throws Exception {
assertThat(records.get(0).sourcePartition()).isEqualTo(SECOND_TABLE_PARTITION);

// Subsequent poll should wait for next timeout
records = task.poll();
records = pollRecords(task);
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);
validatePollResultTable(records, 1, SINGLE_TABLE_NAME);
records = task.poll();
records = pollRecords(task);
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);
validatePollResultTable(records, 1, SECOND_TABLE_NAME);

}

@Test
Expand Down Expand Up @@ -178,7 +176,8 @@ public void testMultipleTablesMultiplePolls() throws Exception {

// Subsequent poll should wait for next timeout
for (int i = 0; i < 2; i++) {
final List<SourceRecord> records = task.poll();
final List<SourceRecord> records;
records = pollRecords(task);
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);
validatePollResultTable(records, 1, SINGLE_TABLE_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;

Expand Down Expand Up @@ -129,4 +131,12 @@ protected void initializeTask() {
task.initialize(taskContext);
}

final List<SourceRecord> pollRecords(final JdbcSourceTask task) throws InterruptedException {
List<SourceRecord> records = null;
while (records == null) {
records = task.poll();
}
return records;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,20 @@ public void testBulkPeriodicLoad() throws Exception {
assertThat(countIntValues(records, "id")).containsExactly(entry(1, 1));
assertRecordsTopic(records, TOPIC_PREFIX + SINGLE_TABLE_NAME);

records = task.poll();
records = pollRecords(task);
assertThat(countIntValues(records, "id")).containsExactly(entry(1, 1));
assertRecordsTopic(records, TOPIC_PREFIX + SINGLE_TABLE_NAME);

db.insert(SINGLE_TABLE_NAME, "id", 2);
records = task.poll();
records = pollRecords(task);
final Map<Integer, Integer> twoRecords = new HashMap<>();
twoRecords.put(1, 1);
twoRecords.put(2, 1);
assertThat(countIntValues(records, "id")).isEqualTo(twoRecords);
assertRecordsTopic(records, TOPIC_PREFIX + SINGLE_TABLE_NAME);

db.delete(SINGLE_TABLE_NAME, new EmbeddedDerby.EqualsCondition(column, 1));
records = task.poll();
records = pollRecords(task);
assertThat(countIntValues(records, "id")).containsExactly(entry(2, 1));
assertRecordsTopic(records, TOPIC_PREFIX + SINGLE_TABLE_NAME);
}
Expand Down Expand Up @@ -667,7 +667,7 @@ public void testCustomQueryBulk() throws Exception {
db.insert(SINGLE_TABLE_NAME, "id", 3, "user_id", 2);
db.insert(SINGLE_TABLE_NAME, "id", 4, "user_id", 2);

records = task.poll();
records = pollRecords(task);
assertThat(records).hasSize(4);
recordUserIdCounts = new HashMap<>();
recordUserIdCounts.put(1, 2);
Expand Down Expand Up @@ -775,6 +775,8 @@ private Map<String, String> taskConfig(final String timestampColumn, final Strin
if (timeZone != null) {
taskConfig.put(JdbcConfig.DB_TIMEZONE_CONFIG, timeZone);
}

taskConfig.put(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG, "100");
return taskConfig;
}

Expand Down Expand Up @@ -824,7 +826,11 @@ private <T> void verifyPoll(final int numRecords,
final boolean multiTimestampOffsets,
final String topic)
throws Exception {
final List<SourceRecord> records = task.poll();
List<SourceRecord> records = null;
// May need to retry polling occasionally
for (int retries = 0; retries < 10 && records == null; retries++) {
records = task.poll();
}
assertThat(records).hasSize(numRecords);

final HashMap<T, Integer> valueCounts = new HashMap<>();
Expand Down
21 changes: 21 additions & 0 deletions src/test/java/io/aiven/connect/jdbc/source/MockTime.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package io.aiven.connect.jdbc.source;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Time;

/**
Expand Down Expand Up @@ -60,4 +62,23 @@ public void sleep(final long ms) {
this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
}

@Override
public void waitObject(final Object object, final Supplier<Boolean> condition, final long deadlineMs)
throws InterruptedException {
synchronized (object) {
while (true) {
if (condition.get()) {
return;
}

final long currentTimeMs = milliseconds();
if (currentTimeMs >= deadlineMs) {
throw new TimeoutException("Condition not satisfied before deadline");
}

object.wait(deadlineMs - currentTimeMs);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void beforeEach() {
}

protected void assertExtractedOffset(final long expected, final Schema schema, final Struct record) {
TimestampIncrementingCriteria criteria = null;
final TimestampIncrementingCriteria criteria;
if (schema.field(INCREMENTING_COLUMN.name()) != null) {
if (schema.field(TS1_COLUMN.name()) != null) {
criteria = criteriaIncTs;
Expand Down

0 comments on commit b9002cb

Please sign in to comment.