diff --git a/configs/backup.yaml b/configs/backup.yaml index dad734d..4daa3ce 100644 --- a/configs/backup.yaml +++ b/configs/backup.yaml @@ -48,7 +48,7 @@ backup: # thread pool to copy data for each collection backup, default 100. # which means if you set backup.parallelism = 2 backup.copydata.parallelism = 100, there will be 200 copy executing at the same time. # reduce it if blocks your storage's network bandwidth - parallelism: 100 + parallelism: 128 restore: # Collection level parallelism to restore diff --git a/core/backup_impl_create_backup.go b/core/backup_impl_create_backup.go index 10aa7c6..7212419 100644 --- a/core/backup_impl_create_backup.go +++ b/core/backup_impl_create_backup.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" + "sort" "strconv" "strings" + "sync" "time" jsoniter "github.com/json-iterator/go" @@ -92,7 +94,7 @@ func (b *BackupContext) CreateBackup(ctx context.Context, request *backuppb.Crea b.backupNameIdDict.Store(name, request.GetRequestId()) if request.Async { - go b.executeCreateBackupV2(ctx, request, backup) + go b.executeCreateBackup(ctx, request, backup) asyncResp := &backuppb.BackupInfoResponse{ RequestId: request.GetRequestId(), Code: backuppb.ResponseCode_Success, @@ -101,7 +103,7 @@ func (b *BackupContext) CreateBackup(ctx context.Context, request *backuppb.Crea } return asyncResp } else { - task, err := b.executeCreateBackupV2(ctx, request, backup) + task, err := b.executeCreateBackup(ctx, request, backup) resp.Data = task if err != nil { resp.Code = backuppb.ResponseCode_Fail @@ -433,20 +435,33 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup segmentBackupInfos := make([]*backuppb.SegmentBackupInfo, 0) partSegInfoMap := make(map[int64][]*backuppb.SegmentBackupInfo) - - segmentLevelBackupInfos := make([]*backuppb.SegmentBackupInfo, 0) - - for _, segment := range filledSegments { - segmentInfo, err := b.readSegmentInfo(ctx, segment.CollectionID, segment.ParititionID, segment.ID, segment.NumRows) - if err != nil { - return err - } - if len(segmentInfo.Binlogs) == 0 { - log.Warn("this segment has no insert binlog", zap.Int64("id", segment.ID)) + mu := sync.Mutex{} + wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupCopyDataParallelism, RPS) + if err != nil { + return err + } + wp.Start() + for _, v := range filledSegments { + segment := v + job := func(ctx context.Context) error { + segmentInfo, err := b.readSegmentInfo(ctx, segment.CollectionID, segment.ParititionID, segment.ID, segment.NumRows) + if err != nil { + return err + } + if len(segmentInfo.Binlogs) == 0 { + log.Warn("this segment has no insert binlog", zap.Int64("id", segment.ID)) + } + mu.Lock() + partSegInfoMap[segment.ParititionID] = append(partSegInfoMap[segment.ParititionID], segmentInfo) + segmentBackupInfos = append(segmentBackupInfos, segmentInfo) + mu.Unlock() + return nil } - partSegInfoMap[segment.ParititionID] = append(partSegInfoMap[segment.ParititionID], segmentInfo) - segmentBackupInfos = append(segmentBackupInfos, segmentInfo) - segmentLevelBackupInfos = append(segmentLevelBackupInfos, segmentInfo) + wp.Submit(job) + } + wp.Done() + if err := wp.Wait(); err != nil { + return err } log.Info("readSegmentInfo from storage", zap.String("collectionName", collectionBackup.GetCollectionName()), @@ -520,6 +535,9 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup } } + sort.SliceStable(segmentBackupInfos, func(i, j int) bool { + return segmentBackupInfos[i].Size < segmentBackupInfos[j].Size + }) err = b.copySegments(ctx, segmentBackupInfos, BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName())) if err != nil { return err @@ -531,7 +549,7 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup return nil } -func (b *BackupContext) executeCreateBackupV2(ctx context.Context, request *backuppb.CreateBackupRequest, backupInfo *backuppb.BackupInfo) (*backuppb.BackupInfo, error) { +func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backuppb.CreateBackupRequest, backupInfo *backuppb.BackupInfo) (*backuppb.BackupInfo, error) { b.mu.Lock() defer b.mu.Unlock() @@ -607,378 +625,6 @@ func (b *BackupContext) executeCreateBackupV2(ctx context.Context, request *back return backupInfo, nil } -func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backuppb.CreateBackupRequest, backupInfo *backuppb.BackupInfo) (*backuppb.BackupInfo, error) { - b.mu.Lock() - defer b.mu.Unlock() - - id := backupInfo.GetId() - backupInfo.BackupTimestamp = uint64(time.Now().UnixNano() / int64(time.Millisecond)) - backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_EXECUTING - leveledBackupInfo := &LeveledBackupInfo{ - backupLevel: backupInfo, - } - defer b.refreshBackupMeta(id, backupInfo, leveledBackupInfo) - - // 1, get collection level meta - toBackupCollections, err := b.parseBackupCollections(request) - if err != nil { - log.Error("parse backup collections from request failed", zap.Error(err)) - return backupInfo, err - } - collectionNames := make([]string, len(toBackupCollections)) - for i, coll := range toBackupCollections { - collectionNames[i] = coll.collectionName - } - log.Info("collections to backup", zap.Strings("collections", collectionNames)) - - collectionBackupInfos := make([]*backuppb.CollectionBackupInfo, 0) - partitionLevelBackupInfos := make([]*backuppb.PartitionBackupInfo, 0) - for _, collection := range toBackupCollections { - // list collection result is not complete - completeCollection, err := b.getMilvusClient().DescribeCollection(b.ctx, collection.db, collection.collectionName) - if err != nil { - log.Error("fail in DescribeCollection", zap.Error(err)) - return backupInfo, err - } - fields := make([]*backuppb.FieldSchema, 0) - for _, field := range completeCollection.Schema.Fields { - fields = append(fields, &backuppb.FieldSchema{ - FieldID: field.ID, - Name: field.Name, - IsPrimaryKey: field.PrimaryKey, - Description: field.Description, - AutoID: field.AutoID, - DataType: backuppb.DataType(field.DataType), - TypeParams: utils.MapToKVPair(field.TypeParams), - IndexParams: utils.MapToKVPair(field.IndexParams), - IsDynamic: field.IsDynamic, - IsPartitionKey: field.IsPartitionKey, - }) - } - schema := &backuppb.CollectionSchema{ - Name: completeCollection.Schema.CollectionName, - Description: completeCollection.Schema.Description, - AutoID: completeCollection.Schema.AutoID, - Fields: fields, - EnableDynamicField: completeCollection.Schema.EnableDynamicField, - } - - indexInfos := make([]*backuppb.IndexInfo, 0) - indexDict := make(map[string]*backuppb.IndexInfo, 0) - log.Info("try to get index", - zap.String("collection_name", completeCollection.Name)) - for _, field := range completeCollection.Schema.Fields { - //if field.DataType != entity.FieldTypeBinaryVector && field.DataType != entity.FieldTypeFloatVector { - // continue - //} - fieldIndex, err := b.getMilvusClient().DescribeIndex(b.ctx, collection.db, completeCollection.Name, field.Name) - if err != nil { - if strings.Contains(err.Error(), "index not found") || - strings.HasPrefix(err.Error(), "index doesn't exist") { - // todo - log.Info("field has no index", - zap.String("collection_name", completeCollection.Name), - zap.String("field_name", field.Name)) - continue - } else { - log.Error("fail in DescribeIndex", zap.Error(err)) - return backupInfo, err - } - } - log.Info("field index", - zap.String("collection_name", completeCollection.Name), - zap.String("field_name", field.Name), - zap.Any("index info", fieldIndex)) - for _, index := range fieldIndex { - if _, ok := indexDict[index.Name()]; ok { - continue - } else { - indexInfo := &backuppb.IndexInfo{ - FieldName: index.FieldName(), - IndexName: index.Name(), - IndexType: string(index.IndexType()), - Params: index.Params(), - } - indexInfos = append(indexInfos, indexInfo) - indexDict[index.Name()] = indexInfo - } - } - } - - collectionBackup := &backuppb.CollectionBackupInfo{ - Id: utils.UUID(), - StateCode: backuppb.BackupTaskStateCode_BACKUP_INITIAL, - StartTime: time.Now().Unix(), - CollectionId: completeCollection.ID, - DbName: collection.db, // todo currently db_name is not used in many places - CollectionName: completeCollection.Name, - Schema: schema, - ShardsNum: completeCollection.ShardNum, - ConsistencyLevel: backuppb.ConsistencyLevel(completeCollection.ConsistencyLevel), - HasIndex: len(indexInfos) > 0, - IndexInfos: indexInfos, - } - collectionBackupInfos = append(collectionBackupInfos, collectionBackup) - } - leveledBackupInfo.collectionLevel = &backuppb.CollectionLevelBackupInfo{ - Infos: collectionBackupInfos, - } - b.refreshBackupMeta(id, backupInfo, leveledBackupInfo) - - segmentLevelBackupInfos := make([]*backuppb.SegmentBackupInfo, 0) - // backup collection - for _, collection := range collectionBackupInfos { - partitionBackupInfos := make([]*backuppb.PartitionBackupInfo, 0) - partitions, err := b.getMilvusClient().ShowPartitions(b.ctx, collection.GetDbName(), collection.GetCollectionName()) - if err != nil { - log.Error("fail to ShowPartitions", zap.Error(err)) - return backupInfo, err - } - - // use GetLoadingProgress currently, GetLoadState is a new interface @20230104 milvus pr#21515 - collectionLoadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collection.GetDbName(), collection.GetCollectionName(), []string{}) - if err != nil { - log.Error("fail to GetLoadingProgress of collection", zap.Error(err)) - return backupInfo, err - } - - var collectionLoadState string - partitionLoadStates := make(map[string]string, 0) - if collectionLoadProgress == 0 { - collectionLoadState = LoadState_NotLoad - for _, partition := range partitions { - partitionLoadStates[partition.Name] = LoadState_NotLoad - } - } else if collectionLoadProgress == 100 { - collectionLoadState = LoadState_Loaded - for _, partition := range partitions { - partitionLoadStates[partition.Name] = LoadState_Loaded - } - } else { - collectionLoadState = LoadState_Loading - for _, partition := range partitions { - loadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collection.GetDbName(), collection.GetCollectionName(), []string{partition.Name}) - if err != nil { - log.Error("fail to GetLoadingProgress of partition", zap.Error(err)) - return backupInfo, err - } - if loadProgress == 0 { - partitionLoadStates[partition.Name] = LoadState_NotLoad - } else if loadProgress == 100 { - partitionLoadStates[partition.Name] = LoadState_Loaded - } else { - partitionLoadStates[partition.Name] = LoadState_Loading - } - } - } - - // fill segments - filledSegments := make([]*entity.Segment, 0) - if !request.GetForce() { - // Flush - segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetDbName(), collection.GetCollectionName()) - if err != nil { - return backupInfo, err - } - log.Info("GetPersistentSegmentInfo before flush from milvus", - zap.String("collectionName", collection.GetCollectionName()), - zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush))) - newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collection.GetDbName(), collection.GetCollectionName(), false) - if err != nil { - log.Error(fmt.Sprintf("fail to flush the collection: %s.%s", collection.GetDbName(), collection.GetCollectionName())) - return backupInfo, err - } - log.Info("flush segments", - zap.String("collectionName", collection.GetCollectionName()), - zap.Int64s("newSealedSegmentIDs", newSealedSegmentIDs), - zap.Int64s("flushedSegmentIDs", flushedSegmentIDs), - zap.Int64("timeOfSeal", timeOfSeal)) - collection.BackupTimestamp = utils.ComposeTS(timeOfSeal, 0) - collection.BackupPhysicalTimestamp = uint64(timeOfSeal) - - flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...) - segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetDbName(), collection.GetCollectionName()) - if err != nil { - return backupInfo, err - } - log.Info("GetPersistentSegmentInfo after flush from milvus", - zap.String("collectionName", collection.GetCollectionName()), - zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)), - zap.Int("segmentNumAfterFlush", len(segmentEntitiesAfterFlush))) - segmentDict := utils.ArrayToMap(flushSegmentIDs) - for _, seg := range segmentEntitiesAfterFlush { - sid := seg.ID - if _, ok := segmentDict[sid]; ok { - delete(segmentDict, sid) - filledSegments = append(filledSegments, seg) - } else { - log.Debug("this may be new segments after flush, skip it", zap.Int64("id", sid)) - } - } - for _, seg := range segmentEntitiesBeforeFlush { - sid := seg.ID - if _, ok := segmentDict[sid]; ok { - delete(segmentDict, sid) - filledSegments = append(filledSegments, seg) - } else { - log.Debug("this may be old segments before flush, skip it", zap.Int64("id", sid)) - } - } - if len(segmentDict) > 0 { - // very rare situation, segments return in flush doesn't exist in either segmentEntitiesBeforeFlush and segmentEntitiesAfterFlush - errorMsg := "Segment return in Flush not exist in GetPersistentSegmentInfo. segment ids: " + fmt.Sprint(utils.MapKeyArray(segmentDict)) - log.Warn(errorMsg) - } - } else { - // Flush - segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetDbName(), collection.GetCollectionName()) - if err != nil { - return backupInfo, err - } - log.Info("GetPersistentSegmentInfo from milvus", - zap.String("collectionName", collection.GetCollectionName()), - zap.Int("segmentNum", len(segmentEntitiesBeforeFlush))) - for _, seg := range segmentEntitiesBeforeFlush { - filledSegments = append(filledSegments, seg) - } - } - - if err != nil { - collection.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL - collection.ErrorMessage = err.Error() - return backupInfo, err - } - log.Info("Finished fill segment", - zap.String("collectionName", collection.GetCollectionName())) - - segmentBackupInfos := make([]*backuppb.SegmentBackupInfo, 0) - partSegInfoMap := make(map[int64][]*backuppb.SegmentBackupInfo) - for _, segment := range filledSegments { - segmentInfo, err := b.readSegmentInfo(ctx, segment.CollectionID, segment.ParititionID, segment.ID, segment.NumRows) - if err != nil { - return backupInfo, err - } - if len(segmentInfo.Binlogs) == 0 { - log.Warn("this segment has no insert binlog", zap.Int64("id", segment.ID)) - } - partSegInfoMap[segment.ParititionID] = append(partSegInfoMap[segment.ParititionID], segmentInfo) - segmentBackupInfos = append(segmentBackupInfos, segmentInfo) - segmentLevelBackupInfos = append(segmentLevelBackupInfos, segmentInfo) - } - log.Info("readSegmentInfo from storage", - zap.String("collectionName", collection.GetCollectionName()), - zap.Int("segmentNum", len(filledSegments))) - - leveledBackupInfo.segmentLevel = &backuppb.SegmentLevelBackupInfo{ - Infos: segmentLevelBackupInfos, - } - - for _, partition := range partitions { - partitionSegments := partSegInfoMap[partition.ID] - var size int64 = 0 - for _, seg := range partitionSegments { - size += seg.GetSize() - } - partitionBackupInfo := &backuppb.PartitionBackupInfo{ - PartitionId: partition.ID, - PartitionName: partition.Name, - CollectionId: collection.GetCollectionId(), - SegmentBackups: partSegInfoMap[partition.ID], - Size: size, - LoadState: partitionLoadStates[partition.Name], - } - partitionBackupInfos = append(partitionBackupInfos, partitionBackupInfo) - partitionLevelBackupInfos = append(partitionLevelBackupInfos, partitionBackupInfo) - } - - leveledBackupInfo.partitionLevel = &backuppb.PartitionLevelBackupInfo{ - Infos: partitionLevelBackupInfos, - } - collection.PartitionBackups = partitionBackupInfos - collection.LoadState = collectionLoadState - b.refreshBackupMeta(id, backupInfo, leveledBackupInfo) - log.Info("finish build partition info", - zap.String("collectionName", collection.GetCollectionName()), - zap.Int("partitionNum", len(partitionBackupInfos))) - - log.Info("Begin copy data", - zap.String("collectionName", collection.GetCollectionName()), - zap.Int("segmentNum", len(segmentBackupInfos))) - - for _, part := range partitionBackupInfos { - if part.GetSize() > b.params.BackupCfg.MaxSegmentGroupSize { - log.Info("partition size is larger than MaxSegmentGroupSize, will separate segments into groups in backup files", - zap.Int64("collectionId", part.GetCollectionId()), - zap.Int64("partitionId", part.GetPartitionId()), - zap.Int64("partitionSize", part.GetSize()), - zap.Int64("MaxSegmentGroupSize", b.params.BackupCfg.MaxSegmentGroupSize)) - segments := partSegInfoMap[part.GetPartitionId()] - var bufferSize int64 = 0 - // 0 is illegal value, start from 1 - var segGroupID int64 = 1 - for _, seg := range segments { - if seg.Size > b.params.BackupCfg.MaxSegmentGroupSize && bufferSize == 0 { - seg.GroupId = segGroupID - segGroupID = segGroupID + 1 - } else if bufferSize+seg.Size > b.params.BackupCfg.MaxSegmentGroupSize { - segGroupID = segGroupID + 1 - seg.GroupId = segGroupID - bufferSize = 0 - bufferSize = bufferSize + seg.Size - } else { - seg.GroupId = segGroupID - bufferSize = bufferSize + seg.Size - } - } - } else { - log.Info("partition size is smaller than MaxSegmentGroupSize, won't separate segments into groups in backup files", - zap.Int64("collectionId", part.GetCollectionId()), - zap.Int64("partitionId", part.GetPartitionId()), - zap.Int64("partitionSize", part.GetSize()), - zap.Int64("MaxSegmentGroupSize", b.params.BackupCfg.MaxSegmentGroupSize)) - } - } - - err = b.copySegments(ctx, segmentBackupInfos, BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName())) - if err != nil { - return backupInfo, err - } - b.refreshBackupMeta(id, backupInfo, leveledBackupInfo) - } - - var backupSize int64 = 0 - for _, coll := range leveledBackupInfo.collectionLevel.GetInfos() { - backupSize += coll.GetSize() - } - backupInfo.Size = backupSize - backupInfo.EndTime = time.Now().UnixNano() / int64(time.Millisecond) - backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_SUCCESS - backupInfo, err = b.refreshBackupMeta(id, backupInfo, leveledBackupInfo) - if err != nil { - return backupInfo, err - } - // 7, write meta data - output, _ := serialize(backupInfo) - log.Debug("backup meta", zap.String("value", string(output.BackupMetaBytes))) - log.Debug("collection meta", zap.String("value", string(output.CollectionMetaBytes))) - log.Debug("partition meta", zap.String("value", string(output.PartitionMetaBytes))) - log.Debug("segment meta", zap.String("value", string(output.SegmentMetaBytes))) - - b.getStorageClient().Write(ctx, b.backupBucketName, BackupMetaPath(b.backupRootPath, backupInfo.GetName()), output.BackupMetaBytes) - b.getStorageClient().Write(ctx, b.backupBucketName, CollectionMetaPath(b.backupRootPath, backupInfo.GetName()), output.CollectionMetaBytes) - b.getStorageClient().Write(ctx, b.backupBucketName, PartitionMetaPath(b.backupRootPath, backupInfo.GetName()), output.PartitionMetaBytes) - b.getStorageClient().Write(ctx, b.backupBucketName, SegmentMetaPath(b.backupRootPath, backupInfo.GetName()), output.SegmentMetaBytes) - b.getStorageClient().Write(ctx, b.backupBucketName, FullMetaPath(b.backupRootPath, backupInfo.GetName()), output.FullMetaBytes) - - log.Info("finish executeCreateBackup", - zap.String("requestId", request.GetRequestId()), - zap.String("backupName", request.GetBackupName()), - zap.Strings("collections", request.GetCollectionNames()), - zap.Bool("async", request.GetAsync()), - zap.String("backup meta", string(output.BackupMetaBytes))) - return backupInfo, nil -} - func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.SegmentBackupInfo, dstPath string) error { wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupCopyDataParallelism, RPS) if err != nil { @@ -1003,7 +649,8 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S zap.Int64("collection_id", segment.GetCollectionId()), zap.Int64("partition_id", segment.GetPartitionId()), zap.Int64("segment_id", segment.GetSegmentId()), - zap.Int64("group_id", segment.GetGroupId())) + zap.Int64("group_id", segment.GetGroupId()), + zap.Int64("size", segment.GetSize())) // insert log for _, binlogs := range segment.GetBinlogs() { for _, binlog := range binlogs.GetBinlogs() { diff --git a/core/backup_impl_restore_backup.go b/core/backup_impl_restore_backup.go index ff8bb11..71a0317 100644 --- a/core/backup_impl_restore_backup.go +++ b/core/backup_impl_restore_backup.go @@ -481,6 +481,11 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup zap.Error(err)) return err } + log.Info("finish restore partition", + zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetDBName", targetDBName), + zap.String("targetCollectionName", targetCollectionName), + zap.String("partition", partitionBackup2.GetPartitionName())) return err } jobId := b.getRestoreWorkerPool().SubmitWithId(job) diff --git a/tests/base/client_base.py b/tests/base/client_base.py index 278bf59..674a5ac 100644 --- a/tests/base/client_base.py +++ b/tests/base/client_base.py @@ -344,8 +344,8 @@ def compare_collections(self, src_name, dist_name, output_fields=None): collection_src, _ = self.collection_wrap.init_collection(name=src_name) collection_dist, _ = self.collection_wrap.init_collection(name=dist_name) assert collection_src.num_entities == collection_dist.num_entities, \ - f"collection_src num_entities: {collection_src.num_entities} != " \ - f"collection_dist num_entities: {collection_dist.num_entities}" + f"collection_src {src_name} num_entities: {collection_src.num_entities} != " \ + f"collection_dist {dist_name} num_entities: {collection_dist.num_entities}" assert collection_src.schema == collection_dist.schema # get partitions partitions_src = collection_src.partitions