Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][sql-gateway] Complete FactoryService #634

Merged
merged 1 commit into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public static YesOrNo of(String value) {
.findAny().orElseThrow(() -> new EnumConstantNotPresentException(YesOrNo.class, value));
}

public static YesOrNo ofBoolean(boolean bool) {
return bool ? YesOrNo.YES : YesOrNo.NO;
}

@EnumValue
private String value;
private String label;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package cn.sliew.scaleph.engine.sql.gateway.services.dto.factory;

import cn.sliew.scaleph.common.dict.common.YesOrNo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.description.HtmlFormatter;

import java.lang.reflect.Field;

@Data
@EqualsAndHashCode
@Schema(name = "Flink Factory 初始化参数选项信息")
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class FactoryOptionDTO {

@Schema(description = "参数名称")
private String parameterName;
@Schema(description = "参数类型")
private String parameterType;
@Schema(description = "是否是列表")
private YesOrNo isList;
@Schema(description = "参数是否含有默认值")
private YesOrNo hasDefaultValue;
@Schema(description = "默认值")
private Object defaultValue;
@Schema(description = "参数描述")
private String description;

public static FactoryOptionDTO from(ConfigOption<?> configOption) {
FactoryOptionDTOBuilder builder = FactoryOptionDTO.builder();
builder.parameterName(configOption.key());
Class<? extends ConfigOption> configOptionClass = configOption.getClass();
try {
Field parameterClassField = configOptionClass.getDeclaredField("clazz");
Class<?> parameterClass = (Class<?>) parameterClassField.get(configOption);
builder.parameterType(parameterClass.getTypeName());
Field isListField = configOptionClass.getDeclaredField("isList");
isListField.setAccessible(true);
boolean isList = (boolean) isListField.get(configOption);
builder.isList(YesOrNo.ofBoolean(isList));
} catch (Exception e) {
throw new RuntimeException(e);
}
builder.hasDefaultValue(YesOrNo.ofBoolean(configOption.hasDefaultValue()));
builder.defaultValue(configOption.defaultValue());
builder.description(new HtmlFormatter().format(configOption.description()));
return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package cn.sliew.scaleph.engine.sql.gateway.services.dto.factory;

import cn.sliew.scaleph.common.dict.DictInstance;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.flink.table.factories.Factory;

import java.util.Arrays;

@JsonFormat(shape = JsonFormat.Shape.OBJECT)
public enum FactoryType implements DictInstance {

CatalogFactory(
"CatalogFactory",
org.apache.flink.table.factories.CatalogFactory.class),
FormatFactory("FormatFactory",
org.apache.flink.table.factories.FormatFactory.class),
DynamicTableSourceFactory("DynamicTableSourceFactory",
org.apache.flink.table.factories.DynamicTableSourceFactory.class),
DynamicTableSinkFactory("DynamicTableSinkFactory",
org.apache.flink.table.factories.DynamicTableSinkFactory.class);

private final String value;
private final Class<? extends Factory>[] factoryClasses;

FactoryType(String value, Class<? extends Factory>... factoryClasses) {
this.value = value;
this.factoryClasses = factoryClasses;
}

@JsonCreator
public static FactoryType of(String value) {
return Arrays.stream(values())
.filter(instance -> instance.getValue().equals(value))
.findAny().orElseThrow(() -> new EnumConstantNotPresentException(FactoryType.class, value));
}

public Class<? extends Factory>[] getFactoryClasses() {
return factoryClasses;
}

@Override
public String getValue() {
return value;
}

@Override
public String getLabel() {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package cn.sliew.scaleph.engine.sql.gateway.services.dto.factory;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import org.apache.flink.table.factories.Factory;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

@Data
@EqualsAndHashCode
@Schema(name = "Flink Factory 信息包装")
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class FlinkFactoryDTO {

@Schema(description = "Factory的唯一标识,Flink根据此标识来识别Factory")
private String factoryIdentifier;
@Schema(description = "Factory类型",
allowableValues = {"CatalogFactory", "FormatFactory", "DynamicTableSourceFactory", "DynamicTableSinkFactory"})
private FactoryType factoryType;
@Schema(description = "Factory初始化时的必填选项")
private Set<FactoryOptionDTO> requiredOptions;
@Schema(description = "Factory初始化时的可选选项")
private Set<FactoryOptionDTO> optionalOptions;

public static <T extends Factory> FlinkFactoryDTO fromFactory(T factory) {
FlinkFactoryDTOBuilder builder = FlinkFactoryDTO.builder();
Class<? extends Factory> factoryClass = factory.getClass();
Arrays.stream(FactoryType.values())
.filter(factoryType ->
Arrays.stream(factoryType.getFactoryClasses())
.anyMatch(factoryTypeClass -> factoryTypeClass.isAssignableFrom(factoryClass))
)
.findAny()
.ifPresent(builder::factoryType);
Set<FactoryOptionDTO> requiredOptions = factory.requiredOptions()
.stream()
.map(FactoryOptionDTO::from)
.collect(Collectors.toSet());
builder.requiredOptions(requiredOptions);
Set<FactoryOptionDTO> optionalOptions = factory.optionalOptions()
.stream()
.map(FactoryOptionDTO::from)
.collect(Collectors.toSet());
builder.optionalOptions(optionalOptions);
return builder.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package cn.sliew.scaleph.engine.sql.gateway.services.impl;

import cn.sliew.scaleph.common.nio.FileUtil;
import cn.sliew.scaleph.common.util.SystemUtil;
import cn.sliew.scaleph.engine.sql.gateway.services.FlinkFactoryService;
import cn.sliew.scaleph.resource.service.JarService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.table.factories.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ServiceLoader;

@Service
@Slf4j
public class FlinkFactoryServiceImpl implements FlinkFactoryService {

private final JarService jarService;

@Autowired
public FlinkFactoryServiceImpl(JarService jarService) {
this.jarService = jarService;
}

/**
* A common method to find factories in the given jar.
*
* @param id Jar file id
* @param tClass Class of the factory
* @param <T> Factory
* @return A factory list
*/
private <T extends Factory> List<T> findFactories(Long id, Class<T> tClass) {
Path randomWorkspace = null;
try {
randomWorkspace = SystemUtil.getRandomWorkspace();
String randomFileName = RandomStringUtils.random(8, 'A', 'z', true, false);
Path randomFilePath = Paths.get(randomFileName, randomFileName + ".jar");
OutputStream outputStream = Files.newOutputStream(randomFilePath, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
String fileName = jarService.download(id, outputStream);
log.info("Downloaded {} to {}", fileName, randomFileName);
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try (URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{randomFilePath.toUri().toURL()}, contextClassLoader)) {
List<T> list = new ArrayList<>();
ServiceLoader<T> tServiceLoader = ServiceLoader.load(tClass, urlClassLoader);
for (T t : tServiceLoader) {
list.add(t);
}
return list;
}
} catch (Exception e) {
log.error("Error in finding factories!", e);
return Collections.emptyList();
} finally {
if (randomWorkspace != null) {
try {
FileUtil.deleteDir(randomWorkspace);
log.info("Deleted tmp dir {}", randomWorkspace);
} catch (IOException e) {
log.error("Error delete tmp dir {}", randomWorkspace);
}
}
}
}

@Override
public List<CatalogFactory> findCatalogs(Long id) {
return findFactories(id, CatalogFactory.class);
}

@Override
public List<FormatFactory> findFormats(Long id) {
return findFactories(id, FormatFactory.class);
}

@Override
public List<DynamicTableSourceFactory> findSources(Long id) {
return findFactories(id, DynamicTableSourceFactory.class);
}

@Override
public List<DynamicTableSinkFactory> findSinks(Long id) {
return findFactories(id, DynamicTableSinkFactory.class);
}
}