Skip to content

Commit

Permalink
Refine backup concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Dec 7, 2023
1 parent 1bda6a6 commit 1024245
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 149 deletions.
22 changes: 10 additions & 12 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@ minio:

backup:
maxSegmentGroupSize: 2G
parallelism: 2 # collection level parallelism to backup
copydata:
# thread pool to copy data for each collection backup, default 100.
# which means if you set backup.parallelism = 2 backup.copydata.parallelism = 100, there will be 200 copy executing at the same time.
# reduce it if blocks your storage's network bandwidth
parallelism: 128
keepTempFiles: false

restore:
# Collection level parallelism to restore
# Only change it > 1 when you have more than one datanode.
# Because the max parallelism of Milvus bulkinsert is equal to datanodes' number.
parallelism: 2
parallelism:
# collection level parallelism to backup
backupCollection: 4
# thread pool to copy data. reduce it if blocks your storage's network bandwidth
copydata: 128
# Collection level parallelism to restore
restoreCollection: 2

# keep temporary files during restore, only use to debug
keepTempFiles: false
32 changes: 30 additions & 2 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ type BackupContext struct {

restoreTasks map[string]*backuppb.RestoreBackupTask

bulkinsertWorkerPool *common.WorkerPool
backupCollectionWorkerPool *common.WorkerPool
backupCopyDataWorkerPool *common.WorkerPool
bulkinsertWorkerPool *common.WorkerPool
}

func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) {
Expand Down Expand Up @@ -146,11 +148,37 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager {
return *b.storageClient
}

func (b *BackupContext) getBackupCollectionWorkerPool() *common.WorkerPool {
if b.backupCollectionWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupCollectionParallelism, RPS)
if err != nil {
log.Error("failed to initial collection backup worker pool", zap.Error(err))
panic(err)
}
b.backupCollectionWorkerPool = wp
b.backupCollectionWorkerPool.Start()
}
return b.backupCollectionWorkerPool
}

func (b *BackupContext) getCopyDataWorkerPool() *common.WorkerPool {
if b.backupCopyDataWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupCopyDataParallelism, RPS)
if err != nil {
log.Error("failed to initial copy data worker pool", zap.Error(err))
panic(err)
}
b.backupCopyDataWorkerPool = wp
b.backupCopyDataWorkerPool.Start()
}
return b.backupCopyDataWorkerPool
}

func (b *BackupContext) getRestoreWorkerPool() *common.WorkerPool {
if b.bulkinsertWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.RestoreParallelism, RPS)
if err != nil {
log.Error("failed to initial copy data woker pool", zap.Error(err))
log.Error("failed to initial copy data worker pool", zap.Error(err))
panic(err)
}
b.bulkinsertWorkerPool = wp
Expand Down
178 changes: 53 additions & 125 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

jsoniter "github.com/json-iterator/go"
Expand All @@ -16,7 +15,6 @@ import (

"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/common"
"github.com/zilliztech/milvus-backup/internal/log"
)

Expand Down Expand Up @@ -434,42 +432,36 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup
log.Info("Finished fill segment",
zap.String("collectionName", collectionBackup.GetCollectionName()))

