Skip to content

Commit

Permalink
support concurrent bulkinsert
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Dec 5, 2023
1 parent 1bda6a6 commit 7f05163
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 31 deletions.
5 changes: 3 additions & 2 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ backup:
parallelism: 128
keepTempFiles: false

restore:
parallelism:
# Collection level parallelism to restore
# Only change it > 1 when you have more than one datanode.
# Because the max parallelism of Milvus bulkinsert is equal to datanodes' number.
parallelism: 2
restore: 2
bulkinsert: 10
18 changes: 16 additions & 2 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type BackupContext struct {

restoreTasks map[string]*backuppb.RestoreBackupTask

restoreWorkerPool *common.WorkerPool
bulkinsertWorkerPool *common.WorkerPool
}

Expand Down Expand Up @@ -147,10 +148,23 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager {
}

func (b *BackupContext) getRestoreWorkerPool() *common.WorkerPool {
if b.bulkinsertWorkerPool == nil {
if b.restoreWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.RestoreParallelism, RPS)
if err != nil {
log.Error("failed to initial copy data woker pool", zap.Error(err))
log.Error("failed to initial restore collection worker pool", zap.Error(err))
panic(err)
}
b.restoreWorkerPool = wp
b.restoreWorkerPool.Start()
}
return b.restoreWorkerPool
}

func (b *BackupContext) getBulkinsertWorkerPool() *common.WorkerPool {
if b.bulkinsertWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BulkinsertParallelism, RPS)
if err != nil {
log.Error("failed to initial bulkinsert worker pool", zap.Error(err))
panic(err)
}
b.bulkinsertWorkerPool = wp
Expand Down
64 changes: 38 additions & 26 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,41 +554,22 @@ func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targ
return nil
}

jobIds := make([]int64, 0)
if task.GetMetaOnly() {
task.Progress = 100
} else {
groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
job := func(ctx context.Context) error {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
return err
}
err = copyAndBulkInsert(files)
if err != nil {
Expand All @@ -597,8 +578,38 @@ func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targ
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
return err
}
return nil
}
jobId := b.getBulkinsertWorkerPool().SubmitWithId(job)
jobIds = append(jobIds, jobId)
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
job := func(ctx context.Context) error {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return err
}
return nil
}
jobId := b.getBulkinsertWorkerPool().SubmitWithId(job)
jobIds = append(jobIds, jobId)
}
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
Expand All @@ -608,7 +619,8 @@ func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targ
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
}
}
return task, nil
err = b.getBulkinsertWorkerPool().WaitJobs(jobIds)
return task, err
}

func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64 {
Expand Down
9 changes: 8 additions & 1 deletion core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type BackupConfig struct {

BackupParallelism int
RestoreParallelism int
BulkinsertParallelism int
BackupCopyDataParallelism int
KeepTempFiles bool
}
Expand All @@ -46,6 +47,7 @@ func (p *BackupConfig) init(base *BaseTable) {
p.initMaxSegmentGroupSize()
p.initBackupParallelism()
p.initRestoreParallelism()
p.initBulkInsertParallelism()
p.initBackupCopyDataParallelism()
p.initKeepTempFiles()
}
Expand All @@ -64,7 +66,12 @@ func (p *BackupConfig) initBackupParallelism() {
}

func (p *BackupConfig) initRestoreParallelism() {
size := p.Base.ParseIntWithDefault("restore.parallelism", 1)
size := p.Base.ParseIntWithDefault("parallelism.restore", 2)
p.RestoreParallelism = size
}

func (p *BackupConfig) initBulkInsertParallelism() {
size := p.Base.ParseIntWithDefault("parallelism.bulkinsert", 10)
p.RestoreParallelism = size
}

Expand Down

0 comments on commit 7f05163

Please sign in to comment.