Skip to content

Commit

Permalink
[Feature][scaleph-engine-doris] update doris operator instance status…
Browse files Browse the repository at this point in the history
… by scheduler (#666)

* feature: add doris status sync job

* feature: add doris status sync job

* feature: add doris status sync job

* feature: update doris instance status

---------

Co-authored-by: wangqi <wangqi@xinxuan.net>
  • Loading branch information
kalencaya and wangqi authored Dec 19, 2023
1 parent a6466e5 commit eb4ace1
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import cn.sliew.scaleph.common.dict.common.YesOrNo;
import cn.sliew.scaleph.dao.entity.BaseDO;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
Expand Down Expand Up @@ -66,16 +67,16 @@ public class WsDorisOperatorInstance extends BaseDO {
@TableField("deployed")
private YesOrNo deployed;

@TableField("fe_status")
@TableField(value = "fe_status", updateStrategy = FieldStrategy.IGNORED)
private String feStatus;

@TableField("be_status")
@TableField(value = "be_status", updateStrategy = FieldStrategy.IGNORED)
private String beStatus;

@TableField("cn_status")
@TableField(value = "cn_status", updateStrategy = FieldStrategy.IGNORED)
private String cnStatus;

@TableField("broker_status")
@TableField(value = "broker_status", updateStrategy = FieldStrategy.IGNORED)
private String brokerStatus;

@TableField("remark")
Expand Down
5 changes: 5 additions & 0 deletions scaleph-engine/scaleph-engine-doris/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<artifactId>scaleph-project</artifactId>
</dependency>

<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>scaleph-workflow-quartz</artifactId>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.doris.action;

import cn.sliew.milky.common.filter.ActionListener;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.engine.doris.operator.status.DorisClusterStatus;
import cn.sliew.scaleph.engine.doris.service.DorisOperatorService;
import cn.sliew.scaleph.engine.doris.service.WsDorisOperatorInstanceService;
import cn.sliew.scaleph.engine.doris.service.dto.WsDorisOperatorInstanceDTO;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Optional;

@Slf4j
@Service
public class DorisOperatorInstanceStatusSyncJob extends AbstractWorkFlow {

@Autowired
private WsDorisOperatorInstanceService wsDorisOperatorInstanceService;
@Autowired
private DorisOperatorService dorisOperatorService;

public DorisOperatorInstanceStatusSyncJob() {
super("DORIS_OPERATOR_INSTANCE_STATUS_SYNC_JOB");
}

@Override
protected Runnable doExecute(ActionContext context, ActionListener<ActionResult> listener) {
return () -> process();
}

private void process() {
List<Long> ids = wsDorisOperatorInstanceService.listAll();
ids.forEach(this::doProcess);
log.debug("update doris operator instance status success! update size: {}", ids.size());
}

private void doProcess(Long id) {
try {
WsDorisOperatorInstanceDTO instanceDTO = wsDorisOperatorInstanceService.selectOne(id);
Optional<GenericKubernetesResource> optional = dorisOperatorService.get(instanceDTO);
if (optional.isPresent()) {
String json = JacksonUtil.toJsonString(optional.get().get("status"));
DorisClusterStatus status = JacksonUtil.parseJsonString(json, DorisClusterStatus.class);
wsDorisOperatorInstanceService.updateStatus(id, status);
} else {
wsDorisOperatorInstanceService.clearStatus(id);
}
} catch (Exception e) {
log.error("update doris operator instance status error! id: {}", id, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cn.sliew.scaleph.engine.doris.service;

import cn.sliew.scaleph.engine.doris.operator.DorisCluster;
import cn.sliew.scaleph.engine.doris.operator.status.DorisClusterStatus;
import cn.sliew.scaleph.engine.doris.service.dto.WsDorisOperatorInstanceDTO;
import cn.sliew.scaleph.engine.doris.service.param.WsDorisOperatorInstanceAddParam;
import cn.sliew.scaleph.engine.doris.service.param.WsDorisOperatorInstanceListParam;
Expand All @@ -31,6 +32,8 @@ public interface WsDorisOperatorInstanceService {

Page<WsDorisOperatorInstanceDTO> list(WsDorisOperatorInstanceListParam param);

List<Long> listAll();

WsDorisOperatorInstanceDTO selectOne(Long id);

WsDorisOperatorInstanceDTO fromTemplate(Long templateId);
Expand All @@ -50,4 +53,8 @@ public interface WsDorisOperatorInstanceService {
void apply(Long id);

void shutdown(Long id);

int updateStatus(Long id, DorisClusterStatus status);

int clearStatus(Long id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import cn.sliew.scaleph.dao.entity.master.ws.WsDorisOperatorInstance;
import cn.sliew.scaleph.dao.mapper.master.ws.WsDorisOperatorInstanceMapper;
import cn.sliew.scaleph.engine.doris.operator.DorisCluster;
import cn.sliew.scaleph.engine.doris.operator.status.DorisClusterStatus;
import cn.sliew.scaleph.engine.doris.service.DorisOperatorService;
import cn.sliew.scaleph.engine.doris.service.WsDorisOperatorInstanceService;
import cn.sliew.scaleph.engine.doris.service.WsDorisOperatorTemplateService;
Expand All @@ -45,6 +46,7 @@
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.stream.Collectors;

import static cn.sliew.milky.common.check.Ensures.checkState;

Expand Down Expand Up @@ -73,6 +75,15 @@ public Page<WsDorisOperatorInstanceDTO> list(WsDorisOperatorInstanceListParam pa
return result;
}

@Override
public List<Long> listAll() {
LambdaQueryWrapper<WsDorisOperatorInstance> queryWrapper = Wrappers.lambdaQuery(WsDorisOperatorInstance.class)
.eq(WsDorisOperatorInstance::getDeployed, YesOrNo.YES)
.select(WsDorisOperatorInstance::getId);
List<WsDorisOperatorInstance> wsDorisOperatorInstances = wsDorisOperatorInstanceMapper.selectList(queryWrapper);
return wsDorisOperatorInstances.stream().map(WsDorisOperatorInstance::getId).collect(Collectors.toList());
}

@Override
public WsDorisOperatorInstanceDTO selectOne(Long id) {
WsDorisOperatorInstance record = wsDorisOperatorInstanceMapper.selectById(id);
Expand Down Expand Up @@ -146,12 +157,15 @@ public int update(WsDorisOperatorInstanceUpdateParam param) {

@Override
public int deleteById(Long id) {
WsDorisOperatorInstanceDTO instanceDTO = selectOne(id);
checkState(instanceDTO.getDeployed() == YesOrNo.NO, () -> "doris instance already deployed! can't delete");
return wsDorisOperatorInstanceMapper.deleteById(id);
}

@Override
public int deleteBatch(List<Long> ids) {
return wsDorisOperatorInstanceMapper.deleteBatchIds(ids);
ids.forEach(this::deleteById);
return ids.size();
}

@Override
Expand All @@ -164,6 +178,10 @@ public void deploy(Long id) {
}
String yaml = Serialization.asYaml(dorisCluster);
dorisOperatorService.deploy(instanceDTO.getClusterCredentialId(), yaml);
WsDorisOperatorInstance record = new WsDorisOperatorInstance();
record.setId(instanceDTO.getId());
record.setDeployed(YesOrNo.YES);
wsDorisOperatorInstanceMapper.updateById(record);
}

@Override
Expand All @@ -180,5 +198,42 @@ public void shutdown(Long id) {
DorisCluster dorisCluster = asYaml(instanceDTO);
String yaml = Serialization.asYaml(dorisCluster);
dorisOperatorService.shutdown(instanceDTO.getClusterCredentialId(), yaml);
WsDorisOperatorInstance record = new WsDorisOperatorInstance();
record.setId(instanceDTO.getId());
wsDorisOperatorInstanceMapper.updateById(record);
}

@Override
public int updateStatus(Long id, DorisClusterStatus status) {
if (status == null) {
return -1;
}
WsDorisOperatorInstance record = new WsDorisOperatorInstance();
record.setId(id);
if (status.getFeStatus() != null) {
record.setFeStatus(JacksonUtil.toJsonString(status.getFeStatus()));
}
if (status.getBeStatus() != null) {
record.setBeStatus(JacksonUtil.toJsonString(status.getBeStatus()));
}
if (status.getCnStatus() != null) {
record.setCnStatus(JacksonUtil.toJsonString(status.getCnStatus()));
}
if (status.getBrokerStatus() != null) {
record.setBrokerStatus(JacksonUtil.toJsonString(status.getBrokerStatus()));
}
return wsDorisOperatorInstanceMapper.updateById(record);
}

@Override
public int clearStatus(Long id) {
WsDorisOperatorInstance record = new WsDorisOperatorInstance();
record.setId(id);
record.setDeployed(YesOrNo.NO);
record.setFeStatus(null);
record.setBeStatus(null);
record.setCnStatus(null);
record.setBrokerStatus(null);
return wsDorisOperatorInstanceMapper.updateById(record);
}
}
8 changes: 8 additions & 0 deletions scaleph-ui-react/src/locales/zh-CN/pages/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,14 @@ export default {

'pages.project.doris.instance.detail': '实例详情',
'pages.project.doris.instance.detail.component': '集群组件',
'pages.project.doris.instance.detail.component.replicas': '全部',
'pages.project.doris.instance.detail.component.creating': '创建中',
'pages.project.doris.instance.detail.component.running': '运行中',
'pages.project.doris.instance.detail.component.failed': '已失败',
'pages.project.doris.instance.detail.component.fe': 'FE节点',
'pages.project.doris.instance.detail.component.be': 'BE节点',
'pages.project.doris.instance.detail.component.cn': '计算节点',
'pages.project.doris.instance.detail.component.broker': 'Broker节点',
'pages.project.doris.instance.detail.access': '连接信息',
'pages.project.doris.instance.detail.yaml': 'YAML',
'pages.project.doris.instance.detail.deploy': 'Deploy',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const DorisInstanceDetailAction: React.FC<{ data: WsDorisOperatorInstance }> = (
<div>
<Popconfirm
title={intl.formatMessage({id: 'app.common.operate.submit.confirm.title'})}
disabled={data.deployed?.value == '1'}
onConfirm={() => {
WsDorisOperatorInstanceService.deploy(data.id).then(response => {
if (response.success) {
Expand All @@ -23,14 +24,19 @@ const DorisInstanceDetailAction: React.FC<{ data: WsDorisOperatorInstance }> = (
})
}}
>
<Button type="default" icon={<CaretRightOutlined/>}>
<Button
type="default"
icon={<CaretRightOutlined/>}
disabled={data.deployed?.value == '1'}
>
{intl.formatMessage({id: 'pages.project.doris.instance.detail.deploy'})}
</Button>
</Popconfirm>


<Popconfirm
title={intl.formatMessage({id: 'app.common.operate.submit.confirm.title'})}
disabled={data.deployed?.value == '0'}
onConfirm={() => {
WsDorisOperatorInstanceService.shutdown(data.id).then(response => {
if (response.success) {
Expand All @@ -39,7 +45,10 @@ const DorisInstanceDetailAction: React.FC<{ data: WsDorisOperatorInstance }> = (
})
}}
>
<Button icon={<CloseOutlined/>}>
<Button
icon={<CloseOutlined/>}
disabled={data.deployed?.value == '0'}
>
{intl.formatMessage({id: 'pages.project.doris.instance.detail.shutdown'})}
</Button>
</Popconfirm>
Expand Down
Loading

0 comments on commit eb4ace1

Please sign in to comment.