diff --git a/configs/backup.yaml b/configs/backup.yaml index d167f52..4145371 100644 --- a/configs/backup.yaml +++ b/configs/backup.yaml @@ -51,8 +51,9 @@ backup: parallelism: 128 keepTempFiles: false -restore: +parallelism: # Collection level parallelism to restore # Only change it > 1 when you have more than one datanode. # Because the max parallelism of Milvus bulkinsert is equal to datanodes' number. - parallelism: 2 \ No newline at end of file + restore: 2 + bulkinsert: 10 \ No newline at end of file diff --git a/core/backup_context.go b/core/backup_context.go index 4d4d8ee..89247f3 100644 --- a/core/backup_context.go +++ b/core/backup_context.go @@ -53,6 +53,7 @@ type BackupContext struct { restoreTasks map[string]*backuppb.RestoreBackupTask + restoreWorkerPool *common.WorkerPool bulkinsertWorkerPool *common.WorkerPool } @@ -147,10 +148,23 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager { } func (b *BackupContext) getRestoreWorkerPool() *common.WorkerPool { - if b.bulkinsertWorkerPool == nil { + if b.restoreWorkerPool == nil { wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.RestoreParallelism, RPS) if err != nil { - log.Error("failed to initial copy data woker pool", zap.Error(err)) + log.Error("failed to initial restore collection worker pool", zap.Error(err)) + panic(err) + } + b.restoreWorkerPool = wp + b.restoreWorkerPool.Start() + } + return b.restoreWorkerPool +} + +func (b *BackupContext) getBulkinsertWorkerPool() *common.WorkerPool { + if b.bulkinsertWorkerPool == nil { + wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BulkinsertParallelism, RPS) + if err != nil { + log.Error("failed to initial bulkinsert worker pool", zap.Error(err)) panic(err) } b.bulkinsertWorkerPool = wp diff --git a/core/backup_impl_restore_backup.go b/core/backup_impl_restore_backup.go index 3cda8f9..8bb4396 100644 --- a/core/backup_impl_restore_backup.go +++ b/core/backup_impl_restore_backup.go @@ -554,41 +554,22 @@ func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targ return nil } + jobIds := make([]int64, 0) if task.GetMetaOnly() { task.Progress = 100 } else { groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups()) if len(groupIds) == 1 && groupIds[0] == 0 { - // backward compatible old backup without group id - files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup) - if err != nil { - log.Error("fail to get partition backup binlog files", - zap.Error(err), - zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), - zap.String("targetCollectionName", targetCollectionName), - zap.String("partition", partitionBackup.GetPartitionName())) - return task, err - } - err = copyAndBulkInsert(files) - if err != nil { - log.Error("fail to (copy and) bulkinsert data", - zap.Error(err), - zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), - zap.String("targetCollectionName", targetCollectionName), - zap.String("partition", partitionBackup.GetPartitionName())) - return task, err - } - } else { - // bulk insert by segment groups - for _, groupId := range groupIds { - files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId) + job := func(ctx context.Context) error { + // backward compatible old backup without group id + files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup) if err != nil { log.Error("fail to get partition backup binlog files", zap.Error(err), zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), zap.String("targetCollectionName", targetCollectionName), zap.String("partition", partitionBackup.GetPartitionName())) - return task, err + return err } err = copyAndBulkInsert(files) if err != nil { @@ -597,8 +578,38 @@ func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targ zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), zap.String("targetCollectionName", targetCollectionName), zap.String("partition", partitionBackup.GetPartitionName())) - return task, err + return err } + return nil + } + jobId := b.getBulkinsertWorkerPool().SubmitWithId(job) + jobIds = append(jobIds, jobId) + } else { + // bulk insert by segment groups + for _, groupId := range groupIds { + job := func(ctx context.Context) error { + files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId) + if err != nil { + log.Error("fail to get partition backup binlog files", + zap.Error(err), + zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetCollectionName", targetCollectionName), + zap.String("partition", partitionBackup.GetPartitionName())) + return err + } + err = copyAndBulkInsert(files) + if err != nil { + log.Error("fail to (copy and) bulkinsert data", + zap.Error(err), + zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetCollectionName", targetCollectionName), + zap.String("partition", partitionBackup.GetPartitionName())) + return err + } + return nil + } + jobId := b.getBulkinsertWorkerPool().SubmitWithId(job) + jobIds = append(jobIds, jobId) } } task.RestoredSize = task.RestoredSize + partitionBackup.GetSize() @@ -608,7 +619,8 @@ func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targ task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize) } } - return task, nil + err = b.getBulkinsertWorkerPool().WaitJobs(jobIds) + return task, err } func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64 { diff --git a/core/paramtable/params.go b/core/paramtable/params.go index f84f186..3241274 100644 --- a/core/paramtable/params.go +++ b/core/paramtable/params.go @@ -36,6 +36,7 @@ type BackupConfig struct { BackupParallelism int RestoreParallelism int + BulkinsertParallelism int BackupCopyDataParallelism int KeepTempFiles bool } @@ -46,6 +47,7 @@ func (p *BackupConfig) init(base *BaseTable) { p.initMaxSegmentGroupSize() p.initBackupParallelism() p.initRestoreParallelism() + p.initBulkInsertParallelism() p.initBackupCopyDataParallelism() p.initKeepTempFiles() } @@ -64,7 +66,12 @@ func (p *BackupConfig) initBackupParallelism() { } func (p *BackupConfig) initRestoreParallelism() { - size := p.Base.ParseIntWithDefault("restore.parallelism", 1) + size := p.Base.ParseIntWithDefault("parallelism.restore", 1) + p.RestoreParallelism = size +} + +func (p *BackupConfig) initBulkInsertParallelism() { + size := p.Base.ParseIntWithDefault("parallelism.bulkinsert", 1) p.RestoreParallelism = size }