log.Info("reading SegmentInfos from storage, this may cost several minutes if data is large",
zap.String("collectionName", collectionBackup.GetCollectionName()))
segmentBackupInfos := make([]*backuppb.SegmentBackupInfo, 0)
partSegInfoMap := make(map[int64][]*backuppb.SegmentBackupInfo)
mu := sync.Mutex{}
wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupCopyDataParallelism, RPS)
if err != nil {
return err
}
wp.Start()
for _, v := range filledSegments {
segment := v
job := func(ctx context.Context) error {
segmentInfo, err := b.readSegmentInfo(ctx, segment.CollectionID, segment.ParititionID, segment.ID, segment.NumRows)
if err != nil {
return err
}
if len(segmentInfo.Binlogs) == 0 {
log.Warn("this segment has no insert binlog", zap.Int64("id", segment.ID))
}
mu.Lock()
partSegInfoMap[segment.ParititionID] = append(partSegInfoMap[segment.ParititionID], segmentInfo)
segmentBackupInfos = append(segmentBackupInfos, segmentInfo)
mu.Unlock()
return nil
segmentInfo := &backuppb.SegmentBackupInfo{
SegmentId: segment.ID,
CollectionId: segment.CollectionID,
PartitionId: segment.ParititionID,
NumOfRows: segment.NumRows,
}
wp.Submit(job)
}
wp.Done()
if err := wp.Wait(); err != nil {
return err
segmentBackupInfos = append(segmentBackupInfos, segmentInfo)
partSegInfoMap[segment.ParititionID] = append(partSegInfoMap[segment.ParititionID], segmentInfo)
}
log.Info("readSegmentInfo from storage",

log.Info("Begin copy data",
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int("segmentNum", len(filledSegments)))

// sort by segment rows, so that smaller segment will be copied earlier.
// smaller segments are likely to be compacted.
// this logic will reduce the possibility of segment deleted because of compaction
sort.SliceStable(segmentBackupInfos, func(i, j int) bool {
return segmentBackupInfos[i].GetNumOfRows() < segmentBackupInfos[j].GetNumOfRows()
})
err = b.copySegments(ctx, segmentBackupInfos, backupInfo)
if err != nil {
return err
}
b.refreshBackupCache(backupInfo)

for _, partition := range partitions {
partitionSegments := partSegInfoMap[partition.ID]
var size int64 = 0
Expand All @@ -485,86 +477,30 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup
LoadState: partitionLoadStates[partition.Name],
}
partitionBackupInfos = append(partitionBackupInfos, partitionBackupInfo)
//partitionLevelBackupInfos = append(partitionLevelBackupInfos, partitionBackupInfo)
}

//leveledBackupInfo.partitionLevel = &backuppb.PartitionLevelBackupInfo{
// Infos: partitionLevelBackupInfos,
//}
collectionBackup.PartitionBackups = partitionBackupInfos
collectionBackup.LoadState = collectionLoadState
b.refreshBackupCache(backupInfo)
log.Info("finish build partition info",
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int("partitionNum", len(partitionBackupInfos)))

log.Info("Begin copy data",
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int("segmentNum", len(segmentBackupInfos)))

var collectionBackupSize int64 = 0
for _, part := range partitionBackupInfos {
collectionBackupSize += part.GetSize()
if part.GetSize() > b.params.BackupCfg.MaxSegmentGroupSize {
log.Info("partition size is larger than MaxSegmentGroupSize, will separate segments into groups in backup files",
zap.Int64("collectionId", part.GetCollectionId()),
zap.Int64("partitionId", part.GetPartitionId()),
zap.Int64("partitionSize", part.GetSize()),
zap.Int64("MaxSegmentGroupSize", b.params.BackupCfg.MaxSegmentGroupSize))
segments := partSegInfoMap[part.GetPartitionId()]
var bufferSize int64 = 0
// 0 is illegal value, start from 1
var segGroupID int64 = 1
for _, seg := range segments {
if seg.Size > b.params.BackupCfg.MaxSegmentGroupSize && bufferSize == 0 {
seg.GroupId = segGroupID
segGroupID = segGroupID + 1
} else if bufferSize+seg.Size > b.params.BackupCfg.MaxSegmentGroupSize {
segGroupID = segGroupID + 1
seg.GroupId = segGroupID
bufferSize = 0
bufferSize = bufferSize + seg.Size
} else {
seg.GroupId = segGroupID
bufferSize = bufferSize + seg.Size
}
}
} else {
log.Info("partition size is smaller than MaxSegmentGroupSize, won't separate segments into groups in backup files",
zap.Int64("collectionId", part.GetCollectionId()),
zap.Int64("partitionId", part.GetPartitionId()),
zap.Int64("partitionSize", part.GetSize()),
zap.Int64("MaxSegmentGroupSize", b.params.BackupCfg.MaxSegmentGroupSize))
}
}

sort.SliceStable(segmentBackupInfos, func(i, j int) bool {
return segmentBackupInfos[i].Size < segmentBackupInfos[j].Size
})
err = b.copySegments(ctx, segmentBackupInfos, BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName()))
if err != nil {
return err
}
b.refreshBackupCache(backupInfo)

collectionBackup.Size = collectionBackupSize
collectionBackup.EndTime = time.Now().Unix()
b.refreshBackupCache(backupInfo)
return nil
}

func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backuppb.CreateBackupRequest, backupInfo *backuppb.BackupInfo) (*backuppb.BackupInfo, error) {
b.mu.Lock()
defer b.mu.Unlock()

wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupParallelism, RPS)
if err != nil {
backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL
backupInfo.ErrorMessage = err.Error()
return backupInfo, err
}
wp.Start()
log.Info("Start collection level backup pool", zap.Int("parallelism", b.params.BackupCfg.BackupParallelism))

backupInfo.BackupTimestamp = uint64(time.Now().UnixNano() / int64(time.Millisecond))
backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_EXECUTING

Expand All @@ -584,16 +520,19 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
}
log.Info("collections to backup", zap.Strings("collections", collectionNames))

jobIds := make([]int64, 0)
for _, collection := range toBackupCollections {
collectionClone := collection
job := func(ctx context.Context) error {
err := b.backupCollection(ctx, backupInfo, collectionClone, request.GetForce())
return err
}
wp.Submit(job)
jobId := b.getBackupCollectionWorkerPool().SubmitWithId(job)
jobIds = append(jobIds, jobId)
}
wp.Done()
if err := wp.Wait(); err != nil {

err = b.getBackupCollectionWorkerPool().WaitJobs(jobIds)
if err != nil {
backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL
backupInfo.ErrorMessage = err.Error()
return backupInfo, err
Expand Down Expand Up @@ -636,12 +575,8 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
return backupInfo, nil
}

func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.SegmentBackupInfo, dstPath string) error {
wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupCopyDataParallelism, RPS)
if err != nil {
return err
}
wp.Start()
func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.SegmentBackupInfo, backupInfo *backuppb.BackupInfo) error {
dstPath := BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName())

