Skip to content

Commit

Permalink
Update to flink 1.11.1 (#63)
Browse files Browse the repository at this point in the history
Update to Flink 1.11.1
  • Loading branch information
tsreaper authored and godfreyhe committed Aug 28, 2020
1 parent df507dc commit 63cc437
Show file tree
Hide file tree
Showing 19 changed files with 88 additions and 98 deletions.
4 changes: 2 additions & 2 deletions bin/sql-gateway.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ FULL_CLASSPATH="`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLIN

# build log config
log=$FLINK_SQL_GATEWAY_LOG/flink-sql-gateway-$FLINK_IDENT_STRING-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_SQL_GATEWAY_CONF"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_SQL_GATEWAY_CONF"/logback.xml)
log_setting=(-Dlog.file="$log" -Dlog4j.configurationFile=file:"$FLINK_SQL_GATEWAY_CONF"/log4j.properties -Dlog4j.configuration=file:"$FLINK_SQL_GATEWAY_CONF"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_SQL_GATEWAY_CONF"/logback.xml)

# read jvm args from config
#
Expand All @@ -106,7 +106,7 @@ if [[ $? -ne 0 ]]; then
echo "$jvm_args_output" 1>&2
exit 1
fi
JVM_ARGS=`extractExecutionParams "$jvm_args_output"`
JVM_ARGS=`extractExecutionResults "$jvm_args_output" 1`


if [ -n "$FLINK_SQL_GATEWAY_JAR" ]; then
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ under the License.
<packaging>jar</packaging>

<properties>
<flink.version>1.10.0</flink.version>
<flink.version>1.11.1</flink.version>
<java.version>1.8</java.version>
<slf4j.version>1.7.15</slf4j.version>
<log4j.version>1.2.17</log4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,12 @@ public static void main(String[] args) {

private static void checkFlinkVersion() {
String flinkVersion = EnvironmentInformation.getVersion();
if (!flinkVersion.startsWith("1.10")) {
LOG.error("Only Flink-1.10 is supported now!");
throw new SqlGatewayException("Only Flink-1.10 is supported now!");
if (!flinkVersion.startsWith("1.11")) {
LOG.error("Only Flink-1.11 is supported now!");
throw new SqlGatewayException("Only Flink-1.11 is supported now!");
} else if (flinkVersion.startsWith("1.11.0")) {
LOG.error("Flink-1.11.0 is not supported, please use Flink >= 1.11.1!");
throw new SqlGatewayException("Flink-1.11.0 is not supported, please use Flink >= 1.11.1!");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import com.ververica.flink.table.gateway.utils.SqlExecutionException;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
Expand All @@ -40,22 +40,17 @@
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.plugin.TemporaryClassLoaderContext;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.BatchQueryConfig;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.QueryConfig;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl;
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
Expand All @@ -81,10 +76,10 @@
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TemporaryClassLoaderContext;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
Expand All @@ -95,6 +90,7 @@

import java.lang.reflect.Method;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -145,9 +141,11 @@ private ExecutionContext(
this.flinkConfig = flinkConfig;

// create class loader
classLoader = FlinkUserCodeClassLoaders.parentFirst(
dependencies.toArray(new URL[0]),
this.getClass().getClassLoader());
classLoader = ClientUtils.buildUserCodeClassLoader(
dependencies,
Collections.emptyList(),
this.getClass().getClassLoader(),
flinkConfig);

// Initialize the TableEnvironment.
initializeTableEnvironment(sessionState);
Expand Down Expand Up @@ -200,7 +198,7 @@ public SessionState getSessionState() {
* Executes the given supplier using the execution context's classloader as thread classloader.
*/
public <R> R wrapClassLoader(Supplier<R> supplier) {
try (TemporaryClassLoaderContext ignored = new TemporaryClassLoaderContext(classLoader)) {
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
return supplier.get();
}
}
Expand All @@ -209,23 +207,11 @@ public <R> R wrapClassLoader(Supplier<R> supplier) {
* Executes the given Runnable using the execution context's classloader as thread classloader.
*/
void wrapClassLoader(Runnable runnable) {
try (TemporaryClassLoaderContext ignored = new TemporaryClassLoaderContext(classLoader)) {
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
runnable.run();
}
}

public QueryConfig getQueryConfig() {
if (streamExecEnv != null) {
final StreamQueryConfig config = new StreamQueryConfig();
final long minRetention = environment.getExecution().getMinStateRetention();
final long maxRetention = environment.getExecution().getMaxStateRetention();
config.withIdleStateRetentionTime(Time.milliseconds(minRetention), Time.milliseconds(maxRetention));
return config;
} else {
return new BatchQueryConfig();
}
}

public TableEnvironment getTableEnvironment() {
return tableEnv;
}
Expand All @@ -243,16 +229,15 @@ public ClusterClientFactory<ClusterID> getClusterClientFactory() {
}

public Pipeline createPipeline(String name) {
if (streamExecEnv != null) {
// special case for Blink planner to apply batch optimizations
// note: it also modifies the ExecutionConfig!
if (executor instanceof ExecutorBase) {
return ((ExecutorBase) executor).getStreamGraph(name);
return wrapClassLoader(() -> {
if (streamExecEnv != null) {
StreamTableEnvironmentImpl streamTableEnv = (StreamTableEnvironmentImpl) tableEnv;
return streamTableEnv.getPipeline(name);
} else {
BatchTableEnvironmentImpl batchTableEnv = (BatchTableEnvironmentImpl) tableEnv;
return batchTableEnv.getPipeline(name);
}
return streamExecEnv.getStreamGraph(name);
} else {
return execEnv.createProgramPlan(name);
}
});
}


Expand Down Expand Up @@ -300,7 +285,7 @@ private static Configuration createExecutionConfig(
commandLine);

try {
final ProgramOptions programOptions = new ProgramOptions(commandLine);
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor
.fromProgramOptions(programOptions, dependencies);
executionConfigAccessor.applyToConfiguration(executionConfig);
Expand Down Expand Up @@ -370,7 +355,7 @@ private static TableSink<?> createTableSink(ExecutionEntry execution, Map<String
throw new SqlExecutionException("Unsupported execution type for sinks.");
}

private static TableEnvironment createStreamTableEnvironment(
private TableEnvironment createStreamTableEnvironment(
StreamExecutionEnvironment env,
EnvironmentSettings settings,
TableConfig config,
Expand All @@ -390,7 +375,8 @@ private static TableEnvironment createStreamTableEnvironment(
env,
planner,
executor,
settings.isStreamingMode());
settings.isStreamingMode(),
classLoader);
}

private static Executor lookupExecutor(
Expand Down Expand Up @@ -423,14 +409,18 @@ private void initializeTableEnvironment(@Nullable SessionState sessionState) {
//--------------------------------------------------------------------------------------------------------------
// Step.1 Create environments
//--------------------------------------------------------------------------------------------------------------
// Step 1.0 Initialize the CatalogManager if required.
final CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()));
// Step 1.1 Initialize the ModuleManager if required.
// Step 1.0 Initialize the ModuleManager if required.
final ModuleManager moduleManager = new ModuleManager();
// Step 1.1 Initialize the CatalogManager if required.
final CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(config.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.build();
// Step 1.2 Initialize the FunctionCatalog if required.
final FunctionCatalog functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager);
// Step 1.4 Set up session state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public CompletableFuture<JobClient> deploy() {
throw new RuntimeException("No execution.target specified in your configuration file.");
}

PipelineExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE;
PipelineExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader();
final PipelineExecutorFactory executorFactory;
try {
executorFactory = executorServiceLoader.getExecutorFactory(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import com.ververica.flink.table.gateway.rest.result.ResultSet;
import com.ververica.flink.table.gateway.utils.SqlExecutionException;

import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
* Operation for CREATE/DROP/ALTER TABLE/DATABASE command.
Expand All @@ -48,11 +46,7 @@ public ResultSet execute() {
// parse and validate statement
try {
context.wrapClassLoader(() -> {
if (tEnv instanceof StreamTableEnvironment) {
((StreamTableEnvironment) tEnv).sqlUpdate(ddl, (StreamQueryConfig) context.getQueryConfig());
} else {
tEnv.sqlUpdate(ddl);
}
tEnv.sqlUpdate(ddl);
return null;
});
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -125,12 +123,7 @@ private <C> JobID executeUpdateInternal(ExecutionContext<C> executionContext) {
// parse and validate statement
try {
executionContext.wrapClassLoader(() -> {
if (tableEnv instanceof StreamTableEnvironment) {
((StreamTableEnvironment) tableEnv)
.sqlUpdate(statement, (StreamQueryConfig) executionContext.getQueryConfig());
} else {
tableEnv.sqlUpdate(statement);
}
tableEnv.sqlUpdate(statement);
return null;
});
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> executionC
// writing to a sink requires an optimization step that might reference UDFs during code compilation
executionContext.wrapClassLoader(() -> {
executionContext.getTableEnvironment().registerTableSink(tableName, result.getTableSink());
table.insertInto(executionContext.getQueryConfig(), tableName);
table.insertInto(tableName);
return null;
});
pipeline = executionContext.createPipeline(jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider;

import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -76,6 +79,11 @@ public SessionContext getContext() {
}

public Tuple2<ResultSet, SqlCommandParser.SqlCommand> runStatement(String statement) {
// TODO: This is a temporary fix to avoid NPE.
// In SQL gateway, TableEnvironment is created and used by different threads, thus causing this problem.
RelMetadataQuery.THREAD_PROVIDERS
.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));

LOG.info("Session: {}, run statement: {}", sessionId, statement);
boolean isBlinkPlanner = context.getExecutionContext().getEnvironment().getExecution().getPlanner()
.equalsIgnoreCase(ExecutionEntry.EXECUTION_PLANNER_VALUE_BLINK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.BatchTableSink;
import org.apache.flink.table.sinks.OutputFormatTableSink;
Expand Down Expand Up @@ -68,8 +69,8 @@ public CollectBatchTableSink configure(String[] fieldNames, TypeInformation<?>[]
}

@Override
public void emitDataSet(DataSet<Row> dataSet) {
dataSet
public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
return dataSet
.output(new Utils.CollectHelper<>(accumulatorName, serializer))
.name("SQL Gateway Batch Collect Sink");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ public TypeInformation<Row> getRecordType() {
return getTableSchema().toRowType();
}

@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
consumeDataStream(stream);
}

@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
// add sink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Mainly for testing classloading of dependencies.
Expand Down Expand Up @@ -260,10 +259,6 @@ public List<String> supportedProperties() {

@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
// Test HiveCatalogFactory.createCatalog
// But not use it for testing purpose
assertTrue(super.createCatalog(name, properties) != null);

// Developers may already have their own production/testing hive-site.xml set in their environment,
// and Flink tests should avoid using those hive-site.xml.
// Thus, explicitly create a testing HiveConf for unit tests here
Expand Down
Loading

0 comments on commit 63cc437

Please sign in to comment.