diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index 6b0a6a5b2ee4..48350f5436ce 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -17,13 +17,15 @@ */ package org.apache.beam.runners.flink; -import static org.apache.beam.runners.flink.FlinkExecutionEnvironments.*; +import static org.apache.beam.runners.flink.FlinkExecutionEnvironments.getFlinkConfiguration; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.Is.is; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; -import com.google.auto.service.AutoService; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -33,9 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Map; -import org.apache.beam.sdk.expansion.ExternalConfigRegistrar; -import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.RemoteEnvironment; @@ -504,39 +504,14 @@ public void testGetFlinkConfiguration() { assertNotNull(configuration); } - @Test - public void testGetFlinkConfigurationWithExternalConfigs() { - // mock a ExternConfigRegistrar. - @AutoService(ExternalConfigRegistrar.class) - class Config implements ExternalConfigRegistrar { - @Override - public Map getExternalConfig(PipelineOptions options) { - // insert flink related configs - return new HashMap() { - { - put("key", "value"); - } - }; - } - } - FlinkPipelineOptions options = getDefaultPipelineOptions(); - Configuration configuration = getFlinkConfiguration(null, options); - assertTrue(configuration.containsKey("key")); - assertEquals(configuration.getString("key", ""), "value"); - } - @Test public void testGetFlinkConfigurationWithConfigMap() { FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setFlinkConfMap( - new HashMap() { - { - put("mapKey", "mapValue"); - } - }); + new HashMap<>(ImmutableMap.builder().put("mapKey", "mapValue").build())); Configuration configuration = getFlinkConfiguration(null, options); assertTrue(configuration.containsKey("mapKey")); - assertEquals(configuration.getString("mapKey", ""), "mapValue"); + assertEquals("mapValue", configuration.getString("mapKey", "")); } private void checkHostAndPort(Object env, String expectedHost, int expectedPort) {