Skip to content

Commit

Permalink
Support azure
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Oct 8, 2023
1 parent af4a02c commit 116dfc8
Show file tree
Hide file tree
Showing 16 changed files with 802 additions and 73 deletions.
10 changes: 8 additions & 2 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ milvus:
password: "Milvus"

# Related configuration of minio, which is responsible for data persistence for Milvus.
# Deprecated, use storage instead
minio:
# Object storage config of milvus
storage:
# deprecated, use type instead
cloudProvider: "minio" # remote cloud storage provider: s3, gcp, aliyun, azure
type: minio # object storage using by milvus: (local,) minio, s3, gcp, aliyun, azure

address: localhost # Address of MinIO/S3
port: 9000 # Port of MinIO/S3
accessKeyID: minioadmin # accessKeyID of MinIO/S3
accessKeyID: minioadmin # accessKeyID of MinIO/S3
secretAccessKey: minioadmin # MinIO/S3 encryption string
useSSL: false # Access to MinIO/S3 with SSL
useIAM: false
cloudProvider: "aws"
iamEndpoint: ""

bucketName: "a-bucket" # Milvus Bucket name in MinIO/S3, make it the same as your milvus instance
Expand Down
13 changes: 1 addition & 12 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,7 @@ func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (s
zap.String("address", minioEndPoint),
zap.String("bucket", params.MinioCfg.BucketName),
zap.String("backupBucket", params.MinioCfg.BackupBucketName))
minioClient, err := storage.NewMinioChunkManager(ctx,
storage.Address(minioEndPoint),
storage.AccessKeyID(params.MinioCfg.AccessKeyID),
storage.SecretAccessKeyID(params.MinioCfg.SecretAccessKey),
storage.UseSSL(params.MinioCfg.UseSSL),
storage.BucketName(params.MinioCfg.BackupBucketName),
storage.RootPath(params.MinioCfg.RootPath),
storage.CloudProvider(params.MinioCfg.CloudProvider),
storage.UseIAM(params.MinioCfg.UseIAM),
storage.IAMEndpoint(params.MinioCfg.IAMEndpoint),
storage.CreateBucket(true),
)
minioClient, err := storage.NewChunkManager(ctx, params)
return minioClient, err
}

Expand Down
2 changes: 1 addition & 1 deletion core/backup_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestListBackups(t *testing.T) {
assert.Equal(t, backupLists.GetCode(), backuppb.ResponseCode_Success)

backupListsWithCollection := backupContext.ListBackups(context, &backuppb.ListBackupsRequest{
CollectionName: "hello_milvus",
//CollectionName: "hello_milvus",
})

for _, backup := range backupListsWithCollection.GetData() {
Expand Down
20 changes: 10 additions & 10 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,9 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
if strings.Contains(err.Error(), "index not found") ||
strings.HasPrefix(err.Error(), "index doesn't exist") {
// todo
log.Warn("field has no index",
log.Info("field has no index",
zap.String("collection_name", completeCollection.Name),
zap.String("field_name", field.Name),
zap.Error(err))
zap.String("field_name", field.Name))
continue
} else {
log.Error("fail in DescribeIndex", zap.Error(err))
Expand Down Expand Up @@ -729,30 +728,31 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S
return nil
}

func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64, partitionID int64, segmentID int64, numOfRows int64) (*backuppb.SegmentBackupInfo, error) {
func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, numOfRows int64) (*backuppb.SegmentBackupInfo, error) {
segmentBackupInfo := backuppb.SegmentBackupInfo{
SegmentId: segmentID,
CollectionId: collecitonID,
CollectionId: collectionID,
PartitionId: partitionID,
NumOfRows: numOfRows,
}
var size int64 = 0
var rootPath string

if b.params.MinioCfg.RootPath != "" {
log.Debug("params.MinioCfg.RootPath", zap.String("params.MinioCfg.RootPath", b.params.MinioCfg.RootPath))
rootPath = fmt.Sprintf("%s/", b.params.MinioCfg.RootPath)
} else {
rootPath = ""
}

insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", collecitonID, partitionID, segmentID)
log.Debug("insertPath", zap.String("insertPath", insertPath))
insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", collectionID, partitionID, segmentID)
log.Debug("insertPath", zap.String("bucket", b.milvusBucketName), zap.String("insertPath", insertPath))
fieldsLogDir, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, insertPath, false)
if err != nil {
log.Error("Fail to list segment path", zap.String("insertPath", insertPath), zap.Error(err))
return &segmentBackupInfo, err
}
log.Debug("fieldsLogDir", zap.Any("fieldsLogDir", fieldsLogDir))
log.Debug("fieldsLogDir", zap.String("bucket", b.milvusBucketName), zap.Any("fieldsLogDir", fieldsLogDir))
insertLogs := make([]*backuppb.FieldBinlog, 0)
for _, fieldLogDir := range fieldsLogDir {
binlogPaths, sizes, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, fieldLogDir, false)
Expand All @@ -772,7 +772,7 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64,
})
}

deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", collecitonID, partitionID, segmentID)
deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", collectionID, partitionID, segmentID)
deltaFieldsLogDir, _, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, deltaLogPath, false)
deltaLogs := make([]*backuppb.FieldBinlog, 0)
for _, deltaFieldLogDir := range deltaFieldsLogDir {
Expand All @@ -798,7 +798,7 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64,
})
}

//statsLogPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "stats_log", collecitonID, partitionID, segmentID)
//statsLogPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "stats_log", collectionID, partitionID, segmentID)
//statsFieldsLogDir, _, _ := b.storageClient.ListWithPrefix(ctx, b.milvusBucketName, statsLogPath, false)
//statsLogs := make([]*backuppb.FieldBinlog, 0)
//for _, statsFieldLogDir := range statsFieldsLogDir {
Expand Down
2 changes: 2 additions & 0 deletions core/paramtable/base_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
DefaultMinioBackupBucketName = "a-bucket"
DefaultMinioBackupRootPath = "backup"

DefaultStorageType = "minio"

DefaultMilvusAddress = "localhost"
DefaultMilvusPort = "19530"
DefaultMilvusAuthorizationEnabled = "false"
Expand Down
66 changes: 45 additions & 21 deletions core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,19 @@ func (p *MilvusConfig) initTLSMode() {
// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
const (
Minio = "minio"
CloudProviderAWS = "aws"
CloudProviderGCP = "gcp"
CloudProviderAliyun = "ali"
CloudProviderAzure = "azure"
)

var supportedCloudProvider = map[string]bool{
Minio: true,
CloudProviderAWS: true,
CloudProviderGCP: true,
CloudProviderAliyun: true,
CloudProviderAzure: true,
}

type MinioConfig struct {
Expand All @@ -141,11 +145,14 @@ type MinioConfig struct {

BackupBucketName string
BackupRootPath string

StorageType string
}

func (p *MinioConfig) init(base *BaseTable) {
p.Base = base

p.initStorageType()
p.initAddress()
p.initPort()
p.initAccessKeyID()
Expand All @@ -154,82 +161,99 @@ func (p *MinioConfig) init(base *BaseTable) {
p.initBucketName()
p.initRootPath()
p.initUseIAM()
p.initCloudProvider()
//p.initCloudProvider()
p.initIAMEndpoint()

p.initBackupBucketName()
p.initBackupRootPath()
}

func (p *MinioConfig) initAddress() {
endpoint := p.Base.LoadWithDefault("minio.address", DefaultMinioAddress)
endpoint := p.Base.LoadWithDefault("storage.address",
p.Base.LoadWithDefault("minio.address", DefaultMinioAddress))
p.Address = endpoint
}

func (p *MinioConfig) initPort() {
port := p.Base.LoadWithDefault("minio.port", DefaultMinioPort)
port := p.Base.LoadWithDefault("storage.port",
p.Base.LoadWithDefault("minio.port", DefaultMinioPort))
p.Port = port
}

func (p *MinioConfig) initAccessKeyID() {
keyID, err := p.Base.Load("minio.accessKeyID")
if err != nil {
panic(err)
}
keyID := p.Base.LoadWithDefault("storage.accessKeyID",
p.Base.LoadWithDefault("minio.accessKeyID", DefaultMinioAccessKey))
p.AccessKeyID = keyID
}

func (p *MinioConfig) initSecretAccessKey() {
key := p.Base.LoadWithDefault("minio.secretAccessKey", DefaultMinioSecretAccessKey)
key := p.Base.LoadWithDefault("storage.secretAccessKey",
p.Base.LoadWithDefault("minio.secretAccessKey", DefaultMinioSecretAccessKey))
p.SecretAccessKey = key
}

func (p *MinioConfig) initUseSSL() {
usessl := p.Base.LoadWithDefault("minio.useSSL", DefaultMinioUseSSL)
usessl := p.Base.LoadWithDefault("storage.useSSL",
p.Base.LoadWithDefault("minio.useSSL", DefaultMinioUseSSL))
p.UseSSL, _ = strconv.ParseBool(usessl)
}

func (p *MinioConfig) initBucketName() {
bucketName := p.Base.LoadWithDefault("minio.bucketName", DefaultMinioBucketName)
bucketName := p.Base.LoadWithDefault("storage.bucketName",
p.Base.LoadWithDefault("minio.bucketName", DefaultMinioBucketName))
p.BucketName = bucketName
}

func (p *MinioConfig) initRootPath() {
rootPath := p.Base.LoadWithDefault("minio.rootPath", DefaultMinioRootPath)
rootPath := p.Base.LoadWithDefault("storage.rootPath",
p.Base.LoadWithDefault("minio.rootPath", DefaultMinioRootPath))
p.RootPath = rootPath
}

func (p *MinioConfig) initUseIAM() {
useIAM := p.Base.LoadWithDefault("minio.useIAM", DefaultMinioUseIAM)
useIAM := p.Base.LoadWithDefault("storage.useIAM",
p.Base.LoadWithDefault("minio.useIAM", DefaultMinioUseIAM))
var err error
p.UseIAM, err = strconv.ParseBool(useIAM)
if err != nil {
panic("parse bool useIAM:" + err.Error())
}
}

func (p *MinioConfig) initCloudProvider() {
p.CloudProvider = p.Base.LoadWithDefault("minio.cloudProvider", DefaultMinioCloudProvider)
if !supportedCloudProvider[p.CloudProvider] {
panic("unsupported cloudProvider:" + p.CloudProvider)
}
}
//func (p *MinioConfig) initCloudProvider() {
// p.CloudProvider = p.Base.LoadWithDefault("minio.cloudProvider", DefaultMinioCloudProvider)
// if !supportedCloudProvider[p.CloudProvider] {
// panic("unsupported cloudProvider:" + p.CloudProvider)
// }
//}

func (p *MinioConfig) initIAMEndpoint() {
iamEndpoint := p.Base.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint)
iamEndpoint := p.Base.LoadWithDefault("storage.iamEndpoint",
p.Base.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint))
p.IAMEndpoint = iamEndpoint
}

func (p *MinioConfig) initBackupBucketName() {
bucketName := p.Base.LoadWithDefault("minio.backupBucketName", DefaultMinioBackupBucketName)
bucketName := p.Base.LoadWithDefault("storage.backupBucketName",
p.Base.LoadWithDefault("minio.backupBucketName", DefaultMinioBackupBucketName))
p.BackupBucketName = bucketName
}

func (p *MinioConfig) initBackupRootPath() {
rootPath := p.Base.LoadWithDefault("minio.backupRootPath", DefaultMinioBackupRootPath)
rootPath := p.Base.LoadWithDefault("storage.backupRootPath",
p.Base.LoadWithDefault("minio.backupRootPath", DefaultMinioBackupRootPath))
p.BackupRootPath = rootPath
}

func (p *MinioConfig) initStorageType() {
engine := p.Base.LoadWithDefault("storage.type",
p.Base.LoadWithDefault("minio.type", DefaultStorageType))
if !supportedCloudProvider[engine] {
panic("unsupported storage type:" + engine)
}
p.StorageType = engine
}

type HTTPConfig struct {
Base *BaseTable

Expand Down
1 change: 1 addition & 0 deletions core/paramtable/params_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package paramtable
Loading

0 comments on commit 116dfc8

Please sign in to comment.