// generate target path
// milvus_rootpath/insert_log/collection_id/partition_id/segment_id/ =>
Expand All @@ -654,18 +589,24 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S
}
}

jobIds := make([]int64, 0)
for _, segment := range segments {
start := time.Now().Unix()
log.Debug("copy segment",
zap.Int64("collection_id", segment.GetCollectionId()),
log := log.With(zap.Int64("collection_id", segment.GetCollectionId()),
zap.Int64("partition_id", segment.GetPartitionId()),
zap.Int64("segment_id", segment.GetSegmentId()),
zap.Int64("group_id", segment.GetGroupId()),
zap.Int64("size", segment.GetSize()))
zap.Int64("group_id", segment.GetGroupId()))
log.Debug("copy segment")
_, err := b.fillSegmentBackupInfo(ctx, segment)
if err != nil {
log.Info("Fail to fill segment backup info", zap.Error(err))
return err
}
// insert log
for _, binlogs := range segment.GetBinlogs() {
for _, binlog := range binlogs.GetBinlogs() {
targetPath := backupPathFunc(binlog.GetLogPath(), b.milvusRootPath, dstPath)
// use segmentID as group id
segment.GroupId = segment.SegmentId
if segment.GetGroupId() != 0 {
targetPath = strings.Replace(targetPath,
strconv.FormatInt(segment.GetPartitionId(), 10),
Expand Down Expand Up @@ -707,7 +648,8 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S

return nil
}
wp.Submit(job)
jobId := b.getCopyDataWorkerPool().SubmitWithId(job)
jobIds = append(jobIds, jobId)
}
}
// delta log
Expand Down Expand Up @@ -753,30 +695,17 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S
}
return err
}
wp.Submit(job)
jobId := b.getCopyDataWorkerPool().SubmitWithId(job)
jobIds = append(jobIds, jobId)
}
}
duration := time.Now().Unix() - start
log.Debug("copy segment finished",
zap.Int64("collection_id", segment.GetCollectionId()),
zap.Int64("partition_id", segment.GetPartitionId()),
zap.Int64("segment_id", segment.GetSegmentId()),
zap.Int64("cost_time", duration))
}
wp.Done()
if err := wp.Wait(); err != nil {
return err
}
return nil

err := b.getCopyDataWorkerPool().WaitJobs(jobIds)
return err
}

func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, numOfRows int64) (*backuppb.SegmentBackupInfo, error) {
segmentBackupInfo := backuppb.SegmentBackupInfo{
SegmentId: segmentID,
CollectionId: collectionID,
PartitionId: partitionID,
NumOfRows: numOfRows,
}
func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackupInfo *backuppb.SegmentBackupInfo) (*backuppb.SegmentBackupInfo, error) {
var size int64 = 0
var rootPath string

Expand All @@ -786,12 +715,12 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64,
rootPath = ""
}

insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", collectionID, partitionID, segmentID)
insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", segmentBackupInfo.GetCollectionId(), segmentBackupInfo.GetPartitionId(), segmentBackupInfo.GetSegmentId())
log.Debug("insertPath", zap.String("bucket", b.milvusBucketName), zap.String("insertPath", insertPath))
fieldsLogDir, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, insertPath, false)
if err != nil {
log.Error("Fail to list segment path", zap.String("insertPath", insertPath), zap.Error(err))
return &segmentBackupInfo, err
return segmentBackupInfo, err
}
log.Debug("fieldsLogDir", zap.String("bucket", b.milvusBucketName), zap.Any("fieldsLogDir", fieldsLogDir))
insertLogs := make([]*backuppb.FieldBinlog, 0)
Expand All @@ -813,7 +742,7 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64,
})
}

deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", collectionID, partitionID, segmentID)
deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", segmentBackupInfo.GetCollectionId(), segmentBackupInfo.GetPartitionId(), segmentBackupInfo.GetSegmentId())
deltaFieldsLogDir, _, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, deltaLogPath, false)
deltaLogs := make([]*backuppb.FieldBinlog, 0)
for _, deltaFieldLogDir := range deltaFieldsLogDir {
Expand Down Expand Up @@ -862,7 +791,6 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64,
segmentBackupInfo.Binlogs = insertLogs
segmentBackupInfo.Deltalogs = deltaLogs
//segmentBackupInfo.Statslogs = statsLogs

segmentBackupInfo.Size = size
return &segmentBackupInfo, nil
return segmentBackupInfo, nil
}
Loading

0 comments on commit 1024245

Please sign in to comment.