Skip to content

Commit

Permalink
add the ref cnt for the replicate entity
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Aug 9, 2024
1 parent 3ca50d2 commit a0e76f9
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 24 deletions.
64 changes: 45 additions & 19 deletions server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/google/uuid"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"

"github.com/zilliztech/milvus-cdc/core/api"
"github.com/zilliztech/milvus-cdc/core/config"
Expand All @@ -60,11 +62,12 @@ type ReplicateEntity struct {
channelManager api.ChannelManager
targetClient api.TargetAPI
metaOp api.MetaOp
readerObj api.Reader // TODO the reader's counter may be more than one
writerObj api.Writer
mqDispatcher msgdispatcher.Client
mqTTDispatcher msgdispatcher.Client
quitFunc func()
entityQuitFunc func()
taskQuitFuncs *typeutil.ConcurrentMap[string, func()]
refCnt atomic.Int32
}

type MetaCDC struct {
Expand Down Expand Up @@ -331,6 +334,11 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
e.cdcTasks.Unlock()
err = e.startInternal(info, false)
if err != nil {
deleteErr := e.delete(info.TaskID)
if deleteErr != nil {
log.Warn("fail to delete the task", zap.String("task_id", info.TaskID), zap.Error(deleteErr))
return nil, servererror.NewServerError(deleteErr)
}
return nil, err
}

Expand Down Expand Up @@ -513,17 +521,12 @@ func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) err
return err
}
readCtx, cancelReadFunc := context.WithCancel(log.WithTraceID(context.Background(), info.TaskID))
e.replicateEntityMap.Lock()
originQuitFunc := replicateEntity.quitFunc
replicateEntity.quitFunc = func() {
replicateEntity.taskQuitFuncs.Insert(info.TaskID, func() {
collectionReader.QuitRead(readCtx)
channelReader.QuitRead(readCtx)
cancelReadFunc()
if originQuitFunc != nil {
originQuitFunc()
}
}
e.replicateEntityMap.Unlock()
})
replicateEntity.refCnt.Inc()

if !ignoreUpdateState {
err = store.UpdateTaskState(e.metaStoreFactory.GetTaskInfoMetaStore(ctx), info.TaskID, meta.TaskStateRunning, []meta.TaskState{meta.TaskStateInitial, meta.TaskStatePaused}, "")
Expand Down Expand Up @@ -619,7 +622,6 @@ func (e *MetaCDC) newReplicateEntity(info *meta.TaskInfo) (*ReplicateEntity, err
MessageBufferSize: bufferSize,
Retry: e.config.Retry,
}, metaOp.GetAllDroppedObj())

e.replicateEntityMap.Lock()
defer e.replicateEntityMap.Unlock()
entity, ok := e.replicateEntityMap.data[milvusAddress]
Expand All @@ -631,9 +633,10 @@ func (e *MetaCDC) newReplicateEntity(info *meta.TaskInfo) (*ReplicateEntity, err
channelManager: channelManager,
metaOp: metaOp,
writerObj: writerObj,
quitFunc: cancelReplicateFunc,
entityQuitFunc: cancelReplicateFunc,
mqDispatcher: msgDispatcherClient,
mqTTDispatcher: msgTTDispatcherClient,
taskQuitFuncs: typeutil.NewConcurrentMap[string, func()](),
}
e.replicateEntityMap.data[milvusAddress] = entity
e.startReplicateAPIEvent(replicateCtx, info, entity)
Expand Down Expand Up @@ -900,7 +903,9 @@ func (e *MetaCDC) pauseTaskWithReason(taskID, reason string, currentStates []met
milvusAddress := GetMilvusAddress(cdcTask.MilvusConnectParam)
e.replicateEntityMap.Lock()
if replicateEntity, ok := e.replicateEntityMap.data[milvusAddress]; ok {
replicateEntity.quitFunc()
if quitFunc, ok := replicateEntity.taskQuitFuncs.GetAndRemove(taskID); ok {
quitFunc()
}
}
delete(e.replicateEntityMap.data, milvusAddress)
e.replicateEntityMap.Unlock()
Expand All @@ -915,12 +920,27 @@ func (e *MetaCDC) Delete(req *request.DeleteRequest) (*request.DeleteResponse, e
return nil, servererror.NewClientError("not found the task, task_id: " + req.TaskID)
}

var err error
err := e.delete(req.TaskID)
if err != nil {
return nil, servererror.NewServerError(err)
}
return &request.DeleteResponse{}, nil
}

