From 3cc492100fdd4f193914e958d1669207e990390a Mon Sep 17 00:00:00 2001 From: liangchg Date: Thu, 27 Feb 2025 15:33:09 +0800 Subject: [PATCH 1/2] Add Paimon format support --- .../services/org.apache.hadoop.fs.FileSystem | 0 java/PaimonCDCSink/README.md | 165 +++++++++ java/PaimonCDCSink/pom.xml | 340 ++++++++++++++++++ .../services/msf/PaimonCDCSinkJob.java | 107 ++++++ .../flink-application-properties-dev.json | 39 ++ .../src/main/resources/hive-site.xml | 273 ++++++++++++++ .../src/main/resources/log4j2.properties | 7 + 7 files changed, 931 insertions(+) create mode 100644 java/IcebergDatastreamSink/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem create mode 100644 java/PaimonCDCSink/README.md create mode 100644 java/PaimonCDCSink/pom.xml create mode 100644 java/PaimonCDCSink/src/main/java/com/amazonaws/services/msf/PaimonCDCSinkJob.java create mode 100644 java/PaimonCDCSink/src/main/resources/flink-application-properties-dev.json create mode 100644 java/PaimonCDCSink/src/main/resources/hive-site.xml create mode 100644 java/PaimonCDCSink/src/main/resources/log4j2.properties diff --git a/java/IcebergDatastreamSink/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/java/IcebergDatastreamSink/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem new file mode 100644 index 0000000..e69de29 diff --git a/java/PaimonCDCSink/README.md b/java/PaimonCDCSink/README.md new file mode 100644 index 0000000..49418b9 --- /dev/null +++ b/java/PaimonCDCSink/README.md @@ -0,0 +1,165 @@ +## Flink Apache Paimon Sink using DataStream API + +* Flink version: 1.20 +* Flink API: DataStream API +* Language: Java (11) +* Apache Paimon: 1.0.1 +* Flink connectors: Flink CDC-MySQL / PostgreSQL / MongoDB / Kafka + +This example demonstrates how to use Apache Paimon CDC ingestion components(MySQL / PostgreSQL / MongoDB / Kafka) to sink +data to Amazon S3 with Apache Paimon table format. The Apache Paimon Hive Catalog can work with Glue Data Catalog. + +The project can run both on Amazon Managed Service for Apache Flink, and locally for development. + +### Prerequisites +* A database source(MySQL, PostgreSQL, MongoDB) with binlog enabled or Kakfa / Amazon MSK source with Apache Paimon + supported CDC format(Canal CDC, Debezium CDC, Maxwell CDC, OGG CDC, JSON, aws-dms-json ) data streamed in it. +* If you want to use Apache Paimon Hive catalog with Glue Data Catalog, please install aws-glue-datacatalog-hive3-client + jar file into your local maven repo(please refer this [github repo](https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore) to install or + you can find this jar file in EMR Cluster and install it into your local maven repo) and copy your EMR cluster's `hive-site.xml` file into the project and repackage the project. +* An S3 bucket to write the Paimon table. + + +#### IAM Permissions + +The application must have IAM permissions to: +* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. + See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). +* Read and Write from the S3 bucket. + + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*. + +When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder. + +This example parses runtime parameters according to the following rules and passes the parsed parameters to Apache Paimon Actions. + +- The Paimon CDC ingestion action name is parsed from the key named action in the 'ActionConf' parameter group. +- Some global or common parameters can be placed in the 'ActionConf' parameter group. The parameter names should refer to the specific ingestion [action name](https://paimon.apache.org/docs/1.0/cdc-ingestion/overview/). +- For parameters like 'table_conf' and 'catalog_conf' that are set in the format of Key=Value, the name of the parameter group can be customized, such as “TableConf” or “CatalogConf”. +For specific parameter names within the parameter group, they should follow the format “parameter group name@_parameter Key”, +such as “table_conf@_bucket”, and the parameter value should be the corresponding Value. + + +Runtime parameters(Sample): + +| Group ID | Key | Description | +|---------------|--------------------------------------------|----------------------------------------------------------------------------------------| +| `ActionConf` | `action` | Name of Apache Paimon CDC ingestion, `kafka_sync_database`, `mysql_sync_database` etc. | +| `ActionConf` | `database` | Target Paimon database name. | +| `ActionConf` | `primary_keys` | (Optional) The primary keys for Paimon table | +| `KafkaConf` | `kafka_conf@_properties.bootstrap.servers` | Bootstrap servers of the Kafka Cluster. | +| `KafkaConf` | `kafka_conf@_properties.auto.offset.reset` | Offset of the Kafka Consumer | +| `KafkaConf` | `kafka_conf@_properties.group.id` | Consumer group Id | +| `CatalogConf` | `catalog_conf@_metastore.client.class` | Paimon Hive Catalog metastore client class name | +| `CatalogConf` | `...` | ... | +| `TableConf` | `table_conf@_bucket` | Bucket of Paimon table | +| `TableConf` | `...` | ... | + +All parameters are case-sensitive. + +### Samples +**Create an MSF application** + +First, compile and package the application using Maven, then copy the packaged jar file to your s3. + +```shell +mvn clean package -P KafkaCDC +``` + +Second, prepare an input json file to create a MSF application, you can add required information(like VPC, Subnets,Security.etc.) into this json file. + +**Notice:** Your service execution role should have appropriate permissions, like s3 bucket access and glue access if you want to use Glue Data Catalog as Paimon Hive Catalog. +```json +{ + "ApplicationName": "kafka-cdc-paimon", + "ApplicationDescription": "Sink CDC from Kafka as Apache Paimon table", + "RuntimeEnvironment": "FLINK-1_20", + "ServiceExecutionRole": "Your service role arn", + "ApplicationConfiguration": { + "ApplicationCodeConfiguration": { + "CodeContent": { + "S3ContentLocation": { + "BucketARN": "Your bucket arn", + "FileKey": "Your jar file s3 key" + } + }, + "CodeContentType": "ZIPFILE" + }, + "EnvironmentProperties": { + "PropertyGroups": [ + { + "PropertyGroupId": "ActionConf", + "PropertyMap": { + "action": "kafka_sync_database", + "database": "Your Paimon Database", + "warehouse": "Your paimon warehouse path" + } + }, + { + "PropertyGroupId": "KafkaConf", + "PropertyMap": { + "kafka_conf@_properties.bootstrap.servers": "MSK bootstrap servers", + "kafka_conf@_properties.auto.offset.reset": "earliest", + "kafka_conf@_properties.group.id": "group id", + "kafka_conf@_topic": "Your cdc topic", + "kafka_conf@_value.format": "debezium-json" + } + }, + { + "PropertyGroupId": "CatalogConf", + "PropertyMap": { + "catalog_conf@_hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "catalog_conf@_hadoop.fs.s3.buffer.dir": "/var/tmp" + } + }, + { + "PropertyGroupId": "TableConf", + "PropertyMap": { + "table_conf@_bucket": "4", + "table_conf@_metadata.iceberg.storage": "hive-catalog", + "table_conf@_metadata.iceberg.manifest-legacy-version": "true", + "table_conf@_metadata.iceberg.hive-client-class": "com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient", + "table_conf@_fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "table_conf@_fs.s3.buffer.dir": "/var/tmp", + "table_conf@_sink.parallelism": "4" + } + } + ] + } + }, + "FlinkApplicationConfiguration": { + "ParallelismConfiguration": { + "AutoScalingEnabled": true, + "Parallelism": 4, + "ParallelismPerKPU": 1 + } + }, + "CloudWatchLoggingOptions": [ + { + "LogStreamARN": "arn:aws:logs:us-west-2:YourAccountId:log-group:/aws/kinesis-analytics/kafka-cdc-paimon:log-stream:kinesis-analytics-log-stream" + } + ] +} +``` + +Last, create an MSF application using AWS CLI. + +```shell +aws kinesisanalyticsv2 create-application \ +--cli-input-json file://create-kafkacdc-paimon.json +``` + +### Running in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](../running-examples-locally.md) for details. + +### Generating data + +You can use [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator), +also available in a [hosted version](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html), +to generate random data to Kinesis Data Stream and test the application. \ No newline at end of file diff --git a/java/PaimonCDCSink/pom.xml b/java/PaimonCDCSink/pom.xml new file mode 100644 index 0000000..e4a4fdd --- /dev/null +++ b/java/PaimonCDCSink/pom.xml @@ -0,0 +1,340 @@ + + + 4.0.0 + + com.amazonaws + amazon-msf-examples + 1.0 + + + paimon-cdc-sink + + + UTF-8 + ${project.basedir}/target + ${project.name}-${project.version} + 11 + ${target.java.version} + ${target.java.version} + 1.20.0 + 5.0.0-1.20 + 1.2.0 + 2.23.1 + 2.16.2 + 1.0.1 + 3.4.0-1.20 + 3.3.0 + 8.4.0 + 3.4.0 + 2.30.16 + + + + + + com.amazonaws + aws-java-sdk-bom + + 1.12.676 + pom + import + + + + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + + org.apache.flink + flink-s3-fs-hadoop + ${flink.version} + + + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + provided + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + provided + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + provided + + + + org.apache.paimon + paimon-flink-action + ${paimon.version} + + + + org.apache.paimon + paimon-flink-cdc + ${paimon.version} + + + + org.apache.paimon + paimon-flink-1.20 + ${paimon.version} + + + org.apache.paimon + paimon-hive-connector-3.1 + ${paimon.version} + + + + org.apache.thrift + libthrift + 0.21.0 + + + org.apache.thrift + libfb303 + 0.9.3 + + + org.apache.hive + hive-exec + 3.1.3 + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + + org.slf4j + slf4j-reload4j + + + + + + com.amazonaws.glue + aws-glue-datacatalog-hive3-client + 4.2.0 + + + + software.amazon.awssdk + glue + ${aws.sdkv2.version} + compile + + + software.amazon.awssdk + aws-core + ${aws.sdkv2.version} + compile + + + software.amazon.awssdk + sts + ${aws.sdkv2.version} + compile + + + software.amazon.awssdk + utils + ${aws.sdkv2.version} + + + + + + + KafkaCDC + + + org.apache.flink + flink-sql-connector-kafka + ${flink.kafka.sql.version} + + + + kafka + + + + + MySQLCDC + + + org.apache.flink + flink-connector-mysql-cdc + ${flink.cdc.version} + + + com.mysql + mysql-connector-j + ${mysql.driver.version} + + + + mysql + + + + PostgresCDC + + + org.apache.flink + flink-connector-postgres-cdc + ${flink.cdc.version} + + + + postgre + + + + MongoDBCDC + + + org.apache.flink + flink-connector-mongodb-cdc + ${flink.cdc.version} + + + + mongo + + + + + + + ${buildDirectory} + ${cdc.source}-${jar.finalName} + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + META-INF/versions/17/**/*.class + META-INF/versions/19/**/*.class + META-INF/versions/15/**/*.class + + + + + + + org.apache.kafka.connect + org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect + + + org.apache.kafka + org.apache.flink.kafka.shaded.org.apache.kafka + + + + + + + com.amazonaws.services.msf.PaimonCDCSinkJob + + + + + + + + + + \ No newline at end of file diff --git a/java/PaimonCDCSink/src/main/java/com/amazonaws/services/msf/PaimonCDCSinkJob.java b/java/PaimonCDCSink/src/main/java/com/amazonaws/services/msf/PaimonCDCSinkJob.java new file mode 100644 index 0000000..c6ff043 --- /dev/null +++ b/java/PaimonCDCSink/src/main/java/com/amazonaws/services/msf/PaimonCDCSinkJob.java @@ -0,0 +1,107 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.flink.action.Action; +import org.apache.paimon.flink.action.ActionBase; +import org.apache.paimon.flink.action.ActionFactory; + +import java.io.IOException; +import java.util.*; + +public class PaimonCDCSinkJob { + + private static final Logger LOGGER = LogManager.getLogger(PaimonCDCSinkJob.class); + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static final String SEP_KEY = "@_"; + private static final String ACTION_CONF_GROUP = "ActionConf"; + private static final String ACTION_KEY = "action"; + private static final String PARAM_KEY_PREFIX = "--"; + + public static void main(String[] args) throws Exception{ + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + Map confMap = loadApplicationProperties(env); + String[] actionArgs = configToActionParameters(confMap); + if (actionArgs.length < 1) { + LOGGER.error("No action specified"); + System.exit(1); + } + + LOGGER.info("actionArgs: {}", Arrays.toString(actionArgs)); + + Optional actionOpt = ActionFactory.createAction(actionArgs); + + if (actionOpt.isPresent()) { + Action action = actionOpt.get(); + if (action instanceof ActionBase) { + LOGGER.info("ActionBase: {}", action.getClass().getName()); + ((ActionBase) action).withStreamExecutionEnvironment(env).run(); + } else { + action.run(); + } + } else { + LOGGER.info("No paimon flink action service found"); + System.exit(1); + } + } + + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (env instanceof LocalStreamEnvironment) { + LOGGER.debug("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + PaimonCDCSinkJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); + } else { + LOGGER.debug("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + private static String[] configToActionParameters(Map confMap) { + + Properties actionProp = confMap.get(ACTION_CONF_GROUP); + if (actionProp == null) { + LOGGER.error("ActionConf not found in application properties"); + System.exit(1); + } + + String action = actionProp.getProperty(ACTION_KEY); + if (action == null || action.isEmpty()) { + LOGGER.error("Action not found in application properties"); + } + + actionProp.remove(ACTION_KEY); + + List params = new ArrayList<>(); + params.add(action); + + for (Map.Entry confEntry : confMap.entrySet()) { + confEntry.getValue().forEach( + (k, v) -> { + String ks = k.toString(); + int idx = ks.indexOf(SEP_KEY); + String paramKey; + String paramVal; + if (idx != -1) { + paramKey = String.format("%s%s", PARAM_KEY_PREFIX , ks.substring(0, idx)); + paramVal = String.format("%s=%s", ks.substring(idx + SEP_KEY.length()), v); + + } else { + paramKey = String.format("%s%s", PARAM_KEY_PREFIX , ks); + paramVal = v.toString(); + } + params.add(paramKey); + params.add(paramVal); + } + ); + } + + return params.toArray(new String[0]); + } + +} diff --git a/java/PaimonCDCSink/src/main/resources/flink-application-properties-dev.json b/java/PaimonCDCSink/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..cb0fa4d --- /dev/null +++ b/java/PaimonCDCSink/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,39 @@ +[ + { + "PropertyGroupId": "ActionConf", + "PropertyMap": { + "action": "kafka_sync_database", + "warehouse": "s3://bucket/data/prefix", + "database": "paimon_flink", + "primary_keys": "ID", + "table_prefix": "ods_" + } + }, + { + "PropertyGroupId": "KafkaConf", + "PropertyMap": { + "kafka_conf@_properties.bootstrap.servers": "b-2.mycluster.bzvtby.c8.kafka.us-west-2.amazonaws.com:9092,b-1.mycluster.bzvtby.c8.kafka.us-west-2.amazonaws.com:9092", + "kafka_conf@_topic": "kafka_topic", + "kafka_conf@_properties.group.id": 1234546, + "kafka_conf@_properties.auto.offset.reset": "earliest" + } + }, + { + "PropertyGroupId": "CatalogConf", + "PropertyMap": { + "catalog_conf@_metastore": "hive", + "catalog_conf@_hive-conf-dir": "/etc/hive/conf.dist", + "catalog_conf@_lock.enabled": "false", + "catalog_conf@_metastore.client.class": "com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient", + "catalog_conf@_warehouse": "s3://bucket/data/prefix" + } + }, + { + "PropertyGroupId": "TableConf", + "PropertyMap": { + "table_conf@_bucket": "4", + "table_conf@_changelog-producer": "input", + "table_conf@_sink.parallelism": "4" + } + } +] \ No newline at end of file diff --git a/java/PaimonCDCSink/src/main/resources/hive-site.xml b/java/PaimonCDCSink/src/main/resources/hive-site.xml new file mode 100644 index 0000000..95bb612 --- /dev/null +++ b/java/PaimonCDCSink/src/main/resources/hive-site.xml @@ -0,0 +1,273 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + hbase.master + + http://wiki.apache.org/hadoop/Hive/HBaseIntegration + + + + hive.zookeeper.quorum + ip-xx-xx-xx-xx.us-west-2.compute.internal:2181 + + + + hive.llap.zk.sm.connectionString + ip-xx-xx-xx-xx.us-west-2.compute.internal:2181 + + + + hbase.zookeeper.quorum + ip-xx-xx-xx-xx.us-west-2.compute.internal + http://wiki.apache.org/hadoop/Hive/HBaseIntegration + + + + hive.execution.engine + tez + + + + fs.defaultFS + hdfs://ip-xx-xx-xx-xx.us-west-2.compute.internal:8020 + + + + + hive.metastore.uris + thrift://ip-xx-xx-xx-xx.us-west-2.compute.internal:9083 + JDBC connect string for a JDBC metastore + + + + javax.jdo.option.ConnectionURL + jdbc:mysql://ip-xx-xx-xx-xx.us-west-2.compute.internal:3306/hive?createDatabaseIfNotExist=true + username to use against metastore database + + + + javax.jdo.option.ConnectionDriverName + org.mariadb.jdbc.Driver + username to use against metastore database + + + + javax.jdo.option.ConnectionUserName + hive + username to use against metastore database + + + + javax.jdo.option.ConnectionPassword + kWs5sQ8HnZaEC2kj + password to use against metastore database + + + + hive.server2.allow.user.substitution + true + + + + hive.server2.enable.doAs + true + + + + hive.server2.thrift.port + 10000 + + + + hive.server2.thrift.http.port + 10001 + + + + + + hive.optimize.ppd.input.formats + com.amazonaws.emr.s3select.hive.S3SelectableTextInputFormat + + + + s3select.filter + false + + + + hive.server2.in.place.progress + false + + + + hive.llap.zk.registry.user + hadoop + + + + hive.security.metastore.authorization.manager + org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider + + + + hive.log.explain.output + false + + + + datanucleus.fixedDatastore + true + + + + mapred.reduce.tasks + -1 + + + + mapred.max.split.size + 256000000 + + + + hive.mapjoin.hybridgrace.hashtable + false + + + + hive.merge.nway.joins + false + + + + hive.metastore.connect.retries + 15 + + + + hive.optimize.joinreducededuplication + false + + + + hive.optimize.sort.dynamic.partition.threshold + 1 + + + + hive.server2.materializedviews.registry.impl + DUMMY + + + + hive.tez.auto.reducer.parallelism + true + + + + hive.vectorized.execution.mapjoin.minmax.enabled + true + + + + hive.vectorized.execution.mapjoin.native.fast.hashtable.enabled + true + + + + hive.optimize.dynamic.partition.hashjoin + true + + + + hive.compactor.initiator.on + true + + + + hive.blobstore.use.output-committer + true + + + + hive.llap.daemon.service.hosts + @llap0 + + + + hive.llap.execution.mode + only + + + + hive.optimize.metadataonly + true + + + + hive.tez.bucket.pruning + true + + + + hive.exec.mode.local.auto + true + + + + hive.exec.mode.local.auto.inputbytes.max + 50000000 + + + + hive.query.reexecution.stats.persist.scope + hiveserver + + + + hive.metastore.client.factory.class + com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory + + + + hive.auto.convert.join.noconditionaltask.size + 1073741824 + + + + hive.compactor.worker.threads + 1 + + + + + + + diff --git a/java/PaimonCDCSink/src/main/resources/log4j2.properties b/java/PaimonCDCSink/src/main/resources/log4j2.properties new file mode 100644 index 0000000..3546643 --- /dev/null +++ b/java/PaimonCDCSink/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n From 5231067e792ef6e5ed071388231b34f156b99a3a Mon Sep 17 00:00:00 2001 From: liangchg Date: Thu, 27 Feb 2025 15:33:09 +0800 Subject: [PATCH 2/2] Add Paimon format support --- .../services/org.apache.hadoop.fs.FileSystem | 0 java/PaimonCDCSink/README.md | 165 +++++++++ java/PaimonCDCSink/pom.xml | 340 ++++++++++++++++++ .../services/msf/PaimonCDCSinkJob.java | 107 ++++++ .../flink-application-properties-dev.json | 39 ++ .../src/main/resources/hive-site.xml | 273 ++++++++++++++ .../src/main/resources/log4j2.properties | 7 + java/pom.xml | 1 + 8 files changed, 932 insertions(+) create mode 100644 java/IcebergDatastreamSink/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem create mode 100644 java/PaimonCDCSink/README.md create mode 100644 java/PaimonCDCSink/pom.xml create mode 100644 java/PaimonCDCSink/src/main/java/com/amazonaws/services/msf/PaimonCDCSinkJob.java create mode 100644 java/PaimonCDCSink/src/main/resources/flink-application-properties-dev.json create mode 100644 java/PaimonCDCSink/src/main/resources/hive-site.xml create mode 100644 java/PaimonCDCSink/src/main/resources/log4j2.properties diff --git a/java/IcebergDatastreamSink/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/java/IcebergDatastreamSink/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem new file mode 100644 index 0000000..e69de29 diff --git a/java/PaimonCDCSink/README.md b/java/PaimonCDCSink/README.md new file mode 100644 index 0000000..49418b9 --- /dev/null +++ b/java/PaimonCDCSink/README.md @@ -0,0 +1,165 @@ +## Flink Apache Paimon Sink using DataStream API + +* Flink version: 1.20 +* Flink API: DataStream API +* Language: Java (11) +* Apache Paimon: 1.0.1 +* Flink connectors: Flink CDC-MySQL / PostgreSQL / MongoDB / Kafka + +This example demonstrates how to use Apache Paimon CDC ingestion components(MySQL / PostgreSQL / MongoDB / Kafka) to sink +data to Amazon S3 with Apache Paimon table format. The Apache Paimon Hive Catalog can work with Glue Data Catalog. + +The project can run both on Amazon Managed Service for Apache Flink, and locally for development. + +### Prerequisites +* A database source(MySQL, PostgreSQL, MongoDB) with binlog enabled or Kakfa / Amazon MSK source with Apache Paimon + supported CDC format(Canal CDC, Debezium CDC, Maxwell CDC, OGG CDC, JSON, aws-dms-json ) data streamed in it. +* If you want to use Apache Paimon Hive catalog with Glue Data Catalog, please install aws-glue-datacatalog-hive3-client + jar file into your local maven repo(please refer this [github repo](https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore) to install or + you can find this jar file in EMR Cluster and install it into your local maven repo) and copy your EMR cluster's `hive-site.xml` file into the project and repackage the project. +* An S3 bucket to write the Paimon table. + + +#### IAM Permissions + +The application must have IAM permissions to: +* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. + See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). +* Read and Write from the S3 bucket. + + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*. + +When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder. + +This example parses runtime parameters according to the following rules and passes the parsed parameters to Apache Paimon Actions. + +- The Paimon CDC ingestion action name is parsed from the key named action in the 'ActionConf' parameter group. +- Some global or common parameters can be placed in the 'ActionConf' parameter group. The parameter names should refer to the specific ingestion [action name](https://paimon.apache.org/docs/1.0/cdc-ingestion/overview/). +- For parameters like 'table_conf' and 'catalog_conf' that are set in the format of Key=Value, the name of the parameter group can be customized, such as “TableConf” or “CatalogConf”. +For specific parameter names within the parameter group, they should follow the format “parameter group name@_parameter Key”, +such as “table_conf@_bucket”, and the parameter value should be the corresponding Value. + + +Runtime parameters(Sample): + +| Group ID | Key | Description | +|---------------|--------------------------------------------|----------------------------------------------------------------------------------------| +| `ActionConf` | `action` | Name of Apache Paimon CDC ingestion, `kafka_sync_database`, `mysql_sync_database` etc. | +| `ActionConf` | `database` | Target Paimon database name. | +| `ActionConf` | `primary_keys` | (Optional) The primary keys for Paimon table | +| `KafkaConf` | `kafka_conf@_properties.bootstrap.servers` | Bootstrap servers of the Kafka Cluster. | +| `KafkaConf` | `kafka_conf@_properties.auto.offset.reset` | Offset of the Kafka Consumer | +| `KafkaConf` | `kafka_conf@_properties.group.id` | Consumer group Id | +| `CatalogConf` | `catalog_conf@_metastore.client.class` | Paimon Hive Catalog metastore client class name | +| `CatalogConf` | `...` | ... | +| `TableConf` | `table_conf@_bucket` | Bucket of Paimon table | +| `TableConf` | `...` | ... | + +All parameters are case-sensitive. + +### Samples +**Create an MSF application** + +First, compile and package the application using Maven, then copy the packaged jar file to your s3. + +```shell +mvn clean package -P KafkaCDC +``` + +Second, prepare an input json file to create a MSF application, you can add required information(like VPC, Subnets,Security.etc.) into this json file. + +**Notice:** Your service execution role should have appropriate permissions, like s3 bucket access and glue access if you want to use Glue Data Catalog as Paimon Hive Catalog. +```json +{ + "ApplicationName": "kafka-cdc-paimon", + "ApplicationDescription": "Sink CDC from Kafka as Apache Paimon table", + "RuntimeEnvironment": "FLINK-1_20", + "ServiceExecutionRole": "Your service role arn", + "ApplicationConfiguration": { + "ApplicationCodeConfiguration": { + "CodeContent": { + "S3ContentLocation": { + "BucketARN": "Your bucket arn", + "FileKey": "Your jar file s3 key" + } + }, + "CodeContentType": "ZIPFILE" + }, + "EnvironmentProperties": { + "PropertyGroups": [ + { + "PropertyGroupId": "ActionConf", + "PropertyMap": { + "action": "kafka_sync_database", + "database": "Your Paimon Database", + "warehouse": "Your paimon warehouse path" + } + }, + { + "PropertyGroupId": "KafkaConf", + "PropertyMap": { + "kafka_conf@_properties.bootstrap.servers": "MSK bootstrap servers", + "kafka_conf@_properties.auto.offset.reset": "earliest", + "kafka_conf@_properties.group.id": "group id", + "kafka_conf@_topic": "Your cdc topic", + "kafka_conf@_value.format": "debezium-json" + } + }, + { + "PropertyGroupId": "CatalogConf", + "PropertyMap": { + "catalog_conf@_hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "catalog_conf@_hadoop.fs.s3.buffer.dir": "/var/tmp" + } + }, + { + "PropertyGroupId": "TableConf", + "PropertyMap": { + "table_conf@_bucket": "4", + "table_conf@_metadata.iceberg.storage": "hive-catalog", + "table_conf@_metadata.iceberg.manifest-legacy-version": "true", + "table_conf@_metadata.iceberg.hive-client-class": "com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient", + "table_conf@_fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "table_conf@_fs.s3.buffer.dir": "/var/tmp", + "table_conf@_sink.parallelism": "4" + } + } + ] + } + }, + "FlinkApplicationConfiguration": { + "ParallelismConfiguration": { + "AutoScalingEnabled": true, + "Parallelism": 4, + "ParallelismPerKPU": 1 + } + }, + "CloudWatchLoggingOptions": [ + { + "LogStreamARN": "arn:aws:logs:us-west-2:YourAccountId:log-group:/aws/kinesis-analytics/kafka-cdc-paimon:log-stream:kinesis-analytics-log-stream" + } + ] +} +``` + +Last, create an MSF application using AWS CLI. + +```shell +aws kinesisanalyticsv2 create-application \ +--cli-input-json file://create-kafkacdc-paimon.json +``` + +### Running in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](../running-examples-locally.md) for details. + +### Generating data + +You can use [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator), +also available in a [hosted version](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html), +to generate random data to Kinesis Data Stream and test the application. \ No newline at end of file diff --git a/java/PaimonCDCSink/pom.xml b/java/PaimonCDCSink/pom.xml new file mode 100644 index 0000000..e4a4fdd --- /dev/null +++ b/java/PaimonCDCSink/pom.xml @@ -0,0 +1,340 @@ + + + 4.0.0 + + com.amazonaws + amazon-msf-examples + 1.0 + + + paimon-cdc-sink + + + UTF-8 + ${project.basedir}/target + ${project.name}-${project.version} + 11 + ${target.java.version} + ${target.java.version} + 1.20.0 + 5.0.0-1.20 + 1.2.0 + 2.23.1 + 2.16.2 + 1.0.1 + 3.4.0-1.20 + 3.3.0 + 8.4.0 + 3.4.0 + 2.30.16 + + + + + + com.amazonaws + aws-java-sdk-bom + + 1.12.676 + pom + import + + + + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + + org.apache.flink + flink-s3-fs-hadoop + ${flink.version} + + + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + provided + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + provided + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + provided + + + + org.apache.paimon + paimon-flink-action + ${paimon.version} + + + + org.apache.paimon + paimon-flink-cdc + ${paimon.version} + + + + org.apache.paimon + paimon-flink-1.20 + ${paimon.version} + + + org.apache.paimon + paimon-hive-connector-3.1 + ${paimon.version} + + + + org.apache.thrift + libthrift + 0.21.0 + + + org.apache.thrift + libfb303 + 0.9.3 + + + org.apache.hive + hive-exec + 3.1.3 + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + + org.slf4j + slf4j-reload4j + + + + + + com.amazonaws.glue + aws-glue-datacatalog-hive3-client + 4.2.0 + + + + software.amazon.awssdk + glue + ${aws.sdkv2.version} + compile + + + software.amazon.awssdk + aws-core + ${aws.sdkv2.version} + compile + + + software.amazon.awssdk + sts + ${aws.sdkv2.version} + compile + + + software.amazon.awssdk + utils + ${aws.sdkv2.version} + + + + + + + KafkaCDC + + + org.apache.flink + flink-sql-connector-kafka + ${flink.kafka.sql.version} + + + + kafka + + + + + MySQLCDC + + + org.apache.flink + flink-connector-mysql-cdc + ${flink.cdc.version} + + + com.mysql + mysql-connector-j + ${mysql.driver.version} + + + + mysql + + + + PostgresCDC + + + org.apache.flink + flink-connector-postgres-cdc + ${flink.cdc.version} + + + + postgre + + + + MongoDBCDC + + + org.apache.flink + flink-connector-mongodb-cdc + ${flink.cdc.version} + + + + mongo + + + + + + + ${buildDirectory} + ${cdc.source}-${jar.finalName} + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + META-INF/versions/17/**/*.class + META-INF/versions/19/**/*.class + META-INF/versions/15/**/*.class + + + + + + + org.apache.kafka.connect + org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect + + + org.apache.kafka + org.apache.flink.kafka.shaded.org.apache.kafka + + + + + + + com.amazonaws.services.msf.PaimonCDCSinkJob + + + + + + + + + + \ No newline at end of file diff --git a/java/PaimonCDCSink/src/main/java/com/amazonaws/services/msf/PaimonCDCSinkJob.java b/java/PaimonCDCSink/src/main/java/com/amazonaws/services/msf/PaimonCDCSinkJob.java new file mode 100644 index 0000000..c6ff043 --- /dev/null +++ b/java/PaimonCDCSink/src/main/java/com/amazonaws/services/msf/PaimonCDCSinkJob.java @@ -0,0 +1,107 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.flink.action.Action; +import org.apache.paimon.flink.action.ActionBase; +import org.apache.paimon.flink.action.ActionFactory; + +import java.io.IOException; +import java.util.*; + +public class PaimonCDCSinkJob { + + private static final Logger LOGGER = LogManager.getLogger(PaimonCDCSinkJob.class); + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static final String SEP_KEY = "@_"; + private static final String ACTION_CONF_GROUP = "ActionConf"; + private static final String ACTION_KEY = "action"; + private static final String PARAM_KEY_PREFIX = "--"; + + public static void main(String[] args) throws Exception{ + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + Map confMap = loadApplicationProperties(env); + String[] actionArgs = configToActionParameters(confMap); + if (actionArgs.length < 1) { + LOGGER.error("No action specified"); + System.exit(1); + } + + LOGGER.info("actionArgs: {}", Arrays.toString(actionArgs)); + + Optional actionOpt = ActionFactory.createAction(actionArgs); + + if (actionOpt.isPresent()) { + Action action = actionOpt.get(); + if (action instanceof ActionBase) { + LOGGER.info("ActionBase: {}", action.getClass().getName()); + ((ActionBase) action).withStreamExecutionEnvironment(env).run(); + } else { + action.run(); + } + } else { + LOGGER.info("No paimon flink action service found"); + System.exit(1); + } + } + + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (env instanceof LocalStreamEnvironment) { + LOGGER.debug("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + PaimonCDCSinkJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); + } else { + LOGGER.debug("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + private static String[] configToActionParameters(Map confMap) { + + Properties actionProp = confMap.get(ACTION_CONF_GROUP); + if (actionProp == null) { + LOGGER.error("ActionConf not found in application properties"); + System.exit(1); + } + + String action = actionProp.getProperty(ACTION_KEY); + if (action == null || action.isEmpty()) { + LOGGER.error("Action not found in application properties"); + } + + actionProp.remove(ACTION_KEY); + + List params = new ArrayList<>(); + params.add(action); + + for (Map.Entry confEntry : confMap.entrySet()) { + confEntry.getValue().forEach( + (k, v) -> { + String ks = k.toString(); + int idx = ks.indexOf(SEP_KEY); + String paramKey; + String paramVal; + if (idx != -1) { + paramKey = String.format("%s%s", PARAM_KEY_PREFIX , ks.substring(0, idx)); + paramVal = String.format("%s=%s", ks.substring(idx + SEP_KEY.length()), v); + + } else { + paramKey = String.format("%s%s", PARAM_KEY_PREFIX , ks); + paramVal = v.toString(); + } + params.add(paramKey); + params.add(paramVal); + } + ); + } + + return params.toArray(new String[0]); + } + +} diff --git a/java/PaimonCDCSink/src/main/resources/flink-application-properties-dev.json b/java/PaimonCDCSink/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..cb0fa4d --- /dev/null +++ b/java/PaimonCDCSink/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,39 @@ +[ + { + "PropertyGroupId": "ActionConf", + "PropertyMap": { + "action": "kafka_sync_database", + "warehouse": "s3://bucket/data/prefix", + "database": "paimon_flink", + "primary_keys": "ID", + "table_prefix": "ods_" + } + }, + { + "PropertyGroupId": "KafkaConf", + "PropertyMap": { + "kafka_conf@_properties.bootstrap.servers": "b-2.mycluster.bzvtby.c8.kafka.us-west-2.amazonaws.com:9092,b-1.mycluster.bzvtby.c8.kafka.us-west-2.amazonaws.com:9092", + "kafka_conf@_topic": "kafka_topic", + "kafka_conf@_properties.group.id": 1234546, + "kafka_conf@_properties.auto.offset.reset": "earliest" + } + }, + { + "PropertyGroupId": "CatalogConf", + "PropertyMap": { + "catalog_conf@_metastore": "hive", + "catalog_conf@_hive-conf-dir": "/etc/hive/conf.dist", + "catalog_conf@_lock.enabled": "false", + "catalog_conf@_metastore.client.class": "com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient", + "catalog_conf@_warehouse": "s3://bucket/data/prefix" + } + }, + { + "PropertyGroupId": "TableConf", + "PropertyMap": { + "table_conf@_bucket": "4", + "table_conf@_changelog-producer": "input", + "table_conf@_sink.parallelism": "4" + } + } +] \ No newline at end of file diff --git a/java/PaimonCDCSink/src/main/resources/hive-site.xml b/java/PaimonCDCSink/src/main/resources/hive-site.xml new file mode 100644 index 0000000..95bb612 --- /dev/null +++ b/java/PaimonCDCSink/src/main/resources/hive-site.xml @@ -0,0 +1,273 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + hbase.master + + http://wiki.apache.org/hadoop/Hive/HBaseIntegration + + + + hive.zookeeper.quorum + ip-xx-xx-xx-xx.us-west-2.compute.internal:2181 + + + + hive.llap.zk.sm.connectionString + ip-xx-xx-xx-xx.us-west-2.compute.internal:2181 + + + + hbase.zookeeper.quorum + ip-xx-xx-xx-xx.us-west-2.compute.internal + http://wiki.apache.org/hadoop/Hive/HBaseIntegration + + + + hive.execution.engine + tez + + + + fs.defaultFS + hdfs://ip-xx-xx-xx-xx.us-west-2.compute.internal:8020 + + + + + hive.metastore.uris + thrift://ip-xx-xx-xx-xx.us-west-2.compute.internal:9083 + JDBC connect string for a JDBC metastore + + + + javax.jdo.option.ConnectionURL + jdbc:mysql://ip-xx-xx-xx-xx.us-west-2.compute.internal:3306/hive?createDatabaseIfNotExist=true + username to use against metastore database + + + + javax.jdo.option.ConnectionDriverName + org.mariadb.jdbc.Driver + username to use against metastore database + + + + javax.jdo.option.ConnectionUserName + hive + username to use against metastore database + + + + javax.jdo.option.ConnectionPassword + kWs5sQ8HnZaEC2kj + password to use against metastore database + + + + hive.server2.allow.user.substitution + true + + + + hive.server2.enable.doAs + true + + + + hive.server2.thrift.port + 10000 + + + + hive.server2.thrift.http.port + 10001 + + + + + + hive.optimize.ppd.input.formats + com.amazonaws.emr.s3select.hive.S3SelectableTextInputFormat + + + + s3select.filter + false + + + + hive.server2.in.place.progress + false + + + + hive.llap.zk.registry.user + hadoop + + + + hive.security.metastore.authorization.manager + org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider + + + + hive.log.explain.output + false + + + + datanucleus.fixedDatastore + true + + + + mapred.reduce.tasks + -1 + + + + mapred.max.split.size + 256000000 + + + + hive.mapjoin.hybridgrace.hashtable + false + + + + hive.merge.nway.joins + false + + + + hive.metastore.connect.retries + 15 + + + + hive.optimize.joinreducededuplication + false + + + + hive.optimize.sort.dynamic.partition.threshold + 1 + + + + hive.server2.materializedviews.registry.impl + DUMMY + + + + hive.tez.auto.reducer.parallelism + true + + + + hive.vectorized.execution.mapjoin.minmax.enabled + true + + + + hive.vectorized.execution.mapjoin.native.fast.hashtable.enabled + true + + + + hive.optimize.dynamic.partition.hashjoin + true + + + + hive.compactor.initiator.on + true + + + + hive.blobstore.use.output-committer + true + + + + hive.llap.daemon.service.hosts + @llap0 + + + + hive.llap.execution.mode + only + + + + hive.optimize.metadataonly + true + + + + hive.tez.bucket.pruning + true + + + + hive.exec.mode.local.auto + true + + + + hive.exec.mode.local.auto.inputbytes.max + 50000000 + + + + hive.query.reexecution.stats.persist.scope + hiveserver + + + + hive.metastore.client.factory.class + com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory + + + + hive.auto.convert.join.noconditionaltask.size + 1073741824 + + + + hive.compactor.worker.threads + 1 + + + + + + + diff --git a/java/PaimonCDCSink/src/main/resources/log4j2.properties b/java/PaimonCDCSink/src/main/resources/log4j2.properties new file mode 100644 index 0000000..3546643 --- /dev/null +++ b/java/PaimonCDCSink/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/java/pom.xml b/java/pom.xml index 29e281b..91229c2 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -35,5 +35,6 @@ Serialization/CustomTypeInfo SideOutputs PrometheusSink + PaimonCDCSink \ No newline at end of file