Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support concurrent bulkinsert #250

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ backup:
parallelism: 128
keepTempFiles: false

restore:
parallelism:
# Collection level parallelism to restore
# Only change it > 1 when you have more than one datanode.
# Because the max parallelism of Milvus bulkinsert is equal to datanodes' number.
parallelism: 2
restore: 2
bulkinsert: 10
18 changes: 16 additions & 2 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type BackupContext struct {

restoreTasks map[string]*backuppb.RestoreBackupTask

restoreWorkerPool *common.WorkerPool
bulkinsertWorkerPool *common.WorkerPool
}

Expand Down Expand Up @@ -147,10 +148,23 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager {
}

func (b *BackupContext) getRestoreWorkerPool() *common.WorkerPool {
if b.bulkinsertWorkerPool == nil {
if b.restoreWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.RestoreParallelism, RPS)
if err != nil {
log.Error("failed to initial copy data woker pool", zap.Error(err))
log.Error("failed to initial restore collection worker pool", zap.Error(err))
panic(err)
}
b.restoreWorkerPool = wp
b.restoreWorkerPool.Start()
}
return b.restoreWorkerPool
}

func (b *BackupContext) getBulkinsertWorkerPool() *common.WorkerPool {
if b.bulkinsertWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BulkinsertParallelism, RPS)
if err != nil {
log.Error("failed to initial bulkinsert worker pool", zap.Error(err))
panic(err)
}
b.bulkinsertWorkerPool = wp
Expand Down
64 changes: 38 additions & 26 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,41 +554,22 @@ func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targ
return nil
}

jobIds := make([]int64, 0)
if task.GetMetaOnly() {
task.Progress = 100
} else {
groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
job := func(ctx context.Context) error {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
return err
}
err = copyAndBulkInsert(files)
if err != nil {
Expand All @@ -597,8 +578,38 @@ func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targ
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
return err
}
return nil
}
jobId := b.getBulkinsertWorkerPool().SubmitWithId(job)
jobIds = append(jobIds, jobId)
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
job := func(ctx context.Context) error {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return err
}
return nil
}
jobId := b.getBulkinsertWorkerPool().SubmitWithId(job)
jobIds = append(jobIds, jobId)
}
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
Expand All @@ -608,7 +619,8 @@ func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targ
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
}
}
return task, nil
err = b.getBulkinsertWorkerPool().WaitJobs(jobIds)
return task, err
}

func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64 {
Expand Down
9 changes: 8 additions & 1 deletion core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type BackupConfig struct {

BackupParallelism int
RestoreParallelism int
BulkinsertParallelism int
BackupCopyDataParallelism int
KeepTempFiles bool
}
Expand All @@ -46,6 +47,7 @@ func (p *BackupConfig) init(base *BaseTable) {
p.initMaxSegmentGroupSize()
p.initBackupParallelism()
p.initRestoreParallelism()
p.initBulkInsertParallelism()
p.initBackupCopyDataParallelism()
p.initKeepTempFiles()
}
Expand All @@ -64,10 +66,15 @@ func (p *BackupConfig) initBackupParallelism() {
}

func (p *BackupConfig) initRestoreParallelism() {
size := p.Base.ParseIntWithDefault("restore.parallelism", 1)
size := p.Base.ParseIntWithDefault("parallelism.restore", 2)
p.RestoreParallelism = size
}

func (p *BackupConfig) initBulkInsertParallelism() {
size := p.Base.ParseIntWithDefault("parallelism.bulkinsert", 10)
p.BulkinsertParallelism = size
}

func (p *BackupConfig) initBackupCopyDataParallelism() {
size := p.Base.ParseIntWithDefault("backup.copydata.parallelism", 128)
p.BackupCopyDataParallelism = size
Expand Down
Loading