From 963696d54dbd29280025f190f3255318a1674276 Mon Sep 17 00:00:00 2001 From: LiuBodong Date: Fri, 27 Oct 2023 14:38:23 +0800 Subject: [PATCH] [feature][sql-gateway] Complete sql-gateway services (#629) * [Feature][sql-gateway] Add CatalogService * [Feature][sql-gateway] Complete sql gateway services * [Feature][sql-gateway] Add listOptions in OperationService * [Bugfix][sql-gateway] Fix sql-gateway errors * [Feature][sql-gateway] Update session configuration after configureSession operation * [Format] Format mysql script * [Feature][sql-gateway] Add previewStatement method * [Feature][sql-gateway] Add limitation previewStatement method * [Feature][sql-gateway] Add limitation previewStatement method * [Feature][sql-gateway] Simplify previewStatement method * [Bugfix][sql-gateway] Fix flink sql gateway dependency error * [Bugfix][sql-gateway] Fix flink sql gateway dependency error * [Bugfix][sql-gateway] Fix flink sql gateway startup error when default catalog was set. format code --- pom.xml | 18 +- .../master/ws/WsFlinkSqlGatewayCatalog.java | 45 ++++ .../ws/WsFlinkSqlGatewayCatalogMapper.java | 33 +++ .../ws/WsFlinkSqlGatewayCatalogMapper.xml | 46 +++++ .../scaleph-engine-sql-gateway/pom.xml | 2 +- .../environment/TableEnvironmentProvider.java | 3 +- .../TableEnvironmentProviderImpl.java | 3 +- .../exception/ScalephSqlGatewayException.java | 6 +- .../ScalephSqlGatewayNotFoundException.java | 3 +- .../internal/ScalephCatalogManager.java | 168 ++++----------- .../internal/ScalephSqlGatewayService.java | 58 +++--- .../sql/gateway/services/CatalogService.java | 21 +- .../gateway/services/FlinkFactoryService.java | 6 +- .../gateway/services/OperationService.java | 17 +- .../services/ResultFetcherService.java | 7 +- .../sql/gateway/services/SessionService.java | 17 +- .../sql/gateway/services/SqlService.java | 34 ++- .../services/WsFlinkSqlGatewayService.java | 19 +- .../services/dto/FlinkSqlGatewaySession.java | 10 +- .../dto/WsFlinkSqlGatewayQueryResultDTO.java | 112 ++++++---- .../services/dto/catalog/CatalogInfo.java | 6 +- .../services/dto/catalog/DatabaseInfo.java | 6 +- .../services/dto/catalog/FunctionInfo.java | 10 +- .../services/dto/catalog/TableInfo.java | 13 +- .../services/impl/CatalogServiceImpl.java | 194 ++++++++++++++++++ .../services/impl/OperationServiceImpl.java | 107 ++++++++++ .../impl/ResultFetcherServiceImpl.java | 53 +++++ .../services/impl/SessionServiceImpl.java | 166 ++++++++++----- .../gateway/services/impl/SqlServiceImpl.java | 163 +++++++++++++++ .../impl/WsFlinkSqlGatewayServiceImpl.java | 73 +++---- .../WsFlinkSqlGatewayCreateCatalogParam.java | 8 +- .../param/WsFlinkSqlGatewayQueryParam.java | 9 +- .../sql/gateway/store/JdbcCatalogStore.java | 138 +++++++++++++ .../store/JdbcCatalogStoreOptions.java | 37 ++++ .../gateway/store/ScalephCatalogStore.java | 86 ++++++++ .../store/ScalephCatalogStoreFactory.java | 89 ++++++++ .../store/ScalephCatalogStoreOptions.java | 28 +++ .../util/CatalogStoreDataSourceUtil.java | 94 +++++++++ .../engine/sql/gateway/util/CatalogUtil.java | 167 +++++++++++++++ .../org.apache.flink.table.factories.Factory | 16 ++ .../docker/mysql/init.d/scaleph-ws-mysql.sql | 18 +- 41 files changed, 1753 insertions(+), 356 deletions(-) create mode 100644 scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkSqlGatewayCatalog.java create mode 100644 scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewayCatalogMapper.java create mode 100644 scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewayCatalogMapper.xml create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/CatalogServiceImpl.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/OperationServiceImpl.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/ResultFetcherServiceImpl.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SqlServiceImpl.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/JdbcCatalogStore.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/JdbcCatalogStoreOptions.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStore.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStoreFactory.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStoreOptions.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/util/CatalogStoreDataSourceUtil.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/util/CatalogUtil.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory diff --git a/pom.xml b/pom.xml index 5da6f3dfd..c7e5e061f 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,17 @@ Sonatype Nexus Snapshots https://s01.oss.sonatype.org/content/repositories/snapshots/ + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + true + + + true + + @@ -160,7 +171,7 @@ 2.5.21 3.21.5 4.1.82.Final - 1.17.1 + 1.19-SNAPSHOT 1.17 3.1.1 0.4.0-incubating @@ -557,6 +568,11 @@ flink-table-planner-loader ${flink.version} + + org.apache.flink + flink-table-planner_2.12 + ${flink.version} + org.apache.flink diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkSqlGatewayCatalog.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkSqlGatewayCatalog.java new file mode 100644 index 000000000..66c8bf542 --- /dev/null +++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkSqlGatewayCatalog.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.dao.entity.master.ws; + +import cn.sliew.scaleph.dao.entity.BaseDO; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +@TableName("ws_flink_sql_gateway_catalog") +public class WsFlinkSqlGatewayCatalog extends BaseDO { + + @TableField("session_handler") + private String sessionHandler; + + @TableField("catalog_name") + private String catalogName; + + @TableField("catalog_options") + private String catalogOptions; + + @TableField("catalog_description") + private String catalogDescription; + + @TableField("catalog_dependencies") + private String catalogDependencies; + +} diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewayCatalogMapper.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewayCatalogMapper.java new file mode 100644 index 000000000..7c027b1c4 --- /dev/null +++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewayCatalogMapper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.dao.mapper.master.ws; + +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkSqlGatewayCatalog; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.springframework.stereotype.Repository; + +/** + *

+ * flink sql gateway catalog mapper + *

+ */ +@Repository +public interface WsFlinkSqlGatewayCatalogMapper extends BaseMapper { + +} diff --git a/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewayCatalogMapper.xml b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewayCatalogMapper.xml new file mode 100644 index 000000000..99be9a79a --- /dev/null +++ b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewayCatalogMapper.xml @@ -0,0 +1,46 @@ + + + + + + + + + + + + + + + + + + + + + id, + creator, + create_time, + editor, + update_time, + session_handler, + `catalog_name`, + catalog_options, + catalog_description + + diff --git a/scaleph-engine/scaleph-engine-sql-gateway/pom.xml b/scaleph-engine/scaleph-engine-sql-gateway/pom.xml index 5d4dca0da..d898f2bdc 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/pom.xml +++ b/scaleph-engine/scaleph-engine-sql-gateway/pom.xml @@ -57,7 +57,7 @@
org.apache.flink - flink-table-planner-loader + flink-table-planner_2.12 org.apache.flink diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProvider.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProvider.java index 100ebd31b..c5389a181 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProvider.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProvider.java @@ -16,9 +16,10 @@ package cn.sliew.scaleph.engine.sql.gateway.environment; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; + public interface TableEnvironmentProvider { TableEnvironmentInternal getTableEnvironment(FlinkSqlGatewaySession session); diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProviderImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProviderImpl.java index 72f274ec5..43ce9ee0b 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProviderImpl.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProviderImpl.java @@ -16,11 +16,12 @@ package cn.sliew.scaleph.engine.sql.gateway.environment; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.gateway.service.operation.OperationExecutor; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; + /** * @see OperationExecutor#getTableEnvironment() * @see org.apache.flink.table.api.internal.TableEnvironmentInternal#create(EnvironmentSettings) diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/exception/ScalephSqlGatewayException.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/exception/ScalephSqlGatewayException.java index 77f419d70..c0193f831 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/exception/ScalephSqlGatewayException.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/exception/ScalephSqlGatewayException.java @@ -18,8 +18,7 @@ public class ScalephSqlGatewayException extends RuntimeException { - public ScalephSqlGatewayException() { - } + public ScalephSqlGatewayException() {} public ScalephSqlGatewayException(String message) { super(message); @@ -33,7 +32,8 @@ public ScalephSqlGatewayException(Throwable cause) { super(cause); } - public ScalephSqlGatewayException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + public ScalephSqlGatewayException( + String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/exception/ScalephSqlGatewayNotFoundException.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/exception/ScalephSqlGatewayNotFoundException.java index a6fc842dd..d2e89fa20 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/exception/ScalephSqlGatewayNotFoundException.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/exception/ScalephSqlGatewayNotFoundException.java @@ -16,10 +16,9 @@ package cn.sliew.scaleph.engine.sql.gateway.exception; -public class ScalephSqlGatewayNotFoundException extends ScalephSqlGatewayException{ +public class ScalephSqlGatewayNotFoundException extends ScalephSqlGatewayException { public ScalephSqlGatewayNotFoundException() { super("Flink sql gateway not found!"); } - } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephCatalogManager.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephCatalogManager.java index f4082670a..02d674b2f 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephCatalogManager.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephCatalogManager.java @@ -16,20 +16,19 @@ package cn.sliew.scaleph.engine.sql.gateway.internal; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.ColumnInfo; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.DatabaseInfo; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.FunctionInfo; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.TableInfo; +import java.io.IOException; +import java.net.URI; +import java.net.URLClassLoader; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; + import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.data.StringData; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.gateway.api.operation.OperationHandle; @@ -47,16 +46,9 @@ import org.apache.flink.table.resource.ResourceUri; import org.springframework.util.CollectionUtils; -import java.io.IOException; -import java.net.URI; -import java.net.URLClassLoader; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.stream.Collectors; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.FunctionInfo; +import cn.sliew.scaleph.engine.sql.gateway.util.CatalogUtil; import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME; import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX; @@ -80,123 +72,50 @@ public static ScalephCatalogManager create(Configuration configuration) { configuration.get(SQL_GATEWAY_WORKER_THREADS_MAX), configuration.get(SQL_GATEWAY_WORKER_KEEPALIVE_TIME).toMillis(), "scaleph-catalog-manager-pool"); - SessionContext sessionContext = SessionContext.create(defaultContext, - SessionHandle.create(), sessionEnvironment, executor); + SessionContext sessionContext = + SessionContext.create(defaultContext, SessionHandle.create(), sessionEnvironment, executor); return new ScalephCatalogManager(sessionContext); } - private TableInfo getTableInfo(CatalogManager catalogManager, ObjectIdentifier tableName) { - TableInfo.TableInfoBuilder tableInfoBuilder = TableInfo.builder(); - tableInfoBuilder.tableName(tableName.getObjectName()); - catalogManager.getTable(tableName).ifPresent(contextResolvedTable -> { - CatalogBaseTable table = contextResolvedTable.getTable(); - tableInfoBuilder.tableKind(table.getTableKind()); - table.getDescription().ifPresent(tableInfoBuilder::description); - tableInfoBuilder.comment(table.getComment()); - tableInfoBuilder.properties(table.getOptions()); - Schema unresolvedSchema = table.getUnresolvedSchema(); - ResolvedSchema schema = unresolvedSchema.resolve(catalogManager.getSchemaResolver()); - List columns = schema.getColumns().stream().map(column -> { - ColumnInfo.ColumnInfoBuilder columnInfoBuilder = ColumnInfo.builder() - .columnName(column.getName()) - .dataType(column.getDataType().getLogicalType().toString()) - .isPersist(column.isPersisted()) - .isPhysical(column.isPhysical()); - column.getComment().ifPresent(columnInfoBuilder::comment); - return columnInfoBuilder.build(); - }).collect(Collectors.toList()); - tableInfoBuilder.schema(columns); - }); - return tableInfoBuilder.build(); - } - public Set getCatalogInfo(boolean includeSystemFunctions) { SessionContext.SessionState sessionState = sessionContext.getSessionState(); CatalogManager catalogManager = sessionState.catalogManager; ModuleManager moduleManager = sessionState.moduleManager; - Set systemFunctions = moduleManager.listFunctions() - .stream() - .map(functionName -> { - FunctionInfo.FunctionInfoBuilder functionInfoBuilder = FunctionInfo.builder(); - functionInfoBuilder.functionName(functionName); - moduleManager.getFunctionDefinition(functionName) - .ifPresent(functionDefinition -> functionInfoBuilder.functionKind(functionDefinition.getKind())); - return functionInfoBuilder.build(); - }).collect(Collectors.toSet()); - return catalogManager.listCatalogs() - .stream() + Set systemFunctions = moduleManager.listFunctions().stream() + .flatMap(functionName -> moduleManager.getFunctionDefinition(functionName).stream() + .map(functionDefinition -> CatalogUtil.createFunctionInfo(functionName, functionDefinition))) + .collect(Collectors.toSet()); + return catalogManager.listCatalogs().stream() .map(catalogName -> { - CatalogInfo.CatalogInfoBuilder catalogInfoBuilder = CatalogInfo.builder(); + CatalogInfo catalogInfo = CatalogUtil.createCatalogInfo(sessionContext, catalogName); if (includeSystemFunctions) { - catalogInfoBuilder.systemFunctions(systemFunctions); + catalogInfo.setSystemFunctions(systemFunctions); } - catalogInfoBuilder.catalogName(catalogName); - catalogManager.getCatalog(catalogName).ifPresent(catalog -> { - catalog.getFactory().ifPresent(factory -> { - Map properties = new HashMap<>(); - properties.put("factory", factory.factoryIdentifier()); - catalogInfoBuilder.properties(properties); - }); - Set databaseInfos = catalog.listDatabases().stream().map(databaseName -> { - DatabaseInfo.DatabaseInfoBuilder databaseInfoBuilder = DatabaseInfo.builder(); - databaseInfoBuilder.databaseName(databaseName); - try { - CatalogDatabase database = catalog.getDatabase(databaseName); - database.getDescription().ifPresent(databaseInfoBuilder::description); - databaseInfoBuilder.comment(database.getComment()); - Set views = catalogManager.listViews().stream() - .map(viewName -> getTableInfo(catalogManager, ObjectIdentifier.of(catalogName, databaseName, viewName))) - .collect(Collectors.toSet()); - Set tables = catalogManager.listTables(catalogName, databaseName).stream() - .map(tableName -> getTableInfo(catalogManager, ObjectIdentifier.of(catalogName, databaseName, tableName))) - .collect(Collectors.toSet()); - databaseInfoBuilder.tables(tables); - databaseInfoBuilder.views(views); - databaseInfoBuilder.properties(database.getProperties()); - } catch (DatabaseNotExistException e) { - throw new RuntimeException(e); - } - return databaseInfoBuilder.build(); - }) - .collect(Collectors.toSet()); - catalogInfoBuilder.databases(databaseInfos); - }); - - return catalogInfoBuilder.build(); - }).collect(Collectors.toSet()); + return catalogInfo; + }) + .collect(Collectors.toSet()); } public ResultSet fetchResults(OperationHandle operationHandle, FetchOrientation fetchOrientation, int maxRows) { - return sessionContext - .getOperationManager() - .fetchResults(operationHandle, fetchOrientation, maxRows); + return sessionContext.getOperationManager().fetchResults(operationHandle, fetchOrientation, maxRows); } public ResultSet fetchResults(OperationHandle operationHandle, Long token, int maxRows) { - return sessionContext - .getOperationManager() - .fetchResults(operationHandle, token, maxRows); + return sessionContext.getOperationManager().fetchResults(operationHandle, token, maxRows); } public void cancelOperation(OperationHandle operationHandle) { - sessionContext - .getOperationManager() - .cancelOperation(operationHandle); + sessionContext.getOperationManager().cancelOperation(operationHandle); } public List completeStatement(String statement, int position) throws Exception { OperationManager operationManager = sessionContext.getOperationManager(); Configuration sessionConf = sessionContext.getSessionConf(); - OperationHandle operationHandle = - operationManager.submitOperation( - handle -> - sessionContext - .createOperationExecutor(sessionConf) - .getCompletionHints(handle, statement, position)); + OperationHandle operationHandle = operationManager.submitOperation(handle -> + sessionContext.createOperationExecutor(sessionConf).getCompletionHints(handle, statement, position)); operationManager.awaitOperationTermination(operationHandle); - ResultSet resultSet = - fetchResults(operationHandle, 0L, Integer.MAX_VALUE); + ResultSet resultSet = fetchResults(operationHandle, 0L, Integer.MAX_VALUE); return resultSet.getData().stream() .map(data -> data.getString(0)) .map(StringData::toString) @@ -208,23 +127,20 @@ public String executeStatement(String sql, Map configuration) { if (!CollectionUtils.isEmpty(configuration)) { configuration.forEach(sessionConf::setString); } - return sessionContext.getOperationManager() + return sessionContext + .getOperationManager() .submitOperation(handle -> - sessionContext.createOperationExecutor(sessionConf) - .executeStatement(handle, sql) - ) + sessionContext.createOperationExecutor(sessionConf).executeStatement(handle, sql)) .getIdentifier() .toString(); } public void addDependencies(List jars) { - List uris = jars.stream().map(uri -> new ResourceUri(ResourceType.JAR, uri.toString())) + List uris = jars.stream() + .map(uri -> new ResourceUri(ResourceType.JAR, uri.toString())) .collect(Collectors.toList()); try { - sessionContext - .getSessionState() - .resourceManager - .registerJarResources(uris); + sessionContext.getSessionState().resourceManager.registerJarResources(uris); } catch (IOException e) { throw new RuntimeException(e); } @@ -232,18 +148,14 @@ public void addDependencies(List jars) { public void addCatalog(String catalogName, Map options) { Configuration sessionConf = sessionContext.getSessionConf(); - SessionContext.SessionState sessionState = sessionContext - .getSessionState(); + SessionContext.SessionState sessionState = sessionContext.getSessionState(); URLClassLoader userClassLoader = sessionState.resourceManager.getUserClassLoader(); Catalog catalog = FactoryUtil.createCatalog(catalogName, options, sessionConf, userClassLoader); - sessionState - .catalogManager - .registerCatalog(catalogName, catalog); + sessionState.catalogManager.registerCatalog(catalogName, catalog); } public void removeCatalog(String catalogName) { - sessionContext.getSessionState() - .catalogManager.unregisterCatalog(catalogName, true); + sessionContext.getSessionState().catalogManager.unregisterCatalog(catalogName, true); } @Override diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java index 5f4aee449..7df328d90 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java @@ -18,8 +18,11 @@ package cn.sliew.scaleph.engine.sql.gateway.internal; -import cn.sliew.scaleph.engine.sql.gateway.services.SessionService; -import lombok.extern.slf4j.Slf4j; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.*; import org.apache.flink.table.functions.FunctionDefinition; @@ -31,10 +34,8 @@ import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; +import cn.sliew.scaleph.engine.sql.gateway.services.SessionService; +import lombok.extern.slf4j.Slf4j; @Slf4j public class ScalephSqlGatewayService implements SqlGatewayService { @@ -56,7 +57,8 @@ public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException } @Override - public void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException { + public void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) + throws SqlGatewayException { sessionService.configureSession(sessionHandle, statement, executionTimeoutMs); } @@ -71,42 +73,47 @@ public EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) th } @Override - public OperationHandle submitOperation(SessionHandle sessionHandle, Callable callable) throws SqlGatewayException { + public OperationHandle submitOperation(SessionHandle sessionHandle, Callable callable) + throws SqlGatewayException { return null; } @Override - public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { - - } + public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException {} @Override - public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { - - } + public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException {} @Override - public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { + public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException { return null; } @Override - public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { + public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException { return null; } @Override - public OperationHandle executeStatement(SessionHandle sessionHandle, String s, long l, Configuration configuration) throws SqlGatewayException { + public OperationHandle executeStatement(SessionHandle sessionHandle, String s, long l, Configuration configuration) + throws SqlGatewayException { return null; } @Override - public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long l, int i) throws SqlGatewayException { + public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long l, int i) + throws SqlGatewayException { return null; } @Override - public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation fetchOrientation, int i) throws SqlGatewayException { + public ResultSet fetchResults( + SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation fetchOrientation, int i) + throws SqlGatewayException { return null; } @@ -126,17 +133,21 @@ public Set listDatabases(SessionHandle sessionHandle, String s) throws S } @Override - public Set listTables(SessionHandle sessionHandle, String s, String s1, Set set) throws SqlGatewayException { + public Set listTables( + SessionHandle sessionHandle, String s, String s1, Set set) + throws SqlGatewayException { return null; } @Override - public ResolvedCatalogBaseTable getTable(SessionHandle sessionHandle, ObjectIdentifier objectIdentifier) throws SqlGatewayException { + public ResolvedCatalogBaseTable getTable(SessionHandle sessionHandle, ObjectIdentifier objectIdentifier) + throws SqlGatewayException { return null; } @Override - public Set listUserDefinedFunctions(SessionHandle sessionHandle, String s, String s1) throws SqlGatewayException { + public Set listUserDefinedFunctions(SessionHandle sessionHandle, String s, String s1) + throws SqlGatewayException { return null; } @@ -146,7 +157,8 @@ public Set listSystemFunctions(SessionHandle sessionHandle) throws } @Override - public FunctionDefinition getFunctionDefinition(SessionHandle sessionHandle, UnresolvedIdentifier unresolvedIdentifier) throws SqlGatewayException { + public FunctionDefinition getFunctionDefinition( + SessionHandle sessionHandle, UnresolvedIdentifier unresolvedIdentifier) throws SqlGatewayException { return null; } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/CatalogService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/CatalogService.java index a4c3b3680..f754e61f9 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/CatalogService.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/CatalogService.java @@ -18,7 +18,8 @@ package cn.sliew.scaleph.engine.sql.gateway.services; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; +import java.util.Set; + import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; @@ -29,7 +30,7 @@ import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; -import java.util.Set; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; /** * 使用用户 id 关联的 session 获取 @@ -73,7 +74,12 @@ public interface CatalogService { * @param tableKinds used to specify the type of return values. * @return table info of the registered tables/views. */ - Set listTables(SessionHandle sessionHandle, String catalogName, String databaseName, Set tableKinds) throws SqlGatewayException; + Set listTables( + SessionHandle sessionHandle, + String catalogName, + String databaseName, + Set tableKinds) + throws SqlGatewayException; /** * Return table of the given fully qualified name. @@ -81,7 +87,8 @@ public interface CatalogService { * @param tableIdentifier fully qualified name of the table. * @return information of the table. */ - ResolvedCatalogBaseTable getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier) throws SqlGatewayException; + ResolvedCatalogBaseTable getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier) + throws SqlGatewayException; /** * List all user defined functions. @@ -90,7 +97,8 @@ public interface CatalogService { * @param databaseName name string of the given database. * @return user defined functions info. */ - Set listUserDefinedFunctions(SessionHandle sessionHandle, String catalogName, String databaseName) throws SqlGatewayException; + Set listUserDefinedFunctions(SessionHandle sessionHandle, String catalogName, String databaseName) + throws SqlGatewayException; /** * List all available system functions. @@ -107,5 +115,6 @@ public interface CatalogService { * @param functionIdentifier identifier of the function. * @return the definition of the function. */ - FunctionDefinition getFunctionDefinition(SessionHandle sessionHandle, UnresolvedIdentifier functionIdentifier) throws SqlGatewayException; + FunctionDefinition getFunctionDefinition(SessionHandle sessionHandle, UnresolvedIdentifier functionIdentifier) + throws SqlGatewayException; } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/FlinkFactoryService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/FlinkFactoryService.java index 02d473a2b..73b60e976 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/FlinkFactoryService.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/FlinkFactoryService.java @@ -18,13 +18,13 @@ package cn.sliew.scaleph.engine.sql.gateway.services; +import java.util.List; + import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FormatFactory; -import java.util.List; - public interface FlinkFactoryService { List findCatalogs(Long id); @@ -32,6 +32,6 @@ public interface FlinkFactoryService { List findFormats(Long id); List findSources(Long id); - + List findSinks(Long id); } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/OperationService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/OperationService.java index 00db6feec..8327c92c7 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/OperationService.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/OperationService.java @@ -18,6 +18,9 @@ package cn.sliew.scaleph.engine.sql.gateway.services; +import java.util.Set; +import java.util.concurrent.Callable; + import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.gateway.api.SqlGatewayService; import org.apache.flink.table.gateway.api.operation.OperationHandle; @@ -27,10 +30,10 @@ import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; -import java.util.concurrent.Callable; - public interface OperationService { + Set listOperations(SessionHandle sessionHandle) throws SqlGatewayException; + /** * Submit an operation and execute. The {@link SqlGatewayService} will take care of the * execution and assign the {@link OperationHandle} for later to retrieve the results. @@ -39,7 +42,8 @@ public interface OperationService { * @param executor the main logic to get the execution results. * @return Returns the handle for later retrieve results. */ - OperationHandle submitOperation(SessionHandle sessionHandle, Callable executor) throws SqlGatewayException; + OperationHandle submitOperation(SessionHandle sessionHandle, Callable executor) + throws SqlGatewayException; /** * Cancel the operation when it is not in terminal status. @@ -65,7 +69,8 @@ public interface OperationService { * @param sessionHandle handle to identify the session. * @param operationHandle handle to identify the operation. */ - OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException; + OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException; /** * Get the result schema for the specified Operation. @@ -76,6 +81,6 @@ public interface OperationService { * @param sessionHandle handle to identify the session. * @param operationHandle handle to identify the operation. */ - ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException; - + ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle) + throws Exception; } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/ResultFetcherService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/ResultFetcherService.java index 8667cffdb..9b045ba88 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/ResultFetcherService.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/ResultFetcherService.java @@ -36,7 +36,8 @@ public interface ResultFetcherService { * @param maxRows max number of rows to fetch. * @return Returns the results. */ - ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows) throws SqlGatewayException; + ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows) + throws SqlGatewayException; /** * Fetch the results from the operation. When maxRows is Integer.MAX_VALUE, it means to fetch @@ -49,5 +50,7 @@ public interface ResultFetcherService { * @param maxRows max number of rows to fetch. * @return Returns the results. */ - ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException; + ResultSet fetchResults( + SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) + throws SqlGatewayException; } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SessionService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SessionService.java index 23d0f13dd..cf988e948 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SessionService.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SessionService.java @@ -18,17 +18,19 @@ package cn.sliew.scaleph.engine.sql.gateway.services; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; +import java.util.Map; + import org.apache.flink.table.gateway.api.endpoint.EndpointVersion; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; -import org.apache.flink.table.gateway.service.session.Session; -import java.util.Map; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; public interface SessionService { + FlinkSqlGatewaySession getSession(SessionHandle sessionHandle) throws SqlGatewayException; + /** * Open the {@code Session}. * @@ -50,10 +52,10 @@ public interface SessionService { * *

It returns until the execution finishes. * - * @param sessionHandle handle to identify the session. - * @param statement the statement used to configure the session. + * @param sessionHandle handle to identify the session. + * @param statement the statement used to configure the session. * @param executionTimeoutMs the execution timeout. Please use non-positive value to forbid the - * timeout mechanism. + * timeout mechanism. */ void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException; @@ -72,6 +74,5 @@ void configureSession(SessionHandle sessionHandle, String statement, long execut * @param sessionHandle handle to identify the session. * @return Returns the version. */ - EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) - throws SqlGatewayException; + EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException; } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SqlService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SqlService.java index c63adbd44..5427ea69e 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SqlService.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SqlService.java @@ -18,13 +18,13 @@ package cn.sliew.scaleph.engine.sql.gateway.services; +import java.util.List; + import org.apache.flink.configuration.Configuration; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; -import java.util.List; - public interface SqlService { Object validate(SessionHandle sessionHandle, String statement, Configuration executionConfig) throws Exception; @@ -37,7 +37,8 @@ public interface SqlService { * @param position position of where need completion hints. * @return completion hints. */ - List completeStatement(SessionHandle sessionHandle, String statement, int position) throws SqlGatewayException; + List completeStatement(SessionHandle sessionHandle, String statement, int position) + throws SqlGatewayException; /** * Execute the submitted statement. @@ -49,7 +50,30 @@ public interface SqlService { * @param executionConfig execution config for the statement. * @return handle to identify the operation. */ - OperationHandle executeStatement(SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfig) throws SqlGatewayException; + OperationHandle executeStatement( + SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfig) + throws SqlGatewayException; + + OperationHandle previewStatement( + SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfig) + throws SqlGatewayException; - OperationHandle previewStatement(SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfig) throws SqlGatewayException; + /** + * Preview the submitted statement. + * + *

NOTE

+ *

+ * Only `INSERT` or `SELECT` statement is supported + *

+ * + * @param sessionHandle handle to identify the session. + * @param statement the SQL to execute. + * @param executionConfig execution config for the statement. + * @param limit limitation of the rows to statement + * @return handle to identify the operation. + * @throws SqlGatewayException + */ + OperationHandle previewStatement( + SessionHandle sessionHandle, String statement, Configuration executionConfig, long limit) + throws SqlGatewayException; } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java index b27ac0a55..88ef38e8b 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java @@ -18,17 +18,18 @@ package cn.sliew.scaleph.engine.sql.gateway.services; -import cn.sliew.scaleph.engine.sql.gateway.services.param.WsFlinkSqlGatewayQueryParam; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; -import cn.sliew.scaleph.engine.sql.gateway.internal.ScalephCatalogManager; -import org.apache.flink.table.gateway.api.results.GatewayInfo; -import org.apache.flink.table.gateway.api.results.ResultSet; - import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.flink.table.gateway.api.results.GatewayInfo; +import org.apache.flink.table.gateway.api.results.ResultSet; + +import cn.sliew.scaleph.engine.sql.gateway.internal.ScalephCatalogManager; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; +import cn.sliew.scaleph.engine.sql.gateway.services.param.WsFlinkSqlGatewayQueryParam; + public interface WsFlinkSqlGatewayService { /** @@ -94,9 +95,7 @@ public interface WsFlinkSqlGatewayService { * @param maxRows Max rows to fetch * @return Operation handle id {@link org.apache.flink.table.gateway.api.results.ResultSet} */ - ResultSet fetchResults(String clusterId, - String operationHandleId, - Long token, int maxRows); + ResultSet fetchResults(String clusterId, String operationHandleId, Long token, int maxRows); /** * Cancel running jobs @@ -125,7 +124,7 @@ ResultSet fetchResults(String clusterId, * @param statement * @throws Exception */ -// void validStatement(String clusterId, String statement) throws Exception; + // void validStatement(String clusterId, String statement) throws Exception; /** * Add dependency jars to the sql-gateway diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/FlinkSqlGatewaySession.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/FlinkSqlGatewaySession.java index 846932418..74d250b20 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/FlinkSqlGatewaySession.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/FlinkSqlGatewaySession.java @@ -16,13 +16,15 @@ package cn.sliew.scaleph.engine.sql.gateway.services.dto; -import cn.sliew.scaleph.system.model.BaseDTO; -import com.fasterxml.jackson.annotation.JsonIgnore; -import lombok.Data; +import java.util.Map; + import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.service.context.SessionContext; -import java.util.Map; +import com.fasterxml.jackson.annotation.JsonIgnore; + +import cn.sliew.scaleph.system.model.BaseDTO; +import lombok.Data; /** * org.apache.flink.table.gateway.service.session.Session diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java index cb418fd15..9cd459942 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java @@ -18,9 +18,12 @@ package cn.sliew.scaleph.engine.sql.gateway.services.dto; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.ColumnInfo; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import org.apache.commons.codec.binary.Hex; import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.catalog.Column; @@ -28,11 +31,9 @@ import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.types.logical.*; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.ColumnInfo; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; @Data @EqualsAndHashCode @@ -42,26 +43,34 @@ @Builder public class WsFlinkSqlGatewayQueryResultDTO { - @Schema(description = "SQL 执行状态。NOT_READY: 未就绪,需轮询重试, PAYLOAD: 可查询, EOS: 数据查询已至末尾,后续无数据,不在调用", + @Schema( + description = "SQL 执行状态。NOT_READY: 未就绪,需轮询重试, PAYLOAD: 可查询, EOS: 数据查询已至末尾,后续无数据,不在调用", allowableValues = {"PAYLOAD", "NOT_READY", "EOS"}) private ResultSet.ResultType resultType; - @Schema(description = "结果类型。SUCCESS: 执行成功, SUCCESS_WITH_CONTENT: 执行成功并可获取数据列表", + + @Schema( + description = "结果类型。SUCCESS: 执行成功, SUCCESS_WITH_CONTENT: 执行成功并可获取数据列表", allowableValues = {"SUCCESS", "SUCCESS_WITH_CONTENT"}) private ResultKind resultKind; + @Schema(description = "任务 id") private String jobID; + @Schema(description = "分页参数。查询下一页数据参数") private Long nextToken; + @Schema(description = "是否支持查询数据") private Boolean isQueryResult; + @Schema(description = "数据类型信息") private List columns; + @Schema(description = "数据") private List> data; public static WsFlinkSqlGatewayQueryResultDTO fromResultSet(ResultSet resultSet) { - WsFlinkSqlGatewayQueryResultDTOBuilder builder = WsFlinkSqlGatewayQueryResultDTO.builder() - .resultType(resultSet.getResultType()); + WsFlinkSqlGatewayQueryResultDTOBuilder builder = + WsFlinkSqlGatewayQueryResultDTO.builder().resultType(resultSet.getResultType()); if (resultSet.getResultType() != ResultSet.ResultType.NOT_READY) { if (resultSet.getJobID() != null) { builder.jobID(resultSet.getJobID().toHexString()); @@ -69,28 +78,38 @@ public static WsFlinkSqlGatewayQueryResultDTO fromResultSet(ResultSet resultSet) builder.resultKind(resultSet.getResultKind()); if (resultSet.isQueryResult()) { List columns = resultSet.getResultSchema().getColumns(); - builder - .columns(columns.stream().map(column -> { - ColumnInfo.ColumnInfoBuilder columnInfoBuilder = ColumnInfo.builder() - .columnName(column.getName()) - .dataType(column.getDataType().getLogicalType().toString()) - .isPersist(column.isPersisted()) - .isPhysical(column.isPhysical()); - column.getComment().ifPresent(columnInfoBuilder::comment); - return columnInfoBuilder.build(); - }).collect(Collectors.toList())) - .data(resultSet.getData().stream().map(rowData -> { - Map map = new HashMap<>(); - for (int i = 0; i < columns.size(); i++) { - Column column = columns.get(i); - try { - map.put(column.getName(), getDataFromRow(rowData, column.getDataType().getLogicalType(), i)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return map; - }).collect(Collectors.toList())); + builder.columns(columns.stream() + .map(column -> { + ColumnInfo.ColumnInfoBuilder columnInfoBuilder = ColumnInfo.builder() + .columnName(column.getName()) + .dataType(column.getDataType() + .getLogicalType() + .toString()) + .isPersist(column.isPersisted()) + .isPhysical(column.isPhysical()); + column.getComment().ifPresent(columnInfoBuilder::comment); + return columnInfoBuilder.build(); + }) + .collect(Collectors.toList())) + .data(resultSet.getData().stream() + .map(rowData -> { + Map map = new HashMap<>(); + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + try { + map.put( + column.getName(), + getDataFromRow( + rowData, + column.getDataType().getLogicalType(), + i)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return map; + }) + .collect(Collectors.toList())); builder.nextToken(resultSet.getNextToken()); } } @@ -114,7 +133,10 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in switch (logicalType.getTypeRoot()) { case VARCHAR: case CHAR: - return dataClass.getDeclaredMethod("getString", int.class).invoke(rowData, index).toString(); + return dataClass + .getDeclaredMethod("getString", int.class) + .invoke(rowData, index) + .toString(); case TINYINT: case SMALLINT: return dataClass.getDeclaredMethod("getShort", int.class).invoke(rowData, index); @@ -129,7 +151,8 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in return dataClass.getDeclaredMethod("getDouble", int.class).invoke(rowData, index); case DECIMAL: DecimalType decimalType = (DecimalType) logicalType; - DecimalData decimalData = (DecimalData) dataClass.getDeclaredMethod("getDecimal", int.class, int.class, int.class) + DecimalData decimalData = (DecimalData) dataClass + .getDeclaredMethod("getDecimal", int.class, int.class, int.class) .invoke(rowData, index, decimalType.getPrecision(), decimalType.getScale()); return decimalData.toBigDecimal().doubleValue(); case BIGINT: @@ -141,12 +164,15 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in return null; case BINARY: case VARBINARY: - byte[] binary = (byte[]) dataClass.getDeclaredMethod("getBinary", int.class).invoke(rowData, index); + byte[] binary = (byte[]) + dataClass.getDeclaredMethod("getBinary", int.class).invoke(rowData, index); return Hex.encodeHexString(binary); case ROW: case STRUCTURED_TYPE: RowType rowType = (RowType) logicalType; - RowData row = (RowData) dataClass.getDeclaredMethod("getRow", int.class, int.class).invoke(rowData, index, rowType.getFieldCount()); + RowData row = (RowData) dataClass + .getDeclaredMethod("getRow", int.class, int.class) + .invoke(rowData, index, rowType.getFieldCount()); Map mapInRow = new HashMap<>(); for (RowType.RowField rowField : rowType.getFields()) { String fieldName = rowField.getName(); @@ -159,7 +185,8 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in MapType mapType = (MapType) logicalType; LogicalType keyValueType = mapType.getKeyType(); LogicalType valueValueType = mapType.getValueType(); - MapData mapData = (MapData) dataClass.getDeclaredMethod("getMap", int.class).invoke(rowData, index); + MapData mapData = (MapData) + dataClass.getDeclaredMethod("getMap", int.class).invoke(rowData, index); ArrayData keyArray = mapData.keyArray(); ArrayData valueArray = mapData.valueArray(); Map mapInMap = new HashMap<>(); @@ -171,7 +198,8 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in return mapInMap; case ARRAY: ArrayType arrayType = (ArrayType) logicalType; - ArrayData arrayData = (ArrayData) dataClass.getDeclaredMethod("getArray", int.class).invoke(rowData, index); + ArrayData arrayData = (ArrayData) + dataClass.getDeclaredMethod("getArray", int.class).invoke(rowData, index); LogicalType elementType = arrayType.getElementType(); List list = new ArrayList<>(); for (int i = 0; i < arrayData.size(); i++) { @@ -180,7 +208,9 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in return list; case TIMESTAMP_WITH_LOCAL_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: - TimestampData timestampData = (TimestampData) dataClass.getDeclaredMethod("getTimestamp", int.class, int.class).invoke(rowData, index, ((TimestampType) logicalType).getPrecision()); + TimestampData timestampData = (TimestampData) dataClass + .getDeclaredMethod("getTimestamp", int.class, int.class) + .invoke(rowData, index, ((TimestampType) logicalType).getPrecision()); return timestampData.toTimestamp(); case DISTINCT_TYPE: DistinctType distinctType = (DistinctType) logicalType; diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/CatalogInfo.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/CatalogInfo.java index 1c55be073..e3ab855d0 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/CatalogInfo.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/CatalogInfo.java @@ -18,12 +18,12 @@ package cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; - import java.util.Map; import java.util.Set; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; + @Data @EqualsAndHashCode @Schema(name = "SqlGateway Catalog信息", description = "SqlGateway Catalog信息") diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/DatabaseInfo.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/DatabaseInfo.java index a7b9e7ba3..e6e13ed05 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/DatabaseInfo.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/DatabaseInfo.java @@ -18,12 +18,12 @@ package cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; - import java.util.Map; import java.util.Set; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; + @Data @EqualsAndHashCode @Schema(name = "SqlGateway 数据库信息", description = "SqlGateway 数据库信息") diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/FunctionInfo.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/FunctionInfo.java index e24c8c4b7..381efb619 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/FunctionInfo.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/FunctionInfo.java @@ -18,11 +18,12 @@ package cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; +import java.util.Map; + import org.apache.flink.table.functions.FunctionKind; -import java.util.Map; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; @Data @EqualsAndHashCode @@ -35,7 +36,8 @@ public class FunctionInfo { @Schema(description = "函数 名称") private String functionName; - @Schema(description = "函数 类型", + @Schema( + description = "函数 类型", allowableValues = {"SCALAR", "TABLE", "ASYNC_TABLE", "AGGREGATE", "TABLE_AGGREGATE", "OTHER"}) private FunctionKind functionKind; diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/TableInfo.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/TableInfo.java index 100a6df91..207dafc6a 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/TableInfo.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/catalog/TableInfo.java @@ -18,13 +18,14 @@ package cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; -import org.apache.flink.table.catalog.CatalogBaseTable; - import java.util.List; import java.util.Map; +import org.apache.flink.table.catalog.CatalogBaseTable; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; + @Data @EqualsAndHashCode @Schema(name = "SqlGateway 表信息", description = "SqlGateway 表信息") @@ -36,7 +37,9 @@ public class TableInfo { @Schema(description = "表 名称") private String tableName; - @Schema(description = "表 类型,TABLE: 表, VIEW: 视图", allowableValues = {"TABLE", "VIEW"}) + @Schema( + description = "表 类型,TABLE: 表, VIEW: 视图", + allowableValues = {"TABLE", "VIEW"}) private CatalogBaseTable.TableKind tableKind; @Schema(description = "表 结构") diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/CatalogServiceImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/CatalogServiceImpl.java new file mode 100644 index 000000000..1571b01bc --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/CatalogServiceImpl.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.services.impl; + +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.ContextResolvedFunction; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.table.gateway.api.results.FunctionInfo; +import org.apache.flink.table.gateway.api.results.TableInfo; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import cn.sliew.scaleph.engine.sql.gateway.services.CatalogService; +import cn.sliew.scaleph.engine.sql.gateway.services.SessionService; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; +import cn.sliew.scaleph.engine.sql.gateway.util.CatalogUtil; + +@Service +public class CatalogServiceImpl implements CatalogService { + + private final SessionService sessionService; + + @Autowired + public CatalogServiceImpl(SessionService sessionService) { + this.sessionService = sessionService; + } + + @Override + public Set getCatalogs(SessionHandle sessionHandle) throws SqlGatewayException { + FlinkSqlGatewaySession flinkSqlGatewaySession = sessionService.getSession(sessionHandle); + SessionContext sessionContext = flinkSqlGatewaySession.getSessionContext(); + SessionContext.SessionState sessionState = sessionContext.getSessionState(); + CatalogManager catalogManager = sessionState.catalogManager; + return catalogManager.listCatalogs().stream() + .map(catalogName -> CatalogUtil.createCatalogInfo(sessionContext, catalogName)) + .collect(Collectors.toSet()); + } + + @Override + public String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException { + FlinkSqlGatewaySession flinkSqlGatewaySession = sessionService.getSession(sessionHandle); + return flinkSqlGatewaySession + .getSessionContext() + .getSessionState() + .catalogManager + .getCurrentCatalog(); + } + + @Override + public Set listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException { + FlinkSqlGatewaySession flinkSqlGatewaySession = sessionService.getSession(sessionHandle); + return flinkSqlGatewaySession + .getSessionContext() + .getSessionState() + .catalogManager + .listCatalogs(); + } + + @Override + public Set listDatabases(SessionHandle sessionHandle, String catalogName) throws SqlGatewayException { + FlinkSqlGatewaySession flinkSqlGatewaySession = sessionService.getSession(sessionHandle); + List databases = flinkSqlGatewaySession + .getSessionContext() + .getSessionState() + .catalogManager + .getCatalogOrThrowException(catalogName) + .listDatabases(); + return new HashSet<>(databases); + } + + @Override + public Set listTables( + SessionHandle sessionHandle, + String catalogName, + String databaseName, + Set tableKinds) + throws SqlGatewayException { + FlinkSqlGatewaySession flinkSqlGatewaySession = sessionService.getSession(sessionHandle); + CatalogManager catalogManager = + flinkSqlGatewaySession.getSessionContext().getSessionState().catalogManager; + HashSet tableInfoSet = new HashSet<>(); + if (tableKinds.contains(CatalogBaseTable.TableKind.TABLE)) { + Set tables = catalogManager.listTables(catalogName, databaseName).stream() + .map(tableName -> { + ContextResolvedTable table = catalogManager.getTableOrError( + ObjectIdentifier.of(catalogName, databaseName, tableName)); + return new TableInfo(table.getIdentifier(), CatalogBaseTable.TableKind.TABLE); + }) + .collect(Collectors.toSet()); + tableInfoSet.addAll(tables); + } + if (tableKinds.contains(CatalogBaseTable.TableKind.VIEW)) { + Set views = catalogManager.listViews(catalogName, databaseName).stream() + .map(tableName -> { + ContextResolvedTable table = catalogManager.getTableOrError( + ObjectIdentifier.of(catalogName, databaseName, tableName)); + return new TableInfo(table.getIdentifier(), CatalogBaseTable.TableKind.VIEW); + }) + .collect(Collectors.toSet()); + tableInfoSet.addAll(views); + } + return tableInfoSet; + } + + @Override + public ResolvedCatalogBaseTable getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier) + throws SqlGatewayException { + FlinkSqlGatewaySession flinkSqlGatewaySession = sessionService.getSession(sessionHandle); + CatalogManager catalogManager = + flinkSqlGatewaySession.getSessionContext().getSessionState().catalogManager; + Catalog catalog = catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName()); + try { + ObjectPath tablePath = new ObjectPath(tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()); + CatalogBaseTable table = catalog.getTable(tablePath); + return catalogManager.resolveCatalogBaseTable(table); + } catch (TableNotExistException e) { + throw new RuntimeException(e); + } + } + + @Override + public Set listUserDefinedFunctions( + SessionHandle sessionHandle, String catalogName, String databaseName) throws SqlGatewayException { + FlinkSqlGatewaySession flinkSqlGatewaySession = sessionService.getSession(sessionHandle); + SessionContext.SessionState sessionState = + flinkSqlGatewaySession.getSessionContext().getSessionState(); + FunctionCatalog functionCatalog = sessionState.functionCatalog; + return functionCatalog.getUserDefinedFunctions(catalogName, databaseName).stream() + .map(FunctionInfo::new) + .collect(Collectors.toSet()); + } + + @Override + public Set listSystemFunctions(SessionHandle sessionHandle) throws SqlGatewayException { + FlinkSqlGatewaySession flinkSqlGatewaySession = sessionService.getSession(sessionHandle); + SessionContext.SessionState sessionState = + flinkSqlGatewaySession.getSessionContext().getSessionState(); + return sessionState.moduleManager.listFunctions().stream() + .map(functionName -> sessionState + .moduleManager + .getFunctionDefinition(functionName) + .map(functionDefinition -> + new FunctionInfo(FunctionIdentifier.of(functionName), functionDefinition.getKind())) + .orElse(new FunctionInfo(FunctionIdentifier.of(functionName)))) + .collect(Collectors.toSet()); + } + + @Override + public FunctionDefinition getFunctionDefinition( + SessionHandle sessionHandle, UnresolvedIdentifier functionIdentifier) throws SqlGatewayException { + FlinkSqlGatewaySession flinkSqlGatewaySession = sessionService.getSession(sessionHandle); + SessionContext.SessionState sessionState = + flinkSqlGatewaySession.getSessionContext().getSessionState(); + Optional contextResolvedFunction = + sessionState.functionCatalog.lookupFunction(functionIdentifier); + return contextResolvedFunction + .map(ContextResolvedFunction::getDefinition) + .orElse(null); + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/OperationServiceImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/OperationServiceImpl.java new file mode 100644 index 000000000..a556f7ee3 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/OperationServiceImpl.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.services.impl; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.stream.Collectors; + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.OperationInfo; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.operation.OperationManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import cn.sliew.scaleph.engine.sql.gateway.services.OperationService; +import cn.sliew.scaleph.engine.sql.gateway.services.SessionService; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; + +@Service +public class OperationServiceImpl implements OperationService { + + private final SessionService sessionService; + + @Autowired + public OperationServiceImpl(SessionService sessionService) { + this.sessionService = sessionService; + } + + @Override + public Set listOperations(SessionHandle sessionHandle) throws SqlGatewayException { + SessionContext sessionContext = sessionService.getSession(sessionHandle).getSessionContext(); + try { + OperationManager operationManager = sessionContext.getOperationManager(); + Class operationManagerClass = operationManager.getClass(); + Field field = operationManagerClass.getDeclaredField("submittedOperations"); + field.setAccessible(true); + Map map = + (Map) field.get(operationManager); + return map.values().stream() + .map(OperationManager.Operation::getOperationInfo) + .collect(Collectors.toSet()); + } catch (Exception e) { + throw new SqlGatewayException(e); + } + } + + @Override + public OperationHandle submitOperation(SessionHandle sessionHandle, Callable executor) + throws SqlGatewayException { + FlinkSqlGatewaySession session = sessionService.getSession(sessionHandle); + SessionContext sessionContext = session.getSessionContext(); + return sessionContext.getOperationManager().submitOperation(executor); + } + + @Override + public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException { + FlinkSqlGatewaySession session = sessionService.getSession(sessionHandle); + SessionContext sessionContext = session.getSessionContext(); + sessionContext.getOperationManager().cancelOperation(operationHandle); + } + + @Override + public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException { + FlinkSqlGatewaySession session = sessionService.getSession(sessionHandle); + SessionContext sessionContext = session.getSessionContext(); + sessionContext.getOperationManager().closeOperation(operationHandle); + } + + @Override + public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException { + FlinkSqlGatewaySession session = sessionService.getSession(sessionHandle); + SessionContext sessionContext = session.getSessionContext(); + return sessionContext.getOperationManager().getOperationInfo(operationHandle); + } + + @Override + public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle) + throws Exception { + FlinkSqlGatewaySession session = sessionService.getSession(sessionHandle); + SessionContext sessionContext = session.getSessionContext(); + return sessionContext.getOperationManager().getOperationResultSchema(operationHandle); + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/ResultFetcherServiceImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/ResultFetcherServiceImpl.java new file mode 100644 index 000000000..638686bb8 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/ResultFetcherServiceImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.services.impl; + +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.FetchOrientation; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.springframework.stereotype.Service; + +import cn.sliew.scaleph.engine.sql.gateway.services.ResultFetcherService; +import cn.sliew.scaleph.engine.sql.gateway.services.SessionService; + +@Service +public class ResultFetcherServiceImpl implements ResultFetcherService { + + private final SessionService sessionService; + + public ResultFetcherServiceImpl(SessionService sessionService) { + this.sessionService = sessionService; + } + + @Override + public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows) + throws SqlGatewayException { + SessionContext sessionContext = sessionService.getSession(sessionHandle).getSessionContext(); + return sessionContext.getOperationManager().fetchResults(operationHandle, token, maxRows); + } + + @Override + public ResultSet fetchResults( + SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) + throws SqlGatewayException { + SessionContext sessionContext = sessionService.getSession(sessionHandle).getSessionContext(); + return sessionContext.getOperationManager().fetchResults(operationHandle, orientation, maxRows); + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SessionServiceImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SessionServiceImpl.java index 3d3365b73..e6fff4d76 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SessionServiceImpl.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SessionServiceImpl.java @@ -18,19 +18,17 @@ package cn.sliew.scaleph.engine.sql.gateway.services.impl; -import cn.sliew.milky.common.util.JacksonUtil; -import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkSqlGatewaySession; -import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkSqlGatewaySessionMapper; -import cn.sliew.scaleph.engine.sql.gateway.services.SessionService; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; -import com.fasterxml.jackson.core.type.TypeReference; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.LoadingCache; -import com.github.benmanes.caffeine.cache.RemovalCause; -import lombok.extern.slf4j.Slf4j; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.gateway.api.endpoint.EndpointVersion; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.session.SessionEnvironment; @@ -47,13 +45,32 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; -import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.fasterxml.jackson.core.type.TypeReference; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.zaxxer.hikari.HikariDataSource; + +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkSqlGatewayCatalog; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkSqlGatewaySession; +import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkSqlGatewayCatalogMapper; +import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkSqlGatewaySessionMapper; +import cn.sliew.scaleph.engine.sql.gateway.services.SessionService; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; +import cn.sliew.scaleph.engine.sql.gateway.util.CatalogUtil; +import lombok.extern.slf4j.Slf4j; + +import static cn.sliew.scaleph.engine.sql.gateway.store.JdbcCatalogStoreOptions.DRIVER; +import static cn.sliew.scaleph.engine.sql.gateway.store.JdbcCatalogStoreOptions.JDBC_URL; +import static cn.sliew.scaleph.engine.sql.gateway.store.JdbcCatalogStoreOptions.PASSWORD; +import static cn.sliew.scaleph.engine.sql.gateway.store.JdbcCatalogStoreOptions.USERNAME; +import static cn.sliew.scaleph.engine.sql.gateway.store.ScalephCatalogStoreOptions.IDENTIFIER; +import static cn.sliew.scaleph.engine.sql.gateway.store.ScalephCatalogStoreOptions.SESSION_HANDLE; +import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND; +import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX; /** * org.apache.flink.table.gateway.service.session.SessionManager @@ -65,15 +82,22 @@ public class SessionServiceImpl implements SessionService, InitializingBean, Dis private ExecutorService operationExecutorService; private DefaultContext defaultContext; - private LoadingCache sessions = Caffeine.newBuilder() - .maximumSize(100) - .expireAfterAccess(Duration.ofDays(3L)) - .evictionListener((SessionHandle sessionHandle, FlinkSqlGatewaySession session, RemovalCause removalCause) -> doCloseSession(sessionHandle, session, removalCause)) - .build(sessionHandle -> doGetSession(sessionHandle)); - @Autowired private WsFlinkSqlGatewaySessionMapper wsFlinkSqlGatewaySessionMapper; + @Autowired + private WsFlinkSqlGatewayCatalogMapper wsFlinkSqlGatewayCatalogMapper; + + @Autowired + private HikariDataSource dataSource; + + // TODO Initialize cache by settings + private final LoadingCache sessions = Caffeine.newBuilder() + .maximumSize(100) + .expireAfterAccess(Duration.ofDays(3L)) + .evictionListener(this::doCloseSession) + .build(this::doGetSession); + /** * session 的配置 2 级:sql gateway 实例级和 session 级别 * DefaultContext 相当于实例级别,而 sessionconfig 相当于 session 级别 @@ -81,10 +105,13 @@ public class SessionServiceImpl implements SessionService, InitializingBean, Dis @Override public void afterPropertiesSet() throws Exception { this.operationExecutorService = Executors.newFixedThreadPool(4); - this.defaultContext = DefaultContext.load(new Configuration(), Collections.emptyList(), false, false); + this.defaultContext = new DefaultContext(new Configuration(), Collections.emptyList()); // 加载所有的 session - List wsFlinkSqlGatewaySessions = wsFlinkSqlGatewaySessionMapper.selectList(Wrappers.emptyWrapper()); - wsFlinkSqlGatewaySessions.stream().map(this::convertSession).forEach(session -> sessions.put(session.getSessionHandle(), session)); + List wsFlinkSqlGatewaySessions = + wsFlinkSqlGatewaySessionMapper.selectList(Wrappers.emptyWrapper()); + wsFlinkSqlGatewaySessions.stream() + .map(this::convertSession) + .forEach(session -> sessions.put(session.getSessionHandle(), session)); } @Override @@ -92,6 +119,7 @@ public void destroy() throws Exception { sessions.cleanUp(); } + @Override public FlinkSqlGatewaySession getSession(SessionHandle sessionHandle) throws SqlGatewayException { return sessions.get(sessionHandle); } @@ -127,9 +155,9 @@ public FlinkSqlGatewaySession doOpenSession(SessionEnvironment environment) thro WsFlinkSqlGatewaySession record = new WsFlinkSqlGatewaySession(); record.setSessionHandler(sessionId.toString()); - environment.getSessionName().ifPresent(sessionName -> record.setSessionName(sessionName)); - environment.getDefaultCatalog().ifPresent(defaultCatalog -> record.setDefaultCatalog(defaultCatalog)); - if (CollectionUtils.isEmpty(environment.getSessionConfig()) == false) { + environment.getSessionName().ifPresent(record::setSessionName); + environment.getDefaultCatalog().ifPresent(record::setDefaultCatalog); + if (CollectionUtils.isEmpty(environment.getSessionConfig())) { record.setSessionConfig(JacksonUtil.toJsonString(environment.getSessionConfig())); } wsFlinkSqlGatewaySessionMapper.insert(record); @@ -143,7 +171,8 @@ public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException sessions.invalidate(sessionHandle); } - public void doCloseSession(SessionHandle sessionHandle, FlinkSqlGatewaySession session, RemovalCause removalCause) throws SqlGatewayException { + public void doCloseSession(SessionHandle sessionHandle, FlinkSqlGatewaySession session, RemovalCause removalCause) + throws SqlGatewayException { switch (removalCause) { case EXPLICIT: case SIZE: @@ -152,7 +181,8 @@ public void doCloseSession(SessionHandle sessionHandle, FlinkSqlGatewaySession s return; case EXPIRED: session.close(); - LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(WsFlinkSqlGatewaySession.class) + LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery( + WsFlinkSqlGatewaySession.class) .eq(WsFlinkSqlGatewaySession::getSessionHandler, sessionHandle.toString()); wsFlinkSqlGatewaySessionMapper.delete(queryWrapper); log.info("Session: {} is closed.", sessionHandle); @@ -165,22 +195,32 @@ public void doCloseSession(SessionHandle sessionHandle, FlinkSqlGatewaySession s * forked from SqlGatewayServiceImpl */ @Override - public void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException { + public void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) + throws SqlGatewayException { try { if (executionTimeoutMs > 0) { // TODO: support the feature in FLINK-27838 - throw new UnsupportedOperationException( - "SqlGatewayService doesn't support timeout mechanism now."); + throw new UnsupportedOperationException("SqlGatewayService doesn't support timeout mechanism now."); } - OperationManager operationManager = getSession(sessionHandle).getSessionContext().getOperationManager(); - OperationHandle operationHandle = - operationManager.submitOperation( - handle -> - getSession(sessionHandle).getSessionContext().createOperationExecutor(getSession(sessionHandle).getSessionContext().getSessionConf()) - .configureSession(handle, statement)); + OperationManager operationManager = + getSession(sessionHandle).getSessionContext().getOperationManager(); + OperationHandle operationHandle = operationManager.submitOperation(handle -> getSession(sessionHandle) + .getSessionContext() + .createOperationExecutor( + getSession(sessionHandle).getSessionContext().getSessionConf()) + .configureSession(handle, statement)); operationManager.awaitOperationTermination(operationHandle); operationManager.closeOperation(operationHandle); + + WsFlinkSqlGatewaySession session = + wsFlinkSqlGatewaySessionMapper.selectOne(Wrappers.lambdaQuery(WsFlinkSqlGatewaySession.class) + .eq(WsFlinkSqlGatewaySession::getSessionHandler, sessionHandle.toString())); + session.setSessionConfig(JacksonUtil.toJsonString(getSession(sessionHandle) + .getSessionContext() + .getSessionConf() + .toMap())); + wsFlinkSqlGatewaySessionMapper.updateById(session); } catch (Throwable t) { log.error("Failed to configure session.", t); throw new SqlGatewayException("Failed to configure session.", t); @@ -202,21 +242,45 @@ private FlinkSqlGatewaySession convertSession(WsFlinkSqlGatewaySession record) { SessionHandle sessionId = new SessionHandle(UUID.fromString(record.getSessionHandler())); session.setSessionHandle(sessionId); - Map sessionConfig = Collections.emptyMap(); + Map sessionConfig = new HashMap<>(); if (StringUtils.hasText(record.getSessionConfig())) { - sessionConfig = JacksonUtil.parseJsonString(record.getSessionConfig(), new TypeReference>() { - }); + sessionConfig = JacksonUtil.parseJsonString( + record.getSessionConfig(), new TypeReference>() {}); } + // Set catalog store configuration + final String catalogStoreOptionPrefix = TABLE_CATALOG_STORE_OPTION_PREFIX + IDENTIFIER + "."; + sessionConfig.put(TABLE_CATALOG_STORE_KIND.key(), IDENTIFIER); + sessionConfig.put(catalogStoreOptionPrefix + SESSION_HANDLE.key(), sessionId.toString()); + sessionConfig.put(catalogStoreOptionPrefix + DRIVER.key(), dataSource.getDriverClassName()); + sessionConfig.put(catalogStoreOptionPrefix + JDBC_URL.key(), dataSource.getJdbcUrl()); + sessionConfig.put(catalogStoreOptionPrefix + USERNAME.key(), dataSource.getUsername()); + sessionConfig.put(catalogStoreOptionPrefix + PASSWORD.key(), dataSource.getPassword()); + session.setSessionConfig(sessionConfig); - SessionEnvironment environment = SessionEnvironment.newBuilder() + SessionEnvironment.Builder sessionEnvBuilder = SessionEnvironment.newBuilder() .setSessionName(record.getSessionName()) - .setDefaultCatalog(record.getDefaultCatalog()) - .addSessionConfig(sessionConfig) - .build(); - SessionContext sessionContext = SessionContext.create(defaultContext, sessionId, environment, operationExecutorService); + .addSessionConfig(sessionConfig); + + // NOTE: If default catalog is set but not registered, + // error will occurs in SessionContext creation. + String defaultCatalog = record.getDefaultCatalog(); + if (StringUtils.hasText(defaultCatalog)) { + sessionEnvBuilder.setDefaultCatalog(defaultCatalog); + WsFlinkSqlGatewayCatalog wsFlinkSqlGatewayCatalog = + wsFlinkSqlGatewayCatalogMapper.selectOne(Wrappers.lambdaQuery(WsFlinkSqlGatewayCatalog.class) + .eq(WsFlinkSqlGatewayCatalog::getSessionHandler, record.getSessionHandler()) + .eq(WsFlinkSqlGatewayCatalog::getCatalogName, defaultCatalog)); + if (wsFlinkSqlGatewayCatalog != null) { + Catalog catalog = CatalogUtil.createCatalog(wsFlinkSqlGatewayCatalog, sessionConfig, null); + sessionEnvBuilder.registerCatalog(wsFlinkSqlGatewayCatalog.getCatalogName(), catalog); + } + } + + SessionEnvironment environment = sessionEnvBuilder.build(); + SessionContext sessionContext = + SessionContext.create(defaultContext, sessionId, environment, operationExecutorService); session.setSessionContext(sessionContext); return session; } - } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SqlServiceImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SqlServiceImpl.java new file mode 100644 index 000000000..eca5046ba --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SqlServiceImpl.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.services.impl; + +import java.util.List; + +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.BasicSqlType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.operation.OperationExecutor; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.operations.SinkModifyOperation; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import cn.sliew.scaleph.engine.sql.gateway.services.SessionService; +import cn.sliew.scaleph.engine.sql.gateway.services.SqlService; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; +import lombok.extern.slf4j.Slf4j; + +@Service +@Slf4j +public class SqlServiceImpl implements SqlService { + + private final SessionService sessionService; + + @Autowired + public SqlServiceImpl(SessionService sessionService) { + this.sessionService = sessionService; + } + + @Override + public Object validate(SessionHandle sessionHandle, String statement, Configuration executionConfig) + throws Exception { + TableEnvironmentInternal tableEnvironment = sessionService + .getSession(sessionHandle) + .getSessionContext() + .createOperationExecutor(executionConfig) + .getTableEnvironment(); + Parser parser = tableEnvironment.getParser(); + List operations = parser.parse(statement); + return !operations.isEmpty(); + } + + @Override + public List completeStatement(SessionHandle sessionHandle, String statement, int position) + throws SqlGatewayException { + FlinkSqlGatewaySession session = sessionService.getSession(sessionHandle); + SessionContext sessionContext = session.getSessionContext(); + OperationExecutor operationExecutor = sessionContext.createOperationExecutor(sessionContext.getSessionConf()); + return List.of(operationExecutor.getTableEnvironment().getParser().getCompletionHints(statement, position)); + } + + @Override + public OperationHandle executeStatement( + SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfig) + throws SqlGatewayException { + try { + if (executionTimeoutMs > 0) { + // TODO: support the feature in FLINK-27838 + throw new UnsupportedOperationException("SqlGatewayService doesn't support timeout mechanism now."); + } + + return sessionService + .getSession(sessionHandle) + .getSessionContext() + .getOperationManager() + .submitOperation(handle -> sessionService + .getSession(sessionHandle) + .getSessionContext() + .createOperationExecutor(executionConfig) + .executeStatement(handle, statement)); + } catch (Throwable t) { + log.error("Failed to execute statement.", t); + throw new SqlGatewayException("Failed to execute statement.", t); + } + } + + @Override + public OperationHandle previewStatement( + SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfig) + throws SqlGatewayException { + if (executionTimeoutMs > 0) { + throw new UnsupportedOperationException("SqlGatewayService doesn't support timeout mechanism now."); + } + return previewStatement(sessionHandle, statement, executionConfig, -1); + } + + @Override + public OperationHandle previewStatement( + SessionHandle sessionHandle, String statement, Configuration executionConfig, long limit) + throws SqlGatewayException { + FlinkSqlGatewaySession session = sessionService.getSession(sessionHandle); + SessionContext sessionContext = session.getSessionContext(); + Configuration sessionConf = new Configuration(sessionContext.getSessionConf()); + sessionConf.addAll(sessionConf); + TableEnvironmentInternal tableEnvironment = + sessionContext.createOperationExecutor(sessionConf).getTableEnvironment(); + QueryOperation queryOperation = parseSqlToQueryOperation(tableEnvironment, statement, limit); + return sessionContext.getOperationManager().submitOperation(operationHandle -> { + TableResultInternal tableResultInternal = tableEnvironment.executeInternal(queryOperation); + return ResultFetcher.fromTableResult(operationHandle, tableResultInternal, true); + }); + } + + private QueryOperation parseSqlToQueryOperation(TableEnvironmentInternal tEnv, String sql, long limitation) { + Parser parser = tEnv.getParser(); + List operations = parser.parse(sql); + if (operations.size() == 1) { + Operation operation = operations.get(0); + QueryOperation queryOperation; + if (operation instanceof SinkModifyOperation) { + queryOperation = ((SinkModifyOperation) operation).getChild(); + } else if (operation instanceof QueryOperation) { + queryOperation = (QueryOperation) operation; + } else { + throw new IllegalArgumentException("Only `SELECT` and `INSERT` statement is supported!"); + } + if (limitation > 0 && queryOperation instanceof PlannerQueryOperation) { + PlannerQueryOperation plannerQueryOperation = (PlannerQueryOperation) queryOperation; + RelNode calciteTree = plannerQueryOperation.getCalciteTree(); + RexNode fetch = new RexBuilder( + new FlinkTypeFactory(tEnv.getClass().getClassLoader(), RelDataTypeSystem.DEFAULT)) + .makeLiteral(limitation, new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DECIMAL)); + LogicalSort logicalSort = LogicalSort.create(calciteTree, RelCollations.EMPTY, null, fetch); + return new PlannerQueryOperation(logicalSort); + } + return queryOperation; + } + throw new IllegalArgumentException("Only one statement should appear"); + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java index 6b8ff9c1c..2471a52af 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java @@ -18,30 +18,6 @@ package cn.sliew.scaleph.engine.sql.gateway.services.impl; -import cn.sliew.scaleph.common.util.SystemUtil; -import cn.sliew.scaleph.engine.sql.gateway.services.param.WsFlinkSqlGatewayQueryParam; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; -import cn.sliew.scaleph.engine.sql.gateway.exception.ScalephSqlGatewayNotFoundException; -import cn.sliew.scaleph.engine.sql.gateway.internal.ScalephCatalogManager; -import cn.sliew.scaleph.engine.sql.gateway.services.WsFlinkSqlGatewayService; -import cn.sliew.scaleph.kubernetes.service.KubernetesService; -import cn.sliew.scaleph.resource.service.ClusterCredentialService; -import cn.sliew.scaleph.resource.service.JarService; -import cn.sliew.scaleph.resource.service.dto.ClusterCredentialDTO; -import cn.sliew.scaleph.resource.service.dto.JarDTO; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DeploymentOptions; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; -import org.apache.flink.table.gateway.api.operation.OperationHandle; -import org.apache.flink.table.gateway.api.results.FetchOrientation; -import org.apache.flink.table.gateway.api.results.GatewayInfo; -import org.apache.flink.table.gateway.api.results.ResultSet; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.util.StringUtils; - import java.io.IOException; import java.io.OutputStream; import java.net.URI; @@ -55,6 +31,31 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.FetchOrientation; +import org.apache.flink.table.gateway.api.results.GatewayInfo; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import cn.sliew.scaleph.common.util.SystemUtil; +import cn.sliew.scaleph.engine.sql.gateway.exception.ScalephSqlGatewayNotFoundException; +import cn.sliew.scaleph.engine.sql.gateway.internal.ScalephCatalogManager; +import cn.sliew.scaleph.engine.sql.gateway.services.WsFlinkSqlGatewayService; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; +import cn.sliew.scaleph.engine.sql.gateway.services.param.WsFlinkSqlGatewayQueryParam; +import cn.sliew.scaleph.kubernetes.service.KubernetesService; +import cn.sliew.scaleph.resource.service.ClusterCredentialService; +import cn.sliew.scaleph.resource.service.JarService; +import cn.sliew.scaleph.resource.service.dto.ClusterCredentialDTO; +import cn.sliew.scaleph.resource.service.dto.JarDTO; +import lombok.extern.slf4j.Slf4j; + @Slf4j @Service public class WsFlinkSqlGatewayServiceImpl implements WsFlinkSqlGatewayService { @@ -63,16 +64,17 @@ public class WsFlinkSqlGatewayServiceImpl implements WsFlinkSqlGatewayService { * Store {@link ScalephCatalogManager}s in this map.
* In case multi {@link ScalephCatalogManager}s can be enabled in the future */ - private static final Map CATALOG_MANAGER_MAP = - new ConcurrentHashMap<>(); + private static final Map CATALOG_MANAGER_MAP = new ConcurrentHashMap<>(); + private final KubernetesService kubernetesService; private final ClusterCredentialService clusterCredentialService; private final JarService jarService; @Autowired - public WsFlinkSqlGatewayServiceImpl(KubernetesService kubernetesService, - ClusterCredentialService clusterCredentialService, - JarService jarService) { + public WsFlinkSqlGatewayServiceImpl( + KubernetesService kubernetesService, + ClusterCredentialService clusterCredentialService, + JarService jarService) { this.kubernetesService = kubernetesService; this.clusterCredentialService = clusterCredentialService; this.jarService = jarService; @@ -178,11 +180,10 @@ public String executeSql(String clusterId, WsFlinkSqlGatewayQueryParam params) { * @return */ @Override - public ResultSet fetchResults(String clusterId, - String operationHandleId, - Long token, int maxRows) { + public ResultSet fetchResults(String clusterId, String operationHandleId, Long token, int maxRows) { OperationHandle operationHandle = new OperationHandle(UUID.fromString(operationHandleId)); - ScalephCatalogManager catalogManager = getCatalogManager(clusterId).orElseThrow(ScalephSqlGatewayNotFoundException::new); + ScalephCatalogManager catalogManager = + getCatalogManager(clusterId).orElseThrow(ScalephSqlGatewayNotFoundException::new); ResultSet resultSet; if (token == null || token < 0) { resultSet = catalogManager.fetchResults(operationHandle, FetchOrientation.FETCH_NEXT, maxRows); @@ -214,7 +215,8 @@ public List completeStatement(String clusterId, String statement, int po @Override public Boolean addDependencies(String clusterId, List jarIdList) { try { - List jars = jarIdList.stream().map(jarId -> { + List jars = jarIdList.stream() + .map(jarId -> { JarDTO jarDTO = jarService.getRaw(jarId); try { Path localPath = SystemUtil.getLocalStorageDir().resolve("jars"); @@ -233,7 +235,8 @@ public Boolean addDependencies(String clusterId, List jarIdList) { } catch (IOException e) { throw new RuntimeException(e); } - }).map(Path::toUri) + }) + .map(Path::toUri) .collect(Collectors.toList()); getCatalogManager(clusterId) .orElseThrow(ScalephSqlGatewayNotFoundException::new) diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayCreateCatalogParam.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayCreateCatalogParam.java index a25145c10..73868d985 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayCreateCatalogParam.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayCreateCatalogParam.java @@ -18,14 +18,14 @@ package cn.sliew.scaleph.engine.sql.gateway.services.param; +import java.util.Map; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotEmpty; + import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import lombok.EqualsAndHashCode; -import javax.validation.constraints.NotBlank; -import javax.validation.constraints.NotEmpty; -import java.util.Map; - @Data @EqualsAndHashCode @Schema(name = "SqlGateway创建Catalog的参数", description = "SqlGateway创建Catalog的参数") diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayQueryParam.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayQueryParam.java index 7551c27d5..74d18ce45 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayQueryParam.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayQueryParam.java @@ -18,20 +18,19 @@ package cn.sliew.scaleph.engine.sql.gateway.services.param; +import java.util.Map; +import javax.validation.constraints.NotNull; + import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import lombok.EqualsAndHashCode; -import javax.validation.constraints.NotNull; -import java.util.Map; - @Data @EqualsAndHashCode @Schema(name = "SqlGateway执行Sql的参数", description = "SqlGateway执行Sql的参数") public class WsFlinkSqlGatewayQueryParam { - @NotNull - @Schema(description = "sql") + @NotNull @Schema(description = "sql") private String sql; @Schema(description = "配置参数") diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/JdbcCatalogStore.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/JdbcCatalogStore.java new file mode 100644 index 000000000..c20259d10 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/JdbcCatalogStore.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.store; + +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; + +import com.baomidou.mybatisplus.core.conditions.Wrapper; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.zaxxer.hikari.HikariDataSource; + +import cn.sliew.scaleph.engine.sql.gateway.util.CatalogStoreDataSourceUtil; + +public abstract class JdbcCatalogStore> implements CatalogStore { + + private final String driver; + private final String jdbcUrl; + private final String username; + + private final String password; + + private HikariDataSource dataSource; + + private SqlSessionFactory sqlSessionFactory; + + public JdbcCatalogStore(String driver, String jdbcUrl, String username, String password) { + this.driver = driver; + this.jdbcUrl = jdbcUrl; + this.username = username; + this.password = password; + } + + public abstract String[] getMapperPackages(); + + public abstract Class getMapperClass(); + + public abstract String getCatalogName(DTO dto); + + public abstract DTO fromDescriptor(String catalogName, CatalogDescriptor catalogDescriptor); + + public abstract CatalogDescriptor toDescriptor(DTO dto); + + public abstract Wrapper buildWrapper(String catalogName); + + protected R doAction(Function action) { + try (SqlSession sqlSession = sqlSessionFactory.openSession()) { + R result = action.apply(sqlSession.getMapper(getMapperClass())); + sqlSession.commit(); + return result; + } + } + + protected void doAction(Consumer consumer) { + try (SqlSession sqlSession = sqlSessionFactory.openSession()) { + MAPPER mapper = sqlSession.getMapper(getMapperClass()); + consumer.accept(mapper); + sqlSession.commit(); + } + } + + @Override + public void open() throws CatalogException { + this.dataSource = CatalogStoreDataSourceUtil.createDataSource(driver, jdbcUrl, username, password); + this.sqlSessionFactory = CatalogStoreDataSourceUtil.createSqlSessionFactory(dataSource, getMapperPackages()); + } + + @Override + public void close() throws CatalogException { + if (this.dataSource != null) { + this.dataSource.close(); + } + } + + @Override + public void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException { + doAction(mapper -> { + mapper.insert(fromDescriptor(catalogName, catalog)); + }); + } + + @Override + public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException { + if (!contains(catalogName) && !ignoreIfNotExists) { + throw new CatalogException("Catalog " + catalogName + " not exists!"); + } + doAction(mapper -> { + Wrapper wrapper = buildWrapper(catalogName); + mapper.delete(wrapper); + }); + } + + @Override + public Optional getCatalog(String catalogName) throws CatalogException { + DTO dto = doAction((Function) mapper -> mapper.selectOne(buildWrapper(catalogName))); + if (dto == null) { + return Optional.empty(); + } + return Optional.ofNullable(toDescriptor(dto)); + } + + @Override + public Set listCatalogs() throws CatalogException { + return doAction(mapper -> { + Wrapper wrapper = buildWrapper(null); + return mapper.selectList(wrapper).stream().map(this::getCatalogName).collect(Collectors.toSet()); + }); + } + + @Override + public boolean contains(String catalogName) throws CatalogException { + return doAction(mapper -> { + return mapper.exists(buildWrapper(catalogName)); + }); + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/JdbcCatalogStoreOptions.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/JdbcCatalogStoreOptions.java new file mode 100644 index 000000000..8a2884008 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/JdbcCatalogStoreOptions.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.store; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class JdbcCatalogStoreOptions { + + public static final ConfigOption DRIVER = + ConfigOptions.key("driver").stringType().noDefaultValue(); + + public static final ConfigOption JDBC_URL = + ConfigOptions.key("jdbcUrl").stringType().noDefaultValue(); + + public static final ConfigOption USERNAME = + ConfigOptions.key("username").stringType().noDefaultValue(); + + public static final ConfigOption PASSWORD = + ConfigOptions.key("password").stringType().noDefaultValue(); +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStore.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStore.java new file mode 100644 index 000000000..09646f4e6 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStore.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.store; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.gateway.api.session.SessionHandle; + +import com.baomidou.mybatisplus.core.conditions.Wrapper; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.fasterxml.jackson.core.type.TypeReference; + +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkSqlGatewayCatalog; +import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkSqlGatewayCatalogMapper; + +public class ScalephCatalogStore extends JdbcCatalogStore { + + private final SessionHandle sessionHandle; + + public ScalephCatalogStore( + SessionHandle sessionHandle, String driver, String jdbcUrl, String username, String password) { + super(driver, jdbcUrl, username, password); + this.sessionHandle = sessionHandle; + } + + @Override + public String[] getMapperPackages() { + return new String[] {"cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewayCatalogMapper.xml"}; + } + + @Override + public Class getMapperClass() { + return WsFlinkSqlGatewayCatalogMapper.class; + } + + @Override + public String getCatalogName(WsFlinkSqlGatewayCatalog wsFlinkSqlGatewayCatalog) { + return wsFlinkSqlGatewayCatalog.getCatalogName(); + } + + @Override + public WsFlinkSqlGatewayCatalog fromDescriptor(String catalogName, CatalogDescriptor catalogDescriptor) { + Map map = catalogDescriptor.getConfiguration().toMap(); + return WsFlinkSqlGatewayCatalog.builder() + .sessionHandler(this.sessionHandle.toString()) + .catalogDescription(JacksonUtil.toJsonString(map)) + .build(); + } + + @Override + public CatalogDescriptor toDescriptor(WsFlinkSqlGatewayCatalog wsFlinkSqlGatewayCatalog) { + String catalogOptions = wsFlinkSqlGatewayCatalog.getCatalogOptions(); + Map optionsMap = + JacksonUtil.parseJsonString(catalogOptions, new TypeReference>() {}); + return CatalogDescriptor.of(wsFlinkSqlGatewayCatalog.getCatalogName(), Configuration.fromMap(optionsMap)); + } + + @Override + public Wrapper buildWrapper(String catalogName) { + LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(WsFlinkSqlGatewayCatalog.class) + .eq(WsFlinkSqlGatewayCatalog::getSessionHandler, sessionHandle.toString()); + if (catalogName != null) { + wrapper.eq(WsFlinkSqlGatewayCatalog::getCatalogName, catalogName); + } + return wrapper; + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStoreFactory.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStoreFactory.java new file mode 100644 index 000000000..b6327055b --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStoreFactory.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.store; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.CatalogStoreFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.gateway.api.session.SessionHandle; + +import static cn.sliew.scaleph.engine.sql.gateway.store.ScalephCatalogStoreOptions.DRIVER; +import static cn.sliew.scaleph.engine.sql.gateway.store.ScalephCatalogStoreOptions.IDENTIFIER; +import static cn.sliew.scaleph.engine.sql.gateway.store.ScalephCatalogStoreOptions.JDBC_URL; +import static cn.sliew.scaleph.engine.sql.gateway.store.ScalephCatalogStoreOptions.PASSWORD; +import static cn.sliew.scaleph.engine.sql.gateway.store.ScalephCatalogStoreOptions.SESSION_HANDLE; +import static cn.sliew.scaleph.engine.sql.gateway.store.ScalephCatalogStoreOptions.USERNAME; + +public class ScalephCatalogStoreFactory implements CatalogStoreFactory { + + private SessionHandle sessionHandle; + private String driver; + private String jdbcUrl; + private String username; + private String password; + + @Override + public CatalogStore createCatalogStore() { + return new ScalephCatalogStore(sessionHandle, driver, jdbcUrl, username, password); + } + + @Override + public void open(Context context) throws CatalogException { + FactoryUtil.CatalogStoreFactoryHelper catalogStoreFactoryHelper = + FactoryUtil.createCatalogStoreFactoryHelper(this, context); + catalogStoreFactoryHelper.validate(); + ReadableConfig options = catalogStoreFactoryHelper.getOptions(); + String sessionHandleStr = options.get(SESSION_HANDLE); + this.sessionHandle = new SessionHandle(UUID.fromString(sessionHandleStr)); + this.driver = options.get(DRIVER); + this.jdbcUrl = options.get(JDBC_URL); + this.username = options.get(USERNAME); + this.password = options.get(PASSWORD); + } + + @Override + public void close() throws CatalogException {} + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(SESSION_HANDLE); + options.add(DRIVER); + options.add(JDBC_URL); + options.add(USERNAME); + options.add(PASSWORD); + return options; + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStoreOptions.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStoreOptions.java new file mode 100644 index 000000000..5acbdefa6 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/store/ScalephCatalogStoreOptions.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.store; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class ScalephCatalogStoreOptions extends JdbcCatalogStoreOptions { + + public static final String IDENTIFIER = "scaleph"; + + public static final ConfigOption SESSION_HANDLE = + ConfigOptions.key("session-handle").stringType().noDefaultValue(); +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/util/CatalogStoreDataSourceUtil.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/util/CatalogStoreDataSourceUtil.java new file mode 100644 index 000000000..47d7e34a4 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/util/CatalogStoreDataSourceUtil.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.util; + +import java.io.InputStream; +import java.util.Date; +import javax.sql.DataSource; + +import org.apache.ibatis.builder.xml.XMLMapperBuilder; +import org.apache.ibatis.io.Resources; +import org.apache.ibatis.logging.slf4j.Slf4jImpl; +import org.apache.ibatis.mapping.Environment; +import org.apache.ibatis.reflection.MetaObject; +import org.apache.ibatis.scripting.LanguageDriverRegistry; +import org.apache.ibatis.session.SqlSessionFactory; +import org.apache.ibatis.session.defaults.DefaultSqlSessionFactory; +import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory; + +import com.baomidou.mybatisplus.core.MybatisConfiguration; +import com.baomidou.mybatisplus.core.MybatisXMLLanguageDriver; +import com.baomidou.mybatisplus.core.config.GlobalConfig; +import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler; +import com.baomidou.mybatisplus.core.handlers.MybatisEnumTypeHandler; +import com.baomidou.mybatisplus.core.toolkit.GlobalConfigUtils; +import com.zaxxer.hikari.HikariDataSource; + +public class CatalogStoreDataSourceUtil { + + private CatalogStoreDataSourceUtil() {} + + public static HikariDataSource createDataSource(String driver, String jdbcUrl, String username, String password) { + HikariDataSource dataSource = new HikariDataSource(); + dataSource.setDriverClassName(driver); + dataSource.setJdbcUrl(jdbcUrl); + dataSource.setUsername(username); + dataSource.setPassword(password); + dataSource.setMaximumPoolSize(20); + dataSource.setConnectionTimeout(100000); + dataSource.setMinimumIdle(1); + dataSource.setIdleTimeout(60000); + dataSource.setConnectionInitSql("SELECT 1 FROM DUAL"); + dataSource.setConnectionTestQuery("SELECT 1 FROM DUAL"); + return dataSource; + } + + public static SqlSessionFactory createSqlSessionFactory(DataSource dataSource, String... mapperPackages) { + try { + MybatisConfiguration configuration = new MybatisConfiguration(); + LanguageDriverRegistry languageRegistry = configuration.getLanguageRegistry(); + languageRegistry.register(MybatisXMLLanguageDriver.class); + languageRegistry.setDefaultDriverClass(MybatisXMLLanguageDriver.class); + configuration.setDefaultEnumTypeHandler(MybatisEnumTypeHandler.class); + configuration.setMapUnderscoreToCamelCase(true); + configuration.setLogImpl(Slf4jImpl.class); + Environment environment = new Environment("CatalogStore", new JdbcTransactionFactory(), dataSource); + configuration.setEnvironment(environment); + configuration.setCacheEnabled(false); + for (String mapperPackage : mapperPackages) { + try (InputStream inputStream = Resources.getResourceAsStream(mapperPackage)) { + new XMLMapperBuilder(inputStream, configuration, mapperPackage, configuration.getSqlFragments()) + .parse(); + } + } + GlobalConfig globalConfig = GlobalConfigUtils.getGlobalConfig(configuration); + globalConfig.setMetaObjectHandler(new MetaObjectHandler() { + public void insertFill(MetaObject metaObject) { + this.strictInsertFill(metaObject, "createTime", Date::new, Date.class); + this.strictInsertFill(metaObject, "updateTime", Date::new, Date.class); + } + + public void updateFill(MetaObject metaObject) { + this.strictUpdateFill(metaObject, "updateTime", Date::new, Date.class); + } + }); + return new DefaultSqlSessionFactory(configuration); + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/util/CatalogUtil.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/util/CatalogUtil.java new file mode 100644 index 000000000..bd81fa7c5 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/util/CatalogUtil.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.util; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.gateway.service.context.SessionContext; + +import com.fasterxml.jackson.core.type.TypeReference; + +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkSqlGatewayCatalog; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.ColumnInfo; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.DatabaseInfo; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.FunctionInfo; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.TableInfo; + +public class CatalogUtil { + + public static Catalog createCatalog( + WsFlinkSqlGatewayCatalog wsFlinkSqlGatewayCatalog, + Map sessionConfig, + ClassLoader classLoader) { + String catalogName = wsFlinkSqlGatewayCatalog.getCatalogName(); + Map options = JacksonUtil.parseJsonString( + wsFlinkSqlGatewayCatalog.getCatalogOptions(), new TypeReference>() {}); + return FactoryUtil.createCatalog( + catalogName, + options, + Configuration.fromMap(sessionConfig), + // TODO Classloader should load session added jars. + classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader); + } + + public static CatalogInfo createCatalogInfo(SessionContext sessionContext, String catalogName) { + Catalog catalog = sessionContext.getSessionState().catalogManager.getCatalogOrThrowException(catalogName); + CatalogInfo.CatalogInfoBuilder catalogInfoBuilder = CatalogInfo.builder(); + catalogInfoBuilder.catalogName(catalogName); + catalog.getFactory().ifPresent(factory -> { + Map properties = new HashMap<>(); + properties.put("factory", factory.factoryIdentifier()); + catalogInfoBuilder.properties(properties); + }); + Set databaseInfos = catalog.listDatabases().stream() + .map(databaseName -> createDatabaseInfo(sessionContext, catalogName, databaseName)) + .collect(Collectors.toSet()); + catalogInfoBuilder.databases(databaseInfos); + return catalogInfoBuilder.build(); + } + + public static DatabaseInfo createDatabaseInfo( + SessionContext sessionContext, String catalogName, String databaseName) { + SessionContext.SessionState sessionState = sessionContext.getSessionState(); + CatalogManager catalogManager = sessionState.catalogManager; + FunctionCatalog functionCatalog = sessionState.functionCatalog; + Catalog catalog = catalogManager.getCatalogOrThrowException(catalogName); + DatabaseInfo.DatabaseInfoBuilder databaseInfoBuilder = DatabaseInfo.builder(); + databaseInfoBuilder.databaseName(databaseName); + try { + CatalogDatabase database = catalog.getDatabase(databaseName); + database.getDescription().ifPresent(databaseInfoBuilder::description); + databaseInfoBuilder.comment(database.getComment()); + Set views = catalogManager.listViews().stream() + .map(viewName -> createTableInfo(sessionContext, catalogName, databaseName, viewName)) + .collect(Collectors.toSet()); + Set tables = catalogManager.listTables(catalogName, databaseName).stream() + .map(tableName -> createTableInfo(sessionContext, catalogName, databaseName, tableName)) + .collect(Collectors.toSet()); + Set userDefinedFunctions = + functionCatalog.getUserDefinedFunctions(catalogName, databaseName).stream() + .flatMap(functionIdentifier -> { + ObjectIdentifier objectIdentifier = functionIdentifier + .getIdentifier() + .orElse(ObjectIdentifier.of( + catalogName, databaseName, functionIdentifier.getFunctionName())); + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(objectIdentifier); + return functionCatalog + .lookupFunction(unresolvedIdentifier) + .map(contextResolvedFunction -> { + String functionName = objectIdentifier.getObjectName(); + FunctionDefinition functionDefinition = + contextResolvedFunction.getDefinition(); + return createFunctionInfo(functionName, functionDefinition); + }) + .stream(); + }) + .collect(Collectors.toSet()); + databaseInfoBuilder.tables(tables); + databaseInfoBuilder.views(views); + databaseInfoBuilder.userDefinedFunctions(userDefinedFunctions); + databaseInfoBuilder.properties(database.getProperties()); + } catch (DatabaseNotExistException e) { + throw new RuntimeException(e); + } + return databaseInfoBuilder.build(); + } + + public static TableInfo createTableInfo( + SessionContext sessionContext, String catalogName, String databaseName, String tableName) { + SessionContext.SessionState sessionState = sessionContext.getSessionState(); + CatalogManager catalogManager = sessionState.catalogManager; + TableInfo.TableInfoBuilder tableInfoBuilder = TableInfo.builder(); + tableInfoBuilder.tableName(tableName); + ObjectIdentifier tableId = ObjectIdentifier.of(catalogName, databaseName, tableName); + catalogManager.getTable(tableId).ifPresent(contextResolvedTable -> { + CatalogBaseTable table = contextResolvedTable.getTable(); + tableInfoBuilder.tableKind(table.getTableKind()); + table.getDescription().ifPresent(tableInfoBuilder::description); + tableInfoBuilder.comment(table.getComment()); + tableInfoBuilder.properties(table.getOptions()); + Schema unresolvedSchema = table.getUnresolvedSchema(); + ResolvedSchema schema = unresolvedSchema.resolve(catalogManager.getSchemaResolver()); + List columns = schema.getColumns().stream() + .map(column -> { + ColumnInfo.ColumnInfoBuilder columnInfoBuilder = ColumnInfo.builder() + .columnName(column.getName()) + .dataType(column.getDataType().getLogicalType().toString()) + .isPersist(column.isPersisted()) + .isPhysical(column.isPhysical()); + column.getComment().ifPresent(columnInfoBuilder::comment); + return columnInfoBuilder.build(); + }) + .collect(Collectors.toList()); + tableInfoBuilder.schema(columns); + }); + return tableInfoBuilder.build(); + } + + public static FunctionInfo createFunctionInfo(String functionName, FunctionDefinition functionDefinition) { + FunctionInfo.FunctionInfoBuilder functionBuilder = FunctionInfo.builder(); + functionBuilder.functionName(functionName); + functionBuilder.functionKind(functionDefinition.getKind()); + return functionBuilder.build(); + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/scaleph-engine/scaleph-engine-sql-gateway/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000..6ca21470b --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cn.sliew.scaleph.engine.sql.gateway.store.ScalephCatalogStoreFactory \ No newline at end of file diff --git a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql index e8b7b472f..90863ae00 100644 --- a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql @@ -504,4 +504,20 @@ create table ws_flink_sql_gateway_session update_time timestamp default current_timestamp on update current_timestamp comment '修改时间', primary key (id), unique key uniq_session (session_handler) -) engine = innodb comment = 'flink sql gateway session'; \ No newline at end of file +) engine = innodb comment = 'flink sql gateway session'; + +drop table if exists ws_flink_sql_gateway_catalog; +create table ws_flink_sql_gateway_catalog +( + id bigint not null auto_increment comment '自增主键', + session_handler varchar(64) not null comment 'session handler', + `catalog_name` varchar(255) not null comment 'catalog name', + catalog_options text comment 'catalog config options', + catalog_description varchar(255) comment 'catalog description', + creator varchar(32) comment '创建人', + create_time timestamp default current_timestamp comment '创建时间', + editor varchar(32) comment '修改人', + update_time timestamp default current_timestamp on update current_timestamp comment '修改时间', + primary key (id), + unique key uniq_catalog (session_handler, catalog_name) +) engine = innodb comment = 'flink sql gateway catalog'; \ No newline at end of file