Skip to content

Commit

Permalink
support create bulk inert job via restful api (#503)
Browse files Browse the repository at this point in the history
Signed-off-by: huanghaoyuanhhy <haoyuan.huang@zilliz.com>
  • Loading branch information
huanghaoyuanhhy authored Jan 16, 2025
1 parent 93fe94e commit 312e7e9
Show file tree
Hide file tree
Showing 12 changed files with 458 additions and 242 deletions.
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ all: gen build

# Build the binary
build:
@echo "Building binary..."
GO111MODULE=on CGO_ENABLED=0 go build -ldflags '$(LDFLAGS)' -o $(BINARY_NAME)
@echo "Building Backup binary..."
@echo "Version: $(VERSION)"
@echo "Commit: $(COMMIT)"
@echo "Date: $(DATE)"
@GO111MODULE=on CGO_ENABLED=0 go build -ldflags '$(LDFLAGS)' -o $(BINARY_NAME)

gen:
./scripts/gen_swag.sh
Expand Down
24 changes: 19 additions & 5 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type BackupContext struct {
bulkinsertWorkerPools sync.Map
}

func CreateMilvusClient(ctx context.Context, params *paramtable.BackupParams) (client.Grpc, error) {
func paramsToCfg(params *paramtable.BackupParams) (*client.Cfg, error) {
ep := params.MilvusCfg.Address + ":" + params.MilvusCfg.Port
log.Debug("Start Milvus client", zap.String("endpoint", ep))

Expand All @@ -76,6 +76,7 @@ func CreateMilvusClient(ctx context.Context, params *paramtable.BackupParams) (c
enableTLS = true
default:
log.Error("milvus.TLSMode is illegal, support value 0, 1, 2")
return nil, fmt.Errorf("milvus.TLSMode is illegal, support value 0, 1, 2")
}

cfg := &client.Cfg{
Expand All @@ -84,6 +85,17 @@ func CreateMilvusClient(ctx context.Context, params *paramtable.BackupParams) (c
Username: params.MilvusCfg.User,
Password: params.MilvusCfg.Password,
}

return cfg, nil
}

func CreateGrpcClient(params *paramtable.BackupParams) (client.Grpc, error) {
cfg, err := paramsToCfg(params)
if err != nil {
log.Error("failed to create milvus client", zap.Error(err))
return nil, fmt.Errorf("failed to create milvus client: %w", err)
}

cli, err := client.NewGrpc(cfg)
if err != nil {
log.Error("failed to create milvus client", zap.Error(err))
Expand All @@ -93,10 +105,12 @@ func CreateMilvusClient(ctx context.Context, params *paramtable.BackupParams) (c
}

func CreateRestfulClient(params *paramtable.BackupParams) (client.Restful, error) {
ep := params.MilvusCfg.Address + ":" + params.MilvusCfg.Port
log.Debug("Start Restful client", zap.String("endpoint", ep))
cfg, err := paramsToCfg(params)
if err != nil {
log.Error("failed to create restful client", zap.Error(err))
return nil, fmt.Errorf("failed to create restful client: %w", err)
}

cfg := &client.Cfg{Address: ep, Username: params.MilvusCfg.User, Password: params.MilvusCfg.Password}
cli, err := client.NewRestful(cfg)
if err != nil {
log.Error("failed to create restful client", zap.Error(err))
Expand Down Expand Up @@ -136,7 +150,7 @@ func CreateBackupContext(ctx context.Context, params *paramtable.BackupParams) *

func (b *BackupContext) getMilvusClient() client.Grpc {
if b.grpcClient == nil {
milvusClient, err := CreateMilvusClient(b.ctx, b.params)
milvusClient, err := CreateGrpcClient(b.params)
if err != nil {
log.Error("failed to initial milvus client", zap.Error(err))
panic(err)
Expand Down
4 changes: 4 additions & 0 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
zap.String("path", request.GetPath()),
zap.String("databaseCollections", utils.GetRestoreDBCollections(request)),
zap.Bool("skipDiskQuotaCheck", request.GetSkipImportDiskQuotaCheck()),
zap.Any("skipParams", request.GetSkipParams()),
zap.Bool("useV2Restore", request.GetUseV2Restore()),
zap.Int32("maxShardNum", request.GetMaxShardNum()))

resp := &backuppb.RestoreBackupResponse{
Expand Down Expand Up @@ -311,6 +313,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
SkipCreateCollection: request.GetSkipCreateCollection(),
SkipDiskQuotaCheck: request.GetSkipImportDiskQuotaCheck(),
MaxShardNum: request.GetMaxShardNum(),
SkipParams: request.GetSkipParams(),
UseV2Restore: request.GetUseV2Restore(),
}
restoreCollectionTasks = append(restoreCollectionTasks, restoreCollectionTask)
task.CollectionRestoreTasks = restoreCollectionTasks
Expand Down
6 changes: 3 additions & 3 deletions core/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Grpc interface {
Flush(ctx context.Context, db, collName string) (*milvuspb.FlushResponse, error)
ListCollections(ctx context.Context, db string) (*milvuspb.ShowCollectionsResponse, error)
HasCollection(ctx context.Context, db, collName string) (bool, error)
BulkInsert(ctx context.Context, input BulkInsertInput) (int64, error)
BulkInsert(ctx context.Context, input GrpcBulkInsertInput) (int64, error)
GetBulkInsertState(ctx context.Context, taskID int64) (*milvuspb.GetImportStateResponse, error)
CreateCollection(ctx context.Context, input CreateCollectionInput) error
CreatePartition(ctx context.Context, db, collName, partitionName string) error
Expand Down Expand Up @@ -355,7 +355,7 @@ func (m *GrpcClient) HasCollection(ctx context.Context, db, collName string) (bo
return resp.GetValue(), nil
}

type BulkInsertInput struct {
type GrpcBulkInsertInput struct {
DB string
CollectionName string
PartitionName string
Expand All @@ -366,7 +366,7 @@ type BulkInsertInput struct {
SkipDiskQuotaCheck bool
}

func (m *GrpcClient) BulkInsert(ctx context.Context, input BulkInsertInput) (int64, error) {
func (m *GrpcClient) BulkInsert(ctx context.Context, input GrpcBulkInsertInput) (int64, error) {
ctx = m.newCtxWithDB(ctx, input.DB)
var opts []*commonpb.KeyValuePair
if input.EndTime > 0 {
Expand Down
46 changes: 33 additions & 13 deletions core/client/restful.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,28 @@ import (
"github.com/zilliztech/milvus-backup/internal/log"
)

type ImportState string

const (
ImportStatePending ImportState = "Pending"
ImportStateImporting ImportState = "Importing"
ImportStateCompleted ImportState = "Completed"
ImportStateFailed ImportState = "Failed"
)

type RestfulBulkInsertInput struct {
DB string
CollectionName string
PartitionName string
// offset 0 is path to insertLog file, offset 1 is path to deleteLog file
Paths [][]string
EndTime int64
IsL0 bool
SkipDiskQuotaCheck bool
}

type Restful interface {
BulkInsert(ctx context.Context, db, collName, partitionName string, files []string, endTime int64, isL0 bool, skipDiskQuotaCheck bool) (string, error)
BulkInsert(ctx context.Context, input RestfulBulkInsertInput) (string, error)
GetBulkInsertState(ctx context.Context, db, jobID string) (*GetProcessResp, error)
}

Expand Down Expand Up @@ -73,27 +93,27 @@ type RestfulClient struct {
cli *req.Client
}

func (r *RestfulClient) BulkInsert(ctx context.Context, db, collName, partitionName string, files []string, endTime int64, isL0 bool, skipDiskQuotaCheck bool) (string, error) {
func (r *RestfulClient) BulkInsert(ctx context.Context, input RestfulBulkInsertInput) (string, error) {
opts := make(map[string]string)
if endTime > 0 {
opts["end_time"] = strconv.FormatInt(endTime, 10)
if input.EndTime > 0 {
opts["end_time"] = strconv.FormatInt(input.EndTime, 10)
}
if isL0 {
if input.IsL0 {
opts["l0_import"] = "true"
} else {
opts["backup"] = "true"
}
opts["skip_disk_quota_check"] = strconv.FormatBool(skipDiskQuotaCheck)
opts["skip_disk_quota_check"] = strconv.FormatBool(input.SkipDiskQuotaCheck)

createReq := createImportReq{
DbName: db,
CollectionName: collName,
PartitionName: partitionName,
Files: [][]string{files},
DbName: input.DB,
CollectionName: input.CollectionName,
PartitionName: input.PartitionName,
Files: input.Paths,
Options: opts,
}
var createResp createImportResp
log.Info("create import job via restful", zap.Any("createReq", createReq))
log.Debug("create import job via restful", zap.Any("createReq", createReq))
resp, err := r.cli.R().
SetContext(ctx).
SetBody(createReq).
Expand All @@ -102,7 +122,7 @@ func (r *RestfulClient) BulkInsert(ctx context.Context, db, collName, partitionN
if err != nil {
return "", fmt.Errorf("client: failed to create import job via restful: %w", err)
}
log.Info("create import job via restful", zap.Any("createResp", resp))
log.Debug("create import job via restful", zap.Any("createResp", resp))
if resp.IsErrorState() {
return "", fmt.Errorf("client: failed to create import job via restful: %v", resp)
}
Expand All @@ -125,7 +145,7 @@ func (r *RestfulClient) GetBulkInsertState(ctx context.Context, dbName, jobID st
if err != nil {
return nil, fmt.Errorf("client: failed to get import job state via restful: %w", err)
}
log.Info("get import job state via restful", zap.Any("getResp", resp))
log.Debug("get import job state via restful", zap.Any("getResp", resp))
if resp.IsErrorState() {
return nil, fmt.Errorf("client: failed to get import job state via restful: %v", resp)
}
Expand Down
14 changes: 8 additions & 6 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,12 @@ enum RestoreTaskStateCode {
}

message SkipParams {
repeated string collection_properties = 1;
repeated string collectionProperties = 1;

repeated string filed_index_params = 2;
repeated string filed_type_params = 3;
repeated string fieldIndexParams = 2;
repeated string fieldTypeParams = 3;

repeated string index_params = 4;
repeated string indexParams = 4;
}

message RestoreBackupRequest {
Expand Down Expand Up @@ -295,7 +295,8 @@ message RestoreBackupRequest {
// target max shard number
int32 maxShardNum = 19;
// if key is set, will skip the params in restore process
SkipParams skip_params = 20;
SkipParams skipParams = 20;
bool useV2Restore = 21;
}

message RestorePartitionTask {
Expand Down Expand Up @@ -336,7 +337,8 @@ message RestoreCollectionTask {
bool skipDiskQuotaCheck = 19;
// target max shard number
int32 maxShardNum = 20;
SkipParams skip_params = 21;
SkipParams skipParams = 21;
bool useV2Restore = 22;
}

message RestoreBackupTask {
Expand Down
Loading

0 comments on commit 312e7e9

Please sign in to comment.