diff --git a/pkg/admin/config_service.go b/pkg/admin/config_service.go index bcd8d1fa..5f433916 100644 --- a/pkg/admin/config_service.go +++ b/pkg/admin/config_service.go @@ -341,7 +341,7 @@ func (cs *MyConfigService) UpsertCluster(ctx context.Context, tenant, cluster st return nil } -func (cs *MyConfigService) buildDTSJson(ctx context.Context, tenant, cluster, src, dst string, vtables []*TableDTO, idx int) map[string]interface{} { +func (cs *MyConfigService) buildDTSJson(ctx context.Context, tenant, cluster, src, dst string, srcNode, dstNode *NodeDTO, vtables []*TableDTO, idx int) map[string]interface{} { jobJson := make(map[string]interface{}) jobBody := make(map[string]interface{}) jobJson["Job"] = jobBody @@ -382,16 +382,16 @@ func (cs *MyConfigService) buildDTSJson(ctx context.Context, tenant, cluster, sr jobReplicate = append(jobReplicate, jobDatabase) jobConfig["ReplicateDoDb"] = jobReplicate jobSrcConfig := make(map[string]interface{}) - jobSrcConfig["Host"] = "arana80" - jobSrcConfig["Port"] = 3306 - jobSrcConfig["User"] = "root" - jobSrcConfig["Password"] = "123456" + jobSrcConfig["Host"] = srcNode.Host + jobSrcConfig["Port"] = srcNode.Port + jobSrcConfig["User"] = srcNode.Username + jobSrcConfig["Password"] = srcNode.Password jobConfig["SrcConnectionConfig"] = jobSrcConfig jobDstConfig := make(map[string]interface{}) - jobDstConfig["Host"] = "arana80" - jobDstConfig["Port"] = 3306 - jobDstConfig["User"] = "root" - jobDstConfig["Password"] = "123456" + jobDstConfig["Host"] = dstNode.Host + jobDstConfig["Port"] = dstNode.Port + jobDstConfig["User"] = dstNode.Username + jobDstConfig["Password"] = dstNode.Password jobConfig["DestConnectionConfig"] = jobDstConfig jobTask["Config"] = jobConfig jobSrc["Tasks"] = append(jobTasks, jobTask) @@ -420,22 +420,37 @@ func (cs *MyConfigService) ExtendCluster(ctx context.Context, tenant, cluster st if len(groups) != len(body.Groups) { return perrors.Errorf("new groups is not equle to old groups") } + vtables, err := cs.ListTables(ctx, tenant, cluster) + if err != nil { + return err + } + allNodes, err := cs.ListNodes(ctx, tenant) + if err != nil { + return err + } //2、创建复制group(物理数据库)的dts任务 //groups[0] --> body.Groups[0] //groups[1] --> body.Groups[1] //... - vtables, err := cs.ListTables(ctx, tenant, cluster) - if err != nil { - return err - } httpClient := &http.Client{} dtsJobList := make([]map[string]interface{}, 0, len(groups)) dtsEndpoint := config.BootOpts.Dts.Options["endpoints"].(string) for i := range groups { srcGroup := groups[i].Name + var srcNode, dstNode *NodeDTO + for n := range allNodes { + if allNodes[n].Database == srcGroup { + srcNode = allNodes[n] + } + } dstGroup := body.Groups[i] - dtsJob := cs.buildDTSJson(ctx, tenant, cluster, srcGroup, dstGroup, vtables, i) + for n := range allNodes { + if allNodes[n].Database == dstGroup { + dstNode = allNodes[n] + } + } + dtsJob := cs.buildDTSJson(ctx, tenant, cluster, srcGroup, dstGroup, srcNode, dstNode, vtables, i) if dtsJob == nil { return perrors.Errorf("failed to build DTS json parameter") } @@ -454,9 +469,34 @@ func (cs *MyConfigService) ExtendCluster(ctx context.Context, tenant, cluster st } //3、检查是否复制完毕 + for { + time.Sleep(5 * time.Second) + finished := false + + for i := range dtsJobList { + dtsJob := dtsJobList[i]["Job"].(map[string]interface{}) + httpURL := dtsEndpoint + "/v1/job/" + dtsJob["ID"].(string) + httpReq, err := http.NewRequest("GET", httpURL, nil) + if err != nil { + return perrors.Errorf("failed to create GET http requst") + } + + httpResp, err := httpClient.Do(httpReq) + if err != nil { + return perrors.Errorf("failed to check replica source group") + } + + //TODO: check Status + finished = true + httpResp.Body.Close() + } + + if finished { + break + } + } //4、断开并拒绝所有客户端连接 - time.Sleep(5 * time.Second) //5、再次检查是否复制完毕 @@ -477,10 +517,6 @@ func (cs *MyConfigService) ExtendCluster(ctx context.Context, tenant, cluster st } //7、更新groups节点 - allNodes, err := cs.ListNodes(ctx, tenant) - if err != nil { - return err - } var groupBody GroupDTO var groupNode string for i := range body.Groups {