From 7e790ddb795b07c4ffea484d055e3cb1dbb349eb Mon Sep 17 00:00:00 2001 From: kalencaya <1942460489@qq.com> Date: Thu, 21 Dec 2023 22:29:09 +0800 Subject: [PATCH] [Feature][scaleph-engine-doris] update doris instance service expose (#668) * feature: add service form item for doris fe component * feature: add service form item for doris fe component * fix: kubernetes service error * feature: add doris service * feature: add doris service --------- Co-authored-by: wangqi --- .../ws/WsDorisOperatorInstanceController.java | 12 ++ .../sliew/scaleph/common/dict/DictType.java | 1 + .../doris/operator/spec/ExportService.java | 9 +- .../service/DorisClusterEndpointService.java | 26 ++++ .../service/dto/DorisClusterFeEndpoint.java | 32 ++++ .../impl/DorisClusterEndpointServiceImpl.java | 74 +++++++++ .../FlinkJobManagerEndpointServiceImpl.java | 121 ++------------- .../kubernetes/service/ServiceService.java | 31 ++++ .../service/impl/ServiceServiceImpl.java | 145 ++++++++++++++++++ scaleph-ui-react/src/constants/dictType.ts | 1 + .../src/locales/zh-CN/pages/project.ts | 4 + .../Detail/DorisInstanceAction.tsx | 119 +++++++++----- .../Steps/Component/DorisFeComponent.tsx | 10 +- .../project/WsDorisOperatorInstanceService.ts | 9 +- .../project/WsDorisOperatorTemplateService.ts | 7 + .../src/services/project/typings.d.ts | 7 + 16 files changed, 461 insertions(+), 147 deletions(-) create mode 100644 scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/DorisClusterEndpointService.java create mode 100644 scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/dto/DorisClusterFeEndpoint.java create mode 100644 scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/impl/DorisClusterEndpointServiceImpl.java create mode 100644 scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/ServiceService.java create mode 100644 scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/impl/ServiceServiceImpl.java diff --git a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsDorisOperatorInstanceController.java b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsDorisOperatorInstanceController.java index 4576543d9..15c0b40f7 100644 --- a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsDorisOperatorInstanceController.java +++ b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsDorisOperatorInstanceController.java @@ -20,7 +20,9 @@ import cn.sliew.scaleph.api.annotation.Logging; import cn.sliew.scaleph.engine.doris.operator.DorisCluster; +import cn.sliew.scaleph.engine.doris.service.DorisClusterEndpointService; import cn.sliew.scaleph.engine.doris.service.WsDorisOperatorInstanceService; +import cn.sliew.scaleph.engine.doris.service.dto.DorisClusterFeEndpoint; import cn.sliew.scaleph.engine.doris.service.dto.WsDorisOperatorInstanceDTO; import cn.sliew.scaleph.engine.doris.service.param.WsDorisOperatorInstanceAddParam; import cn.sliew.scaleph.engine.doris.service.param.WsDorisOperatorInstanceListParam; @@ -46,6 +48,8 @@ public class WsDorisOperatorInstanceController { @Autowired private WsDorisOperatorInstanceService wsDorisInstanceService; + @Autowired + private DorisClusterEndpointService dorisClusterEndpointService; @Logging @GetMapping @@ -135,4 +139,12 @@ public ResponseEntity> getStatus(@PathVari return new ResponseEntity<>(ResponseVO.success(sessionCluster.orElse(null)), HttpStatus.OK); } + @Logging + @GetMapping("endpoint/fe/{id}") + @Operation(summary = "获取 FE endpoint", description = "获取 FE endpoint") + public ResponseEntity> getFlinkUI(@PathVariable("id") Long id) { + DorisClusterFeEndpoint feEndpoint = dorisClusterEndpointService.getFEEndpoint(id); + return new ResponseEntity<>(ResponseVO.success(feEndpoint), HttpStatus.OK); + } + } diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java index 13660e978..9aa0b158f 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java @@ -81,6 +81,7 @@ public enum DictType implements DictDefinition { FLINK_JOB_STATUS("flink_job_status", "Flink 任务状态", FlinkJobState.class), FLINK_JOB_TYPE("flink_job_type", "Flink 任务类型", FlinkJobType.class), FLINK_SAVEPOINT_TYPE("flink_savepoint_type", "Flink Savepoint 类型", FlinkSavepointType.class), + FLINK_SERVICE_EXPOSED_TYPE("flink_service_exposed_type", "Flink Service Expose 类型", ServiceExposedType.class), FLINK_KUBERNETES_DEPLOYMENT_MODE("deployment_mode", "Deployment 模式", DeploymentMode.class), FLINK_KUBERNETES_DEPLOYMENT_KIND("deployment_kind", "Deployment 类型", DeploymentKind.class), diff --git a/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/operator/spec/ExportService.java b/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/operator/spec/ExportService.java index a486971ef..c48294194 100644 --- a/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/operator/spec/ExportService.java +++ b/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/operator/spec/ExportService.java @@ -23,6 +23,7 @@ import javax.annotation.Nullable; import java.util.List; +import java.util.Map; /** * ExportService consisting of expose ports for user access to software service. @@ -30,13 +31,19 @@ @Data public class ExportService { + /** + * Annotations for using function on different cloud platform. + */ + @Nullable + private Map annotations; + /** * fixme k8s 的 type * https://github.com/selectdb/doris-operator/blob/master/doc/api.md#exportservice * type of service,the possible value for the service type are : ClusterIP, NodePort, LoadBalancer,ExternalName. More info: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types */ @Nullable - private ServiceExposedType type; + private String type; /** * ServicePort config service for NodePort access mode. diff --git a/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/DorisClusterEndpointService.java b/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/DorisClusterEndpointService.java new file mode 100644 index 000000000..f3fa7d262 --- /dev/null +++ b/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/DorisClusterEndpointService.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.doris.service; + +import cn.sliew.scaleph.engine.doris.service.dto.DorisClusterFeEndpoint; + +public interface DorisClusterEndpointService { + + DorisClusterFeEndpoint getFEEndpoint(Long dorisInstanceId); +} diff --git a/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/dto/DorisClusterFeEndpoint.java b/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/dto/DorisClusterFeEndpoint.java new file mode 100644 index 000000000..695bbb0fb --- /dev/null +++ b/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/dto/DorisClusterFeEndpoint.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.doris.service.dto; + +import lombok.Data; + +import java.net.URI; + +@Data +public class DorisClusterFeEndpoint { + + private URI http; + private URI rpc; + private URI query; + private URI editLog; +} diff --git a/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/impl/DorisClusterEndpointServiceImpl.java b/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/impl/DorisClusterEndpointServiceImpl.java new file mode 100644 index 000000000..aad772a31 --- /dev/null +++ b/scaleph-engine/scaleph-engine-doris/src/main/java/cn/sliew/scaleph/engine/doris/service/impl/DorisClusterEndpointServiceImpl.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.doris.service.impl; + +import cn.sliew.scaleph.common.dict.common.YesOrNo; +import cn.sliew.scaleph.engine.doris.service.DorisClusterEndpointService; +import cn.sliew.scaleph.engine.doris.service.WsDorisOperatorInstanceService; +import cn.sliew.scaleph.engine.doris.service.dto.DorisClusterFeEndpoint; +import cn.sliew.scaleph.engine.doris.service.dto.WsDorisOperatorInstanceDTO; +import cn.sliew.scaleph.kubernetes.service.ServiceService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.net.URI; +import java.util.Map; +import java.util.Optional; + +@Service +public class DorisClusterEndpointServiceImpl implements DorisClusterEndpointService { + + @Autowired + private WsDorisOperatorInstanceService wsDorisOperatorInstanceService; + @Autowired + private ServiceService serviceService; + + @Override + public DorisClusterFeEndpoint getFEEndpoint(Long dorisInstanceId) { + WsDorisOperatorInstanceDTO instanceDTO = wsDorisOperatorInstanceService.selectOne(dorisInstanceId); + if (instanceDTO.getDeployed() == YesOrNo.NO) { + return null; + } + + Optional> optional = serviceService.getService(instanceDTO.getClusterCredentialId(), instanceDTO.getNamespace(), instanceDTO.getFeStatus().getAccessService()); + if (optional.isEmpty()) { + return null; + } + DorisClusterFeEndpoint result = new DorisClusterFeEndpoint(); + Map uris = optional.get(); + for (Map.Entry entry : uris.entrySet()) { + switch (entry.getKey()) { + case "http-port": + result.setHttp(entry.getValue()); + break; + case "rpc-port": + result.setRpc(entry.getValue()); + break; + case "query-port": + result.setQuery(entry.getValue()); + break; + case "edit-log-port": + result.setEditLog(entry.getValue()); + break; + default: + } + } + return result; + } +} diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkJobManagerEndpointServiceImpl.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkJobManagerEndpointServiceImpl.java index a21556da8..7aa5deeee 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkJobManagerEndpointServiceImpl.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkJobManagerEndpointServiceImpl.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package cn.sliew.scaleph.engine.flink.kubernetes.service.impl; -import cn.sliew.scaleph.common.dict.flink.ServiceExposedType; import cn.sliew.scaleph.engine.flink.kubernetes.factory.FlinkTemplateFactory; import cn.sliew.scaleph.engine.flink.kubernetes.service.FlinkJobManagerEndpointService; import cn.sliew.scaleph.engine.flink.kubernetes.service.WsFlinkKubernetesJobInstanceService; @@ -26,15 +26,9 @@ import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesSessionClusterDTO; -import cn.sliew.scaleph.kubernetes.service.KubernetesService; -import io.fabric8.kubernetes.api.model.*; -import io.fabric8.kubernetes.api.model.networking.v1.Ingress; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.dsl.Resource; -import org.apache.commons.lang3.text.StrSubstitutor; +import cn.sliew.scaleph.kubernetes.service.ServiceService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; import java.net.URI; import java.util.Map; @@ -48,7 +42,7 @@ public class FlinkJobManagerEndpointServiceImpl implements FlinkJobManagerEndpoi @Autowired private WsFlinkKubernetesJobInstanceService wsFlinkKubernetesJobInstanceService; @Autowired - private KubernetesService kubernetesService; + private ServiceService serviceService; @Override public URI getSessionClusterJobManagerEndpoint(Long sessionClusterId) { @@ -74,115 +68,26 @@ public URI getJobManagerEndpoint(Long jobInstanceId) { } private Optional getJobManagerEndpoint(String namespace, String name, Long clusterCredentialId) { - KubernetesClient client = kubernetesService.getClient(clusterCredentialId); - return getEndpointByIngress(namespace, name, client).or(() -> getEndpointByService(namespace, name, client)); + return getEndpointByIngress(clusterCredentialId, namespace, name).or(() -> getEndpointByService(clusterCredentialId, namespace, name)); } /** * @see FlinkTemplateFactory#createIngressSpec() */ - private Optional getEndpointByIngress(String namespace, String name, KubernetesClient client) { - Resource ingressResource = client.resources(Ingress.class) - .inNamespace(namespace) - .withName(name); - if (ingressResource != null && ingressResource.isReady()) { - Ingress ingress = ingressResource.get(); - String format = "https://${host}/${namespace}/${name}/"; - Optional host = formatHost(ingress.getStatus().getLoadBalancer()); - if (host.isEmpty()) { - return Optional.empty(); - } - Map variables = Map.of("host", host.get(), "namespace", namespace, "name", name); - StrSubstitutor substitutor = new StrSubstitutor(variables); - return Optional.of(URI.create(substitutor.replace(format))); - } - return Optional.empty(); + private Optional getEndpointByIngress(Long clusterCredentialId, String namespace, String name) { + return serviceService.getIngress(clusterCredentialId, namespace, name); } /** * @see FlinkTemplateFactory#createServiceConfiguration() */ - private Optional getEndpointByService(String namespace, String name, KubernetesClient client) { - Resource serviceResource = client.services() - .inNamespace(namespace) - .withName(String.format("%s-rest", name)); - if (serviceResource != null && serviceResource.isReady()) { - Service service = serviceResource.get(); - ServiceExposedType type = ServiceExposedType.of(service.getSpec().getType()); - switch (type) { - case NODE_PORT: - return doGetEndpointByNodePort(service); - case LOAD_BALANCER: - return doGetEndpointByLoadBalancer(service); - default: - return Optional.empty(); - } - } - return Optional.empty(); - } - - private Optional doGetEndpointByLoadBalancer(Service service) { - String format = "http://${host}:${port}/"; - Optional host = formatHost(service.getStatus().getLoadBalancer()); - if (host.isEmpty()) { - return Optional.empty(); - } - Optional port = formatPort(service.getSpec()); - if (port.isEmpty()) { - return Optional.empty(); - } - Map variables = Map.of("host", host.get(), "port", port.get().toString()); - StrSubstitutor substitutor = new StrSubstitutor(variables); - return Optional.of(URI.create(substitutor.replace(format))); - } - - private Optional doGetEndpointByNodePort(Service service) { - String format = "http://${host}:${nodePort}/"; - Optional host = formatHost(service.getStatus().getLoadBalancer()); - if (host.isEmpty()) { - return Optional.empty(); - } - Optional nodePort = formatNodePort(service.getSpec()); - if (nodePort.isEmpty()) { - return Optional.empty(); - } - Map variables = Map.of("host", host.get(), "nodePort", nodePort.get().toString()); - StrSubstitutor substitutor = new StrSubstitutor(variables); - return Optional.of(URI.create(substitutor.replace(format))); - } - - private Optional formatHost(LoadBalancerStatus loadBalancer) { - if (loadBalancer == null) { - return Optional.empty(); - } - for (LoadBalancerIngress ingress : loadBalancer.getIngress()) { - // Get by ip firstly - String hostOrIp = ingress.getIp(); - if (!StringUtils.hasText(hostOrIp)) { - // If ip is empty, get by hostname - hostOrIp = ingress.getHostname(); - } - for (PortStatus portStatus : ingress.getPorts()) { - return Optional.of(String.format("%s:%d", hostOrIp, portStatus.getPort())); - } - return Optional.ofNullable(hostOrIp); - } - return Optional.empty(); - } - - private Optional formatPort(ServiceSpec service) { - for (ServicePort servicePort : service.getPorts()) { - if (servicePort.getName().equals("rest")) { - return Optional.of(servicePort.getPort()); - } - } - return Optional.empty(); - } - - private Optional formatNodePort(ServiceSpec service) { - for (ServicePort servicePort : service.getPorts()) { - if (servicePort.getName().equals("rest")) { - return Optional.of(servicePort.getNodePort()); + private Optional getEndpointByService(Long clusterCredentialId, String namespace, String name) { + String serviceName = String.format("%s-rest", name); + Optional> optional = serviceService.getService(clusterCredentialId, namespace, serviceName); + if (optional.isPresent()) { + Map uris = optional.get(); + if (uris.containsKey("rest")) { + return Optional.of(uris.get("rest")); } } return Optional.empty(); diff --git a/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/ServiceService.java b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/ServiceService.java new file mode 100644 index 000000000..470d500e1 --- /dev/null +++ b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/ServiceService.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.kubernetes.service; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public interface ServiceService { + + Optional getIngress(Long clusterCredentialId, String namespace, String name); + + Optional> getService(Long clusterCredentialId, String namespace, String name); +} diff --git a/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/impl/ServiceServiceImpl.java b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/impl/ServiceServiceImpl.java new file mode 100644 index 000000000..c81e2c073 --- /dev/null +++ b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/impl/ServiceServiceImpl.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.kubernetes.service.impl; + +import cn.sliew.scaleph.common.dict.flink.ServiceExposedType; +import cn.sliew.scaleph.kubernetes.service.KubernetesService; +import cn.sliew.scaleph.kubernetes.service.ServiceService; +import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.api.model.networking.v1.Ingress; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; +import io.fabric8.kubernetes.client.dsl.Resource; +import org.apache.commons.lang3.text.StrSubstitutor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +@Service +public class ServiceServiceImpl implements ServiceService { + + @Autowired + private KubernetesService kubernetesService; + + @Override + public Optional getIngress(Long clusterCredentialId, String namespace, String name) { + NamespacedKubernetesClient client = kubernetesService.getClient(clusterCredentialId, namespace); + Resource ingressResource = client.resources(Ingress.class) + .inNamespace(namespace) + .withName(name); + if (ingressResource != null && ingressResource.isReady()) { + Ingress ingress = ingressResource.get(); + String format = "https://${host}/${namespace}/${name}/"; + Optional host = formatHost(ingress.getStatus().getLoadBalancer()); + if (host.isEmpty()) { + return Optional.empty(); + } + Map variables = Map.of("host", host.get(), "namespace", namespace, "name", name); + StrSubstitutor substitutor = new StrSubstitutor(variables); + return Optional.of(URI.create(substitutor.replace(format))); + } + return Optional.empty(); + } + + @Override + public Optional> getService(Long clusterCredentialId, String namespace, String name) { + NamespacedKubernetesClient client = kubernetesService.getClient(clusterCredentialId, namespace); + Resource serviceResource = client.services() + .inNamespace(namespace) + .withName(name); + if (serviceResource != null && serviceResource.isReady()) { + io.fabric8.kubernetes.api.model.Service service = serviceResource.get(); + ServiceExposedType type = ServiceExposedType.of(service.getSpec().getType()); + switch (type) { + case NODE_PORT: + return getNodePort(service); + case LOAD_BALANCER: + return getLoadBalancer(service); + default: + return Optional.empty(); + } + } + return Optional.empty(); + } + + + private Optional> getLoadBalancer(io.fabric8.kubernetes.api.model.Service service) { + String format = "http://${host}:${port}/"; + Optional host = formatHost(service.getStatus().getLoadBalancer()); + if (host.isEmpty()) { + return Optional.empty(); + } + Map uris = new HashMap<>(); + for (ServicePort servicePort : service.getSpec().getPorts()) { + Map variables = Map.of("host", host.get(), "port", servicePort.getPort().toString()); + StrSubstitutor substitutor = new StrSubstitutor(variables); + URI uri = URI.create(substitutor.replace(format)); + uris.put(servicePort.getName(), uri); + } + return Optional.of(uris); + } + + private Optional> getNodePort(io.fabric8.kubernetes.api.model.Service service) { + String format = "http://${host}:${nodePort}/"; + Optional host = formatHost(service.getStatus().getLoadBalancer()); + if (host.isEmpty()) { + return Optional.empty(); + } + + Map uris = new HashMap<>(); + for (ServicePort servicePort : service.getSpec().getPorts()) { + Map variables = Map.of("host", host.get(), "nodePort", servicePort.getNodePort().toString()); + StrSubstitutor substitutor = new StrSubstitutor(variables); + URI uri = URI.create(substitutor.replace(format)); + uris.put(servicePort.getName(), uri); + } + return Optional.of(uris); + } + + private Optional formatHost(LoadBalancerStatus loadBalancer) { + if (loadBalancer == null) { + return Optional.empty(); + } + for (LoadBalancerIngress ingress : loadBalancer.getIngress()) { + // Get by ip firstly + String hostOrIp = ingress.getIp(); + if (!StringUtils.hasText(hostOrIp)) { + // If ip is empty, get by hostname + hostOrIp = ingress.getHostname(); + } + for (PortStatus portStatus : ingress.getPorts()) { + return Optional.of(String.format("%s:%d", hostOrIp, portStatus.getPort())); + } + return Optional.ofNullable(hostOrIp); + } + return Optional.empty(); + } + + private Map formatPort(ServiceSpec service) { + Map ports = new HashMap<>(); + for (ServicePort servicePort : service.getPorts()) { + ports.put(servicePort.getName(), servicePort.getPort()); + } + return ports; + } +} diff --git a/scaleph-ui-react/src/constants/dictType.ts b/scaleph-ui-react/src/constants/dictType.ts index 81501761f..f9ce0024c 100644 --- a/scaleph-ui-react/src/constants/dictType.ts +++ b/scaleph-ui-react/src/constants/dictType.ts @@ -33,6 +33,7 @@ export const DICT_TYPE = { flinkHA: 'flink_high_availability', flinkJobType: 'flink_job_type', flinkJobStatus: 'flink_job_status', + flinkServiceExposedType: 'flink_service_exposed_type', seatunnelVersion: 'seatunnel_version', seatunnelEngineType: 'seatunnel_engine_type', seatunnelPluginName: 'seatunnel_plugin_name', diff --git a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts index 033c58c22..d8dcd90bf 100644 --- a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts +++ b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts @@ -946,6 +946,7 @@ export default { 'pages.project.doris.template.steps.component.base.requests.memory': '内存', 'pages.project.doris.template.steps.component.base.limits.cpu': '最大CPU', 'pages.project.doris.template.steps.component.base.limits.memory': '最大内存', + 'pages.project.doris.template.steps.component.base.service.type': '网络服务类型', 'pages.project.doris.template.steps.yaml': 'YAML', 'pages.project.doris.template.detail': '模版详情', 'pages.project.doris.template.detail.component': '集群组件', @@ -980,6 +981,9 @@ export default { 'pages.project.doris.instance.detail.yaml.status': '状态', 'pages.project.doris.instance.detail.deploy': 'Deploy', 'pages.project.doris.instance.detail.shutdown': 'Shutdown', + 'pages.project.doris.instance.detail.fe': 'FE', + 'pages.project.doris.instance.detail.metrics': 'Metrics', + 'pages.project.doris.instance.detail.logs': 'Logs', 'Run': '运行', 'Save': '保存', diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Doris/OperatorInstance/Detail/DorisInstanceAction.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Doris/OperatorInstance/Detail/DorisInstanceAction.tsx index 65ec692bc..fe37573b4 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Doris/OperatorInstance/Detail/DorisInstanceAction.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Doris/OperatorInstance/Detail/DorisInstanceAction.tsx @@ -2,11 +2,18 @@ import {connect, useIntl} from "umi"; import React from "react"; import {ProCard, ProDescriptions} from "@ant-design/pro-components"; import {WsDorisOperatorInstanceService} from "@/services/project/WsDorisOperatorInstanceService"; -import {Button, message, Popconfirm} from "antd"; -import {CaretRightOutlined, CloseOutlined} from "@ant-design/icons"; +import {Button, message, Popconfirm, Space} from "antd"; +import { + AreaChartOutlined, + CaretRightOutlined, + CloseOutlined, + DashboardOutlined, + OrderedListOutlined +} from "@ant-design/icons"; import {YesOrNo} from "@/constants/enum"; import {ProDescriptionsItemProps} from "@ant-design/pro-descriptions"; import {WsFlinkKubernetesJob} from "@/services/project/typings"; +import {WsFlinkKubernetesJobService} from "@/services/project/WsFlinkKubernetesJobService"; const DorisInstanceDetailAction: React.FC = (props: any) => { const intl = useIntl(); @@ -46,47 +53,87 @@ const DorisInstanceDetailAction: React.FC = (props: any) => { return ( - - { - WsDorisOperatorInstanceService.deploy(props.dorisInstanceDetail.instance?.id).then(response => { - if (response.success) { - message.success(intl.formatMessage({id: 'app.common.operate.submit.success'})); - } - }) - }} - > + +
+ { + WsDorisOperatorInstanceService.deploy(props.dorisInstanceDetail.instance?.id).then(response => { + if (response.success) { + message.success(intl.formatMessage({id: 'app.common.operate.submit.success'})); + } + }) + }} + > + + + { + WsDorisOperatorInstanceService.shutdown(props.dorisInstanceDetail.instance?.id).then(response => { + if (response.success) { + message.success(intl.formatMessage({id: 'app.common.operate.submit.success'})); + } + }) + }} + > + + +
+ +
- - { - WsDorisOperatorInstanceService.shutdown(props.dorisInstanceDetail.instance?.id).then(response => { - if (response.success) { - message.success(intl.formatMessage({id: 'app.common.operate.submit.success'})); - } - }) - }} - > +
+ +
- -
- }> + + + + + )}> { const intl = useIntl(); @@ -54,6 +56,12 @@ const DorisFeComponent: React.FC = () => { colProps={{span: 10, offset: 1}} initialValue={"32Gi"} /> + DictDataService.listDictDataByType2(DICT_TYPE.flinkServiceExposedType)} + />
); } diff --git a/scaleph-ui-react/src/services/project/WsDorisOperatorInstanceService.ts b/scaleph-ui-react/src/services/project/WsDorisOperatorInstanceService.ts index cf39c2f67..046b55eb4 100644 --- a/scaleph-ui-react/src/services/project/WsDorisOperatorInstanceService.ts +++ b/scaleph-ui-react/src/services/project/WsDorisOperatorInstanceService.ts @@ -1,10 +1,11 @@ import {PageResponse, ResponseBody} from '@/app.d'; import {request} from 'umi'; import { + DorisClusterFeEndpoint, WsDorisOperatorInstance, WsDorisOperatorInstanceAddParam, WsDorisOperatorInstanceParam, - WsDorisOperatorInstanceUpdateParam, WsFlinkKubernetesJob + WsDorisOperatorInstanceUpdateParam } from './typings'; export const WsDorisOperatorInstanceService = { @@ -90,4 +91,10 @@ export const WsDorisOperatorInstanceService = { method: 'GET', }); }, + + feEndpoint: async (id: number) => { + return request>(`${WsDorisOperatorInstanceService.url}/endpoint/fe/${id}`, { + method: 'GET', + }); + }, }; diff --git a/scaleph-ui-react/src/services/project/WsDorisOperatorTemplateService.ts b/scaleph-ui-react/src/services/project/WsDorisOperatorTemplateService.ts index a6f2fe12f..bff6ccb08 100644 --- a/scaleph-ui-react/src/services/project/WsDorisOperatorTemplateService.ts +++ b/scaleph-ui-react/src/services/project/WsDorisOperatorTemplateService.ts @@ -78,6 +78,12 @@ export const WsDorisOperatorTemplateService = { } feSpec.requests = feSpecRequests feSpec.limits = feSpecLimits + if (value['fe.service.type']) { + const feSpecService: Record = { + type: value['fe.service.type'] + } + feSpec.service = feSpecService + } data.feSpec = feSpec if (value['be.replicas']) { @@ -132,6 +138,7 @@ export const WsDorisOperatorTemplateService = { 'fe.requests.memory': data.feSpec?.requests?.memory, 'fe.limits.cpu': data.feSpec?.limits?.cpu, 'fe.limits.memory': data.feSpec?.limits?.memory, + 'fe.service.type': data.feSpec?.service?.type?.value, 'be.replicas': data.beSpec?.replicas, 'be.image': data.beSpec?.image, diff --git a/scaleph-ui-react/src/services/project/typings.d.ts b/scaleph-ui-react/src/services/project/typings.d.ts index 0969d672d..bf3d99bcb 100644 --- a/scaleph-ui-react/src/services/project/typings.d.ts +++ b/scaleph-ui-react/src/services/project/typings.d.ts @@ -530,3 +530,10 @@ export type WsDorisOperatorInstanceUpdateParam = { brokerSpec?: Record; remark?: string; }; + +export type DorisClusterFeEndpoint = { + http?: string; + rpc?: string; + query?: string; + editLog?: string; +};