Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update to Kafka API 3.0.2 #325

Merged
merged 1 commit into from
May 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
chore: update to Kafka API 3.0.2
Update to Kafka API 3.0.2 as baseline.
Includes correction in JdbcSourceTask to return control to Connect. The
SourceTask#poll is required to return instead of blocking when no data
is available.
  • Loading branch information
jjaakola-aiven committed May 28, 2024
commit 19153c77a850c2cb4b7ed2585c6845fe71ea808e
6 changes: 1 addition & 5 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -117,7 +117,7 @@ publishing {
}
}

val kafkaVersion = "2.2.0"
val kafkaVersion = "3.0.2"
val slf4jVersion = "2.0.13"

val avroVersion = "1.8.1"
@@ -126,7 +126,6 @@ val confluentPlatformVersion = "4.1.4" // For compatibility tests use version 4.
val hamcrestVersion = "2.2"
val jacksonVersion = "2.17.0" // This Jackson is used in the tests.
val jupiterVersion = "5.10.2"
val jettyVersion = "12.0.8"
val servletVersion = "4.0.1"
val testcontainersVersion = "1.19.7"
val awaitilityVersion = "4.2.1"
@@ -197,9 +196,6 @@ dependencies {
integrationTestImplementation("javax.servlet:javax.servlet-api:$servletVersion")
integrationTestImplementation("org.apache.avro:avro:$avroVersion")
integrationTestImplementation("org.apache.kafka:connect-runtime:$kafkaVersion")
integrationTestImplementation("org.eclipse.jetty:jetty-http:$jettyVersion")
integrationTestImplementation("org.eclipse.jetty:jetty-server:$jettyVersion")
integrationTestImplementation("org.eclipse.jetty:jetty-util:$jettyVersion")
integrationTestImplementation("org.junit.jupiter:junit-jupiter:$jupiterVersion")
integrationTestImplementation("org.testcontainers:junit-jupiter:$testcontainersVersion")
integrationTestImplementation("org.testcontainers:kafka:$testcontainersVersion") // this is not Kafka version
Original file line number Diff line number Diff line change
@@ -141,10 +141,17 @@ protected KafkaConsumer<String, GenericRecord> createConsumer() {

@AfterEach
final void tearDown() {
connectRunner.stop();
producer.close();
consumer.close();

connectRunner.awaitStop();
if (connectRunner != null) {
connectRunner.stop();
}
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
if (connectRunner != null) {
connectRunner.awaitStop();
}
}
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@
import java.util.concurrent.ExecutionException;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
@@ -75,10 +77,14 @@ void start() {
final Plugins plugins = new Plugins(workerProps);
final StandaloneConfig config = new StandaloneConfig(workerProps);


final Worker worker = new Worker(workerId, time, plugins, config, new MemoryOffsetBackingStore());
herder = new StandaloneHerder(worker, kafkaClusterId);
connect = new Connect(herder, new RestServer(config));
final ConnectorClientConfigOverridePolicy overridePolicy = new AllConnectorClientConfigOverridePolicy();
final Worker worker = new Worker(workerId, time, plugins, config, new MemoryOffsetBackingStore(),
overridePolicy);
herder = new StandaloneHerder(worker, kafkaClusterId, overridePolicy);
final RestServer restServer = new RestServer(config);
restServer.initializeServer();
restServer.initializeResources(herder);
connect = new Connect(herder, restServer);

connect.start();
}
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ public final class SchemaRegistryContainer extends GenericContainer<SchemaRegist
public static final int SCHEMA_REGISTRY_PORT = 8081;

public SchemaRegistryContainer(final KafkaContainer kafka) {
this("5.0.4", kafka);
this("5.4.3", kafka);
}

public SchemaRegistryContainer(final String confluentPlatformVersion, final KafkaContainer kafka) {
11 changes: 9 additions & 2 deletions src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java
Original file line number Diff line number Diff line change
@@ -56,6 +56,9 @@ public class JdbcSourceTask extends SourceTask {

private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);

// Visible for testing
public static final int MAX_QUERY_SLEEP_MS = 100;

private Time time;
private JdbcSourceTaskConfig config;
private DatabaseDialect dialect;
@@ -304,12 +307,16 @@ public List<SourceRecord> poll() throws InterruptedException {
final long nextUpdate = querier.getLastUpdate()
+ config.getInt(JdbcSourceTaskConfig.POLL_INTERVAL_MS_CONFIG);
final long untilNext = nextUpdate - time.milliseconds();
final long sleepMs = Math.min(untilNext, 100);
final long sleepMs = Math.min(untilNext, MAX_QUERY_SLEEP_MS);
if (sleepMs > 0) {
log.trace("Waiting {} ms to poll {} next ({} ms total left to wait)",
sleepMs, querier.toString(), untilNext);
time.sleep(sleepMs);
continue; // Re-check stop flag before continuing
// Return control to the Connect runtime periodically
// See https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll():
// "If no data is currently available, this method should block but return control to the caller
// regularly (by returning null)"
return null;
}
}

Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;

import org.apache.kafka.connect.data.Date;
@@ -53,7 +54,9 @@ public class JdbcSourceTaskConversionTest extends JdbcSourceTaskTestBase {
@BeforeEach
public void setup() throws Exception {
super.setup();
task.start(singleTableConfig(extendedMapping));
final Map<String, String> taskConfig = singleTableConfig(extendedMapping);
taskConfig.put(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG, "1");
task.start(taskConfig);
}

@AfterEach
@@ -303,7 +306,11 @@ private void typeConversion(final String sqlType, final boolean nullable,
}
db.createTable(SINGLE_TABLE_NAME, "id", sqlColumnSpec);
db.insert(SINGLE_TABLE_NAME, "id", sqlValue);
final List<SourceRecord> records = task.poll();
List<SourceRecord> records = null;
// May need to retry polling
for (int retries = 0; retries < 10 && records == null; retries++) {
records = task.poll();
}
validateRecords(records, convertedSchema, convertedValue);
db.dropTable(SINGLE_TABLE_NAME);
}
Original file line number Diff line number Diff line change
@@ -79,9 +79,9 @@ public void testPollInterval() throws Exception {

// Subsequent polls have to wait for timeout
task.poll();
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceTask.MAX_QUERY_SLEEP_MS);
task.poll();
assertThat(time.milliseconds()).isEqualTo(startTime + 2 * JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);
assertThat(time.milliseconds()).isEqualTo(startTime + 2 * JdbcSourceTask.MAX_QUERY_SLEEP_MS);

task.stop();
}
@@ -111,8 +111,7 @@ public void testSingleUpdateMultiplePoll() throws Exception {

// Subsequent poll should wait for next timeout
task.poll();
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);

assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceTask.MAX_QUERY_SLEEP_MS);
}

