Skip to content

Commit

Permalink
Refine backup concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Dec 7, 2023
1 parent 1bda6a6 commit 0f2220d
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 153 deletions.
22 changes: 10 additions & 12 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@ minio:

backup:
maxSegmentGroupSize: 2G
parallelism: 2 # collection level parallelism to backup
copydata:
# thread pool to copy data for each collection backup, default 100.
# which means if you set backup.parallelism = 2 backup.copydata.parallelism = 100, there will be 200 copy executing at the same time.
# reduce it if blocks your storage's network bandwidth
parallelism: 128
keepTempFiles: false

restore:
# Collection level parallelism to restore
# Only change it > 1 when you have more than one datanode.
# Because the max parallelism of Milvus bulkinsert is equal to datanodes' number.
parallelism: 2
parallelism:
# collection level parallelism to backup
backupCollection: 4
# thread pool to copy data. reduce it if blocks your storage's network bandwidth
copydata: 128
# Collection level parallelism to restore
restoreCollection: 2

# keep temporary files during restore, only use to debug
keepTempFiles: false
32 changes: 30 additions & 2 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ type BackupContext struct {

restoreTasks map[string]*backuppb.RestoreBackupTask

bulkinsertWorkerPool *common.WorkerPool
backupCollectionWorkerPool *common.WorkerPool
backupCopyDataWorkerPool *common.WorkerPool
bulkinsertWorkerPool *common.WorkerPool
}

func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) {
Expand Down Expand Up @@ -146,11 +148,37 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager {
return *b.storageClient
}

func (b *BackupContext) getBackupCollectionWorkerPool() *common.WorkerPool {
if b.backupCollectionWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupCollectionParallelism, RPS)
if err != nil {
log.Error("failed to initial collection backup worker pool", zap.Error(err))
panic(err)
}
b.backupCollectionWorkerPool = wp
b.backupCollectionWorkerPool.Start()
}
return b.backupCollectionWorkerPool
}

func (b *BackupContext) getCopyDataWorkerPool() *common.WorkerPool {
if b.backupCopyDataWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupCopyDataParallelism, RPS)
if err != nil {
log.Error("failed to initial copy data worker pool", zap.Error(err))
panic(err)
}
b.backupCopyDataWorkerPool = wp
b.backupCopyDataWorkerPool.Start()
}
return b.backupCopyDataWorkerPool
}

func (b *BackupContext) getRestoreWorkerPool() *common.WorkerPool {
if b.bulkinsertWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.RestoreParallelism, RPS)
if err != nil {
log.Error("failed to initial copy data woker pool", zap.Error(err))
log.Error("failed to initial copy data worker pool", zap.Error(err))
panic(err)
}
b.bulkinsertWorkerPool = wp
Expand Down
Loading

0 comments on commit 0f2220d

Please sign in to comment.