Skip to content

Commit

Permalink
data share (#226)
Browse files Browse the repository at this point in the history
* fix grpc psi

* fix grpc psi

* fix psi

* fix psi

* feat local dev envs

* feat organ joining pass

* feat data share

* fix node joining auto pass

* fix apply for connect pull dataresource

* feat event log

* feat log

* feat log

* feat log

* feat log to roll back

* feat log to roll back

* feat log to roll back

* fix data_fusion_task entity fill id key automatically

* fix select resource by resourceId (string and long)

* feat add data_share address

* feat update menu auth ddl sql

---------

Co-authored-by: Like habits <17801066560@163.com>
Co-authored-by: Unknown <unknown@example.com>
Co-authored-by: primi <wy19283745@163.com>
Co-authored-by: terrence <terrence>
  • Loading branch information
4 people authored Dec 1, 2023
1 parent 0421b91 commit c7efe56
Show file tree
Hide file tree
Showing 15 changed files with 452 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,34 @@ public BaseResultEntity joiningPartners(String gateway,String publicKey){
if (StringUtils.isBlank(publicKey)) {
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"publicKey");
}
// 解决 "+" 在传输中替换为 " " 的问题
if (publicKey.contains(" ")) {
publicKey = publicKey.replace(" ", "+");
}
return sysOrganService.joiningPartners(gateway,publicKey);
}

/**
* 加入合作方
* @param gateway
* @param publicKey
* @return
*/
@RequestMapping("joiningPartnersForResource")
public BaseResultEntity joiningPartnersForResource(String gateway,String publicKey){
if (StringUtils.isBlank(gateway)) {
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"gateway");
}
if (StringUtils.isBlank(publicKey)) {
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"publicKey");
}
// 解决 "+" 在传输中替换为 " " 的问题
if (publicKey.contains(" ")) {
publicKey = publicKey.replace(" ", "+");
}
return sysOrganService.joiningPartnersForResource(gateway,publicKey);
}

