From 94878ea5aff957b2bf1edafe8147f3ca1caeff7c Mon Sep 17 00:00:00 2001 From: MabelYC Date: Fri, 3 Jan 2025 12:11:26 -0800 Subject: [PATCH 01/10] Add External Config injection for pipelines --- .../test-properties.json | 2 +- auto-elr/build.gradle | 5 ++-- build.gradle.kts | 18 +++++---------- .../beam/gradle/BeamModulePlugin.groovy | 14 ++++++++++- .../runner-concepts/description.md | 2 +- .../flink/FlinkExecutionEnvironments.java | 11 +++++---- .../runners/flink/FlinkPipelineOptions.java | 13 +++++++++++ .../samza/translation/ConfigBuilder.java | 2 ++ .../expansion/ExternalConfigRegistrar.java | 23 +++++++++++++++++++ settings.gradle.kts | 4 ---- 10 files changed, 69 insertions(+), 25 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java diff --git a/.github/actions/setup-default-test-properties/test-properties.json b/.github/actions/setup-default-test-properties/test-properties.json index 84c7691077e2..f2180bb2f670 100644 --- a/.github/actions/setup-default-test-properties/test-properties.json +++ b/.github/actions/setup-default-test-properties/test-properties.json @@ -14,7 +14,7 @@ }, "JavaTestProperties": { "SUPPORTED_VERSIONS": ["8", "11", "17", "21"], - "FLINK_VERSIONS": ["1.15", "1.16", "1.17", "1.18"], + "FLINK_VERSIONS": ["1.16", "1.17", "1.18"], "SPARK_VERSIONS": ["2", "3"] }, "GoTestProperties": { diff --git a/auto-elr/build.gradle b/auto-elr/build.gradle index 4f086975e646..34cc5394b3b0 100644 --- a/auto-elr/build.gradle +++ b/auto-elr/build.gradle @@ -31,8 +31,8 @@ dependencies { implementation project(":runners:core-java") implementation project(":runners:java-fn-execution") implementation project(":runners:samza") - implementation project(":runners:flink:1.15") implementation project(":runners:flink:1.16") + implementation project(":runners:flink:1.17") implementation project(":runners:flink:1.18") implementation project(":runners:spark:2") implementation project(":runners:spark:3") @@ -60,8 +60,9 @@ tasks.all { task -> task.mustRunAfter(":runners:core-java:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository") task.mustRunAfter(":runners:java-fn-execution:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository") task.mustRunAfter(":runners:samza:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository") - task.mustRunAfter(":runners:flink:1.15:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository") task.mustRunAfter(":runners:flink:1.16:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository") + task.mustRunAfter(":runners:flink:1.17:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository") + task.mustRunAfter(":runners:flink:1.18:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository") task.mustRunAfter(":runners:spark:2:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository") task.mustRunAfter(":runners:spark:3:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository") task.mustRunAfter(":runners:portability:java:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository") diff --git a/build.gradle.kts b/build.gradle.kts index ece1e7d0bed7..27eed49a02f7 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -234,16 +234,12 @@ tasks.register("javaPreCommit") { dependsOn(":runners:core-java:build") dependsOn(":runners:direct-java:build") dependsOn(":runners:extensions-java:metrics:build") - dependsOn(":runners:flink:1.12:build") - dependsOn(":runners:flink:1.12:job-server:build") - dependsOn(":runners:flink:1.13:build") - dependsOn(":runners:flink:1.13:job-server:build") - dependsOn(":runners:flink:1.14:build") - dependsOn(":runners:flink:1.14:job-server:build") - dependsOn(":runners:flink:1.15:build") - dependsOn(":runners:flink:1.15:job-server:build") dependsOn(":runners:flink:1.16:build") dependsOn(":runners:flink:1.16:job-server:build") + dependsOn(":runners:flink:1.17:build") + dependsOn(":runners:flink:1.17:job-server:build") + dependsOn(":runners:flink:1.18:build") + dependsOn(":runners:flink:1.18:job-server:build") dependsOn(":runners:google-cloud-dataflow-java:build") dependsOn(":runners:google-cloud-dataflow-java:examples-streaming:build") dependsOn(":runners:google-cloud-dataflow-java:examples:build") @@ -339,11 +335,9 @@ tasks.register("javaPostCommit") { tasks.register("javaPostCommitSickbay") { dependsOn(":runners:samza:validatesRunnerSickbay") - dependsOn(":runners:flink:1.12:validatesRunnerSickbay") - dependsOn(":runners:flink:1.13:validatesRunnerSickbay") - dependsOn(":runners:flink:1.14:validatesRunnerSickbay") - dependsOn(":runners:flink:1.15:validatesRunnerSickbay") dependsOn(":runners:flink:1.16:validatesRunnerSickbay") + dependsOn(":runners:flink:1.17:validatesRunnerSickbay") + dependsOn(":runners:flink:1.18:validatesRunnerSickbay") dependsOn(":runners:spark:2:job-server:validatesRunnerSickbay") dependsOn(":runners:spark:3:job-server:validatesRunnerSickbay") dependsOn(":runners:direct-java:validatesRunnerSickbay") diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 3cc8cf0c4802..3c450e5f17f5 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -46,6 +46,7 @@ import org.gradle.api.tasks.testing.Test import org.gradle.api.tasks.PathSensitive import org.gradle.api.tasks.PathSensitivity import org.gradle.testing.jacoco.tasks.JacocoReport +import groovy.util.logging.Slf4j import java.net.ServerSocket /** @@ -1528,6 +1529,7 @@ class BeamModulePlugin implements Plugin { // // Consider re-enabling if we can get annotations for the generated // code and test libraries we use. + println("current2 it is: " + it) checkerFramework { skipCheckerFramework = true } @@ -1658,12 +1660,22 @@ class BeamModulePlugin implements Plugin { } project.task('sourcesJar', type: Jar) { - from project.sourceSets.main.allSource + if (it.getProject().toString().contains("runners:") || + it.getProject().toString().contains("sdks:java:") || + it.getProject().toString().contains("examples:")) { + duplicatesStrategy = 'exclude' + } + project.sourceSets.main.allSource.each { + from it + } classifier = 'sources' } project.artifacts.archives project.sourcesJar project.task('testSourcesJar', type: Jar) { + if (it.getProject().toString().contains("runners:")) { + duplicatesStrategy = 'exclude' + } from project.sourceSets.test.allSource classifier = 'test-sources' } diff --git a/learning/tour-of-beam/learning-content/java/introduction/introduction-concepts/runner-concepts/description.md b/learning/tour-of-beam/learning-content/java/introduction/introduction-concepts/runner-concepts/description.md index 9a0a53f9b49c..7b84c74b41f1 100644 --- a/learning/tour-of-beam/learning-content/java/introduction/introduction-concepts/runner-concepts/description.md +++ b/learning/tour-of-beam/learning-content/java/introduction/introduction-concepts/runner-concepts/description.md @@ -111,7 +111,7 @@ Additionally, you can read [here](https://beam.apache.org/documentation/runners/ #### Run example ##### Portable -1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.15`, `Flink 1.16`, `Flink 1.17`, `Flink 1.18`. +1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.16`, `Flink 1.17`, `Flink 1.18`. 2. Start the JobService endpoint: `docker run --net=host apache/beam_flink1.10_job_server:latest` 3. Submit the pipeline to the above endpoint by using the PortableRunner, job_endpoint set to localhost:8099 (this is the default address of the JobService). Optionally set environment_type set to LOOPBACK. For example: diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java index 25a553af372c..82024aed3430 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.beam.sdk.expansion.ExternalConfigRegistrar; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; @@ -78,7 +79,7 @@ static ExecutionEnvironment createBatchExecutionEnvironment( // Although Flink uses Rest, it expects the address not to contain a http scheme String flinkMasterHostPort = stripHttpSchema(options.getFlinkMaster()); - Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap()); + Configuration flinkConfiguration = getFlinkConfiguration(confDir, options); ExecutionEnvironment flinkBatchEnv; // depending on the master, create the right environment. @@ -164,7 +165,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment( // Although Flink uses Rest, it expects the address not to contain a http scheme String masterUrl = stripHttpSchema(options.getFlinkMaster()); - Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap()); + Configuration flinkConfiguration = getFlinkConfiguration(confDir, options); StreamExecutionEnvironment flinkStreamEnv; // depending on the master, create the right environment. @@ -378,9 +379,11 @@ private static int determineParallelism( } private static Configuration getFlinkConfiguration( - @Nullable String flinkConfDir, @Nullable Map flinkConfMap) { + @Nullable String flinkConfDir, FlinkPipelineOptions flinkPipelineOptions) { Configuration dynamicProperties = null; - if (flinkConfMap != null && !flinkConfMap.isEmpty()) { + final Map flinkConfMap = flinkPipelineOptions.getFlinkConfMap(); + flinkConfMap.putAll(ExternalConfigRegistrar.getFactory(flinkPipelineOptions)); + if (!flinkConfMap.isEmpty()) { dynamicProperties = Configuration.fromMap(flinkConfMap); } if (flinkConfDir != null && !flinkConfDir.isEmpty()) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index aa83b60a8ac6..4b839a21f08e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -17,9 +17,13 @@ */ package org.apache.beam.runners.flink; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.FileStagingOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -307,10 +311,19 @@ public interface FlinkPipelineOptions void setFlinkConfDir(String confDir); @Description("Map containing Flink configurations") + @Default.InstanceFactory(FlinkConfMapFactory.class) Map getFlinkConfMap(); void setFlinkConfMap(Map flinkConfMap); + /** Returns an empty map, to avoid handling null. */ + class FlinkConfMapFactory implements DefaultValueFactory> { + @Override + public Map create(PipelineOptions options) { + return new HashMap<>(); + } + } + static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java index f5ed34637a8f..67f8a29556e7 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java @@ -40,6 +40,7 @@ import org.apache.beam.runners.samza.container.BeamJobCoordinatorRunner; import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals; import org.apache.beam.runners.samza.util.ConfigUtils; +import org.apache.beam.sdk.expansion.ExternalConfigRegistrar; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.samza.config.ApplicationConfig; @@ -174,6 +175,7 @@ private static Map createUserConfig(SamzaPipelineOptions options if (options.getConfigOverride() != null) { config.putAll(options.getConfigOverride()); } + config.putAll(ExternalConfigRegistrar.getFactory(options)); return config; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java new file mode 100644 index 000000000000..043f3bdc096a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java @@ -0,0 +1,23 @@ +package org.apache.beam.sdk.expansion; + +import java.util.Iterator; +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; + +@Experimental +/** + * Inject external configs to pipelineOptions + */ +public interface ExternalConfigRegistrar { + Map getExternalConfig(PipelineOptions options); + + static Map getFactory(PipelineOptions options) { + final Iterator factories = + ServiceLoader.load(ExternalConfigRegistrar.class).iterator(); + + return Iterators.getOnlyElement(factories).getExternalConfig(options); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 0b82ea997ccc..53d820303070 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -87,10 +87,6 @@ include(":runners:extensions-java:metrics") * verify versions in website/www/site/content/en/documentation/runners/flink.md * verify version in sdks/python/apache_beam/runners/interactive/interactive_beam.py */ -// Flink 1.15 -include(":runners:flink:1.15") -include(":runners:flink:1.15:job-server") -include(":runners:flink:1.15:job-server-container") // Flink 1.16 include(":runners:flink:1.16") include(":runners:flink:1.16:job-server") From bf891f22438da9e7a5a095afdb340a72c338ff75 Mon Sep 17 00:00:00 2001 From: MabelYC Date: Fri, 3 Jan 2025 12:15:18 -0800 Subject: [PATCH 02/10] Add External Config injection for pipelines --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- gradle.properties | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 3c450e5f17f5..1090d19c553a 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -399,7 +399,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.32' + project.version = '2.45.33' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index 190a2c82e3ed..7c864d29975c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.32 -sdk_version=2.45.32 +version=2.45.33 +sdk_version=2.45.33 javaVersion=1.8 From 2c6cd1be029faa9d4b47b9c2c184da7b543033f6 Mon Sep 17 00:00:00 2001 From: MabelYC Date: Fri, 3 Jan 2025 13:25:55 -0800 Subject: [PATCH 03/10] Add External Config injection for pipelines --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 6 +----- .../org/apache/beam/runners/flink/FlinkPipelineOptions.java | 2 -- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 1090d19c553a..69e1a37bb59c 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -46,7 +46,6 @@ import org.gradle.api.tasks.testing.Test import org.gradle.api.tasks.PathSensitive import org.gradle.api.tasks.PathSensitivity import org.gradle.testing.jacoco.tasks.JacocoReport -import groovy.util.logging.Slf4j import java.net.ServerSocket /** @@ -1529,7 +1528,6 @@ class BeamModulePlugin implements Plugin { // // Consider re-enabling if we can get annotations for the generated // code and test libraries we use. - println("current2 it is: " + it) checkerFramework { skipCheckerFramework = true } @@ -1665,9 +1663,7 @@ class BeamModulePlugin implements Plugin { it.getProject().toString().contains("examples:")) { duplicatesStrategy = 'exclude' } - project.sourceSets.main.allSource.each { - from it - } + from project.sourceSets.main.allSource classifier = 'sources' } project.artifacts.archives project.sourcesJar diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 4b839a21f08e..c8c9817f4389 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -17,9 +17,7 @@ */ package org.apache.beam.runners.flink; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; From aacce38e171277adba9318803d1a46a7db8f7f03 Mon Sep 17 00:00:00 2001 From: MabelYC Date: Fri, 3 Jan 2025 14:04:30 -0800 Subject: [PATCH 04/10] remove supported flink versions --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 7c864d29975c..026d6aa3bb24 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,6 +39,6 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.15,1.16,1.17,1.18 +flink_versions=1.16,1.17,1.18 # supported python versions python_versions=3.8,3.9,3.10,3.11 From a42bc9e71c134856c29b35f0c307739f46dcf31c Mon Sep 17 00:00:00 2001 From: MabelYC Date: Sat, 4 Jan 2025 22:15:35 -0800 Subject: [PATCH 05/10] handle null --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 ++ .../apache/beam/sdk/expansion/ExternalConfigRegistrar.java | 6 ++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 69e1a37bb59c..a809419a0b03 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1276,6 +1276,8 @@ class BeamModulePlugin implements Plugin { permitTestUnusedDeclared dep } permitUnusedDeclared "org.checkerframework:checker-qual:$checkerframework_version" + permitUnusedDeclared "org.apache.flink:flink-table-api-java-bridge:1.18.0" + permitUnusedDeclared "org.apache.flink:flink-table-api-java:1.18.0" } if (configuration.enableStrictDependencies) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java index 043f3bdc096a..ef258560c086 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java @@ -1,5 +1,6 @@ package org.apache.beam.sdk.expansion; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.ServiceLoader; @@ -8,9 +9,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; @Experimental -/** - * Inject external configs to pipelineOptions - */ public interface ExternalConfigRegistrar { Map getExternalConfig(PipelineOptions options); @@ -18,6 +16,6 @@ static Map getFactory(PipelineOptions options) { final Iterator factories = ServiceLoader.load(ExternalConfigRegistrar.class).iterator(); - return Iterators.getOnlyElement(factories).getExternalConfig(options); + return factories.hasNext() ? Iterators.getOnlyElement(factories).getExternalConfig(options) : new HashMap<>(); } } From 30bc9b048498fb36729c0b808c6d3f0bcc2f42b4 Mon Sep 17 00:00:00 2001 From: MabelYC Date: Sat, 4 Jan 2025 22:29:17 -0800 Subject: [PATCH 06/10] add permit --- .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index a809419a0b03..84981c646bfe 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1278,6 +1278,7 @@ class BeamModulePlugin implements Plugin { permitUnusedDeclared "org.checkerframework:checker-qual:$checkerframework_version" permitUnusedDeclared "org.apache.flink:flink-table-api-java-bridge:1.18.0" permitUnusedDeclared "org.apache.flink:flink-table-api-java:1.18.0" + permitUnusedDeclared "org.apache.flink:flink-table-common:1.18.0" } if (configuration.enableStrictDependencies) { From 623965e277edbcd889526d869aaae5ee9f04e056 Mon Sep 17 00:00:00 2001 From: MabelYC Date: Mon, 6 Jan 2025 14:26:46 -0800 Subject: [PATCH 07/10] address comments --- .../flink/FlinkExecutionEnvironments.java | 5 ++- .../flink/FlinkExecutionEnvironmentsTest.java | 45 ++++++++++++++++++- .../samza/translation/ConfigBuilder.java | 2 +- .../expansion/ExternalConfigRegistrar.java | 12 ++++- 4 files changed, 58 insertions(+), 6 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java index 82024aed3430..f5d7a1fe7bab 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -378,11 +378,12 @@ private static int determineParallelism( return 1; } - private static Configuration getFlinkConfiguration( + @VisibleForTesting + static Configuration getFlinkConfiguration( @Nullable String flinkConfDir, FlinkPipelineOptions flinkPipelineOptions) { Configuration dynamicProperties = null; final Map flinkConfMap = flinkPipelineOptions.getFlinkConfMap(); - flinkConfMap.putAll(ExternalConfigRegistrar.getFactory(flinkPipelineOptions)); + flinkConfMap.putAll(ExternalConfigRegistrar.getConfig(flinkPipelineOptions)); if (!flinkConfMap.isEmpty()) { dynamicProperties = Configuration.fromMap(flinkConfMap); } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index c2d5cc3ebc38..73698a222693 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -17,12 +17,13 @@ */ package org.apache.beam.runners.flink; +import static org.apache.beam.runners.flink.FlinkExecutionEnvironments.*; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.*; +import com.google.auto.service.AutoService; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -31,6 +32,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.expansion.ExternalConfigRegistrar; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.RemoteEnvironment; @@ -493,6 +498,42 @@ public void shouldCreateRocksDbStateBackend() { assertThat(sev.getStateBackend(), instanceOf(RocksDBStateBackend.class)); } + @Test + public void testGetFlinkConfiguration() { + Configuration configuration = getFlinkConfiguration(null, getDefaultPipelineOptions()); + assertNotNull(configuration); + } + + @Test + public void testGetFlinkConfigurationWithExternalConfigs() { + // mock a ExternConfigRegistrar. + @AutoService(ExternalConfigRegistrar.class) + class Config implements ExternalConfigRegistrar{ + @Override + public Map getExternalConfig(PipelineOptions options) { + // insert flink related configs + return new HashMap() {{ + put("key", "value"); + }}; + } + } + FlinkPipelineOptions options = getDefaultPipelineOptions(); + Configuration configuration = getFlinkConfiguration(null, options); + assertTrue(configuration.containsKey("key")); + assertEquals(configuration.getString("key", ""), "value"); + } + + @Test + public void testGetFlinkConfigurationWithConfigMap() { + FlinkPipelineOptions options = getDefaultPipelineOptions(); + options.setFlinkConfMap(new HashMap() {{ + put("mapKey", "mapValue"); + }}); + Configuration configuration = getFlinkConfiguration(null, options); + assertTrue(configuration.containsKey("mapKey")); + assertEquals(configuration.getString("mapKey", ""), "mapValue"); + } + private void checkHostAndPort(Object env, String expectedHost, int expectedPort) { String host = ((Configuration) Whitebox.getInternalState(env, "configuration")) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java index 67f8a29556e7..f616af28fa2c 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java @@ -175,7 +175,7 @@ private static Map createUserConfig(SamzaPipelineOptions options if (options.getConfigOverride() != null) { config.putAll(options.getConfigOverride()); } - config.putAll(ExternalConfigRegistrar.getFactory(options)); + config.putAll(ExternalConfigRegistrar.getConfig(options)); return config; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java index ef258560c086..f2cffd52e2e5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalConfigRegistrar.java @@ -8,11 +8,21 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; +/** + * A LinkedIn factory interface for runner-specific runtime to load external configs. + */ @Experimental public interface ExternalConfigRegistrar { Map getExternalConfig(PipelineOptions options); - static Map getFactory(PipelineOptions options) { + /** + * Find the {@link ExternalConfigRegistrar} to load external configs for different runners. + * @param options the pipeline options + * @return a map contains external configs + * @param the type of the Key + * @param the type of the Value + */ + static Map getConfig(PipelineOptions options) { final Iterator factories = ServiceLoader.load(ExternalConfigRegistrar.class).iterator(); From 59db6de1f745fa44102cc878f858f09abebafbd3 Mon Sep 17 00:00:00 2001 From: MabelYC Date: Mon, 6 Jan 2025 14:34:58 -0800 Subject: [PATCH 08/10] address comments --- .../flink/FlinkExecutionEnvironmentsTest.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index 73698a222693..75d0c4940402 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -508,13 +508,15 @@ public void testGetFlinkConfiguration() { public void testGetFlinkConfigurationWithExternalConfigs() { // mock a ExternConfigRegistrar. @AutoService(ExternalConfigRegistrar.class) - class Config implements ExternalConfigRegistrar{ + class Config implements ExternalConfigRegistrar { @Override public Map getExternalConfig(PipelineOptions options) { // insert flink related configs - return new HashMap() {{ + return new HashMap() { + { put("key", "value"); - }}; + } + }; } } FlinkPipelineOptions options = getDefaultPipelineOptions(); @@ -526,9 +528,12 @@ public Map getExternalConfig(PipelineOptions options) { @Test public void testGetFlinkConfigurationWithConfigMap() { FlinkPipelineOptions options = getDefaultPipelineOptions(); - options.setFlinkConfMap(new HashMap() {{ - put("mapKey", "mapValue"); - }}); + options.setFlinkConfMap( + new HashMap() { + { + put("mapKey", "mapValue"); + } + }); Configuration configuration = getFlinkConfiguration(null, options); assertTrue(configuration.containsKey("mapKey")); assertEquals(configuration.getString("mapKey", ""), "mapValue"); From 62e3ee64a3f3b3cade7b03047635e5fd9a852f9b Mon Sep 17 00:00:00 2001 From: MabelYC Date: Mon, 6 Jan 2025 14:38:39 -0800 Subject: [PATCH 09/10] address comments --- .../beam/runners/flink/FlinkExecutionEnvironmentsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index 75d0c4940402..6b0a6a5b2ee4 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -514,7 +514,7 @@ public Map getExternalConfig(PipelineOptions options) { // insert flink related configs return new HashMap() { { - put("key", "value"); + put("key", "value"); } }; } From 247d77b8c9482de5b0567e5bf7106a8621a733bc Mon Sep 17 00:00:00 2001 From: MabelYC Date: Tue, 7 Jan 2025 09:15:41 -0800 Subject: [PATCH 10/10] add unit tests --- .../flink/FlinkExecutionEnvironmentsTest.java | 41 ++++--------------- 1 file changed, 8 insertions(+), 33 deletions(-) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index 6b0a6a5b2ee4..48350f5436ce 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -17,13 +17,15 @@ */ package org.apache.beam.runners.flink; -import static org.apache.beam.runners.flink.FlinkExecutionEnvironments.*; +import static org.apache.beam.runners.flink.FlinkExecutionEnvironments.getFlinkConfiguration; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.Is.is; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; -import com.google.auto.service.AutoService; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -33,9 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Map; -import org.apache.beam.sdk.expansion.ExternalConfigRegistrar; -import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.RemoteEnvironment; @@ -504,39 +504,14 @@ public void testGetFlinkConfiguration() { assertNotNull(configuration); } - @Test - public void testGetFlinkConfigurationWithExternalConfigs() { - // mock a ExternConfigRegistrar. - @AutoService(ExternalConfigRegistrar.class) - class Config implements ExternalConfigRegistrar { - @Override - public Map getExternalConfig(PipelineOptions options) { - // insert flink related configs - return new HashMap() { - { - put("key", "value"); - } - }; - } - } - FlinkPipelineOptions options = getDefaultPipelineOptions(); - Configuration configuration = getFlinkConfiguration(null, options); - assertTrue(configuration.containsKey("key")); - assertEquals(configuration.getString("key", ""), "value"); - } - @Test public void testGetFlinkConfigurationWithConfigMap() { FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setFlinkConfMap( - new HashMap() { - { - put("mapKey", "mapValue"); - } - }); + new HashMap<>(ImmutableMap.builder().put("mapKey", "mapValue").build())); Configuration configuration = getFlinkConfiguration(null, options); assertTrue(configuration.containsKey("mapKey")); - assertEquals(configuration.getString("mapKey", ""), "mapValue"); + assertEquals("mapValue", configuration.getString("mapKey", "")); } private void checkHostAndPort(Object env, String expectedHost, int expectedPort) {