@Test
@@ -137,13 +136,12 @@ public void testMultipleTables() throws Exception {
assertThat(records.get(0).sourcePartition()).isEqualTo(SECOND_TABLE_PARTITION);

// Subsequent poll should wait for next timeout
records = task.poll();
records = pollRecords(task);
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);
validatePollResultTable(records, 1, SINGLE_TABLE_NAME);
records = task.poll();
records = pollRecords(task);
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);
validatePollResultTable(records, 1, SECOND_TABLE_NAME);

}

@Test
@@ -178,7 +176,8 @@ public void testMultipleTablesMultiplePolls() throws Exception {

// Subsequent poll should wait for next timeout
for (int i = 0; i < 2; i++) {
final List<SourceRecord> records = task.poll();
final List<SourceRecord> records;
records = pollRecords(task);
assertThat(time.milliseconds()).isEqualTo(startTime + JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT);
validatePollResultTable(records, 1, SINGLE_TABLE_NAME);
}
Original file line number Diff line number Diff line change
@@ -19,9 +19,11 @@

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;

@@ -129,4 +131,12 @@ protected void initializeTask() {
task.initialize(taskContext);
}

final List<SourceRecord> pollRecords(final JdbcSourceTask task) throws InterruptedException {
List<SourceRecord> records = null;
while (records == null) {
records = task.poll();
}
return records;
}

}
Original file line number Diff line number Diff line change
@@ -74,20 +74,20 @@ public void testBulkPeriodicLoad() throws Exception {
assertThat(countIntValues(records, "id")).containsExactly(entry(1, 1));
assertRecordsTopic(records, TOPIC_PREFIX + SINGLE_TABLE_NAME);

records = task.poll();
records = pollRecords(task);
assertThat(countIntValues(records, "id")).containsExactly(entry(1, 1));
assertRecordsTopic(records, TOPIC_PREFIX + SINGLE_TABLE_NAME);

db.insert(SINGLE_TABLE_NAME, "id", 2);
records = task.poll();
records = pollRecords(task);
final Map<Integer, Integer> twoRecords = new HashMap<>();
twoRecords.put(1, 1);
twoRecords.put(2, 1);
assertThat(countIntValues(records, "id")).isEqualTo(twoRecords);
assertRecordsTopic(records, TOPIC_PREFIX + SINGLE_TABLE_NAME);

db.delete(SINGLE_TABLE_NAME, new EmbeddedDerby.EqualsCondition(column, 1));
records = task.poll();
records = pollRecords(task);
assertThat(countIntValues(records, "id")).containsExactly(entry(2, 1));
assertRecordsTopic(records, TOPIC_PREFIX + SINGLE_TABLE_NAME);
}
@@ -667,7 +667,7 @@ public void testCustomQueryBulk() throws Exception {
db.insert(SINGLE_TABLE_NAME, "id", 3, "user_id", 2);
db.insert(SINGLE_TABLE_NAME, "id", 4, "user_id", 2);

records = task.poll();
records = pollRecords(task);
assertThat(records).hasSize(4);
recordUserIdCounts = new HashMap<>();
recordUserIdCounts.put(1, 2);
@@ -775,6 +775,8 @@ private Map<String, String> taskConfig(final String timestampColumn, final Strin
if (timeZone != null) {
taskConfig.put(JdbcConfig.DB_TIMEZONE_CONFIG, timeZone);
}

taskConfig.put(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG, "100");
return taskConfig;
}

@@ -824,7 +826,11 @@ private <T> void verifyPoll(final int numRecords,
final boolean multiTimestampOffsets,
final String topic)
throws Exception {
final List<SourceRecord> records = task.poll();
List<SourceRecord> records = null;
// May need to retry polling occasionally
for (int retries = 0; retries < 10 && records == null; retries++) {
records = task.poll();
}
assertThat(records).hasSize(numRecords);

final HashMap<T, Integer> valueCounts = new HashMap<>();
21 changes: 21 additions & 0 deletions src/test/java/io/aiven/connect/jdbc/source/MockTime.java
Original file line number Diff line number Diff line change
@@ -18,7 +18,9 @@
package io.aiven.connect.jdbc.source;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Time;

/**
@@ -60,4 +62,23 @@ public void sleep(final long ms) {
this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
}

@Override
public void waitObject(final Object object, final Supplier<Boolean> condition, final long deadlineMs)
throws InterruptedException {
synchronized (object) {
while (true) {
if (condition.get()) {
return;
}

final long currentTimeMs = milliseconds();
if (currentTimeMs >= deadlineMs) {
throw new TimeoutException("Condition not satisfied before deadline");
}

object.wait(deadlineMs - currentTimeMs);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ public void beforeEach() {
}

protected void assertExtractedOffset(final long expected, final Schema schema, final Struct record) {
TimestampIncrementingCriteria criteria = null;
final TimestampIncrementingCriteria criteria;
if (schema.field(INCREMENTING_COLUMN.name()) != null) {
if (schema.field(TS1_COLUMN.name()) != null) {
criteria = criteriaIncTs;
Loading