Skip to content

Commit

Permalink
feat: support Milvus 2.5 function feature by bypassing SDK
Browse files Browse the repository at this point in the history
Milvus 2.5 introduced the function feature, which is not well-supported by the older SDK versions. The new SDK hides many internal IDs used by the backup tool. To address this, the backup tool now interacts with Milvus directly via gRPC, bypassing the SDK.

Signed-off-by: huanghaoyuanhhy <haoyuan.huang@zilliz.com>
  • Loading branch information
huanghaoyuanhhy committed Jan 6, 2025
1 parent 6d40f12 commit 60e6483
Show file tree
Hide file tree
Showing 41 changed files with 5,147 additions and 3,986 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.18.0'
go-version: '1.22'
cache: true

- name: Build
Expand Down Expand Up @@ -180,7 +180,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.18.0'
go-version: '1.22'
cache: true

- name: Build
Expand Down Expand Up @@ -270,7 +270,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.18.0'
go-version: '1.22'
cache: true

- name: Build
Expand Down Expand Up @@ -402,7 +402,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.18.0'
go-version: '1.22'
cache: "true"

- name: Creating kind cluster
Expand Down Expand Up @@ -550,7 +550,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.18.0'
go-version: '1.22'
cache: true

- name: Build
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/nightly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.18.0'
go-version: '1.22'
cache: true

- name: Creating kind cluster
Expand Down Expand Up @@ -147,7 +147,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.18.0'
go-version: '1.22'
cache: true

- name: Creating kind cluster
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/perf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.18.0'
go-version: '1.22'
cache: true

- name: Creating kind cluster
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- run: git fetch --force --tags
- uses: actions/setup-go@v5
with:
go-version: 1.18
go-version: 1.22
cache: true
- uses: docker/login-action@v3
with:
Expand Down Expand Up @@ -46,7 +46,7 @@ jobs:
- run: git fetch --force --tags
- uses: actions/setup-go@v3
with:
go-version: 1.18
go-version: 1.22
cache: true

- name: Set up QEMU
Expand Down
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

FROM golang:1.18 AS builder
FROM golang:1.22 AS builder

ENV CGO_ENABLED=0

Expand Down
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Variables
BINARY_NAME=milvus-backup
PKG := github.com/zilliztech/milvus-backup
VERSION=$(shell git describe --tags --always)
COMMIT=$(shell git rev-parse --short HEAD)
DATE=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ')
Expand All @@ -10,10 +11,15 @@ all: gen build
# Build the binary
build:
@echo "Building binary..."
GO111MODULE=on CGO_ENABLED=0 go build -ldflags "-X main.version=$(VERSION) -X main.commit=$(COMMIT) -X main.date=$(DATE)" -o $(BINARY_NAME)
GO111MODULE=on CGO_ENABLED=0 go build -ldflags "-X version.version=$(VERSION) -X version.commit=$(COMMIT) -X version.date=$(DATE)" -o $(BINARY_NAME)

gen:
./scripts/gen_swag.sh
./scripts/gen_proto.sh

fmt:
@echo Formatting code...
@goimports -w --local $(PKG) ./
@echo Format code done

.PHONY: all build gen
4 changes: 3 additions & 1 deletion cmd/backup_yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package cmd
import (
"fmt"
"strings"

"github.com/spf13/cobra"
"github.com/zilliztech/milvus-backup/core/paramtable"
"gopkg.in/yaml.v3"

"github.com/zilliztech/milvus-backup/core/paramtable"
)

