diff --git a/core/reader/replicate_channel_manager.go b/core/reader/replicate_channel_manager.go index 24003abd..7a258104 100644 --- a/core/reader/replicate_channel_manager.go +++ b/core/reader/replicate_channel_manager.go @@ -170,6 +170,11 @@ func (r *replicateChannelManager) StartReadCollection(ctx context.Context, info r.addCollectionLock.Unlock() }() + if r.isDroppedCollection(info.ID) { + log.Info("the collection is dropped", zap.Int64("collection_id", info.ID)) + return nil + } + sourceDBInfo := r.metaOp.GetDatabaseInfoForCollection(ctx, info.ID) var err error @@ -362,7 +367,11 @@ func (r *replicateChannelManager) AddPartition(ctx context.Context, collectionIn sourceDBInfo := r.metaOp.GetDatabaseInfoForCollection(ctx, collectionID) if sourceDBInfo.Dropped { - partitionLog.Warn("the database has been dropped when add partition") + partitionLog.Info("the database has been dropped when add partition") + return nil + } + if r.isDroppedPartition(partitionInfo.PartitionID) { + partitionLog.Info("the partition has been dropped when add partition") return nil } @@ -412,7 +421,7 @@ func (r *replicateChannelManager) AddPartition(ctx context.Context, collectionIn if partitionInfo.State == pb.PartitionState_PartitionDropping || partitionInfo.State == pb.PartitionState_PartitionDropped { r.droppedPartitions.Store(partitionInfo.PartitionID, struct{}{}) - partitionLog.Warn("the partition is dropped in the source and target") + partitionLog.Info("the partition is dropped in the source and target") return nil } select { @@ -456,6 +465,11 @@ func (r *replicateChannelManager) AddPartition(ctx context.Context, collectionIn if _, ok := r.replicatePartitions[collectionID]; !ok { r.replicatePartitions[collectionID] = make(map[int64]chan struct{}) } + if _, ok := r.replicatePartitions[collectionID][partitionInfo.PartitionID]; ok { + partitionLog.Info("the partition is already replicated", zap.Int64("partition_id", partitionInfo.PartitionID)) + r.partitionLock.Unlock() + return nil + } r.replicatePartitions[collectionID][partitionInfo.PartitionID] = barrier.CloseChan r.partitionLock.Unlock() for _, handler := range handlers {