diff --git a/configs/backup.yaml b/configs/backup.yaml index d167f52..ec69395 100644 --- a/configs/backup.yaml +++ b/configs/backup.yaml @@ -43,16 +43,14 @@ minio: backup: maxSegmentGroupSize: 2G - parallelism: 2 # collection level parallelism to backup - copydata: - # thread pool to copy data for each collection backup, default 100. - # which means if you set backup.parallelism = 2 backup.copydata.parallelism = 100, there will be 200 copy executing at the same time. - # reduce it if blocks your storage's network bandwidth - parallelism: 128 - keepTempFiles: false -restore: - # Collection level parallelism to restore - # Only change it > 1 when you have more than one datanode. - # Because the max parallelism of Milvus bulkinsert is equal to datanodes' number. - parallelism: 2 \ No newline at end of file + parallelism: + # collection level parallelism to backup + backupCollection: 4 + # thread pool to copy data. reduce it if blocks your storage's network bandwidth + copydata: 128 + # Collection level parallelism to restore + restoreCollection: 2 + + # keep temporary files during restore, only use to debug + keepTempFiles: false \ No newline at end of file diff --git a/core/backup_context.go b/core/backup_context.go index 4d4d8ee..f734c2e 100644 --- a/core/backup_context.go +++ b/core/backup_context.go @@ -53,7 +53,9 @@ type BackupContext struct { restoreTasks map[string]*backuppb.RestoreBackupTask - bulkinsertWorkerPool *common.WorkerPool + backupCollectionWorkerPool *common.WorkerPool + backupCopyDataWorkerPool *common.WorkerPool + bulkinsertWorkerPool *common.WorkerPool } func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) { @@ -146,11 +148,37 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager { return *b.storageClient } +func (b *BackupContext) getBackupCollectionWorkerPool() *common.WorkerPool { + if b.backupCollectionWorkerPool == nil { + wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupCollectionParallelism, RPS) + if err != nil { + log.Error("failed to initial collection backup worker pool", zap.Error(err)) + panic(err) + } + b.backupCollectionWorkerPool = wp + b.backupCollectionWorkerPool.Start() + } + return b.backupCollectionWorkerPool +} + +func (b *BackupContext) getCopyDataWorkerPool() *common.WorkerPool { + if b.backupCopyDataWorkerPool == nil { + wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupCopyDataParallelism, RPS) + if err != nil { + log.Error("failed to initial copy data worker pool", zap.Error(err)) + panic(err) + } + b.backupCopyDataWorkerPool = wp + b.backupCopyDataWorkerPool.Start() + } + return b.backupCopyDataWorkerPool +} + func (b *BackupContext) getRestoreWorkerPool() *common.WorkerPool { if b.bulkinsertWorkerPool == nil { wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.RestoreParallelism, RPS) if err != nil { - log.Error("failed to initial copy data woker pool", zap.Error(err)) + log.Error("failed to initial copy data worker pool", zap.Error(err)) panic(err) } b.bulkinsertWorkerPool = wp diff --git a/core/backup_impl_create_backup.go b/core/backup_impl_create_backup.go index e0375cd..f5278cb 100644 --- a/core/backup_impl_create_backup.go +++ b/core/backup_impl_create_backup.go @@ -7,7 +7,6 @@ import ( "sort" "strconv" "strings" - "sync" "time" jsoniter "github.com/json-iterator/go" @@ -16,7 +15,6 @@ import ( "github.com/zilliztech/milvus-backup/core/proto/backuppb" "github.com/zilliztech/milvus-backup/core/utils" - "github.com/zilliztech/milvus-backup/internal/common" "github.com/zilliztech/milvus-backup/internal/log" ) @@ -434,42 +432,36 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup log.Info("Finished fill segment", zap.String("collectionName", collectionBackup.GetCollectionName())) - log.Info("reading SegmentInfos from storage, this may cost several minutes if data is large", - zap.String("collectionName", collectionBackup.GetCollectionName())) segmentBackupInfos := make([]*backuppb.SegmentBackupInfo, 0) partSegInfoMap := make(map[int64][]*backuppb.SegmentBackupInfo) - mu := sync.Mutex{} - wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupCopyDataParallelism, RPS) - if err != nil { - return err - } - wp.Start() for _, v := range filledSegments { segment := v - job := func(ctx context.Context) error { - segmentInfo, err := b.readSegmentInfo(ctx, segment.CollectionID, segment.ParititionID, segment.ID, segment.NumRows) - if err != nil { - return err - } - if len(segmentInfo.Binlogs) == 0 { - log.Warn("this segment has no insert binlog", zap.Int64("id", segment.ID)) - } - mu.Lock() - partSegInfoMap[segment.ParititionID] = append(partSegInfoMap[segment.ParititionID], segmentInfo) - segmentBackupInfos = append(segmentBackupInfos, segmentInfo) - mu.Unlock() - return nil + segmentInfo := &backuppb.SegmentBackupInfo{ + SegmentId: segment.ID, + CollectionId: segment.CollectionID, + PartitionId: segment.ParititionID, + NumOfRows: segment.NumRows, } - wp.Submit(job) - } - wp.Done() - if err := wp.Wait(); err != nil { - return err + segmentBackupInfos = append(segmentBackupInfos, segmentInfo) + partSegInfoMap[segment.ParititionID] = append(partSegInfoMap[segment.ParititionID], segmentInfo) } - log.Info("readSegmentInfo from storage", + + log.Info("Begin copy data", zap.String("collectionName", collectionBackup.GetCollectionName()), zap.Int("segmentNum", len(filledSegments))) + // sort by segment rows, so that smaller segment will be copied earlier. + // smaller segments are likely to be compacted. + // this logic will reduce the possibility of segment deleted because of compaction + sort.SliceStable(segmentBackupInfos, func(i, j int) bool { + return segmentBackupInfos[i].GetNumOfRows() < segmentBackupInfos[j].GetNumOfRows() + }) + err = b.copySegments(ctx, segmentBackupInfos, backupInfo) + if err != nil { + return err + } + b.refreshBackupCache(backupInfo) + for _, partition := range partitions { partitionSegments := partSegInfoMap[partition.ID] var size int64 = 0 @@ -485,12 +477,8 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup LoadState: partitionLoadStates[partition.Name], } partitionBackupInfos = append(partitionBackupInfos, partitionBackupInfo) - //partitionLevelBackupInfos = append(partitionLevelBackupInfos, partitionBackupInfo) } - //leveledBackupInfo.partitionLevel = &backuppb.PartitionLevelBackupInfo{ - // Infos: partitionLevelBackupInfos, - //} collectionBackup.PartitionBackups = partitionBackupInfos collectionBackup.LoadState = collectionLoadState b.refreshBackupCache(backupInfo) @@ -498,57 +486,14 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup zap.String("collectionName", collectionBackup.GetCollectionName()), zap.Int("partitionNum", len(partitionBackupInfos))) - log.Info("Begin copy data", - zap.String("collectionName", collectionBackup.GetCollectionName()), - zap.Int("segmentNum", len(segmentBackupInfos))) - var collectionBackupSize int64 = 0 for _, part := range partitionBackupInfos { collectionBackupSize += part.GetSize() - if part.GetSize() > b.params.BackupCfg.MaxSegmentGroupSize { - log.Info("partition size is larger than MaxSegmentGroupSize, will separate segments into groups in backup files", - zap.Int64("collectionId", part.GetCollectionId()), - zap.Int64("partitionId", part.GetPartitionId()), - zap.Int64("partitionSize", part.GetSize()), - zap.Int64("MaxSegmentGroupSize", b.params.BackupCfg.MaxSegmentGroupSize)) - segments := partSegInfoMap[part.GetPartitionId()] - var bufferSize int64 = 0 - // 0 is illegal value, start from 1 - var segGroupID int64 = 1 - for _, seg := range segments { - if seg.Size > b.params.BackupCfg.MaxSegmentGroupSize && bufferSize == 0 { - seg.GroupId = segGroupID - segGroupID = segGroupID + 1 - } else if bufferSize+seg.Size > b.params.BackupCfg.MaxSegmentGroupSize { - segGroupID = segGroupID + 1 - seg.GroupId = segGroupID - bufferSize = 0 - bufferSize = bufferSize + seg.Size - } else { - seg.GroupId = segGroupID - bufferSize = bufferSize + seg.Size - } - } - } else { - log.Info("partition size is smaller than MaxSegmentGroupSize, won't separate segments into groups in backup files", - zap.Int64("collectionId", part.GetCollectionId()), - zap.Int64("partitionId", part.GetPartitionId()), - zap.Int64("partitionSize", part.GetSize()), - zap.Int64("MaxSegmentGroupSize", b.params.BackupCfg.MaxSegmentGroupSize)) - } - } - - sort.SliceStable(segmentBackupInfos, func(i, j int) bool { - return segmentBackupInfos[i].Size < segmentBackupInfos[j].Size - }) - err = b.copySegments(ctx, segmentBackupInfos, BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName())) - if err != nil { - return err } - b.refreshBackupCache(backupInfo) collectionBackup.Size = collectionBackupSize collectionBackup.EndTime = time.Now().Unix() + b.refreshBackupCache(backupInfo) return nil } @@ -556,15 +501,6 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup b.mu.Lock() defer b.mu.Unlock() - wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupParallelism, RPS) - if err != nil { - backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL - backupInfo.ErrorMessage = err.Error() - return backupInfo, err - } - wp.Start() - log.Info("Start collection level backup pool", zap.Int("parallelism", b.params.BackupCfg.BackupParallelism)) - backupInfo.BackupTimestamp = uint64(time.Now().UnixNano() / int64(time.Millisecond)) backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_EXECUTING @@ -584,16 +520,19 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup } log.Info("collections to backup", zap.Strings("collections", collectionNames)) + jobIds := make([]int64, 0) for _, collection := range toBackupCollections { collectionClone := collection job := func(ctx context.Context) error { err := b.backupCollection(ctx, backupInfo, collectionClone, request.GetForce()) return err } - wp.Submit(job) + jobId := b.getBackupCollectionWorkerPool().SubmitWithId(job) + jobIds = append(jobIds, jobId) } - wp.Done() - if err := wp.Wait(); err != nil { + + err = b.getBackupCollectionWorkerPool().WaitJobs(jobIds) + if err != nil { backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL backupInfo.ErrorMessage = err.Error() return backupInfo, err @@ -636,12 +575,8 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup return backupInfo, nil } -func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.SegmentBackupInfo, dstPath string) error { - wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupCopyDataParallelism, RPS) - if err != nil { - return err - } - wp.Start() +func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.SegmentBackupInfo, backupInfo *backuppb.BackupInfo) error { + dstPath := BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName()) // generate target path // milvus_rootpath/insert_log/collection_id/partition_id/segment_id/ => @@ -654,18 +589,24 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S } } + jobIds := make([]int64, 0) for _, segment := range segments { - start := time.Now().Unix() - log.Debug("copy segment", - zap.Int64("collection_id", segment.GetCollectionId()), + log := log.With(zap.Int64("collection_id", segment.GetCollectionId()), zap.Int64("partition_id", segment.GetPartitionId()), zap.Int64("segment_id", segment.GetSegmentId()), - zap.Int64("group_id", segment.GetGroupId()), - zap.Int64("size", segment.GetSize())) + zap.Int64("group_id", segment.GetGroupId())) + log.Debug("copy segment") + _, err := b.fillSegmentBackupInfo(ctx, segment) + if err != nil { + log.Info("Fail to fill segment backup info", zap.Error(err)) + return err + } // insert log for _, binlogs := range segment.GetBinlogs() { for _, binlog := range binlogs.GetBinlogs() { targetPath := backupPathFunc(binlog.GetLogPath(), b.milvusRootPath, dstPath) + // use segmentID as group id + segment.GroupId = segment.SegmentId if segment.GetGroupId() != 0 { targetPath = strings.Replace(targetPath, strconv.FormatInt(segment.GetPartitionId(), 10), @@ -707,7 +648,8 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S return nil } - wp.Submit(job) + jobId := b.getCopyDataWorkerPool().SubmitWithId(job) + jobIds = append(jobIds, jobId) } } // delta log @@ -753,30 +695,17 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S } return err } - wp.Submit(job) + jobId := b.getCopyDataWorkerPool().SubmitWithId(job) + jobIds = append(jobIds, jobId) } } - duration := time.Now().Unix() - start - log.Debug("copy segment finished", - zap.Int64("collection_id", segment.GetCollectionId()), - zap.Int64("partition_id", segment.GetPartitionId()), - zap.Int64("segment_id", segment.GetSegmentId()), - zap.Int64("cost_time", duration)) - } - wp.Done() - if err := wp.Wait(); err != nil { - return err } - return nil + + err := b.getCopyDataWorkerPool().WaitJobs(jobIds) + return err } -func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, numOfRows int64) (*backuppb.SegmentBackupInfo, error) { - segmentBackupInfo := backuppb.SegmentBackupInfo{ - SegmentId: segmentID, - CollectionId: collectionID, - PartitionId: partitionID, - NumOfRows: numOfRows, - } +func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackupInfo *backuppb.SegmentBackupInfo) (*backuppb.SegmentBackupInfo, error) { var size int64 = 0 var rootPath string @@ -786,12 +715,12 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64, rootPath = "" } - insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", collectionID, partitionID, segmentID) + insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", segmentBackupInfo.GetCollectionId(), segmentBackupInfo.GetPartitionId(), segmentBackupInfo.GetSegmentId()) log.Debug("insertPath", zap.String("bucket", b.milvusBucketName), zap.String("insertPath", insertPath)) fieldsLogDir, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, insertPath, false) if err != nil { log.Error("Fail to list segment path", zap.String("insertPath", insertPath), zap.Error(err)) - return &segmentBackupInfo, err + return segmentBackupInfo, err } log.Debug("fieldsLogDir", zap.String("bucket", b.milvusBucketName), zap.Any("fieldsLogDir", fieldsLogDir)) insertLogs := make([]*backuppb.FieldBinlog, 0) @@ -813,7 +742,7 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64, }) } - deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", collectionID, partitionID, segmentID) + deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", segmentBackupInfo.GetCollectionId(), segmentBackupInfo.GetPartitionId(), segmentBackupInfo.GetSegmentId()) deltaFieldsLogDir, _, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, deltaLogPath, false) deltaLogs := make([]*backuppb.FieldBinlog, 0) for _, deltaFieldLogDir := range deltaFieldsLogDir { @@ -862,7 +791,6 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64, segmentBackupInfo.Binlogs = insertLogs segmentBackupInfo.Deltalogs = deltaLogs //segmentBackupInfo.Statslogs = statsLogs - segmentBackupInfo.Size = size - return &segmentBackupInfo, nil + return segmentBackupInfo, nil } diff --git a/core/paramtable/params.go b/core/paramtable/params.go index f84f186..e8c932e 100644 --- a/core/paramtable/params.go +++ b/core/paramtable/params.go @@ -34,17 +34,18 @@ type BackupConfig struct { MaxSegmentGroupSize int64 - BackupParallelism int - RestoreParallelism int - BackupCopyDataParallelism int - KeepTempFiles bool + BackupCollectionParallelism int + BackupCopyDataParallelism int + RestoreParallelism int + + KeepTempFiles bool } func (p *BackupConfig) init(base *BaseTable) { p.Base = base p.initMaxSegmentGroupSize() - p.initBackupParallelism() + p.initBackupCollectionParallelism() p.initRestoreParallelism() p.initBackupCopyDataParallelism() p.initKeepTempFiles() @@ -58,18 +59,18 @@ func (p *BackupConfig) initMaxSegmentGroupSize() { p.MaxSegmentGroupSize = size } -func (p *BackupConfig) initBackupParallelism() { - size := p.Base.ParseIntWithDefault("backup.parallelism", 1) - p.BackupParallelism = size +func (p *BackupConfig) initBackupCollectionParallelism() { + size := p.Base.ParseIntWithDefault("backup.parallelism.backupCollection", 1) + p.BackupCollectionParallelism = size } func (p *BackupConfig) initRestoreParallelism() { - size := p.Base.ParseIntWithDefault("restore.parallelism", 1) + size := p.Base.ParseIntWithDefault("backup.parallelism.restoreCollection", 1) p.RestoreParallelism = size } func (p *BackupConfig) initBackupCopyDataParallelism() { - size := p.Base.ParseIntWithDefault("backup.copydata.parallelism", 128) + size := p.Base.ParseIntWithDefault("backup.parallelism.copydata", 128) p.BackupCopyDataParallelism = size }