var configCmd = &cobra.Command{
Expand Down
3 changes: 2 additions & 1 deletion cmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/spf13/cobra"

"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
)
Expand All @@ -20,7 +21,7 @@ var checkCmd = &cobra.Command{
params.Init()

context := context.Background()
backupContext := core.CreateBackupContext(context, params)
backupContext := core.CreateBackupContext(context, &params)

resp := backupContext.Check(context)
fmt.Println(resp)
Expand Down
3 changes: 2 additions & 1 deletion cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"

"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
Expand Down Expand Up @@ -35,7 +36,7 @@ var createBackupCmd = &cobra.Command{
params.Init()

context := context.Background()
backupContext := core.CreateBackupContext(context, params)
backupContext := core.CreateBackupContext(context, &params)

start := time.Now().Unix()
var collectionNameArr []string
Expand Down
3 changes: 2 additions & 1 deletion cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/spf13/cobra"

"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
Expand All @@ -24,7 +25,7 @@ var deleteBackupCmd = &cobra.Command{
params.Init()

context := context.Background()
backupContext := core.CreateBackupContext(context, params)
backupContext := core.CreateBackupContext(context, &params)

resp := backupContext.DeleteBackup(context, &backuppb.DeleteBackupRequest{
BackupName: deleteBackName,
Expand Down
3 changes: 2 additions & 1 deletion cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/spf13/cobra"

"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
Expand All @@ -27,7 +28,7 @@ var getBackupCmd = &cobra.Command{
params.Init()

context := context.Background()
backupContext := core.CreateBackupContext(context, params)
backupContext := core.CreateBackupContext(context, &params)

resp := backupContext.GetBackup(context, &backuppb.GetBackupRequest{
BackupName: getBackName,
Expand Down
3 changes: 2 additions & 1 deletion cmd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/spf13/cobra"

"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
Expand All @@ -25,7 +26,7 @@ var listBackupCmd = &cobra.Command{
params.Init()

context := context.Background()
backupContext := core.CreateBackupContext(context, params)
backupContext := core.CreateBackupContext(context, &params)

backups := backupContext.ListBackups(context, &backuppb.ListBackupsRequest{
CollectionName: collectionName,
Expand Down
3 changes: 2 additions & 1 deletion cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"

"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
Expand Down Expand Up @@ -43,7 +44,7 @@ var restoreBackupCmd = &cobra.Command{
params.Init()

context := context.Background()
backupContext := core.CreateBackupContext(context, params)
backupContext := core.CreateBackupContext(context, &params)
log.Info("restore cmd input args", zap.Strings("args", args))
start := time.Now().Unix()
var collectionNameArr []string
Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var rootCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
Error(cmd, args, errors.New("unrecognized command"))
},
PersistentPreRun: func(cmd *cobra.Command, args []string){
PersistentPreRun: func(cmd *cobra.Command, args []string) {
setEnvs(yamlOverrides)
},
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"

"github.com/spf13/cobra"

"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
)
Expand All @@ -31,7 +32,7 @@ var serverCmd = &cobra.Command{
params.Init()

context := context.Background()
server, err := core.NewServer(context, params, core.Port(port))
server, err := core.NewServer(context, &params, core.Port(port))
if err != nil {
fmt.Errorf("fail to create backup server, %s", err.Error())
}
Expand Down
59 changes: 30 additions & 29 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package core
import (
"context"
"encoding/json"
"errors"
"fmt"
"path"
"sync"
"time"

gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/zilliztech/milvus-backup/core/client"
"github.com/zilliztech/milvus-backup/core/paramtable"
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/storage"
Expand Down Expand Up @@ -40,10 +39,10 @@ type BackupContext struct {
// lock to make sure only one backup is creating or restoring
mu sync.Mutex
started bool
params paramtable.BackupParams
params *paramtable.BackupParams

// milvus client
milvusClient *MilvusClient
milvusClient client.Milvus

// data storage client
milvusStorageClient storage.ChunkManager
Expand All @@ -63,32 +62,36 @@ type BackupContext struct {
bulkinsertWorkerPools map[string]*common.WorkerPool
}

func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) {
milvusEndpoint := params.MilvusCfg.Address + ":" + params.MilvusCfg.Port
log.Debug("Start Milvus client", zap.String("endpoint", milvusEndpoint))
var c gomilvus.Client
var err error
if params.MilvusCfg.AuthorizationEnabled && params.MilvusCfg.User != "" && params.MilvusCfg.Password != "" {
if params.MilvusCfg.TLSMode == 0 {
c, err = gomilvus.NewDefaultGrpcClientWithAuth(ctx, milvusEndpoint, params.MilvusCfg.User, params.MilvusCfg.Password)
} else if params.MilvusCfg.TLSMode == 1 || params.MilvusCfg.TLSMode == 2 {
c, err = gomilvus.NewDefaultGrpcClientWithTLSAuth(ctx, milvusEndpoint, params.MilvusCfg.User, params.MilvusCfg.Password)
} else {
log.Error("milvus.TLSMode is not illegal, support value 0, 1, 2")
return nil, errors.New("milvus.TLSMode is not illegal, support value 0, 1, 2")
}
} else {
c, err = gomilvus.NewGrpcClient(ctx, milvusEndpoint)
func CreateMilvusClient(ctx context.Context, params *paramtable.BackupParams) (client.Milvus, error) {
ep := params.MilvusCfg.Address + ":" + params.MilvusCfg.Port
log.Debug("Start Milvus client", zap.String("endpoint", ep))

var tlsAuth bool
switch params.MilvusCfg.TLSMode {
case 0:
tlsAuth = false
case 1, 2:
tlsAuth = true
default:
log.Error("milvus.TLSMode is not illegal, support value 0, 1, 2")
}

cfg := &client.MilvusConfig{
Address: ep,
EnableTLSAuth: tlsAuth,
Username: params.MilvusCfg.User,
Password: params.MilvusCfg.Password,
}
cli, err := client.NewMilvus(cfg)
if err != nil {
log.Error("failed to connect to milvus", zap.Error(err))
return nil, err
log.Error("failed to create milvus client", zap.Error(err))
return nil, fmt.Errorf("failed to create milvus client: %w", err)
}
return c, nil
return cli, nil
}

// Deprecated
func createStorageClient(ctx context.Context, params paramtable.BackupParams) (storage.ChunkManager, error) {
func createStorageClient(ctx context.Context, params *paramtable.BackupParams) (storage.ChunkManager, error) {
minioEndPoint := params.MinioCfg.Address + ":" + params.MinioCfg.Port
log.Debug("Start minio client",
zap.String("address", minioEndPoint),
Expand Down Expand Up @@ -128,7 +131,7 @@ func (b *BackupContext) Close() error {
return nil
}

func CreateBackupContext(ctx context.Context, params paramtable.BackupParams) *BackupContext {
func CreateBackupContext(ctx context.Context, params *paramtable.BackupParams) *BackupContext {
return &BackupContext{
ctx: ctx,
params: params,
Expand All @@ -141,16 +144,14 @@ func CreateBackupContext(ctx context.Context, params paramtable.BackupParams) *B
}
}

func (b *BackupContext) getMilvusClient() *MilvusClient {
func (b *BackupContext) getMilvusClient() client.Milvus {
if b.milvusClient == nil {
milvusClient, err := CreateMilvusClient(b.ctx, b.params)
if err != nil {
log.Error("failed to initial milvus client", zap.Error(err))
panic(err)
}
b.milvusClient = &MilvusClient{
client: milvusClient,
}
b.milvusClient = milvusClient
}
return b.milvusClient
}
Expand Down
Loading

0 comments on commit 60e6483

Please sign in to comment.