Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LocalChunkManager support #246

Merged
merged 1 commit into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"

gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -514,7 +515,7 @@ func (b *BackupContext) Check(ctx context.Context) string {
return "Failed to connect to storage backup path " + info + err.Error()
}

CHECK_PATH := ".milvus_backup_check"
CHECK_PATH := "milvus_backup_check_" + time.Now().String()

err = b.getStorageClient().Write(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, []byte{1})
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions core/backup_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ func TestCreateBackup(t *testing.T) {
backup.CreateBackup(context, req)
}

func TestCheck(t *testing.T) {
var params paramtable.BackupParams
params.Init()
context := context.Background()
backup := CreateBackupContext(context, params)

res := backup.Check(context)
println(res)
}

func TestListBackups(t *testing.T) {
var params paramtable.BackupParams
params.Init()
Expand Down
2 changes: 2 additions & 0 deletions core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (p *MilvusConfig) initTLSMode() {
// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
const (
Local = "local"
Minio = "minio"
CloudProviderAWS = "aws"
CloudProviderGCP = "gcp"
Expand All @@ -151,6 +152,7 @@ const (
)

var supportedCloudProvider = map[string]bool{
Local: true,
Minio: true,
CloudProviderAWS: true,
CloudProviderGCP: true,
Expand Down
31 changes: 0 additions & 31 deletions core/storage/azure_chunk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/minio/minio-go/v7"
"go.uber.org/zap"
"golang.org/x/exp/mmap"
"golang.org/x/sync/errgroup"

"github.com/zilliztech/milvus-backup/internal/log"
Expand Down Expand Up @@ -227,36 +226,6 @@ func (mcm *AzureChunkManager) ReadWithPrefix(ctx context.Context, bucketName str
return objectsKeys, objectsValues, nil
}

func (mcm *AzureChunkManager) Mmap(ctx context.Context, bucketName string, filePath string) (*mmap.ReaderAt, error) {
return nil, errors.New("this method has not been implemented")
}

// ReadAt reads specific position data of minio storage if exists.
func (mcm *AzureChunkManager) ReadAt(ctx context.Context, bucketName string, filePath string, off int64, length int64) ([]byte, error) {
return nil, errors.New("this method has not been implemented")
//if off < 0 || length < 0 {
// return nil, io.EOF
//}
//
//object, err := mcm.getObject(ctx, bucketName, filePath, off, length)
//if err != nil {
// log.Warn("failed to get object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err))
// return nil, err
//}
//defer object.Close()
//
//data, err := Read(object, length)
//if err != nil {
// errResponse := minio.ToErrorResponse(err)
// if errResponse.Code == "NoSuchKey" {
// return nil, WrapErrNoSuchKey(filePath)
// }
// log.Warn("failed to read object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err))
// return nil, err
//}
//return data, nil
}

// Remove deletes an object with @key.
func (mcm *AzureChunkManager) Remove(ctx context.Context, bucketName string, filePath string) error {
err := mcm.removeObject(ctx, bucketName, filePath)
Expand Down
12 changes: 12 additions & 0 deletions core/storage/chunk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ func NewChunkManager(ctx context.Context, params paramtable.BackupParams) (Chunk
engine := params.MinioCfg.CloudProvider
if engine == "azure" {
return newAzureChunkManagerWithParams(ctx, params)
} else if engine == "local" {
return newLocalChunkManagerWithParams(ctx, params)
} else {
return newMinioChunkManagerWithParams(ctx, params)
}
Expand Down Expand Up @@ -66,3 +68,13 @@ func newAzureChunkManagerWithParams(ctx context.Context, params paramtable.Backu

return NewAzureChunkManager(ctx, c)
}

func newLocalChunkManagerWithParams(ctx context.Context, params paramtable.BackupParams) (*LocalChunkManager, error) {
c := newDefaultConfig()
c.rootPath = params.MinioCfg.RootPath
c.cloudProvider = params.MinioCfg.CloudProvider
c.storageEngine = params.MinioCfg.StorageType
c.backupRootPath = params.MinioCfg.BackupRootPath

return NewLocalChunkManager(ctx, c)
}
Loading
Loading