Skip to content

Commit

Permalink
Merge pull request #135 from MabelYC/configSet
Browse files Browse the repository at this point in the history
Add Support to inject external user configs
  • Loading branch information
yananhao12 authored Jan 7, 2025
2 parents c3ba8b3 + 247d77b commit d5efd17
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"JavaTestProperties": {
"SUPPORTED_VERSIONS": ["8", "11", "17", "21"],
"FLINK_VERSIONS": ["1.15", "1.16", "1.17", "1.18"],
"FLINK_VERSIONS": ["1.16", "1.17", "1.18"],
"SPARK_VERSIONS": ["2", "3"]
},
"GoTestProperties": {
Expand Down
5 changes: 3 additions & 2 deletions auto-elr/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ dependencies {
implementation project(":runners:core-java")
implementation project(":runners:java-fn-execution")
implementation project(":runners:samza")
implementation project(":runners:flink:1.15")
implementation project(":runners:flink:1.16")
implementation project(":runners:flink:1.17")
implementation project(":runners:flink:1.18")
implementation project(":runners:spark:2")
implementation project(":runners:spark:3")
Expand Down Expand Up @@ -60,8 +60,9 @@ tasks.all { task ->
task.mustRunAfter(":runners:core-java:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:java-fn-execution:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:samza:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:flink:1.15:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:flink:1.16:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:flink:1.17:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:flink:1.18:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:spark:2:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:spark:3:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
task.mustRunAfter(":runners:portability:java:publishMavenJavaPublicationToLinkedin.jfrog.httpsRepository")
Expand Down
18 changes: 6 additions & 12 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,12 @@ tasks.register("javaPreCommit") {
dependsOn(":runners:core-java:build")
dependsOn(":runners:direct-java:build")
dependsOn(":runners:extensions-java:metrics:build")
dependsOn(":runners:flink:1.12:build")
dependsOn(":runners:flink:1.12:job-server:build")
dependsOn(":runners:flink:1.13:build")
dependsOn(":runners:flink:1.13:job-server:build")
dependsOn(":runners:flink:1.14:build")
dependsOn(":runners:flink:1.14:job-server:build")
dependsOn(":runners:flink:1.15:build")
dependsOn(":runners:flink:1.15:job-server:build")
dependsOn(":runners:flink:1.16:build")
dependsOn(":runners:flink:1.16:job-server:build")
dependsOn(":runners:flink:1.17:build")
dependsOn(":runners:flink:1.17:job-server:build")
dependsOn(":runners:flink:1.18:build")
dependsOn(":runners:flink:1.18:job-server:build")
dependsOn(":runners:google-cloud-dataflow-java:build")
dependsOn(":runners:google-cloud-dataflow-java:examples-streaming:build")
dependsOn(":runners:google-cloud-dataflow-java:examples:build")
Expand Down Expand Up @@ -339,11 +335,9 @@ tasks.register("javaPostCommit") {

tasks.register("javaPostCommitSickbay") {
dependsOn(":runners:samza:validatesRunnerSickbay")
dependsOn(":runners:flink:1.12:validatesRunnerSickbay")
dependsOn(":runners:flink:1.13:validatesRunnerSickbay")
dependsOn(":runners:flink:1.14:validatesRunnerSickbay")
dependsOn(":runners:flink:1.15:validatesRunnerSickbay")
dependsOn(":runners:flink:1.16:validatesRunnerSickbay")
dependsOn(":runners:flink:1.17:validatesRunnerSickbay")
dependsOn(":runners:flink:1.18:validatesRunnerSickbay")
dependsOn(":runners:spark:2:job-server:validatesRunnerSickbay")
dependsOn(":runners:spark:3:job-server:validatesRunnerSickbay")
dependsOn(":runners:direct-java:validatesRunnerSickbay")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Automatically use the official release version if we are performing a release
// otherwise append '-SNAPSHOT'
project.version = '2.45.32'
project.version = '2.45.33'
if (isLinkedin(project)) {
project.ext.mavenGroupId = 'com.linkedin.beam'
}
Expand Down Expand Up @@ -1276,6 +1276,9 @@ class BeamModulePlugin implements Plugin<Project> {
permitTestUnusedDeclared dep
}
permitUnusedDeclared "org.checkerframework:checker-qual:$checkerframework_version"
permitUnusedDeclared "org.apache.flink:flink-table-api-java-bridge:1.18.0"
permitUnusedDeclared "org.apache.flink:flink-table-api-java:1.18.0"
permitUnusedDeclared "org.apache.flink:flink-table-common:1.18.0"
}

if (configuration.enableStrictDependencies) {
Expand Down Expand Up @@ -1658,12 +1661,20 @@ class BeamModulePlugin implements Plugin<Project> {
}

project.task('sourcesJar', type: Jar) {
if (it.getProject().toString().contains("runners:") ||
it.getProject().toString().contains("sdks:java:") ||
it.getProject().toString().contains("examples:")) {
duplicatesStrategy = 'exclude'
}
from project.sourceSets.main.allSource
classifier = 'sources'
}
project.artifacts.archives project.sourcesJar

project.task('testSourcesJar', type: Jar) {
if (it.getProject().toString().contains("runners:")) {
duplicatesStrategy = 'exclude'
}
from project.sourceSets.test.allSource
classifier = 'test-sources'
}
Expand Down
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ signing.gnupg.useLegacyGpg=true
# buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
# To build a custom Beam version make sure you change it in both places, see
# https://github.com/apache/beam/issues/21302.
version=2.45.32
sdk_version=2.45.32
version=2.45.33
sdk_version=2.45.33

javaVersion=1.8

docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_

# supported flink versions
flink_versions=1.15,1.16,1.17,1.18
flink_versions=1.16,1.17,1.18
# supported python versions
python_versions=3.8,3.9,3.10,3.11
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ Additionally, you can read [here](https://beam.apache.org/documentation/runners/
#### Run example

##### Portable
1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.15`, `Flink 1.16`, `Flink 1.17`, `Flink 1.18`.
1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.16`, `Flink 1.17`, `Flink 1.18`.
2. Start the JobService endpoint: `docker run --net=host apache/beam_flink1.10_job_server:latest`
3. Submit the pipeline to the above endpoint by using the PortableRunner, job_endpoint set to localhost:8099 (this is the default address of the JobService). Optionally set environment_type set to LOOPBACK. For example:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.expansion.ExternalConfigRegistrar;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -78,7 +79,7 @@ static ExecutionEnvironment createBatchExecutionEnvironment(

// Although Flink uses Rest, it expects the address not to contain a http scheme
String flinkMasterHostPort = stripHttpSchema(options.getFlinkMaster());
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap());
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options);
ExecutionEnvironment flinkBatchEnv;

// depending on the master, create the right environment.
Expand Down Expand Up @@ -164,7 +165,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(

// Although Flink uses Rest, it expects the address not to contain a http scheme
String masterUrl = stripHttpSchema(options.getFlinkMaster());
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap());
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options);
StreamExecutionEnvironment flinkStreamEnv;

// depending on the master, create the right environment.
Expand Down Expand Up @@ -377,10 +378,13 @@ private static int determineParallelism(
return 1;
}

private static Configuration getFlinkConfiguration(
@Nullable String flinkConfDir, @Nullable Map<String, String> flinkConfMap) {
@VisibleForTesting
static Configuration getFlinkConfiguration(
@Nullable String flinkConfDir, FlinkPipelineOptions flinkPipelineOptions) {
Configuration dynamicProperties = null;
if (flinkConfMap != null && !flinkConfMap.isEmpty()) {
final Map<String, String> flinkConfMap = flinkPipelineOptions.getFlinkConfMap();
flinkConfMap.putAll(ExternalConfigRegistrar.getConfig(flinkPipelineOptions));
if (!flinkConfMap.isEmpty()) {
dynamicProperties = Configuration.fromMap(flinkConfMap);
}
if (flinkConfDir != null && !flinkConfDir.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
*/
package org.apache.beam.runners.flink;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.FileStagingOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -307,10 +309,19 @@ public interface FlinkPipelineOptions
void setFlinkConfDir(String confDir);

@Description("Map containing Flink configurations")
@Default.InstanceFactory(FlinkConfMapFactory.class)
Map<String, String> getFlinkConfMap();

void setFlinkConfMap(Map<String, String> flinkConfMap);

/** Returns an empty map, to avoid handling null. */
class FlinkConfMapFactory implements DefaultValueFactory<Map<String, String>> {
@Override
public Map<String, String> create(PipelineOptions options) {
return new HashMap<>();
}
}

static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.beam.runners.flink;

import static org.apache.beam.runners.flink.FlinkExecutionEnvironments.getFlinkConfiguration;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.IOException;
Expand All @@ -31,6 +34,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.RemoteEnvironment;
Expand Down Expand Up @@ -493,6 +498,22 @@ public void shouldCreateRocksDbStateBackend() {
assertThat(sev.getStateBackend(), instanceOf(RocksDBStateBackend.class));
}

@Test
public void testGetFlinkConfiguration() {
Configuration configuration = getFlinkConfiguration(null, getDefaultPipelineOptions());
assertNotNull(configuration);
}

@Test
public void testGetFlinkConfigurationWithConfigMap() {
FlinkPipelineOptions options = getDefaultPipelineOptions();
options.setFlinkConfMap(
new HashMap<>(ImmutableMap.<String, String>builder().put("mapKey", "mapValue").build()));
Configuration configuration = getFlinkConfiguration(null, options);
assertTrue(configuration.containsKey("mapKey"));
assertEquals("mapValue", configuration.getString("mapKey", ""));
}

private void checkHostAndPort(Object env, String expectedHost, int expectedPort) {
String host =
((Configuration) Whitebox.getInternalState(env, "configuration"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.beam.runners.samza.container.BeamJobCoordinatorRunner;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.util.ConfigUtils;
import org.apache.beam.sdk.expansion.ExternalConfigRegistrar;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.samza.config.ApplicationConfig;
Expand Down Expand Up @@ -174,6 +175,7 @@ private static Map<String, String> createUserConfig(SamzaPipelineOptions options
if (options.getConfigOverride() != null) {
config.putAll(options.getConfigOverride());
}
config.putAll(ExternalConfigRegistrar.getConfig(options));

return config;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.apache.beam.sdk.expansion;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.ServiceLoader;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;

/**
* A LinkedIn factory interface for runner-specific runtime to load external configs.
*/
@Experimental
public interface ExternalConfigRegistrar {
<K, V> Map<K, V> getExternalConfig(PipelineOptions options);

/**
* Find the {@link ExternalConfigRegistrar} to load external configs for different runners.
* @param options the pipeline options
* @return a map contains external configs
* @param <K> the type of the Key
* @param <V> the type of the Value
*/
static <K, V> Map<K, V> getConfig(PipelineOptions options) {
final Iterator<ExternalConfigRegistrar> factories =
ServiceLoader.load(ExternalConfigRegistrar.class).iterator();

return factories.hasNext() ? Iterators.getOnlyElement(factories).getExternalConfig(options) : new HashMap<>();
}
}
4 changes: 0 additions & 4 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ include(":runners:extensions-java:metrics")
* verify versions in website/www/site/content/en/documentation/runners/flink.md
* verify version in sdks/python/apache_beam/runners/interactive/interactive_beam.py
*/
// Flink 1.15
include(":runners:flink:1.15")
include(":runners:flink:1.15:job-server")
include(":runners:flink:1.15:job-server-container")
// Flink 1.16
include(":runners:flink:1.16")
include(":runners:flink:1.16:job-server")
Expand Down

0 comments on commit d5efd17

Please sign in to comment.