Skip to content

Commit

Permalink
[Feature][scaleph-pulgin-seatunnel-connectors] upgrade seatunnel conn…
Browse files Browse the repository at this point in the history
…ectors to 2.3.4 version (#697)

* feature: upgrade seatunnel http connectors to 2.3.4

* feature: upgrade seatunnel iotdb connectors to 2.3.4

* feature: upgrade seatunnel jdbc connectors to 2.3.4

* feature: upgrade seatunnel jdbc connectors to 2.3.4

* feature: upgrade seatunnel jdbc connectors to 2.3.4

* feature: upgrade seatunnel jdbc connectors to 2.3.4

* feature: upgrade seatunnel jdbc connectors to 2.3.4

* feature: upgrade seatunnel kafka connectors to 2.3.4

* feature: upgrade seatunnel pulsar connectors to 2.3.4

* feature: upgrade seatunnel pulsar connectors to 2.3.4

* feature: upgrade seatunnel redis connectors to 2.3.4

* feature: upgrade seatunnel s3redshift connectors to 2.3.4

* feature: upgrade seatunnel starrocks connectors to 2.3.4

* feature: upgrade seatunnel kudu connectors to 2.3.4

* feature: upgrade seatunnel kudu connectors to 2.3.4

---------

Co-authored-by: wangqi <wangqi@xinxuan.net>
  • Loading branch information
kalencaya and wangqi authored Mar 8, 2024
1 parent 62c394e commit 6e2fefb
Show file tree
Hide file tree
Showing 58 changed files with 1,635 additions and 238 deletions.
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@
<okhttp.version>4.10.0</okhttp.version>
<guava.version>32.1.3-jre</guava.version>
<minio.version>8.3.8</minio.version>
<milky.version>1.0.7</milky.version>
<milky.version>1.0.12</milky.version>
<sakura.version>1.0.2-SNAPSHOT</sakura.version>
<hadoop.version>3.3.4</hadoop.version>
<hive.version>3.1.3</hive.version>
<akka.version>2.6.21</akka.version>
<protobuf.version>3.21.5</protobuf.version>
<netty.version>4.1.82.Final</netty.version>
Expand Down Expand Up @@ -554,6 +555,11 @@
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ public ValidationResult validate(final String subject, final String value) {
.valid(reason == null).build();
}
};

public static final Validator DOUBLE_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value) {
String reason = null;
try {
Double.parseDouble(value);
} catch (NumberFormatException e) {
reason = "not a valid Double";
}

return new ValidationResult.Builder().subject(subject).input(value).explanation(reason)
.valid(reason == null).build();
}
};

public static final Validator NUMBER_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value) {
Expand All @@ -147,6 +163,7 @@ public ValidationResult validate(final String subject, final String value) {
.valid(reason == null).build();
}
};