func (e *MetaCDC) delete(taskID string) error {
e.cdcTasks.RLock()
_, ok := e.cdcTasks.data[taskID]
e.cdcTasks.RUnlock()
if !ok {
return errors.Errorf("not found the task, task_id: " + taskID)
}

var err error
var info *meta.TaskInfo
info, err = store.DeleteTask(e.metaStoreFactory, req.TaskID)

info, err = store.DeleteTask(e.metaStoreFactory, taskID)
if err != nil {
return nil, servererror.NewServerError(errors.WithMessage(err, "fail to delete the task meta, task_id: "+req.TaskID))
return errors.WithMessage(err, "fail to delete the task meta, task_id: "+taskID)
}
milvusAddress := fmt.Sprintf("%s:%d", info.MilvusConnectParam.Host, info.MilvusConnectParam.Port)
collectionNames := info.CollectionNames()
Expand All @@ -932,17 +952,23 @@ func (e *MetaCDC) Delete(req *request.DeleteRequest) (*request.DeleteResponse, e
e.collectionNames.Unlock()

e.cdcTasks.Lock()
delete(e.cdcTasks.data, req.TaskID)
delete(e.cdcTasks.data, taskID)
e.cdcTasks.Unlock()

e.replicateEntityMap.Lock()
if replicateEntity, ok := e.replicateEntityMap.data[milvusAddress]; ok {
replicateEntity.quitFunc()
if quitFunc, ok := replicateEntity.taskQuitFuncs.GetAndRemove(taskID); ok {
quitFunc()
replicateEntity.refCnt.Dec()
}
if replicateEntity.refCnt.Load() == 0 {
replicateEntity.entityQuitFunc()
}
}
delete(e.replicateEntityMap.data, milvusAddress)
e.replicateEntityMap.Unlock()

return &request.DeleteResponse{}, err
return err
}

func (e *MetaCDC) Pause(req *request.PauseRequest) (*request.PauseResponse, error) {
Expand Down
15 changes: 10 additions & 5 deletions server/cdc_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ func TestReload(t *testing.T) {
metaCDC.replicateEntityMap.Lock()
metaCDC.replicateEntityMap.data = map[string]*ReplicateEntity{
"127.0.0.1:19530": {
quitFunc: func() {},
entityQuitFunc: func() {},
taskQuitFuncs: typeutil.NewConcurrentMap[string, func()](),
},
}
metaCDC.replicateEntityMap.Unlock()
Expand Down Expand Up @@ -1071,7 +1072,8 @@ func TestDelete(t *testing.T) {
metaCDC.cdcTasks.Unlock()
metaCDC.replicateEntityMap.Lock()
metaCDC.replicateEntityMap.data["127.0.0.1:6666"] = &ReplicateEntity{
quitFunc: func() {},
entityQuitFunc: func() {},
taskQuitFuncs: typeutil.NewConcurrentMap[string, func()](),
}
metaCDC.replicateEntityMap.Unlock()

Expand Down Expand Up @@ -1162,10 +1164,13 @@ func TestPauseTask(t *testing.T) {
var isQuit util.Value[bool]
isQuit.Store(false)
m.replicateEntityMap.Lock()
cm := typeutil.NewConcurrentMap[string, func()]()
cm.Insert("task1", func() {
isQuit.Store(true)
})
m.replicateEntityMap.data["127.0.0.1:19530"] = &ReplicateEntity{
quitFunc: func() {
isQuit.Store(true)
},
entityQuitFunc: func() {},
taskQuitFuncs: cm,
}
m.replicateEntityMap.Unlock()
err := m.pauseTaskWithReason("task1", "foo", []meta.TaskState{})
Expand Down

0 comments on commit a0e76f9

Please sign in to comment.