Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update Checkstyle version and fix errors #304

Merged
merged 1 commit into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ tasks.wrapper {
}

checkstyle {
toolVersion = "8.35"
toolVersion = "10.16.0"
configDirectory.set(rootProject.file("checkstyle/"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void start(final Map<String, String> properties) {
log.info("Starting JDBC source task");
try {
config = new JdbcSourceTaskConfig(properties);
config.validate();
} catch (final ConfigException e) {
throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e);
}
Expand All @@ -101,10 +102,7 @@ public void start(final Map<String, String> properties) {

final List<String> tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG);
final String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG);
if ((tables.isEmpty() && query.isEmpty()) || (!tables.isEmpty() && !query.isEmpty())) {
throw new ConnectException("Invalid configuration: each JdbcSourceTask must have at "
+ "least one table assigned to it or one query specified");
}

final TableQuerier.QueryMode queryMode = !query.isEmpty()
? TableQuerier.QueryMode.QUERY
: TableQuerier.QueryMode.TABLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package io.aiven.connect.jdbc.source;

import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;

/**
* Configuration options for a single JdbcSourceTask. These are processed after all
Expand All @@ -30,12 +32,25 @@
public class JdbcSourceTaskConfig extends JdbcSourceConnectorConfig {

public static final String TABLES_CONFIG = "tables";
private static final String TABLES_DOC = "List of tables for this task to watch for changes.";
private static final String TABLES_DOC = "List of tables encoded as a comma separated string"
+ " for this task to watch for changes.";
public static final String TABLES_DEFAULT = "";


static ConfigDef config = baseConfigDef()
.define(TABLES_CONFIG, Type.LIST, Importance.HIGH, TABLES_DOC);
.define(TABLES_CONFIG, Type.LIST, TABLES_DEFAULT, Importance.HIGH, TABLES_DOC);

public JdbcSourceTaskConfig(final Map<String, String> props) {
super(config, props);
}

public void validate() throws ConfigException {
final List<String> tables = this.getList(JdbcSourceTaskConfig.TABLES_CONFIG);
final String query = this.getString(JdbcSourceTaskConfig.QUERY_CONFIG);
if (tables.isEmpty() && query.isEmpty() || !tables.isEmpty() && !query.isEmpty()) {
throw new org.apache.kafka.connect.errors.ConnectException(
"Invalid configuration: each JdbcSourceTask must have at "
+ "least one table assigned to it or one query specified");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,9 @@ public TimestampIncrementingOffset extractValues(
extractedTimestamp = extractOffsetTimestamp(schema, record);
assert previousOffset == null
|| previousOffset.getTimestampOffset() == null
|| (previousOffset.getTimestampOffset() != null
|| previousOffset.getTimestampOffset() != null
&& previousOffset.getTimestampOffset().compareTo(
extractedTimestamp) <= 0
);
extractedTimestamp) <= 0;
}
Long extractedId = null;
if (hasIncrementedColumn()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.connect.jdbc.source;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;

import io.aiven.connect.jdbc.config.JdbcConfig;

import org.junit.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;


public final class JdbcSourceTaskConfigTest {

@Test(expected = ConfigException.class)
public void testValidateEmptyConfig() {
new JdbcSourceTaskConfig(Collections.emptyMap());
}

@Test
public void testValidateTablesAndQueryMandatoryConfigPresent() {
final Map<String, String> properties = new HashMap<>();
properties.put(JdbcConfig.CONNECTION_URL_CONFIG, "connection-url");
properties.put(JdbcSourceConnectorConfig.MODE_CONFIG, "bulk");
properties.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-prefix");
final JdbcSourceTaskConfig config = new JdbcSourceTaskConfig(properties);
assertThrowsExactly(ConnectException.class, config::validate,
"Invalid configuration: each JdbcSourceTask must"
+ " have at least one table assigned to it or one query specified");
}


@Test
public void testValidateQueryAndTablesGiven() {
final Map<String, String> properties = new HashMap<>();
properties.put(JdbcSourceTaskConfig.TABLES_CONFIG, "test-table-1, test-table-2");
properties.put(JdbcSourceTaskConfig.QUERY_CONFIG, "test-query");
properties.put(JdbcConfig.CONNECTION_URL_CONFIG, "connection-url");
properties.put(JdbcSourceConnectorConfig.MODE_CONFIG, "bulk");
properties.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-prefix");
final JdbcSourceTaskConfig config = new JdbcSourceTaskConfig(properties);
assertThrowsExactly(ConnectException.class, config::validate,
"Invalid configuration: each JdbcSourceTask must"
+ " have at least one table assigned to it or one query specified");
}

@Test
public void testValidateQueryGiven() {
final Map<String, String> properties = new HashMap<>();
properties.put(JdbcSourceTaskConfig.QUERY_CONFIG, "test-query");
properties.put(JdbcConfig.CONNECTION_URL_CONFIG, "connection-url");
properties.put(JdbcSourceConnectorConfig.MODE_CONFIG, "bulk");
properties.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-prefix");
final JdbcSourceTaskConfig config = new JdbcSourceTaskConfig(properties);
config.validate();
}

@Test
public void testValidateTablesGiven() {
final Map<String, String> properties = new HashMap<>();
properties.put(JdbcSourceTaskConfig.TABLES_CONFIG, "test-table-1, test-table-2");
properties.put(JdbcConfig.CONNECTION_URL_CONFIG, "connection-url");
properties.put(JdbcSourceConnectorConfig.MODE_CONFIG, "bulk");
properties.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-prefix");
final JdbcSourceTaskConfig config = new JdbcSourceTaskConfig(properties);
config.validate();
assertEquals(
config.getList(JdbcSourceTaskConfig.TABLES_CONFIG),
List.of("test-table-1", "test-table-2")
);
}

}
Loading