public static final Validator NON_EMPTY_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public IoTDBSinkPlugin() {
props.add(KEY_MEASUREMENT_FIELDS);
props.add(STORAGE_GROUP);
props.add(BATCH_SIZE);
props.add(BATCH_INTERVAL_MS);
props.add(MAX_RETRIES);
props.add(RETRY_BACKOFF_MULTIPLIER_MS);
props.add(MAX_RETRY_BACKOFF_MS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ public enum IoTDBSinkProperties {
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Integer> BATCH_INTERVAL_MS = new PropertyDescriptor.Builder<Integer>()
.name("batch_interval_ms")
.description("For batch writing, when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms, the data will be flushed into the database")
.type(PropertyType.INT)
.parser(Parsers.INTEGER_PARSER)
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Integer> MAX_RETRIES = new PropertyDescriptor.Builder<Integer>()
.name("max_retries")
.description("The number of retries to flush failed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public IoTDBSourcePlugin() {
props.add(NUM_PARTITIONS);
props.add(LOWER_BOUND);
props.add(UPPER_BOUND);
props.add(THRIFT_MAX_FRAME_SIZE);
props.add(CommonProperties.PARALLELISM);
props.add(CommonProperties.RESULT_TABLE_NAME);
supportedProperties = Collections.unmodifiableList(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,12 @@ public enum IoTDBSourceProperties {
.addValidator(Validators.POSITIVE_LONG_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Integer> THRIFT_MAX_FRAME_SIZE = new PropertyDescriptor.Builder()
.name("thrift_max_frame_size")
.description("thrift max frame size")
.type(PropertyType.INT)
.parser(Parsers.INTEGER_PARSER)
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public enum JdbcProperties {
.fallbackProperty(JdbcPoolProperties.PASSWORD)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> PROPERTIES = new PropertyDescriptor.Builder()
.name("properties")
.description("Additional connection configuration parameters")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Integer> CONNECTION_CHECK_TIMEOUT_SEC = new PropertyDescriptor.Builder<Integer>()
.name("connection_check_timeout_sec")
.description("The time in seconds to wait for the database operation used to validate the connection to complete.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public JdbcSinkPlugin() {
props.add(SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST);
props.add(GENERATE_SINK_SQL);
props.add(PRIMARY_KEYS);
props.add(ENABLE_UPSERT);
props.add(QUERY);
props.add(MAX_RETRIES);
props.add(BATCH_SIZE);
Expand All @@ -62,6 +63,10 @@ public JdbcSinkPlugin() {
props.add(MAX_COMMIT_ATTEMPTS);
props.add(TRANSACTION_TIMEOUT_SEC);
props.add(AUTO_COMMIT);
props.add(FIELD_IDE);
props.add(SCHEMA_SAVE_MODE);
props.add(DATA_SAVE_MODE);
props.add(CUSTOM_SQL);
props.add(CommonProperties.PARALLELISM);
props.add(CommonProperties.SOURCE_TABLE_NAME);
supportedProperties = Collections.unmodifiableList(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,39 @@ public enum JdbcSinkProperties {
.defaultValue(true)
.parser(Parsers.BOOLEAN_PARSER)
.validateAndBuild();

public static final PropertyDescriptor<String> FIELD_IDE = new PropertyDescriptor.Builder<Boolean>()
.name("field_ide")
.description("The field \"field_ide\" is used to identify whether the field needs to be converted to uppercase or lowercase when synchronizing from the source to the sink")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.validateAndBuild();

public static final PropertyDescriptor<String> SCHEMA_SAVE_MODE = new PropertyDescriptor.Builder<Boolean>()
.name("schema_save_mode")
.description("Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.validateAndBuild();

public static final PropertyDescriptor<String> DATA_SAVE_MODE = new PropertyDescriptor.Builder<Boolean>()
.name("data_save_mode")
.description("Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side.")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.validateAndBuild();

public static final PropertyDescriptor<String> CUSTOM_SQL = new PropertyDescriptor.Builder<Boolean>()
.name("custom_sql")
.description("When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter.")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.validateAndBuild();

public static final PropertyDescriptor<Boolean> ENABLE_UPSERT = new PropertyDescriptor.Builder<Boolean>()
.name("enable_upsert")
.description("Enable upsert by primary_keys exist.")
.type(PropertyType.BOOLEAN)
.parser(Parsers.BOOLEAN_PARSER)
.validateAndBuild();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,18 @@ public JdbcSourcePlugin() {
props.add(CONNECTION_CHECK_TIMEOUT_SEC);
props.add(COMPATIBLE_MODE);
props.add(QUERY);
props.add(TABLE_PATH);
props.add(TABLE_LIST);
props.add(WHERE_CONDITION);
props.add(PARTITION_COLUMN);
props.add(PARTITION_UPPER_BOUND);
props.add(PARTITION_LOWER_BOUND);
props.add(PARTITION_NUM);
props.add(SPLIT_SIZE);
props.add(SPLIT_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
props.add(SPLIT_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
props.add(SPLIT_SAMPLE_SHARDING_THRESHOLD);
props.add(SPLIT_INVERSE_SAMPLING_RATE);
props.add(FETCH_SIZE);
props.add(CommonProperties.PARALLELISM);
props.add(CommonProperties.RESULT_TABLE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.jdbc.source;

import cn.sliew.scaleph.plugin.framework.property.*;
import com.fasterxml.jackson.databind.JsonNode;

public enum JdbcSourceProperties {
;
Expand All @@ -32,6 +33,30 @@ public enum JdbcSourceProperties {
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> TABLE_PATH = new PropertyDescriptor.Builder<String>()
.name("table_path")
.description("The path to the full path of table, you can use this configuration instead of query")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> TABLE_LIST = new PropertyDescriptor.Builder<String>()
.name("table_list")
.description("The path to the full path of table, you can use this configuration instead of query")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> WHERE_CONDITION = new PropertyDescriptor.Builder<String>()
.name("where_condition")
.description("Common row filter conditions for all tables/queries")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> PARTITION_COLUMN = new PropertyDescriptor.Builder<String>()
.name("partition_column")
.description("The column name for parallelism's partition, only support numeric type.")
Expand Down Expand Up @@ -66,6 +91,46 @@ public enum JdbcSourceProperties {
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Integer> SPLIT_SIZE = new PropertyDescriptor.Builder<Integer>()
.name("split.size")
.description("How many rows in one split")
.type(PropertyType.INT)
.parser(Parsers.INTEGER_PARSER)
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Double> SPLIT_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = new PropertyDescriptor.Builder<Integer>()
.name("split.even-distribution.factor.lower-bound")
.description("The lower bound of the chunk key distribution factor")
.type(PropertyType.INT)
.parser(Parsers.DOUBLE_PARSER)
.addValidator(Validators.DOUBLE_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Double> SPLIT_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = new PropertyDescriptor.Builder<Integer>()
.name("split.even-distribution.factor.upper-bound")
.description("The upper bound of the chunk key distribution factor")
.type(PropertyType.INT)
.parser(Parsers.DOUBLE_PARSER)
.addValidator(Validators.DOUBLE_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Integer> SPLIT_SAMPLE_SHARDING_THRESHOLD = new PropertyDescriptor.Builder<Integer>()
.name("split.sample-sharding.threshold")
.description("This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy")
.type(PropertyType.INT)
.parser(Parsers.INTEGER_PARSER)
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Integer> SPLIT_INVERSE_SAMPLING_RATE = new PropertyDescriptor.Builder<Integer>()
.name("split.inverse-sampling.rate")
.description("The inverse of the sampling rate used in the sample sharding strategy.")
.type(PropertyType.INT)
.parser(Parsers.INTEGER_PARSER)
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Integer> FETCH_SIZE = new PropertyDescriptor.Builder<Integer>()
.name("fetch_size")
.description("The number of records writen per batch")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public enum KafkaProperties {
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> KAFKA_CONF = new PropertyDescriptor.Builder()
public static final PropertyDescriptor<JsonNode> KAFKA_CONFIG = new PropertyDescriptor.Builder()
.name("kafka.config")
.description(
"The way to specify parameters is to add the prefix kafka. to the original parameter name. For example, the way to specify auto.offset.reset is: kafka.auto.offset.reset = latest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public ObjectNode createConf() {
KafkaDataSource dataSource = (KafkaDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode);
conf.putPOJO(BOOTSTRAP_SERVERS.getName(), dataSource.getBootstrapServers());
for (Map.Entry<String, Object> entry : properties.toMap().entrySet()) {
if (entry.getKey().startsWith(KAFKA_CONF.getName())) {
if (entry.getKey().startsWith(KAFKA_CONFIG.getName())) {
conf.putPOJO(entry.getKey(), entry.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public KafkaSourcePlugin() {
props.add(START_MODE_TIMESTAMP);
props.add(START_MODE_OFFSETS);
props.add(PARTITION_DISCOVERY_INTERVAL_MILLIS);
props.add(KAFKA_CONF);
props.add(KAFKA_CONFIG);
props.add(CommonProperties.PARALLELISM);
props.add(CommonProperties.RESULT_TABLE_NAME);
supportedProperties = Collections.unmodifiableList(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,60 @@ public enum KuduProperties {
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> KUDU_TABLE = new PropertyDescriptor.Builder()
.name("kudu_table")
.description("The name of kudu table")
public static final PropertyDescriptor<Boolean> ENABLE_KERBEROS = new PropertyDescriptor.Builder()
.name("enable_kerberos")
.description("Kerberos principal enable.")
.type(PropertyType.BOOLEAN)
.parser(Parsers.BOOLEAN_PARSER)
.addValidator(Validators.BOOLEAN_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("kerberos_principal")
.description("Kerberos principal.")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.properties(Property.Required)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> COLUMNS_LIST = new PropertyDescriptor.Builder()
.name("columnsList")
.description("Specifies the column names of the table")
public static final PropertyDescriptor<String> KERBEROS_KEYTAB = new PropertyDescriptor.Builder()
.name("kerberos_keytab")
.description("Kerberos keytab.")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.properties(Property.Required)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> SAVE_MODE = new PropertyDescriptor.Builder()
.name("save_mode")
.description("Storage mode, we need support overwrite and append")
public static final PropertyDescriptor<String> KERBEROS_KRB5CONF = new PropertyDescriptor.Builder()
.name("kerberos_krb5conf")
.description("Kerberos krb5 conf.")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.properties(Property.Required)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Integer> CLIENT_WORKER_COUNT = new PropertyDescriptor.Builder()
.name("client_worker_count")
.description("Kudu worker count. Default value is twice the current number of cpu cores.")
.type(PropertyType.INT)
.parser(Parsers.INTEGER_PARSER)
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Long> CLIENT_DEFAULT_OPERATION_TIMEOUT_MS = new PropertyDescriptor.Builder()
.name("client_default_operation_timeout_ms")
.description("Kudu normal operation timeout.")
.type(PropertyType.INT)
.parser(Parsers.LONG_PARSER)
.addValidator(Validators.POSITIVE_LONG_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Long> CLIENT_DEFAULT_ADMIN_OPERATION_TIMEOUT_MS = new PropertyDescriptor.Builder()
.name("client_default_admin_operation_timeout_ms")
.description("Kudu admin operation timeout.")
.type(PropertyType.INT)
.parser(Parsers.LONG_PARSER)
.addValidator(Validators.POSITIVE_LONG_VALIDATOR)
.validateAndBuild();

}
Loading

0 comments on commit 6e2fefb

Please sign in to comment.