From 1d19a9f0e73c69c0df819e3629b210802220bdc0 Mon Sep 17 00:00:00 2001 From: wayblink Date: Wed, 11 Oct 2023 20:38:02 +0800 Subject: [PATCH] Support concurrent backup and restore (#219) Signed-off-by: wayblink --- cmd/create.go | 7 +- cmd/delete.go | 2 +- cmd/restore.go | 6 +- configs/backup.yaml | 14 +- core/backup_context.go | 52 ++-- core/backup_impl_create_backup.go | 445 ++++++++++++++++++++++++++--- core/backup_impl_restore_backup.go | 266 +++++++++-------- core/etcd_test.go | 75 ----- core/milvus_sdk_wrapper.go | 171 +++++++++++ core/milvus_source.go | 21 -- core/paramtable/params.go | 22 ++ internal/common/workerpool.go | 73 ++++- internal/common/workerpool_test.go | 28 ++ ut_test.go | 1 - 14 files changed, 897 insertions(+), 286 deletions(-) delete mode 100644 core/etcd_test.go create mode 100644 core/milvus_sdk_wrapper.go delete mode 100644 core/milvus_source.go delete mode 100644 ut_test.go diff --git a/cmd/create.go b/cmd/create.go index 7b8b81e3..313c92c4 100644 --- a/cmd/create.go +++ b/cmd/create.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" jsoniter "github.com/json-iterator/go" "github.com/spf13/cobra" @@ -34,6 +35,7 @@ var createBackupCmd = &cobra.Command{ context := context.Background() backupContext := core.CreateBackupContext(context, params) + start := time.Now().Unix() var collectionNameArr []string if collectionNames == "" { collectionNameArr = []string{} @@ -60,7 +62,10 @@ var createBackupCmd = &cobra.Command{ DbCollections: utils.WrapDBCollections(dbCollections), Force: force, }) - fmt.Println(resp.GetCode(), "\n", resp.GetMsg()) + + fmt.Println(resp.GetMsg()) + duration := time.Now().Unix() - start + fmt.Println(fmt.Sprintf("duration:%d s", duration)) }, } diff --git a/cmd/delete.go b/cmd/delete.go index e38701ca..a0736876 100644 --- a/cmd/delete.go +++ b/cmd/delete.go @@ -30,7 +30,7 @@ var deleteBackupCmd = &cobra.Command{ BackupName: deleteBackName, }) - fmt.Println(resp.GetCode(), "\n", resp.GetMsg()) + fmt.Println(resp.GetMsg()) }, } diff --git a/cmd/restore.go b/cmd/restore.go index ccc061e2..ae0cadcb 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" jsoniter "github.com/json-iterator/go" "github.com/spf13/cobra" @@ -39,6 +40,7 @@ var restoreBackupCmd = &cobra.Command{ context := context.Background() backupContext := core.CreateBackupContext(context, params) log.Info("restore cmd input args", zap.Strings("args", args)) + start := time.Now().Unix() var collectionNameArr []string if restoreCollectionNames == "" { collectionNameArr = []string{} @@ -84,7 +86,9 @@ var restoreBackupCmd = &cobra.Command{ RestoreIndex: restoreIndex, }) - fmt.Println(resp.GetCode(), "\n", resp.GetMsg()) + fmt.Println(resp.GetMsg()) + duration := time.Now().Unix() - start + fmt.Println(fmt.Sprintf("duration:%d s", duration)) }, } diff --git a/configs/backup.yaml b/configs/backup.yaml index 3c52cefd..8c8767b6 100644 --- a/configs/backup.yaml +++ b/configs/backup.yaml @@ -38,4 +38,16 @@ minio: backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath backup: - maxSegmentGroupSize: 2G \ No newline at end of file + maxSegmentGroupSize: 2G + parallelism: 2 # collection level parallelism to backup + copydata: + # thread pool to copy data for each collection backup, default 100. + # which means if you set backup.parallelism = 2 backup.copydata.parallelism = 100, there will be 200 copy executing at the same time. + # reduce it if blocks your storage's network bandwidth + parallelism: 100 + +restore: + # Collection level parallelism to restore + # Only change it > 1 when you have more than one datanode. + # Because the max parallelism of Milvus bulkinsert is equal to datanodes' number. + parallelism: 2 \ No newline at end of file diff --git a/core/backup_context.go b/core/backup_context.go index 41d839e3..5404274f 100644 --- a/core/backup_context.go +++ b/core/backup_context.go @@ -14,6 +14,7 @@ import ( "github.com/zilliztech/milvus-backup/core/proto/backuppb" "github.com/zilliztech/milvus-backup/core/storage" "github.com/zilliztech/milvus-backup/core/utils" + "github.com/zilliztech/milvus-backup/internal/common" "github.com/zilliztech/milvus-backup/internal/log" ) @@ -37,7 +38,7 @@ type BackupContext struct { params paramtable.BackupParams // milvus client - milvusClient *gomilvus.Client + milvusClient *MilvusClient // data storage client storageClient *storage.ChunkManager @@ -51,7 +52,7 @@ type BackupContext struct { restoreTasks map[string]*backuppb.RestoreBackupTask - //copyWorkerPool *common.WorkerPool + bulkinsertWorkerPool *common.WorkerPool } func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) { @@ -89,35 +90,9 @@ func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (s } func (b *BackupContext) Start() error { - // start milvus go SDK client - //milvusClient, err := CreateMilvusClient(b.ctx, b.params) - //if err != nil { - // log.Error("failed to initial milvus client", zap.Error(err)) - // return err - //} - //b.milvusClient = milvusClient - - // start milvus storage client - //minioClient, err := CreateStorageClient(b.ctx, b.params) - //if err != nil { - // log.Error("failed to initial storage client", zap.Error(err)) - // return err - //} - //b.storageClient = minioClient - b.backupTasks = sync.Map{} b.backupNameIdDict = sync.Map{} b.restoreTasks = make(map[string]*backuppb.RestoreBackupTask) - - // init worker pool - //wp, err := common.NewWorkerPool(b.ctx, WORKER_NUM, RPS) - //if err != nil { - // log.Error("failed to initial copy data woker pool", zap.Error(err)) - // return err - //} - //b.copyWorkerPool = wp - //b.copyWorkerPool.Start() - b.started = true return nil } @@ -142,16 +117,18 @@ func CreateBackupContext(ctx context.Context, params paramtable.BackupParams) *B } } -func (b *BackupContext) getMilvusClient() gomilvus.Client { +func (b *BackupContext) getMilvusClient() *MilvusClient { if b.milvusClient == nil { milvusClient, err := CreateMilvusClient(b.ctx, b.params) if err != nil { log.Error("failed to initial milvus client", zap.Error(err)) panic(err) } - b.milvusClient = &milvusClient + b.milvusClient = &MilvusClient{ + client: milvusClient, + } } - return *b.milvusClient + return b.milvusClient } func (b *BackupContext) getStorageClient() storage.ChunkManager { @@ -166,6 +143,19 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager { return *b.storageClient } +func (b *BackupContext) getRestoreWorkerPool() *common.WorkerPool { + if b.bulkinsertWorkerPool == nil { + wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.RestoreParallelism, RPS) + if err != nil { + log.Error("failed to initial copy data woker pool", zap.Error(err)) + panic(err) + } + b.bulkinsertWorkerPool = wp + b.bulkinsertWorkerPool.Start() + } + return b.bulkinsertWorkerPool +} + func (b *BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBackupRequest) *backuppb.BackupInfoResponse { if request.GetRequestId() == "" { request.RequestId = utils.UUID() diff --git a/core/backup_impl_create_backup.go b/core/backup_impl_create_backup.go index 4e076359..82ed1249 100644 --- a/core/backup_impl_create_backup.go +++ b/core/backup_impl_create_backup.go @@ -92,7 +92,7 @@ func (b *BackupContext) CreateBackup(ctx context.Context, request *backuppb.Crea b.backupNameIdDict.Store(name, request.GetRequestId()) if request.Async { - go b.executeCreateBackup(ctx, request, backup) + go b.executeCreateBackupV2(ctx, request, backup) asyncResp := &backuppb.BackupInfoResponse{ RequestId: request.GetRequestId(), Code: backuppb.ResponseCode_Success, @@ -101,7 +101,7 @@ func (b *BackupContext) CreateBackup(ctx context.Context, request *backuppb.Crea } return asyncResp } else { - task, err := b.executeCreateBackup(ctx, request, backup) + task, err := b.executeCreateBackupV2(ctx, request, backup) resp.Data = task if err != nil { resp.Code = backuppb.ResponseCode_Fail @@ -125,7 +125,12 @@ func (b *BackupContext) refreshBackupMeta(id string, backupInfo *backuppb.Backup return backup, nil } -type collection struct { +func (b *BackupContext) refreshBackupCache(backupInfo *backuppb.BackupInfo) { + log.Debug("refreshBackupCache", zap.String("id", backupInfo.GetId())) + b.backupTasks.Store(backupInfo.GetId(), backupInfo) +} + +type collectionStruct struct { db string collectionName string } @@ -134,12 +139,12 @@ type collection struct { // For backward compatibility: // 1,parse dbCollections first, // 2,if dbCollections not set, use collectionNames -func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupRequest) ([]collection, error) { +func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupRequest) ([]collectionStruct, error) { log.Debug("Request collection names", zap.Strings("request_collection_names", request.GetCollectionNames()), zap.String("request_db_collections", utils.GetCreateDBCollections(request)), zap.Int("length", len(request.GetCollectionNames()))) - var toBackupCollections []collection + var toBackupCollections []collectionStruct dbCollectionsStr := utils.GetCreateDBCollections(request) // first priority: dbCollections @@ -152,23 +157,18 @@ func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupReq } for db, collections := range dbCollections { if len(collections) == 0 { - err := b.getMilvusClient().UsingDatabase(b.ctx, db) - if err != nil { - log.Error("fail to call SDK use database", zap.Error(err)) - return nil, err - } - collections, err := b.getMilvusClient().ListCollections(b.ctx) + collections, err := b.getMilvusClient().ListCollections(b.ctx, db) if err != nil { log.Error("fail in ListCollections", zap.Error(err)) return nil, err } for _, coll := range collections { log.Debug("Add collection to toBackupCollections", zap.String("db", db), zap.String("collection", coll.Name)) - toBackupCollections = append(toBackupCollections, collection{db, coll.Name}) + toBackupCollections = append(toBackupCollections, collectionStruct{db, coll.Name}) } } else { for _, coll := range collections { - toBackupCollections = append(toBackupCollections, collection{db, coll}) + toBackupCollections = append(toBackupCollections, collectionStruct{db, coll}) } } } @@ -183,14 +183,13 @@ func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupReq return nil, err } for _, db := range dbs { - b.getMilvusClient().UsingDatabase(b.ctx, db.Name) - collections, err := b.getMilvusClient().ListCollections(b.ctx) + collections, err := b.getMilvusClient().ListCollections(b.ctx, db.Name) if err != nil { log.Error("fail in ListCollections", zap.Error(err)) return nil, err } for _, coll := range collections { - toBackupCollections = append(toBackupCollections, collection{db.Name, coll.Name}) + toBackupCollections = append(toBackupCollections, collectionStruct{db.Name, coll.Name}) } } log.Debug(fmt.Sprintf("List %v collections", len(toBackupCollections))) @@ -202,9 +201,8 @@ func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupReq dbName = splits[0] collectionName = splits[1] } - b.getMilvusClient().UsingDatabase(b.ctx, dbName) - exist, err := b.getMilvusClient().HasCollection(b.ctx, collectionName) + exist, err := b.getMilvusClient().HasCollection(b.ctx, dbName, collectionName) if err != nil { log.Error("fail in HasCollection", zap.Error(err)) return nil, err @@ -214,13 +212,398 @@ func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupReq log.Error(errMsg) return nil, errors.New(errMsg) } - toBackupCollections = append(toBackupCollections, collection{dbName, collectionName}) + toBackupCollections = append(toBackupCollections, collectionStruct{dbName, collectionName}) } } return toBackupCollections, nil } +func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backuppb.BackupInfo, collection collectionStruct, force bool) error { + log.Info("start backup collection", zap.String("db", collection.db), zap.String("collection", collection.collectionName)) + // list collection result is not complete + completeCollection, err := b.getMilvusClient().DescribeCollection(b.ctx, collection.db, collection.collectionName) + if err != nil { + log.Error("fail in DescribeCollection", zap.Error(err)) + return err + } + fields := make([]*backuppb.FieldSchema, 0) + for _, field := range completeCollection.Schema.Fields { + fields = append(fields, &backuppb.FieldSchema{ + FieldID: field.ID, + Name: field.Name, + IsPrimaryKey: field.PrimaryKey, + Description: field.Description, + AutoID: field.AutoID, + DataType: backuppb.DataType(field.DataType), + TypeParams: utils.MapToKVPair(field.TypeParams), + IndexParams: utils.MapToKVPair(field.IndexParams), + IsDynamic: field.IsDynamic, + IsPartitionKey: field.IsPartitionKey, + }) + } + schema := &backuppb.CollectionSchema{ + Name: completeCollection.Schema.CollectionName, + Description: completeCollection.Schema.Description, + AutoID: completeCollection.Schema.AutoID, + Fields: fields, + EnableDynamicField: completeCollection.Schema.EnableDynamicField, + } + + indexInfos := make([]*backuppb.IndexInfo, 0) + indexDict := make(map[string]*backuppb.IndexInfo, 0) + log.Info("try to get index", + zap.String("collection_name", completeCollection.Name)) + for _, field := range completeCollection.Schema.Fields { + //if field.DataType != entity.FieldTypeBinaryVector && field.DataType != entity.FieldTypeFloatVector { + // continue + //} + fieldIndex, err := b.getMilvusClient().DescribeIndex(b.ctx, collection.db, completeCollection.Name, field.Name) + if err != nil { + if strings.Contains(err.Error(), "index not found") || + strings.HasPrefix(err.Error(), "index doesn't exist") { + // todo + log.Info("field has no index", + zap.String("collection_name", completeCollection.Name), + zap.String("field_name", field.Name)) + continue + } else { + log.Error("fail in DescribeIndex", zap.Error(err)) + return err + } + } + log.Info("field index", + zap.String("collection_name", completeCollection.Name), + zap.String("field_name", field.Name), + zap.Any("index info", fieldIndex)) + for _, index := range fieldIndex { + if _, ok := indexDict[index.Name()]; ok { + continue + } else { + indexInfo := &backuppb.IndexInfo{ + FieldName: index.FieldName(), + IndexName: index.Name(), + IndexType: string(index.IndexType()), + Params: index.Params(), + } + indexInfos = append(indexInfos, indexInfo) + indexDict[index.Name()] = indexInfo + } + } + } + + collectionBackup := &backuppb.CollectionBackupInfo{ + Id: utils.UUID(), + StateCode: backuppb.BackupTaskStateCode_BACKUP_INITIAL, + StartTime: time.Now().Unix(), + CollectionId: completeCollection.ID, + DbName: collection.db, // todo currently db_name is not used in many places + CollectionName: completeCollection.Name, + Schema: schema, + ShardsNum: completeCollection.ShardNum, + ConsistencyLevel: backuppb.ConsistencyLevel(completeCollection.ConsistencyLevel), + HasIndex: len(indexInfos) > 0, + IndexInfos: indexInfos, + } + backupInfo.CollectionBackups = append(backupInfo.CollectionBackups, collectionBackup) + + b.refreshBackupCache(backupInfo) + partitionBackupInfos := make([]*backuppb.PartitionBackupInfo, 0) + partitions, err := b.getMilvusClient().ShowPartitions(b.ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName()) + if err != nil { + log.Error("fail to ShowPartitions", zap.Error(err)) + return err + } + + // use GetLoadingProgress currently, GetLoadState is a new interface @20230104 milvus pr#21515 + collectionLoadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName(), []string{}) + if err != nil { + log.Error("fail to GetLoadingProgress of collection", zap.Error(err)) + return err + } + + var collectionLoadState string + partitionLoadStates := make(map[string]string, 0) + if collectionLoadProgress == 0 { + collectionLoadState = LoadState_NotLoad + for _, partition := range partitions { + partitionLoadStates[partition.Name] = LoadState_NotLoad + } + } else if collectionLoadProgress == 100 { + collectionLoadState = LoadState_Loaded + for _, partition := range partitions { + partitionLoadStates[partition.Name] = LoadState_Loaded + } + } else { + collectionLoadState = LoadState_Loading + for _, partition := range partitions { + loadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName(), []string{partition.Name}) + if err != nil { + log.Error("fail to GetLoadingProgress of partition", zap.Error(err)) + return err + } + if loadProgress == 0 { + partitionLoadStates[partition.Name] = LoadState_NotLoad + } else if loadProgress == 100 { + partitionLoadStates[partition.Name] = LoadState_Loaded + } else { + partitionLoadStates[partition.Name] = LoadState_Loading + } + } + } + + // fill segments + filledSegments := make([]*entity.Segment, 0) + if !force { + // Flush + segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName()) + if err != nil { + return err + } + log.Info("GetPersistentSegmentInfo before flush from milvus", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush))) + newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName(), false) + if err != nil { + log.Error(fmt.Sprintf("fail to flush the collection: %s", collectionBackup.GetCollectionName())) + return err + } + log.Info("flush segments", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int64s("newSealedSegmentIDs", newSealedSegmentIDs), + zap.Int64s("flushedSegmentIDs", flushedSegmentIDs), + zap.Int64("timeOfSeal", timeOfSeal)) + collectionBackup.BackupTimestamp = utils.ComposeTS(timeOfSeal, 0) + collectionBackup.BackupPhysicalTimestamp = uint64(timeOfSeal) + + flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...) + segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName()) + if err != nil { + return err + } + log.Info("GetPersistentSegmentInfo after flush from milvus", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)), + zap.Int("segmentNumAfterFlush", len(segmentEntitiesAfterFlush))) + segmentDict := utils.ArrayToMap(flushSegmentIDs) + for _, seg := range segmentEntitiesAfterFlush { + sid := seg.ID + if _, ok := segmentDict[sid]; ok { + delete(segmentDict, sid) + filledSegments = append(filledSegments, seg) + } else { + log.Debug("this may be new segments after flush, skip it", zap.Int64("id", sid)) + } + } + for _, seg := range segmentEntitiesBeforeFlush { + sid := seg.ID + if _, ok := segmentDict[sid]; ok { + delete(segmentDict, sid) + filledSegments = append(filledSegments, seg) + } else { + log.Debug("this may be old segments before flush, skip it", zap.Int64("id", sid)) + } + } + if len(segmentDict) > 0 { + // very rare situation, segments return in flush doesn't exist in either segmentEntitiesBeforeFlush and segmentEntitiesAfterFlush + errorMsg := "Segment return in Flush not exist in GetPersistentSegmentInfo. segment ids: " + fmt.Sprint(utils.MapKeyArray(segmentDict)) + log.Warn(errorMsg) + } + } else { + // Flush + segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName()) + if err != nil { + return err + } + log.Info("GetPersistentSegmentInfo from milvus", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("segmentNum", len(segmentEntitiesBeforeFlush))) + for _, seg := range segmentEntitiesBeforeFlush { + filledSegments = append(filledSegments, seg) + } + } + + if err != nil { + collectionBackup.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL + collectionBackup.ErrorMessage = err.Error() + return err + } + log.Info("Finished fill segment", + zap.String("collectionName", collectionBackup.GetCollectionName())) + + segmentBackupInfos := make([]*backuppb.SegmentBackupInfo, 0) + partSegInfoMap := make(map[int64][]*backuppb.SegmentBackupInfo) + + segmentLevelBackupInfos := make([]*backuppb.SegmentBackupInfo, 0) + + for _, segment := range filledSegments { + segmentInfo, err := b.readSegmentInfo(ctx, segment.CollectionID, segment.ParititionID, segment.ID, segment.NumRows) + if err != nil { + return err + } + if len(segmentInfo.Binlogs) == 0 { + log.Warn("this segment has no insert binlog", zap.Int64("id", segment.ID)) + } + partSegInfoMap[segment.ParititionID] = append(partSegInfoMap[segment.ParititionID], segmentInfo) + segmentBackupInfos = append(segmentBackupInfos, segmentInfo) + segmentLevelBackupInfos = append(segmentLevelBackupInfos, segmentInfo) + } + log.Info("readSegmentInfo from storage", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("segmentNum", len(filledSegments))) + + for _, partition := range partitions { + partitionSegments := partSegInfoMap[partition.ID] + var size int64 = 0 + for _, seg := range partitionSegments { + size += seg.GetSize() + } + partitionBackupInfo := &backuppb.PartitionBackupInfo{ + PartitionId: partition.ID, + PartitionName: partition.Name, + CollectionId: collectionBackup.GetCollectionId(), + SegmentBackups: partSegInfoMap[partition.ID], + Size: size, + LoadState: partitionLoadStates[partition.Name], + } + partitionBackupInfos = append(partitionBackupInfos, partitionBackupInfo) + //partitionLevelBackupInfos = append(partitionLevelBackupInfos, partitionBackupInfo) + } + + //leveledBackupInfo.partitionLevel = &backuppb.PartitionLevelBackupInfo{ + // Infos: partitionLevelBackupInfos, + //} + collectionBackup.PartitionBackups = partitionBackupInfos + collectionBackup.LoadState = collectionLoadState + b.refreshBackupCache(backupInfo) + log.Info("finish build partition info", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("partitionNum", len(partitionBackupInfos))) + + log.Info("Begin copy data", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("segmentNum", len(segmentBackupInfos))) + + var collectionBackupSize int64 = 0 + for _, part := range partitionBackupInfos { + collectionBackupSize += part.GetSize() + if part.GetSize() > b.params.BackupCfg.MaxSegmentGroupSize { + log.Info("partition size is larger than MaxSegmentGroupSize, will separate segments into groups in backup files", + zap.Int64("collectionId", part.GetCollectionId()), + zap.Int64("partitionId", part.GetPartitionId()), + zap.Int64("partitionSize", part.GetSize()), + zap.Int64("MaxSegmentGroupSize", b.params.BackupCfg.MaxSegmentGroupSize)) + segments := partSegInfoMap[part.GetPartitionId()] + var bufferSize int64 = 0 + // 0 is illegal value, start from 1 + var segGroupID int64 = 1 + for _, seg := range segments { + if seg.Size > b.params.BackupCfg.MaxSegmentGroupSize && bufferSize == 0 { + seg.GroupId = segGroupID + segGroupID = segGroupID + 1 + } else if bufferSize+seg.Size > b.params.BackupCfg.MaxSegmentGroupSize { + segGroupID = segGroupID + 1 + seg.GroupId = segGroupID + bufferSize = 0 + bufferSize = bufferSize + seg.Size + } else { + seg.GroupId = segGroupID + bufferSize = bufferSize + seg.Size + } + } + } else { + log.Info("partition size is smaller than MaxSegmentGroupSize, won't separate segments into groups in backup files", + zap.Int64("collectionId", part.GetCollectionId()), + zap.Int64("partitionId", part.GetPartitionId()), + zap.Int64("partitionSize", part.GetSize()), + zap.Int64("MaxSegmentGroupSize", b.params.BackupCfg.MaxSegmentGroupSize)) + } + } + + err = b.copySegments(ctx, segmentBackupInfos, BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName())) + b.refreshBackupCache(backupInfo) + + collectionBackup.Size = collectionBackupSize + collectionBackup.EndTime = time.Now().Unix() + return nil +} + +func (b *BackupContext) executeCreateBackupV2(ctx context.Context, request *backuppb.CreateBackupRequest, backupInfo *backuppb.BackupInfo) (*backuppb.BackupInfo, error) { + b.mu.Lock() + defer b.mu.Unlock() + + wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupParallelism, RPS) + if err != nil { + return backupInfo, err + } + wp.Start() + log.Info("Start collection level backup pool", zap.Int("parallelism", b.params.BackupCfg.BackupParallelism)) + + backupInfo.BackupTimestamp = uint64(time.Now().UnixNano() / int64(time.Millisecond)) + backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_EXECUTING + + defer b.refreshBackupCache(backupInfo) + + // 1, get collection level meta + toBackupCollections, err := b.parseBackupCollections(request) + if err != nil { + log.Error("parse backup collections from request failed", zap.Error(err)) + return backupInfo, err + } + collectionNames := make([]string, len(toBackupCollections)) + for i, coll := range toBackupCollections { + collectionNames[i] = coll.collectionName + } + log.Info("collections to backup", zap.Strings("collections", collectionNames)) + + for _, collection := range toBackupCollections { + collectionClone := collection + job := func(ctx context.Context) error { + err := b.backupCollection(ctx, backupInfo, collectionClone, request.GetForce()) + return err + } + wp.Submit(job) + } + wp.Done() + if err := wp.Wait(); err != nil { + return backupInfo, err + } + + var backupSize int64 = 0 + leveledBackupInfo, err := treeToLevel(backupInfo) + if err != nil { + return backupInfo, err + } + for _, coll := range leveledBackupInfo.collectionLevel.GetInfos() { + backupSize += coll.GetSize() + } + backupInfo.Size = backupSize + backupInfo.EndTime = time.Now().UnixNano() / int64(time.Millisecond) + backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_SUCCESS + b.refreshBackupCache(backupInfo) + + // 7, write meta data + output, _ := serialize(backupInfo) + log.Debug("backup meta", zap.String("value", string(output.BackupMetaBytes))) + log.Debug("collection meta", zap.String("value", string(output.CollectionMetaBytes))) + log.Debug("partition meta", zap.String("value", string(output.PartitionMetaBytes))) + log.Debug("segment meta", zap.String("value", string(output.SegmentMetaBytes))) + + b.getStorageClient().Write(ctx, b.backupBucketName, BackupMetaPath(b.backupRootPath, backupInfo.GetName()), output.BackupMetaBytes) + b.getStorageClient().Write(ctx, b.backupBucketName, CollectionMetaPath(b.backupRootPath, backupInfo.GetName()), output.CollectionMetaBytes) + b.getStorageClient().Write(ctx, b.backupBucketName, PartitionMetaPath(b.backupRootPath, backupInfo.GetName()), output.PartitionMetaBytes) + b.getStorageClient().Write(ctx, b.backupBucketName, SegmentMetaPath(b.backupRootPath, backupInfo.GetName()), output.SegmentMetaBytes) + b.getStorageClient().Write(ctx, b.backupBucketName, FullMetaPath(b.backupRootPath, backupInfo.GetName()), output.FullMetaBytes) + + log.Info("finish executeCreateBackup", + zap.String("requestId", request.GetRequestId()), + zap.String("backupName", request.GetBackupName()), + zap.Strings("collections", request.GetCollectionNames()), + zap.Bool("async", request.GetAsync()), + zap.String("backup meta", string(output.BackupMetaBytes))) + return backupInfo, nil +} + func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backuppb.CreateBackupRequest, backupInfo *backuppb.BackupInfo) (*backuppb.BackupInfo, error) { b.mu.Lock() defer b.mu.Unlock() @@ -249,8 +632,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup partitionLevelBackupInfos := make([]*backuppb.PartitionBackupInfo, 0) for _, collection := range toBackupCollections { // list collection result is not complete - b.getMilvusClient().UsingDatabase(b.ctx, collection.db) - completeCollection, err := b.getMilvusClient().DescribeCollection(b.ctx, collection.collectionName) + completeCollection, err := b.getMilvusClient().DescribeCollection(b.ctx, collection.db, collection.collectionName) if err != nil { log.Error("fail in DescribeCollection", zap.Error(err)) return backupInfo, err @@ -286,7 +668,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup //if field.DataType != entity.FieldTypeBinaryVector && field.DataType != entity.FieldTypeFloatVector { // continue //} - fieldIndex, err := b.getMilvusClient().DescribeIndex(b.ctx, completeCollection.Name, field.Name) + fieldIndex, err := b.getMilvusClient().DescribeIndex(b.ctx, collection.db, completeCollection.Name, field.Name) if err != nil { if strings.Contains(err.Error(), "index not found") || strings.HasPrefix(err.Error(), "index doesn't exist") { @@ -343,16 +725,15 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup segmentLevelBackupInfos := make([]*backuppb.SegmentBackupInfo, 0) // backup collection for _, collection := range collectionBackupInfos { - b.getMilvusClient().UsingDatabase(b.ctx, collection.GetDbName()) partitionBackupInfos := make([]*backuppb.PartitionBackupInfo, 0) - partitions, err := b.getMilvusClient().ShowPartitions(b.ctx, collection.GetCollectionName()) + partitions, err := b.getMilvusClient().ShowPartitions(b.ctx, collection.GetDbName(), collection.GetCollectionName()) if err != nil { log.Error("fail to ShowPartitions", zap.Error(err)) return backupInfo, err } // use GetLoadingProgress currently, GetLoadState is a new interface @20230104 milvus pr#21515 - collectionLoadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collection.GetCollectionName(), []string{}) + collectionLoadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collection.GetDbName(), collection.GetCollectionName(), []string{}) if err != nil { log.Error("fail to GetLoadingProgress of collection", zap.Error(err)) return backupInfo, err @@ -373,7 +754,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup } else { collectionLoadState = LoadState_Loading for _, partition := range partitions { - loadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collection.GetCollectionName(), []string{partition.Name}) + loadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collection.GetDbName(), collection.GetCollectionName(), []string{partition.Name}) if err != nil { log.Error("fail to GetLoadingProgress of partition", zap.Error(err)) return backupInfo, err @@ -392,16 +773,16 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup filledSegments := make([]*entity.Segment, 0) if !request.GetForce() { // Flush - segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetCollectionName()) + segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetDbName(), collection.GetCollectionName()) if err != nil { return backupInfo, err } log.Info("GetPersistentSegmentInfo before flush from milvus", zap.String("collectionName", collection.GetCollectionName()), zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush))) - newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collection.GetCollectionName(), false) + newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collection.GetDbName(), collection.GetCollectionName(), false) if err != nil { - log.Error(fmt.Sprintf("fail to flush the collection: %s", collection.GetCollectionName())) + log.Error(fmt.Sprintf("fail to flush the collection: %s.%s", collection.GetDbName(), collection.GetCollectionName())) return backupInfo, err } log.Info("flush segments", @@ -413,7 +794,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup collection.BackupPhysicalTimestamp = uint64(timeOfSeal) flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...) - segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetCollectionName()) + segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetDbName(), collection.GetCollectionName()) if err != nil { return backupInfo, err } @@ -447,7 +828,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup } } else { // Flush - segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetCollectionName()) + segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetDbName(), collection.GetCollectionName()) if err != nil { return backupInfo, err } @@ -596,7 +977,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup } func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.SegmentBackupInfo, dstPath string) error { - wp, err := common.NewWorkerPool(ctx, WORKER_NUM, RPS) + wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupCopyDataParallelism, RPS) if err != nil { return err } diff --git a/core/backup_impl_restore_backup.go b/core/backup_impl_restore_backup.go index 4dbfdbff..40e809db 100644 --- a/core/backup_impl_restore_backup.go +++ b/core/backup_impl_restore_backup.go @@ -14,6 +14,7 @@ import ( "github.com/zilliztech/milvus-backup/core/proto/backuppb" "github.com/zilliztech/milvus-backup/core/utils" + "github.com/zilliztech/milvus-backup/internal/common" "github.com/zilliztech/milvus-backup/internal/log" "github.com/zilliztech/milvus-backup/internal/util/retry" ) @@ -236,17 +237,9 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res } log.Info("create database", zap.String("database", targetDBName)) } - err = b.getMilvusClient().UsingDatabase(ctx, targetDBName) - if err != nil { - errorMsg := fmt.Sprintf("fail to switch database %s, err: %s", targetDBName, err) - log.Error(errorMsg) - resp.Code = backuppb.ResponseCode_Fail - resp.Msg = errorMsg - return resp - } // check if the collection exist, if exist, will not restore - exist, err := b.getMilvusClient().HasCollection(ctx, targetCollectionName) + exist, err := b.getMilvusClient().HasCollection(ctx, targetDBName, targetCollectionName) if err != nil { errorMsg := fmt.Sprintf("fail to check whether the collection is exist, collection_name: %s, err: %s", targetDBCollectionName, err) log.Error(errorMsg) @@ -315,6 +308,13 @@ func (b *BackupContext) executeRestoreBackupTask(ctx context.Context, backupBuck b.mu.Lock() defer b.mu.Unlock() + wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.RestoreParallelism, RPS) + if err != nil { + return task, err + } + wp.Start() + log.Info("Start collection level restore pool", zap.Int("parallelism", b.params.BackupCfg.RestoreParallelism)) + id := task.GetId() b.restoreTasks[id] = task task.StateCode = backuppb.RestoreTaskStateCode_EXECUTING @@ -332,25 +332,34 @@ func (b *BackupContext) executeRestoreBackupTask(ctx context.Context, backupBuck // 3, execute restoreCollectionTasks for _, restoreCollectionTask := range restoreCollectionTasks { - endTask, err := b.executeRestoreCollectionTask(ctx, backupBucketName, backupPath, restoreCollectionTask, id) - if err != nil { - log.Error("executeRestoreCollectionTask failed", - zap.String("TargetDBName", restoreCollectionTask.GetTargetDbName()), - zap.String("TargetCollectionName", restoreCollectionTask.GetTargetCollectionName()), - zap.Error(err)) - return task, err - } - log.Info("finish restore collection", - zap.String("db_name", restoreCollectionTask.GetTargetDbName()), - zap.String("collection_name", restoreCollectionTask.GetTargetCollectionName())) - restoreCollectionTask.StateCode = backuppb.RestoreTaskStateCode_SUCCESS - task.RestoredSize += endTask.RestoredSize - if task.GetToRestoreSize() == 0 { - task.Progress = 100 - } else { - task.Progress = int32(100 * task.GetRestoredSize() / task.GetToRestoreSize()) + restoreCollectionTaskClone := restoreCollectionTask + job := func(ctx context.Context) error { + endTask, err := b.executeRestoreCollectionTask(ctx, backupBucketName, backupPath, restoreCollectionTaskClone, id) + if err != nil { + log.Error("executeRestoreCollectionTask failed", + zap.String("TargetDBName", restoreCollectionTaskClone.GetTargetDbName()), + zap.String("TargetCollectionName", restoreCollectionTaskClone.GetTargetCollectionName()), + zap.Error(err)) + return err + } + log.Info("finish restore collection", + zap.String("db_name", restoreCollectionTaskClone.GetTargetDbName()), + zap.String("collection_name", restoreCollectionTaskClone.GetTargetCollectionName())) + restoreCollectionTaskClone.StateCode = backuppb.RestoreTaskStateCode_SUCCESS + task.RestoredSize += endTask.RestoredSize + if task.GetToRestoreSize() == 0 { + task.Progress = 100 + } else { + task.Progress = int32(100 * task.GetRestoredSize() / task.GetToRestoreSize()) + } + updateRestoreTaskFunc(id, task) + return nil } - updateRestoreTaskFunc(id, task) + wp.Submit(job) + } + wp.Done() + if err := wp.Wait(); err != nil { + return task, err } task.StateCode = backuppb.RestoreTaskStateCode_SUCCESS @@ -398,13 +407,12 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup EnableDynamicField: task.GetCollBackup().GetSchema().GetEnableDynamicField(), } - b.getMilvusClient().UsingDatabase(ctx, targetDBName) - err := retry.Do(ctx, func() error { if hasPartitionKey { partitionNum := len(task.GetCollBackup().GetPartitionBackups()) return b.getMilvusClient().CreateCollection( ctx, + targetDBName, collectionSchema, task.GetCollBackup().GetShardsNum(), gomilvus.WithConsistencyLevel(entity.ConsistencyLevel(task.GetCollBackup().GetConsistencyLevel())), @@ -412,6 +420,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup } return b.getMilvusClient().CreateCollection( ctx, + targetDBName, collectionSchema, task.GetCollBackup().GetShardsNum(), gomilvus.WithConsistencyLevel(entity.ConsistencyLevel(task.GetCollBackup().GetConsistencyLevel()))) @@ -432,7 +441,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup indexes := task.GetCollBackup().GetIndexInfos() for _, index := range indexes { idx := entity.NewGenericIndex(index.GetIndexName(), entity.IndexType(index.GetIndexType()), index.GetFieldName(), index.GetParams()) - err := b.getMilvusClient().CreateIndex(ctx, targetCollectionName, index.GetFieldName(), idx, true) + err := b.getMilvusClient().CreateIndex(ctx, targetDBName, targetCollectionName, index.GetFieldName(), idx, true) if err != nil { log.Warn("Fail to restore index", zap.Error(err)) return task, err @@ -453,67 +462,119 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup } }() + jobIds := make([]int64, 0) for _, partitionBackup := range task.GetCollBackup().GetPartitionBackups() { - exist, err := b.getMilvusClient().HasPartition(ctx, targetCollectionName, partitionBackup.GetPartitionName()) - if err != nil { - log.Error("fail to check has partition", zap.Error(err)) - return task, err - } - if !exist { - err = retry.Do(ctx, func() error { - return b.getMilvusClient().CreatePartition(ctx, targetCollectionName, partitionBackup.GetPartitionName()) - }, retry.Attempts(10), retry.Sleep(1*time.Second)) + partitionBackup2 := partitionBackup + job := func(ctx context.Context) error { + log.Info("start restore partition", + zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetDBName", targetDBName), + zap.String("targetCollectionName", targetCollectionName), + zap.String("partition", partitionBackup2.GetPartitionName())) + _, err := b.restorePartition(ctx, targetDBName, targetCollectionName, partitionBackup2, task, isSameBucket, backupBucketName, backupPath, tempDir) if err != nil { - log.Error("fail to create partition", zap.Error(err)) - return task, err + log.Error("fail to restore partition", + zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetDBName", targetDBName), + zap.String("targetCollectionName", targetCollectionName), + zap.String("partition", partitionBackup2.GetPartitionName()), + zap.Error(err)) + return err } + return err } - log.Info("create partition", - zap.String("collectionName", targetCollectionName), - zap.String("partitionName", partitionBackup.GetPartitionName())) - - // bulk insert - copyAndBulkInsert := func(files []string) error { - realFiles := make([]string, len(files)) - // if milvus bucket and backup bucket are not the same, should copy the data first - if !isSameBucket { - log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files)) - for i, file := range files { - // empty delta file, no need to copy - if file == "" { - realFiles[i] = file - } else { - err := b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file) - if err != nil { - log.Error("fail to copy backup date from backup bucket to restore target milvus bucket", zap.Error(err)) - return err - } - realFiles[i] = tempDir + file + jobId := b.getRestoreWorkerPool().SubmitWithId(job) + jobIds = append(jobIds, jobId) + } + + err = b.getRestoreWorkerPool().WaitJobs(jobIds) + return task, err +} + +func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targetCollectionName string, + partitionBackup *backuppb.PartitionBackupInfo, task *backuppb.RestoreCollectionTask, isSameBucket bool, backupBucketName string, backupPath string, tempDir string) (*backuppb.RestoreCollectionTask, error) { + exist, err := b.getMilvusClient().HasPartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName()) + if err != nil { + log.Error("fail to check has partition", zap.Error(err)) + return task, err + } + if !exist { + err = retry.Do(ctx, func() error { + return b.getMilvusClient().CreatePartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName()) + }, retry.Attempts(10), retry.Sleep(1*time.Second)) + if err != nil { + log.Error("fail to create partition", zap.Error(err)) + return task, err + } + } + log.Info("create partition", + zap.String("collectionName", targetCollectionName), + zap.String("partitionName", partitionBackup.GetPartitionName())) + + // bulk insert + copyAndBulkInsert := func(files []string) error { + realFiles := make([]string, len(files)) + // if milvus bucket and backup bucket are not the same, should copy the data first + if !isSameBucket { + log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files)) + for i, file := range files { + // empty delta file, no need to copy + if file == "" { + realFiles[i] = file + } else { + err := b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file) + if err != nil { + log.Error("fail to copy backup date from backup bucket to restore target milvus bucket", zap.Error(err)) + return err } + realFiles[i] = tempDir + file } - } else { - realFiles = files } + } else { + realFiles = files + } - err = b.executeBulkInsert(ctx, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp)) + err = b.executeBulkInsert(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp)) + if err != nil { + log.Error("fail to bulk insert to partition", + zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetDBName", targetDBName), + zap.String("targetCollectionName", targetCollectionName), + zap.String("partition", partitionBackup.GetPartitionName()), + zap.Error(err)) + return err + } + return nil + } + + if task.GetMetaOnly() { + task.Progress = 100 + } else { + groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups()) + if len(groupIds) == 1 && groupIds[0] == 0 { + // backward compatible old backup without group id + files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup) if err != nil { - log.Error("fail to bulk insert to partition", + log.Error("fail to get partition backup binlog files", + zap.Error(err), zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), zap.String("targetCollectionName", targetCollectionName), - zap.String("partition", partitionBackup.GetPartitionName()), - zap.Error(err)) - return err + zap.String("partition", partitionBackup.GetPartitionName())) + return task, err + } + err = copyAndBulkInsert(files) + if err != nil { + log.Error("fail to (copy and) bulkinsert data", + zap.Error(err), + zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetCollectionName", targetCollectionName), + zap.String("partition", partitionBackup.GetPartitionName())) + return task, err } - return nil - } - - if task.GetMetaOnly() { - task.Progress = 100 } else { - groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups()) - if len(groupIds) == 1 && groupIds[0] == 0 { - // backward compatible old backup without group id - files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup) + // bulk insert by segment groups + for _, groupId := range groupIds { + files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId) if err != nil { log.Error("fail to get partition backup binlog files", zap.Error(err), @@ -531,39 +592,16 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup zap.String("partition", partitionBackup.GetPartitionName())) return task, err } - } else { - // bulk insert by segment groups - for _, groupId := range groupIds { - files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId) - if err != nil { - log.Error("fail to get partition backup binlog files", - zap.Error(err), - zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), - zap.String("targetCollectionName", targetCollectionName), - zap.String("partition", partitionBackup.GetPartitionName())) - return task, err - } - err = copyAndBulkInsert(files) - if err != nil { - log.Error("fail to (copy and) bulkinsert data", - zap.Error(err), - zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), - zap.String("targetCollectionName", targetCollectionName), - zap.String("partition", partitionBackup.GetPartitionName())) - return task, err - } - } - } - task.RestoredSize = task.RestoredSize + partitionBackup.GetSize() - if task.ToRestoreSize == 0 { - task.Progress = 100 - } else { - task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize) } } + task.RestoredSize = task.RestoredSize + partitionBackup.GetSize() + if task.ToRestoreSize == 0 { + task.Progress = 100 + } else { + task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize) + } } - - return task, err + return task, nil } func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64 { @@ -578,8 +616,9 @@ func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64 return res } -func (b *BackupContext) executeBulkInsert(ctx context.Context, coll string, partition string, files []string, endTime int64) error { +func (b *BackupContext) executeBulkInsert(ctx context.Context, db, coll string, partition string, files []string, endTime int64) error { log.Debug("execute bulk insert", + zap.String("db", db), zap.String("collection", coll), zap.String("partition", partition), zap.Strings("files", files), @@ -587,16 +626,17 @@ func (b *BackupContext) executeBulkInsert(ctx context.Context, coll string, part var taskId int64 var err error if endTime == 0 { - taskId, err = b.getMilvusClient().BulkInsert(ctx, coll, partition, files, gomilvus.IsBackup()) + taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup()) } else { - taskId, err = b.getMilvusClient().BulkInsert(ctx, coll, partition, files, gomilvus.IsBackup(), gomilvus.WithEndTs(endTime)) + taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup(), gomilvus.WithEndTs(endTime)) } if err != nil { log.Error("fail to bulk insert", - zap.Error(err), + zap.String("db", db), zap.String("collectionName", coll), zap.String("partitionName", partition), - zap.Strings("files", files)) + zap.Strings("files", files), + zap.Error(err)) return err } err = b.watchBulkInsertState(ctx, taskId, BULKINSERT_TIMEOUT, BULKINSERT_SLEEP_INTERVAL) diff --git a/core/etcd_test.go b/core/etcd_test.go deleted file mode 100644 index 9a4c0d10..00000000 --- a/core/etcd_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package core - -import ( - "context" - "fmt" - "github.com/golang/protobuf/proto" - "github.com/zilliztech/milvus-backup/core/proto/backuppb" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" - "log" - "testing" - "time" -) - -func TestETCDList(t *testing.T) { - // 1. etcd client - // Initialize etcd client - cli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{"10.102.8.127:2379"}, - //Endpoints: []string{"10.102.10.120:2379"}, - //Endpoints: []string{"10.102.10.139:2379"}, - //Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}, - DialTimeout: 5 * time.Second, - }) - if err != nil { - fmt.Printf("Initialize etcd client failed. err: %v\n", err) - } - //kv := clientv3.NewKV(cli) - - ctx := context.TODO() - - opts := []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithSerializable()} - getResp, err := cli.Get(ctx, "", opts...) - - for _, kvs := range getResp.Kvs { - log.Println(zap.Any("key", string(kvs.Key)), zap.Any("value", string(kvs.Value))) - } - - //log.Println("getresp", zap.Any("resp", getResp), zap.Any("values", getResp.Kvs)) - cli.Close() -} - -func TestETCDGet(t *testing.T) { - // 1. etcd client - // Initialize etcd client - cli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{"10.102.8.127:2379"}, - //Endpoints: []string{"10.102.10.120:2379"}, - //Endpoints: []string{"10.102.10.162:2379"}, - //Endpoints: []string{"10.102.10.139:2379"}, - //Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}, - DialTimeout: 5 * time.Second, - }) - if err != nil { - fmt.Printf("Initialize etcd client failed. err: %v\n", err) - } - //kv := clientv3.NewKV(cli) - - ctx := context.TODO() - - //opts := []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithSerializable()} - //getResp, err := cli.Get(ctx, "by-dev/meta/datacoord-meta/binlog/437433135932575088/437433135932575089/437433135932575098/102") - getResp, err := cli.Get(ctx, "by-dev/meta/datacoord-meta/binlog/437454123484053509/437454123484053510/437454123484253594/0") - - for _, kvs := range getResp.Kvs { - log.Println(zap.Any("key", string(kvs.Key)), zap.Any("value", string(kvs.Value))) - m := &backuppb.FieldBinlog{} - proto.Unmarshal(kvs.Value, m) - log.Println(len(m.Binlogs)) - log.Println(m) - } - - //log.Println("getresp", zap.Any("resp", getResp), zap.Any("values", getResp.Kvs)) - cli.Close() -} diff --git a/core/milvus_sdk_wrapper.go b/core/milvus_sdk_wrapper.go new file mode 100644 index 00000000..84ad672f --- /dev/null +++ b/core/milvus_sdk_wrapper.go @@ -0,0 +1,171 @@ +package core + +import ( + "context" + gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client" + "github.com/milvus-io/milvus-sdk-go/v2/entity" + "github.com/zilliztech/milvus-backup/internal/util/retry" + "sync" + "time" +) + +// MilvusClient wrap db into milvus API to make it thread safe +type MilvusClient struct { + mu sync.Mutex + client gomilvus.Client +} + +func (m *MilvusClient) Close() error { + return m.client.Close() +} + +func (m *MilvusClient) GetVersion(ctx context.Context) (string, error) { + return m.client.GetVersion(ctx) +} + +func (m *MilvusClient) CreateDatabase(ctx context.Context, dbName string) error { + return m.client.CreateDatabase(ctx, dbName) +} + +func (m *MilvusClient) ListDatabases(ctx context.Context) ([]entity.Database, error) { + return m.client.ListDatabases(ctx) +} + +func (m *MilvusClient) DescribeCollection(ctx context.Context, db, collName string) (*entity.Collection, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, err + } + return m.client.DescribeCollection(ctx, collName) +} + +func (m *MilvusClient) DescribeIndex(ctx context.Context, db, collName, fieldName string) ([]entity.Index, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, err + } + return m.client.DescribeIndex(ctx, collName, fieldName) +} + +func (m *MilvusClient) ShowPartitions(ctx context.Context, db, collName string) ([]*entity.Partition, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, err + } + return m.client.ShowPartitions(ctx, collName) +} + +func (m *MilvusClient) GetLoadingProgress(ctx context.Context, db, collName string, partitionNames []string) (int64, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return 0, err + } + return m.client.GetLoadingProgress(ctx, collName, partitionNames) +} + +func (m *MilvusClient) GetPersistentSegmentInfo(ctx context.Context, db, collName string) ([]*entity.Segment, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, err + } + return m.client.GetPersistentSegmentInfo(ctx, collName) +} + +func (m *MilvusClient) FlushV2(ctx context.Context, db, collName string, async bool) ([]int64, []int64, int64, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, nil, 0, err + } + return m.client.FlushV2(ctx, collName, async) +} + +func (m *MilvusClient) ListCollections(ctx context.Context, db string) ([]*entity.Collection, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, err + } + return m.client.ListCollections(ctx) +} + +func (m *MilvusClient) HasCollection(ctx context.Context, db, collName string) (bool, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return false, err + } + return m.client.HasCollection(ctx, collName) +} + +func (m *MilvusClient) BulkInsert(ctx context.Context, db, collName string, partitionName string, files []string, opts ...gomilvus.BulkInsertOption) (int64, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return 0, err + } + return m.client.BulkInsert(ctx, collName, partitionName, files, opts...) +} + +func (m *MilvusClient) GetBulkInsertState(ctx context.Context, taskID int64) (*entity.BulkInsertTaskState, error) { + return m.client.GetBulkInsertState(ctx, taskID) +} + +func (m *MilvusClient) CreateCollection(ctx context.Context, db string, schema *entity.Schema, shardsNum int32, opts ...gomilvus.CreateCollectionOption) error { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return err + } + // add retry to make sure won't be block by rate control + return retry.Do(ctx, func() error { + return m.client.CreateCollection(ctx, schema, shardsNum, opts...) + }, retry.Sleep(2*time.Second), retry.Attempts(10)) +} + +func (m *MilvusClient) CreatePartition(ctx context.Context, db, collName string, partitionName string) error { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return err + } + return retry.Do(ctx, func() error { + return m.client.CreatePartition(ctx, collName, partitionName) + }, retry.Sleep(2*time.Second), retry.Attempts(10)) +} + +func (m *MilvusClient) HasPartition(ctx context.Context, db, collName string, partitionName string) (bool, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return false, err + } + return m.client.HasPartition(ctx, collName, partitionName) +} + +func (m *MilvusClient) CreateIndex(ctx context.Context, db, collName string, fieldName string, idx entity.Index, async bool, opts ...gomilvus.IndexOption) error { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return err + } + return m.client.CreateIndex(ctx, collName, fieldName, idx, async, opts...) +} diff --git a/core/milvus_source.go b/core/milvus_source.go deleted file mode 100644 index 0e1bd370..00000000 --- a/core/milvus_source.go +++ /dev/null @@ -1,21 +0,0 @@ -package core - -import "github.com/zilliztech/milvus-backup/core/paramtable" - -type MilvusSource struct { - params paramtable.BackupParams - proxyAddr string - //datacoordAddr string -} - -func (m *MilvusSource) GetProxyAddr() string { - return m.proxyAddr -} - -//func (m *MilvusSource) GetDatacoordAddr() string { -// return m.datacoordAddr -//} - -func (m *MilvusSource) GetParams() paramtable.BackupParams { - return m.params -} diff --git a/core/paramtable/params.go b/core/paramtable/params.go index 65865c8f..4befad13 100644 --- a/core/paramtable/params.go +++ b/core/paramtable/params.go @@ -33,12 +33,19 @@ type BackupConfig struct { Base *BaseTable MaxSegmentGroupSize int64 + + BackupParallelism int + RestoreParallelism int + BackupCopyDataParallelism int } func (p *BackupConfig) init(base *BaseTable) { p.Base = base p.initMaxSegmentGroupSize() + p.initBackupParallelism() + p.initRestoreParallelism() + p.initBackupCopyDataParallelism() } func (p *BackupConfig) initMaxSegmentGroupSize() { @@ -49,6 +56,21 @@ func (p *BackupConfig) initMaxSegmentGroupSize() { p.MaxSegmentGroupSize = size } +func (p *BackupConfig) initBackupParallelism() { + size := p.Base.ParseIntWithDefault("backup.parallelism", 1) + p.BackupParallelism = size +} + +func (p *BackupConfig) initRestoreParallelism() { + size := p.Base.ParseIntWithDefault("restore.parallelism", 1) + p.RestoreParallelism = size +} + +func (p *BackupConfig) initBackupCopyDataParallelism() { + size := p.Base.ParseIntWithDefault("backup.copydata.parallelism", 10) + p.BackupCopyDataParallelism = size +} + type MilvusConfig struct { Base *BaseTable diff --git a/internal/common/workerpool.go b/internal/common/workerpool.go index 73286a1b..fd3319cb 100644 --- a/internal/common/workerpool.go +++ b/internal/common/workerpool.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "go.uber.org/atomic" + "sync" "time" "golang.org/x/sync/errgroup" @@ -12,16 +14,25 @@ import ( // WorkerPool a pool that can control the total amount and rate of concurrency type WorkerPool struct { - job chan Job + job chan JobWithId g *errgroup.Group subCtx context.Context workerNum int lim *rate.Limiter + + nextId atomic.Int64 + jobsStatus sync.Map + jobsError sync.Map } type Job func(ctx context.Context) error +type JobWithId struct { + job Job + id int64 +} + // NewWorkerPool build a worker pool, rps 0 is unlimited func NewWorkerPool(ctx context.Context, workerNum int, rps int32) (*WorkerPool, error) { if workerNum <= 0 { @@ -36,13 +47,19 @@ func NewWorkerPool(ctx context.Context, workerNum int, rps int32) (*WorkerPool, lim = rate.NewLimiter(rate.Every(time.Second/time.Duration(rps)), 1) } - return &WorkerPool{job: make(chan Job), workerNum: workerNum, g: g, lim: lim, subCtx: subCtx}, nil + return &WorkerPool{job: make(chan JobWithId), workerNum: workerNum, g: g, lim: lim, subCtx: subCtx}, nil +} + +func (p *WorkerPool) Start() { + //p.jobsStatus = make(map[*Job]string) + //p.jobsError = make(map[*Job]error) + p.g.Go(p.work) + p.nextId = atomic.Int64{} } -func (p *WorkerPool) Start() { p.g.Go(p.work) } func (p *WorkerPool) work() error { for j := range p.job { - job := j + jobWithId := j p.g.Go(func() error { if p.lim != nil { if err := p.lim.Wait(p.subCtx); err != nil { @@ -50,16 +67,54 @@ func (p *WorkerPool) work() error { } } - if err := job(p.subCtx); err != nil { + if err := jobWithId.job(p.subCtx); err != nil { + p.jobsError.Store(jobWithId.id, err) + p.jobsStatus.Store(jobWithId.id, "done") return fmt.Errorf("workerpool: execute job %w", err) } - + p.jobsStatus.Store(jobWithId.id, "done") return nil }) } return nil } -func (p *WorkerPool) Submit(job Job) { p.job <- job } -func (p *WorkerPool) Done() { close(p.job) } -func (p *WorkerPool) Wait() error { return p.g.Wait() } +func (p *WorkerPool) Submit(job Job) { + jobId := p.nextId.Inc() + p.job <- JobWithId{job: job, id: jobId} + //p.jobsStatus.Store(jobId, "started") +} +func (p *WorkerPool) Done() { close(p.job) } +func (p *WorkerPool) Wait() error { return p.g.Wait() } + +func (p *WorkerPool) SubmitWithId(job Job) int64 { + jobId := p.nextId.Inc() + p.job <- JobWithId{job: job, id: jobId} + return jobId +} + +func (p *WorkerPool) WaitJobs(jobIds []int64) error { + for { + var done = true + var err error = nil + for _, jobId := range jobIds { + if value, ok := p.jobsStatus.Load(jobId); ok && value == "done" { + done = done + } else { + done = false + break + } + + if jobError, exist := p.jobsError.Load(jobId); exist { + err = jobError.(error) + break + } + } + if err != nil { + return err + } + if done { + return nil + } + } +} diff --git a/internal/common/workerpool_test.go b/internal/common/workerpool_test.go index 99bb5537..1c2112da 100644 --- a/internal/common/workerpool_test.go +++ b/internal/common/workerpool_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/stretchr/testify/assert" "go.uber.org/atomic" @@ -48,3 +49,30 @@ func TestRunTaskReturnErr(t *testing.T) { wp.Done() assert.NotNil(t, wp.Wait()) } + +func TestWaitJobs(t *testing.T) { + wp, err := NewWorkerPool(context.Background(), 3, 10) + assert.Nil(t, err) + + wp.Start() + start := time.Now().Unix() + jobs := make([]int64, 0) + for i := 0; i < 10; i++ { + job := func(ctx context.Context) error { + //return errors.New("some err") + time.Sleep(2 * time.Second) + //return errors.New("some err") + return nil + } + id := wp.SubmitWithId(job) + jobs = append(jobs, id) + } + + //time.Sleep(15 * time.Second) + err = wp.WaitJobs(jobs) + + assert.NoError(t, err) + duration := time.Now().Unix() - start + assert.True(t, duration >= 8) + //wp.Done() +} diff --git a/ut_test.go b/ut_test.go deleted file mode 100644 index 06ab7d0f..00000000 --- a/ut_test.go +++ /dev/null @@ -1 +0,0 @@ -package main