Skip to content

Commit

Permalink
skip to create collection/partition if it has been deleted
Browse files Browse the repository at this point in the history
skip to create collection/partition in the etcd event if it has been checked deleted when getting all objects in the etcd

Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Jul 22, 2024
1 parent a73b590 commit 6c4fd6f
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (r *replicateChannelManager) StartReadCollection(ctx context.Context, info
r.addCollectionLock.Unlock()
}()

if r.isDroppedCollection(info.ID) {
log.Info("the collection is dropped", zap.Int64("collection_id", info.ID))
return nil
}

sourceDBInfo := r.metaOp.GetDatabaseInfoForCollection(ctx, info.ID)

var err error
Expand Down Expand Up @@ -362,7 +367,11 @@ func (r *replicateChannelManager) AddPartition(ctx context.Context, collectionIn
sourceDBInfo := r.metaOp.GetDatabaseInfoForCollection(ctx, collectionID)

if sourceDBInfo.Dropped {
partitionLog.Warn("the database has been dropped when add partition")
partitionLog.Info("the database has been dropped when add partition")
return nil
}
if r.isDroppedPartition(partitionInfo.PartitionID) {
partitionLog.Info("the partition has been dropped when add partition")
return nil
}

Expand Down Expand Up @@ -412,7 +421,7 @@ func (r *replicateChannelManager) AddPartition(ctx context.Context, collectionIn
if partitionInfo.State == pb.PartitionState_PartitionDropping ||
partitionInfo.State == pb.PartitionState_PartitionDropped {
r.droppedPartitions.Store(partitionInfo.PartitionID, struct{}{})
partitionLog.Warn("the partition is dropped in the source and target")
partitionLog.Info("the partition is dropped in the source and target")
return nil
}
select {
Expand Down Expand Up @@ -456,6 +465,11 @@ func (r *replicateChannelManager) AddPartition(ctx context.Context, collectionIn
if _, ok := r.replicatePartitions[collectionID]; !ok {
r.replicatePartitions[collectionID] = make(map[int64]chan struct{})
}
if _, ok := r.replicatePartitions[collectionID][partitionInfo.PartitionID]; ok {
partitionLog.Info("the partition is already replicated", zap.Int64("partition_id", partitionInfo.PartitionID))
r.partitionLock.Unlock()
return nil
}
r.replicatePartitions[collectionID][partitionInfo.PartitionID] = barrier.CloseChan
r.partitionLock.Unlock()
for _, handler := range handlers {
Expand Down

0 comments on commit 6c4fd6f

Please sign in to comment.