diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/common/YesOrNo.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/common/YesOrNo.java index 558b1dfe3..55bd8d9c7 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/common/YesOrNo.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/common/YesOrNo.java @@ -39,6 +39,10 @@ public static YesOrNo of(String value) { .findAny().orElseThrow(() -> new EnumConstantNotPresentException(YesOrNo.class, value)); } + public static YesOrNo ofBoolean(boolean bool) { + return bool ? YesOrNo.YES : YesOrNo.NO; + } + @EnumValue private String value; private String label; diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/factory/FactoryOptionDTO.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/factory/FactoryOptionDTO.java new file mode 100644 index 000000000..849dec18b --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/factory/FactoryOptionDTO.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package cn.sliew.scaleph.engine.sql.gateway.services.dto.factory; + +import cn.sliew.scaleph.common.dict.common.YesOrNo; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.description.HtmlFormatter; + +import java.lang.reflect.Field; + +@Data +@EqualsAndHashCode +@Schema(name = "Flink Factory 初始化参数选项信息") +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class FactoryOptionDTO { + + @Schema(description = "参数名称") + private String parameterName; + @Schema(description = "参数类型") + private String parameterType; + @Schema(description = "是否是列表") + private YesOrNo isList; + @Schema(description = "参数是否含有默认值") + private YesOrNo hasDefaultValue; + @Schema(description = "默认值") + private Object defaultValue; + @Schema(description = "参数描述") + private String description; + + public static FactoryOptionDTO from(ConfigOption configOption) { + FactoryOptionDTOBuilder builder = FactoryOptionDTO.builder(); + builder.parameterName(configOption.key()); + Class configOptionClass = configOption.getClass(); + try { + Field parameterClassField = configOptionClass.getDeclaredField("clazz"); + Class parameterClass = (Class) parameterClassField.get(configOption); + builder.parameterType(parameterClass.getTypeName()); + Field isListField = configOptionClass.getDeclaredField("isList"); + isListField.setAccessible(true); + boolean isList = (boolean) isListField.get(configOption); + builder.isList(YesOrNo.ofBoolean(isList)); + } catch (Exception e) { + throw new RuntimeException(e); + } + builder.hasDefaultValue(YesOrNo.ofBoolean(configOption.hasDefaultValue())); + builder.defaultValue(configOption.defaultValue()); + builder.description(new HtmlFormatter().format(configOption.description())); + return builder.build(); + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/factory/FactoryType.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/factory/FactoryType.java new file mode 100644 index 000000000..53d013a84 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/factory/FactoryType.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package cn.sliew.scaleph.engine.sql.gateway.services.dto.factory; + +import cn.sliew.scaleph.common.dict.DictInstance; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonFormat; +import org.apache.flink.table.factories.Factory; + +import java.util.Arrays; + +@JsonFormat(shape = JsonFormat.Shape.OBJECT) +public enum FactoryType implements DictInstance { + + CatalogFactory( + "CatalogFactory", + org.apache.flink.table.factories.CatalogFactory.class), + FormatFactory("FormatFactory", + org.apache.flink.table.factories.FormatFactory.class), + DynamicTableSourceFactory("DynamicTableSourceFactory", + org.apache.flink.table.factories.DynamicTableSourceFactory.class), + DynamicTableSinkFactory("DynamicTableSinkFactory", + org.apache.flink.table.factories.DynamicTableSinkFactory.class); + + private final String value; + private final Class[] factoryClasses; + + FactoryType(String value, Class... factoryClasses) { + this.value = value; + this.factoryClasses = factoryClasses; + } + + @JsonCreator + public static FactoryType of(String value) { + return Arrays.stream(values()) + .filter(instance -> instance.getValue().equals(value)) + .findAny().orElseThrow(() -> new EnumConstantNotPresentException(FactoryType.class, value)); + } + + public Class[] getFactoryClasses() { + return factoryClasses; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getLabel() { + return value; + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/factory/FlinkFactoryDTO.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/factory/FlinkFactoryDTO.java new file mode 100644 index 000000000..21dafcd13 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/factory/FlinkFactoryDTO.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package cn.sliew.scaleph.engine.sql.gateway.services.dto.factory; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; +import org.apache.flink.table.factories.Factory; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +@Data +@EqualsAndHashCode +@Schema(name = "Flink Factory 信息包装") +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class FlinkFactoryDTO { + + @Schema(description = "Factory的唯一标识,Flink根据此标识来识别Factory") + private String factoryIdentifier; + @Schema(description = "Factory类型", + allowableValues = {"CatalogFactory", "FormatFactory", "DynamicTableSourceFactory", "DynamicTableSinkFactory"}) + private FactoryType factoryType; + @Schema(description = "Factory初始化时的必填选项") + private Set requiredOptions; + @Schema(description = "Factory初始化时的可选选项") + private Set optionalOptions; + + public static FlinkFactoryDTO fromFactory(T factory) { + FlinkFactoryDTOBuilder builder = FlinkFactoryDTO.builder(); + Class factoryClass = factory.getClass(); + Arrays.stream(FactoryType.values()) + .filter(factoryType -> + Arrays.stream(factoryType.getFactoryClasses()) + .anyMatch(factoryTypeClass -> factoryTypeClass.isAssignableFrom(factoryClass)) + ) + .findAny() + .ifPresent(builder::factoryType); + Set requiredOptions = factory.requiredOptions() + .stream() + .map(FactoryOptionDTO::from) + .collect(Collectors.toSet()); + builder.requiredOptions(requiredOptions); + Set optionalOptions = factory.optionalOptions() + .stream() + .map(FactoryOptionDTO::from) + .collect(Collectors.toSet()); + builder.optionalOptions(optionalOptions); + return builder.build(); + } + +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/FlinkFactoryServiceImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/FlinkFactoryServiceImpl.java new file mode 100644 index 000000000..51eba4286 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/FlinkFactoryServiceImpl.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package cn.sliew.scaleph.engine.sql.gateway.services.impl; + +import cn.sliew.scaleph.common.nio.FileUtil; +import cn.sliew.scaleph.common.util.SystemUtil; +import cn.sliew.scaleph.engine.sql.gateway.services.FlinkFactoryService; +import cn.sliew.scaleph.resource.service.JarService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.flink.table.factories.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ServiceLoader; + +@Service +@Slf4j +public class FlinkFactoryServiceImpl implements FlinkFactoryService { + + private final JarService jarService; + + @Autowired + public FlinkFactoryServiceImpl(JarService jarService) { + this.jarService = jarService; + } + + /** + * A common method to find factories in the given jar. + * + * @param id Jar file id + * @param tClass Class of the factory + * @param Factory + * @return A factory list + */ + private List findFactories(Long id, Class tClass) { + Path randomWorkspace = null; + try { + randomWorkspace = SystemUtil.getRandomWorkspace(); + String randomFileName = RandomStringUtils.random(8, 'A', 'z', true, false); + Path randomFilePath = Paths.get(randomFileName, randomFileName + ".jar"); + OutputStream outputStream = Files.newOutputStream(randomFilePath, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + String fileName = jarService.download(id, outputStream); + log.info("Downloaded {} to {}", fileName, randomFileName); + ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + try (URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{randomFilePath.toUri().toURL()}, contextClassLoader)) { + List list = new ArrayList<>(); + ServiceLoader tServiceLoader = ServiceLoader.load(tClass, urlClassLoader); + for (T t : tServiceLoader) { + list.add(t); + } + return list; + } + } catch (Exception e) { + log.error("Error in finding factories!", e); + return Collections.emptyList(); + } finally { + if (randomWorkspace != null) { + try { + FileUtil.deleteDir(randomWorkspace); + log.info("Deleted tmp dir {}", randomWorkspace); + } catch (IOException e) { + log.error("Error delete tmp dir {}", randomWorkspace); + } + } + } + } + + @Override + public List findCatalogs(Long id) { + return findFactories(id, CatalogFactory.class); + } + + @Override + public List findFormats(Long id) { + return findFactories(id, FormatFactory.class); + } + + @Override + public List findSources(Long id) { + return findFactories(id, DynamicTableSourceFactory.class); + } + + @Override + public List findSinks(Long id) { + return findFactories(id, DynamicTableSinkFactory.class); + } +}