Skip to content

Commit

Permalink
fixed the issue that the same collection in different db is skipped w…
Browse files Browse the repository at this point in the history
…hen replicating
  • Loading branch information
SimFG committed Dec 3, 2024
1 parent d7d0689 commit f0f4597
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions core/reader/collection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,24 +192,31 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
return
}

recordCreateCollectionTime := make(map[string]*pb.CollectionInfo)
recordCreateCollectionTime := make(map[int64]map[string]*pb.CollectionInfo)
repeatedCollectionID := make(map[int64]struct{})
repeatedCollectionName := make(map[string]struct{})
repeatedCollectionName := make(map[int64][]string)
for _, info := range existedCollectionInfos {
// TODO should consider the same collection name in different db
collectionName := info.Schema.GetName()
createTime := info.CreateTime
lastCollectionInfo, recordOK := recordCreateCollectionTime[collectionName]
dbCollections := recordCreateCollectionTime[info.GetDbId()]
if dbCollections == nil {
dbCollections = map[string]*pb.CollectionInfo{
collectionName: info,
}
recordCreateCollectionTime[info.GetDbId()] = dbCollections
continue
}
lastCollectionInfo, recordOK := dbCollections[collectionName]
if recordOK {
if createTime > lastCollectionInfo.CreateTime {
repeatedCollectionID[lastCollectionInfo.ID] = struct{}{}
recordCreateCollectionTime[collectionName] = info
dbCollections[collectionName] = info
} else {
repeatedCollectionID[info.ID] = struct{}{}
}
repeatedCollectionName[collectionName] = struct{}{}
repeatedCollectionName[info.GetDbId()] = append(repeatedCollectionName[info.GetDbId()], collectionName)
} else {
recordCreateCollectionTime[collectionName] = info
dbCollections[collectionName] = info
}
}

Expand All @@ -236,7 +243,7 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
seekPositions := make([]*msgpb.MsgPosition, 0)
if collectionSeekPositionMap != nil {
seekPositions = lo.Values(collectionSeekPositionMap)
} else if _, ok := repeatedCollectionName[info.Schema.Name]; ok {
} else if dbCollections, ok := repeatedCollectionName[info.DbId]; ok && lo.Contains(dbCollections, info.Schema.Name) {
log.Warn("server warn: find the repeated collection, the latest collection will use the collection start position.", zap.String("name", info.Schema.Name), zap.Int64("collection_id", info.ID))
for _, v := range info.StartPositions {
seekPositions = append(seekPositions, &msgstream.MsgPosition{
Expand Down

0 comments on commit f0f4597

Please sign in to comment.