/**
* 查询合作列表
* @param param
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
server:
port: 8091
spring:
application:
name: platform
cloud:
nacos:
discovery:
server-addr: localhost:8848
namespace: dev1
nacos:
config:
server-addr: localhost:8848
namespace: dev1
max-retry: 3
config-long-poll-timeout: 1000
config-retry-time: 1000
auto-refresh: true
username: nacos
password: nacos
context-path: /nacos
group: DEFAULT_GROUP
file-extension: yaml
aj:
captcha:
jigsaw: classpath:images/yyjigsaw
cache-type: redis
water-mark: PrimiHub
slip-offset: 5
aes-status: true
interference-options: 2
req-frequency-limit-enable: true
req-get-lock-limit: 30
req-get-lock-seconds: 30
req-get-minute-limit: 10
req-check-minute-limit: 10
req-verify-minute-limit: 10
type: BLOCKPUZZLE
logging:
level:
root: info
mybatis.mapper: debug
com.primihub.biz.repository: debug
mybatis:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
server:
port: 8092
spring:
application:
name: platform
cloud:
nacos:
discovery:
server-addr: localhost:8848
namespace: dev2
nacos:
config:
server-addr: localhost:8848
namespace: dev2
max-retry: 3
config-long-poll-timeout: 1000
config-retry-time: 1000
auto-refresh: true
username: nacos
password: nacos
context-path: /nacos
group: DEFAULT_GROUP
file-extension: yaml
aj:
captcha:
jigsaw: classpath:images/yyjigsaw
cache-type: redis
water-mark: PrimiHub
slip-offset: 5
aes-status: true
interference-options: 2
req-frequency-limit-enable: true
req-get-lock-limit: 30
req-get-lock-seconds: 30
req-get-minute-limit: 10
req-check-minute-limit: 10
req-verify-minute-limit: 10
type: BLOCKPUZZLE
logging:
level:
root: info
mybatis.mapper: debug
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,37 @@ public static DataResourceVo dataResourcePoConvertVo(DataResource po){
vo.setResourceFusionId(po.getResourceFusionId());
return vo;
}

public static DataResourceVo dataResourcePoConvertVo(DataResource po, String organFusionId, String organName){
DataResourceVo vo = new DataResourceVo();
vo.setCreateDate(po.getCreateDate());
vo.setDbId(po.getDbId());
vo.setFileId(po.getFileId());
vo.setFileSize(po.getFileSize());
vo.setFileSuffix(po.getFileSuffix());
vo.setFileColumns(po.getFileColumns());
vo.setFileRows(po.getFileRows());
vo.setFileHandleStatus(po.getFileHandleStatus());
vo.setFileContainsY(po.getFileContainsY());
vo.setFileYRows(po.getFileYRows());
vo.setFileYRatio(po.getFileYRatio());
vo.setOrganId(po.getOrganId());
vo.setOrganName(organName);
vo.setOrganFusionId(organFusionId);
vo.setResourceAuthType(po.getResourceAuthType());
vo.setResourceSource(po.getResourceSource());
vo.setResourceDesc(po.getResourceDesc());
vo.setResourceName(po.getResourceName());
vo.setResourceId(po.getResourceId());
vo.setResourceNum(po.getResourceNum());
vo.setUserId(po.getUserId());
vo.setUrl(po.getUrl());
vo.setFileHandleField(StringUtils.isBlank(po.getFileHandleField())?new String[]{}:po.getFileHandleField().split(","));
vo.setResourceState(po.getResourceState());
vo.setResourceHashCode(po.getResourceHashCode());
vo.setResourceFusionId(po.getResourceFusionId());
return vo;
}
public static DataFileFieldVo DataFileFieldPoConvertVo(DataFileField fileField){
DataFileFieldVo dataFileFieldVo = new DataFileFieldVo();
dataFileFieldVo.setFieldId(fileField.getFieldId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,8 @@ public class DataResourceVo {
* 中心节点资源id
*/
private String resourceFusionId;
/**
* organFusionId
*/
private String organFusionId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.primihub.biz.entity.event;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RemoteDataResourceEvent {
private Long resourceId;
private Integer resourceState;
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void handleFusionCopyTask(DataFusionCopyTask task){
errorMsg = "机构信息查询null";
}else {
BaseResultEntity resultEntity = otherBusinessesService.syncGatewayApiData(copyDto, task.getServerAddress() + "/share/shareData/saveFusionResource", sysOrgan.getPublicKey());
log.info("本次查找备份结果:{}-{}, 其中与其他机构同步结果: {}", startOffset, endOffset, JSON.toJSONString(resultEntity));
if (!resultEntity.getCode().equals(BaseResultEnum.SUCCESS.getReturnCode())) {
isSuccess = false;
if (++errorCount >= 3) {
Expand All @@ -125,6 +126,7 @@ public void handleFusionCopyTask(DataFusionCopyTask task){
}
}
if(isSuccess) {
log.info("传输成功一样:执行修改任务:taskId: {}, endOffset: {}", task.getId(), endOffset);
dataCopyPrimarydbRepository.updateCopyInfo(task.getId(),endOffset,"success");
if (task.getTaskType() == 1) {
startOffset=startOffset+DataConstant.COPY_PAGE_NUM;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import com.primihub.biz.entity.base.*;
import com.primihub.biz.entity.data.base.ResourceFileData;
import com.primihub.biz.entity.data.dataenum.DataResourceAuthType;
import com.primihub.biz.entity.data.dataenum.ResourceStateEnum;
import com.primihub.biz.entity.data.dataenum.SourceEnum;
import com.primihub.biz.entity.data.dto.DataFusionCopyDto;
import com.primihub.biz.entity.data.dto.ModelDerivationDto;
import com.primihub.biz.entity.data.po.*;
import com.primihub.biz.entity.data.req.*;
import com.primihub.biz.entity.data.vo.*;
import com.primihub.biz.entity.event.RemoteDataResourceEvent;
import com.primihub.biz.entity.sys.po.SysFile;
import com.primihub.biz.entity.sys.po.SysLocalOrganInfo;
import com.primihub.biz.entity.sys.po.SysUser;
Expand All @@ -37,6 +39,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

Expand Down Expand Up @@ -76,6 +79,8 @@ public class DataResourceService {
private FusionResourceService fusionResourceService;
@Autowired
private TaskHelper taskHelper;
@Autowired
private ApplicationContext applicationContext;

public BaseResultEntity getDataResourceList(DataResourceReq req, Long userId){
Map<String,Object> paramMap = new HashMap<>();
Expand Down Expand Up @@ -180,6 +185,11 @@ public BaseResultEntity saveDataResource(DataResourceReq req,Long userId){
map.put("resourceFusionId",dataResource.getResourceFusionId());
map.put("resourceName",dataResource.getResourceName());
map.put("resourceDesc",dataResource.getResourceDesc());

// 传送任务
if (dataResource.getResourceAuthType().equals(DataResourceAuthType.PUBLIC.getAuthType())) {
applicationContext.publishEvent(new RemoteDataResourceEvent(dataResource.getResourceId(), null));
}
}catch (Exception e){
log.info("save DataResource Exception:{}",e.getMessage());
e.printStackTrace();
Expand Down Expand Up @@ -229,13 +239,31 @@ public BaseResultEntity editDataResource(DataResourceReq req, Long userId) {
map.put("resourceId",dataResource.getResourceId());
map.put("resourceName",dataResource.getResourceName());
map.put("resourceDesc",dataResource.getResourceDesc());

if (Objects.equals(dataResource.getResourceAuthType(), DataResourceAuthType.PUBLIC.getAuthType()) || Objects.equals(req.getResourceAuthType(), DataResourceAuthType.PRIVATE.getAuthType())) {
if (Objects.equals(req.getResourceAuthType(), DataResourceAuthType.PUBLIC.getAuthType())) {
RemoteDataResourceEvent remoteDataResourceEvent = new RemoteDataResourceEvent(dataResource.getResourceId(), ResourceStateEnum.AVAILABLE.getStateType());
applicationContext.publishEvent(remoteDataResourceEvent);
log.info("spring event publish : {}", JSONObject.toJSONString(remoteDataResourceEvent));
} else {
RemoteDataResourceEvent remoteDataResourceEvent = new RemoteDataResourceEvent(dataResource.getResourceId(), ResourceStateEnum.NOT_AVAILABLE.getStateType());
applicationContext.publishEvent(new RemoteDataResourceEvent(dataResource.getResourceId(), ResourceStateEnum.NOT_AVAILABLE.getStateType()));
log.info("spring event publish : {}", JSONObject.toJSONString(remoteDataResourceEvent));
}
}
return BaseResultEntity.success(map);
}

public BaseResultEntity getDataResource(String resourceId) {
DataResource dataResource = dataResourceRepository.queryDataResourceByResourceFusionId(resourceId);
if (dataResource == null){
dataResource = dataResourceRepository.queryDataResourceById(Long.parseLong(resourceId));
try {
long l = Long.parseLong(resourceId);
dataResource = dataResourceRepository.queryDataResourceById(Long.parseLong(resourceId));
} catch (Exception e) {
log.info("找不到资源:{}", resourceId);
return BaseResultEntity.failure(BaseResultEnum.DATA_QUERY_NULL);
}
}
if (dataResource == null) {
return BaseResultEntity.failure(BaseResultEnum.DATA_QUERY_NULL);
Expand Down Expand Up @@ -456,8 +484,11 @@ public BaseResultEntity getResourceTags() {
}

public Object findFusionCopyResourceList(Long startOffset, Long endOffset){
log.info("本方机构:{}, 查找fusionCopy的, startOffset :{}, endOffset:{}", organConfiguration.getSysLocalOrganName(), startOffset, endOffset);
List<DataResource> resourceList=dataResourceRepository.findCopyResourceList(startOffset,endOffset);
log.info("本地查找出的资源: {}", resourceList.size());
Set<String> ids = resourceList.stream().map(DataResource::getResourceFusionId).collect(Collectors.toSet());
log.info("远程查找备份的资源ids: {}", ids);
BaseResultEntity result = fusionResourceService.getCopyResource(ids);
log.info(JSONObject.toJSONString(result));
return result.getResult();
Expand Down Expand Up @@ -685,6 +716,8 @@ public BaseResultEntity resourceStatusChange(Long resourceId, Integer resourceSt
dataResourcePrRepository.editResource(dataResource);
fusionResourceService.saveResource(organConfiguration.getSysLocalOrganId(),findCopyResourceList(dataResource.getResourceId(), dataResource.getResourceId()));
singleTaskChannel.input().send(MessageBuilder.withPayload(JSON.toJSONString(new BaseFunctionHandleEntity(BaseFunctionHandleEnum.SINGLE_DATA_FUSION_RESOURCE_TASK.getHandleType(),dataResource))).build());

applicationContext.publishEvent(new RemoteDataResourceEvent(resourceId, null));
}
return BaseResultEntity.success();
}
Expand Down Expand Up @@ -882,5 +915,57 @@ public BaseResultEntity noticeResource(String resourceId) {
singleTaskChannel.input().send(MessageBuilder.withPayload(JSON.toJSONString(new BaseFunctionHandleEntity(BaseFunctionHandleEnum.SINGLE_DATA_FUSION_RESOURCE_TASK.getHandleType(),dataResource))).build());
return BaseResultEntity.success();
}

/**
* 组装传输实体
* @param resourceId
* @return
*/
public Map getDataResourceToTransfer(Long resourceId, Integer resourceState) {
DataResource dataResource = dataResourceRepository.queryDataResourceById(resourceId);
if (dataResource == null) {
return null;
}
if (resourceState != null) {
dataResource.setResourceState(resourceState);
}
List<DataResourceTag> dataResourceTags = dataResourceRepository.queryTagsByResourceId(resourceId);
DataResourceVo dataResourceVo = DataResourceConvert.dataResourcePoConvertVo(dataResource, organConfiguration.getSysLocalOrganId() ,organConfiguration.getSysLocalOrganName());
dataResourceVo.setTags(dataResourceTags.stream().map(DataResourceConvert::dataResourceTagPoConvertListVo).collect(Collectors.toList()));
List<DataFileFieldVo> dataFileFieldList = dataResourceRepository.queryDataFileFieldByFileId(dataResource.getResourceId())
.stream().map(DataResourceConvert::DataFileFieldPoConvertVo)
.collect(Collectors.toList());
Map<String, Object> map = new HashMap<>();
try {
// 数据库格式或者是csv格式
if (dataResource.getResourceSource() == 2) {
DataSource dataSource = dataResourceRepository.queryDataSourceById(dataResource.getDbId());
log.info("{}-{}", dataResource.getDbId(), JSONObject.toJSONString(dataSource));
if (dataSource != null) {
BaseResultEntity baseResultEntity = dataSourceService.dataSourceTableDetails(dataSource);
if (baseResultEntity.getCode() == 0) {
map.putAll((Map<String, Object>) baseResultEntity.getResult());
}
}
} else {
List<LinkedHashMap<String, Object>> csvData = FileUtil.getCsvData(dataResource.getUrl(), DataConstant.READ_DATA_ROW);
map.put("dataList", csvData);
}
} catch (Exception e) {
log.info("资源id:{}-文件读取失败:{}", dataResource.getResourceId(), e.getMessage());
map.put("dataList", new ArrayList());
}
map.put("resource", dataResourceVo);
// 不需要获取授权信息
map.put("fieldList", dataFileFieldList);
Map<String, String> nodeInfoMap = new HashMap<String, String>() {
{
put("gateway", organConfiguration.getSysLocalOrganInfo().getGatewayAddress());
put("publicKey", organConfiguration.getSysLocalOrganInfo().getPublicKey());
}
};
map.put("nodeInfo", JSON.toJSONString(nodeInfoMap));
return map;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public BaseResultEntity recallNotFinishedTask(){
Date tenMinuteAgo= DateUtil.changeDate(date, Calendar.MINUTE,-3);
List<DataFusionCopyTask> notFinishedTask=dataCopyService.selectNotFinishedTask(threeDayAgo,tenMinuteAgo);
for(DataFusionCopyTask task:notFinishedTask){
log.info("本次recall的任务id: {}", task.getId());
dataCopyService.handleFusionCopyTask(task);
}
result.put("fusionMsg","本节点处理");
Expand Down
Loading

0 comments on commit c7efe56

Please sign in to comment.