Skip to content

Commit

Permalink
[Feature][scaleph-engine-doris] update doris instance service expose (#…
Browse files Browse the repository at this point in the history
…668)

* feature: add service form item for doris fe component

* feature: add service form item for doris fe component

* fix: kubernetes service error

* feature: add doris service

* feature: add doris service

---------

Co-authored-by: wangqi <wangqi@xinxuan.net>
  • Loading branch information
kalencaya and wangqi authored Dec 21, 2023
1 parent bc44bf8 commit 7e790dd
Show file tree
Hide file tree
Showing 16 changed files with 461 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import cn.sliew.scaleph.api.annotation.Logging;
import cn.sliew.scaleph.engine.doris.operator.DorisCluster;
import cn.sliew.scaleph.engine.doris.service.DorisClusterEndpointService;
import cn.sliew.scaleph.engine.doris.service.WsDorisOperatorInstanceService;
import cn.sliew.scaleph.engine.doris.service.dto.DorisClusterFeEndpoint;
import cn.sliew.scaleph.engine.doris.service.dto.WsDorisOperatorInstanceDTO;
import cn.sliew.scaleph.engine.doris.service.param.WsDorisOperatorInstanceAddParam;
import cn.sliew.scaleph.engine.doris.service.param.WsDorisOperatorInstanceListParam;
Expand All @@ -46,6 +48,8 @@ public class WsDorisOperatorInstanceController {

@Autowired
private WsDorisOperatorInstanceService wsDorisInstanceService;
@Autowired
private DorisClusterEndpointService dorisClusterEndpointService;

@Logging
@GetMapping
Expand Down Expand Up @@ -135,4 +139,12 @@ public ResponseEntity<ResponseVO<GenericKubernetesResource>> getStatus(@PathVari
return new ResponseEntity<>(ResponseVO.success(sessionCluster.orElse(null)), HttpStatus.OK);
}

@Logging
@GetMapping("endpoint/fe/{id}")
@Operation(summary = "获取 FE endpoint", description = "获取 FE endpoint")
public ResponseEntity<ResponseVO<DorisClusterFeEndpoint>> getFlinkUI(@PathVariable("id") Long id) {
DorisClusterFeEndpoint feEndpoint = dorisClusterEndpointService.getFEEndpoint(id);
return new ResponseEntity<>(ResponseVO.success(feEndpoint), HttpStatus.OK);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public enum DictType implements DictDefinition {
FLINK_JOB_STATUS("flink_job_status", "Flink 任务状态", FlinkJobState.class),
FLINK_JOB_TYPE("flink_job_type", "Flink 任务类型", FlinkJobType.class),
FLINK_SAVEPOINT_TYPE("flink_savepoint_type", "Flink Savepoint 类型", FlinkSavepointType.class),
FLINK_SERVICE_EXPOSED_TYPE("flink_service_exposed_type", "Flink Service Expose 类型", ServiceExposedType.class),

FLINK_KUBERNETES_DEPLOYMENT_MODE("deployment_mode", "Deployment 模式", DeploymentMode.class),
FLINK_KUBERNETES_DEPLOYMENT_KIND("deployment_kind", "Deployment 类型", DeploymentKind.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,27 @@

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;

/**
* ExportService consisting of expose ports for user access to software service.
*/
@Data
public class ExportService {

/**
* Annotations for using function on different cloud platform.
*/
@Nullable
private Map<String, String> annotations;

/**
* fixme k8s 的 type
* https://github.com/selectdb/doris-operator/blob/master/doc/api.md#exportservice
* type of service,the possible value for the service type are : ClusterIP, NodePort, LoadBalancer,ExternalName. More info: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types
*/
@Nullable
private ServiceExposedType type;
private String type;

/**
* ServicePort config service for NodePort access mode.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.doris.service;

import cn.sliew.scaleph.engine.doris.service.dto.DorisClusterFeEndpoint;

public interface DorisClusterEndpointService {

DorisClusterFeEndpoint getFEEndpoint(Long dorisInstanceId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.doris.service.dto;

import lombok.Data;

import java.net.URI;

@Data
public class DorisClusterFeEndpoint {

private URI http;
private URI rpc;
private URI query;
private URI editLog;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.doris.service.impl;

import cn.sliew.scaleph.common.dict.common.YesOrNo;
import cn.sliew.scaleph.engine.doris.service.DorisClusterEndpointService;
import cn.sliew.scaleph.engine.doris.service.WsDorisOperatorInstanceService;
import cn.sliew.scaleph.engine.doris.service.dto.DorisClusterFeEndpoint;
import cn.sliew.scaleph.engine.doris.service.dto.WsDorisOperatorInstanceDTO;
import cn.sliew.scaleph.kubernetes.service.ServiceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.net.URI;
import java.util.Map;
import java.util.Optional;

@Service
public class DorisClusterEndpointServiceImpl implements DorisClusterEndpointService {

@Autowired
private WsDorisOperatorInstanceService wsDorisOperatorInstanceService;
@Autowired
private ServiceService serviceService;

@Override
public DorisClusterFeEndpoint getFEEndpoint(Long dorisInstanceId) {
WsDorisOperatorInstanceDTO instanceDTO = wsDorisOperatorInstanceService.selectOne(dorisInstanceId);
if (instanceDTO.getDeployed() == YesOrNo.NO) {
return null;
}

Optional<Map<String, URI>> optional = serviceService.getService(instanceDTO.getClusterCredentialId(), instanceDTO.getNamespace(), instanceDTO.getFeStatus().getAccessService());
if (optional.isEmpty()) {
return null;
}
DorisClusterFeEndpoint result = new DorisClusterFeEndpoint();
Map<String, URI> uris = optional.get();
for (Map.Entry<String, URI> entry : uris.entrySet()) {
switch (entry.getKey()) {
case "http-port":
result.setHttp(entry.getValue());
break;
case "rpc-port":
result.setRpc(entry.getValue());
break;
case "query-port":
result.setQuery(entry.getValue());
break;
case "edit-log-port":
result.setEditLog(entry.getValue());
break;
default:
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.flink.kubernetes.service.impl;

import cn.sliew.scaleph.common.dict.flink.ServiceExposedType;
import cn.sliew.scaleph.engine.flink.kubernetes.factory.FlinkTemplateFactory;
import cn.sliew.scaleph.engine.flink.kubernetes.service.FlinkJobManagerEndpointService;
import cn.sliew.scaleph.engine.flink.kubernetes.service.WsFlinkKubernetesJobInstanceService;
Expand All @@ -26,15 +26,9 @@
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesSessionClusterDTO;
import cn.sliew.scaleph.kubernetes.service.KubernetesService;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.Resource;
import org.apache.commons.lang3.text.StrSubstitutor;
import cn.sliew.scaleph.kubernetes.service.ServiceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.net.URI;
import java.util.Map;
Expand All @@ -48,7 +42,7 @@ public class FlinkJobManagerEndpointServiceImpl implements FlinkJobManagerEndpoi
@Autowired
private WsFlinkKubernetesJobInstanceService wsFlinkKubernetesJobInstanceService;
@Autowired
private KubernetesService kubernetesService;
private ServiceService serviceService;

@Override
public URI getSessionClusterJobManagerEndpoint(Long sessionClusterId) {
Expand All @@ -74,115 +68,26 @@ public URI getJobManagerEndpoint(Long jobInstanceId) {
}

private Optional<URI> getJobManagerEndpoint(String namespace, String name, Long clusterCredentialId) {
KubernetesClient client = kubernetesService.getClient(clusterCredentialId);
return getEndpointByIngress(namespace, name, client).or(() -> getEndpointByService(namespace, name, client));
return getEndpointByIngress(clusterCredentialId, namespace, name).or(() -> getEndpointByService(clusterCredentialId, namespace, name));
}

/**
* @see FlinkTemplateFactory#createIngressSpec()
*/
private Optional<URI> getEndpointByIngress(String namespace, String name, KubernetesClient client) {
Resource<Ingress> ingressResource = client.resources(Ingress.class)
.inNamespace(namespace)
.withName(name);
if (ingressResource != null && ingressResource.isReady()) {
Ingress ingress = ingressResource.get();
String format = "https://${host}/${namespace}/${name}/";
Optional<String> host = formatHost(ingress.getStatus().getLoadBalancer());
if (host.isEmpty()) {
return Optional.empty();
}
Map<String, String> variables = Map.of("host", host.get(), "namespace", namespace, "name", name);
StrSubstitutor substitutor = new StrSubstitutor(variables);
return Optional.of(URI.create(substitutor.replace(format)));
}
return Optional.empty();
private Optional<URI> getEndpointByIngress(Long clusterCredentialId, String namespace, String name) {
return serviceService.getIngress(clusterCredentialId, namespace, name);
}

/**
* @see FlinkTemplateFactory#createServiceConfiguration()
*/
private Optional<URI> getEndpointByService(String namespace, String name, KubernetesClient client) {
Resource<Service> serviceResource = client.services()
.inNamespace(namespace)
.withName(String.format("%s-rest", name));
if (serviceResource != null && serviceResource.isReady()) {
Service service = serviceResource.get();
ServiceExposedType type = ServiceExposedType.of(service.getSpec().getType());
switch (type) {
case NODE_PORT:
return doGetEndpointByNodePort(service);
case LOAD_BALANCER:
return doGetEndpointByLoadBalancer(service);
default:
return Optional.empty();
}
}
return Optional.empty();
}

private Optional<URI> doGetEndpointByLoadBalancer(Service service) {
String format = "http://${host}:${port}/";
Optional<String> host = formatHost(service.getStatus().getLoadBalancer());
if (host.isEmpty()) {
return Optional.empty();
}
Optional<Integer> port = formatPort(service.getSpec());
if (port.isEmpty()) {
return Optional.empty();
}
Map<String, String> variables = Map.of("host", host.get(), "port", port.get().toString());
StrSubstitutor substitutor = new StrSubstitutor(variables);
return Optional.of(URI.create(substitutor.replace(format)));
}

private Optional<URI> doGetEndpointByNodePort(Service service) {
String format = "http://${host}:${nodePort}/";
Optional<String> host = formatHost(service.getStatus().getLoadBalancer());
if (host.isEmpty()) {
return Optional.empty();
}
Optional<Integer> nodePort = formatNodePort(service.getSpec());
if (nodePort.isEmpty()) {
return Optional.empty();
}
Map<String, String> variables = Map.of("host", host.get(), "nodePort", nodePort.get().toString());
StrSubstitutor substitutor = new StrSubstitutor(variables);
return Optional.of(URI.create(substitutor.replace(format)));
}

private Optional<String> formatHost(LoadBalancerStatus loadBalancer) {
if (loadBalancer == null) {
return Optional.empty();
}
for (LoadBalancerIngress ingress : loadBalancer.getIngress()) {
// Get by ip firstly
String hostOrIp = ingress.getIp();
if (!StringUtils.hasText(hostOrIp)) {
// If ip is empty, get by hostname
hostOrIp = ingress.getHostname();
}
for (PortStatus portStatus : ingress.getPorts()) {
return Optional.of(String.format("%s:%d", hostOrIp, portStatus.getPort()));
}
return Optional.ofNullable(hostOrIp);
}
return Optional.empty();
}

private Optional<Integer> formatPort(ServiceSpec service) {
for (ServicePort servicePort : service.getPorts()) {
if (servicePort.getName().equals("rest")) {
return Optional.of(servicePort.getPort());
}
}
return Optional.empty();
}

private Optional<Integer> formatNodePort(ServiceSpec service) {
for (ServicePort servicePort : service.getPorts()) {
if (servicePort.getName().equals("rest")) {
return Optional.of(servicePort.getNodePort());
private Optional<URI> getEndpointByService(Long clusterCredentialId, String namespace, String name) {
String serviceName = String.format("%s-rest", name);
Optional<Map<String, URI>> optional = serviceService.getService(clusterCredentialId, namespace, serviceName);
if (optional.isPresent()) {
Map<String, URI> uris = optional.get();
if (uris.containsKey("rest")) {
return Optional.of(uris.get("rest"));
}
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.kubernetes.service;

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public interface ServiceService {

Optional<URI> getIngress(Long clusterCredentialId, String namespace, String name);

Optional<Map<String, URI>> getService(Long clusterCredentialId, String namespace, String name);
}
Loading

0 comments on commit 7e790dd

Please sign in to comment.