Skip to content

Commit

Permalink
concurrent backup
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Oct 9, 2023
1 parent 0691b27 commit d38652e
Show file tree
Hide file tree
Showing 7 changed files with 636 additions and 85 deletions.
3 changes: 2 additions & 1 deletion configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ minio:
backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath

backup:
maxSegmentGroupSize: 2G
maxSegmentGroupSize: 2G
parallelism: 2
51 changes: 20 additions & 31 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type BackupContext struct {
params paramtable.BackupParams

// milvus client
milvusClient *gomilvus.Client
milvusClient *MilvusClient

// data storage client
storageClient *storage.ChunkManager
Expand All @@ -50,7 +50,7 @@ type BackupContext struct {

restoreTasks map[string]*backuppb.RestoreBackupTask

//copyWorkerPool *common.WorkerPool
//backupWorkerPool *common.WorkerPool
}

func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) {
Expand Down Expand Up @@ -88,35 +88,9 @@ func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (s
}

func (b *BackupContext) Start() error {
// start milvus go SDK client
//milvusClient, err := CreateMilvusClient(b.ctx, b.params)
//if err != nil {
// log.Error("failed to initial milvus client", zap.Error(err))
// return err
//}
//b.milvusClient = milvusClient

// start milvus storage client
//minioClient, err := CreateStorageClient(b.ctx, b.params)
//if err != nil {
// log.Error("failed to initial storage client", zap.Error(err))
// return err
//}
//b.storageClient = minioClient

b.backupTasks = sync.Map{}
b.backupNameIdDict = sync.Map{}
b.restoreTasks = make(map[string]*backuppb.RestoreBackupTask)

// init worker pool
//wp, err := common.NewWorkerPool(b.ctx, WORKER_NUM, RPS)
//if err != nil {
// log.Error("failed to initial copy data woker pool", zap.Error(err))
// return err
//}
//b.copyWorkerPool = wp
//b.copyWorkerPool.Start()

b.started = true
return nil
}
Expand All @@ -141,16 +115,18 @@ func CreateBackupContext(ctx context.Context, params paramtable.BackupParams) *B
}
}

func (b *BackupContext) getMilvusClient() gomilvus.Client {
func (b *BackupContext) getMilvusClient() *MilvusClient {
if b.milvusClient == nil {
milvusClient, err := CreateMilvusClient(b.ctx, b.params)
if err != nil {
log.Error("failed to initial milvus client", zap.Error(err))
panic(err)
}
b.milvusClient = &milvusClient
b.milvusClient = &MilvusClient{
client: milvusClient,
}
}
return *b.milvusClient
return b.milvusClient
}

func (b *BackupContext) getStorageClient() storage.ChunkManager {
Expand All @@ -165,6 +141,19 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager {
return *b.storageClient
}

//func (b *BackupContext) getBackupWorkerPool() *common.WorkerPool {
// if b.backupWorkerPool == nil {
// wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupParallelism, RPS)
// if err != nil {
// log.Error("failed to initial copy data woker pool", zap.Error(err))
// panic(err)
// }
// b.backupWorkerPool = wp
// b.backupWorkerPool.Start()
// }
// return b.backupWorkerPool
//}

func (b *BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBackupRequest) *backuppb.BackupInfoResponse {
if request.GetRequestId() == "" {
request.RequestId = utils.UUID()
Expand Down
Loading

0 comments on commit d38652e

Please sign in to comment.