Skip to content

Commit

Permalink
Add LocalChunkManager support
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Nov 30, 2023
1 parent 623bed0 commit 5235d83
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 52 deletions.
3 changes: 2 additions & 1 deletion core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"

gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -514,7 +515,7 @@ func (b *BackupContext) Check(ctx context.Context) string {
return "Failed to connect to storage backup path " + info + err.Error()
}

CHECK_PATH := ".milvus_backup_check"
CHECK_PATH := "milvus_backup_check_" + time.Now().String()

err = b.getStorageClient().Write(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, []byte{1})
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions core/backup_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ func TestCreateBackup(t *testing.T) {
backup.CreateBackup(context, req)
}

func TestCheck(t *testing.T) {
var params paramtable.BackupParams
params.Init()
context := context.Background()
backup := CreateBackupContext(context, params)

res := backup.Check(context)
println(res)
}

func TestListBackups(t *testing.T) {
var params paramtable.BackupParams
params.Init()
Expand Down
2 changes: 2 additions & 0 deletions core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (p *MilvusConfig) initTLSMode() {
// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
const (
Local = "local"
Minio = "minio"
CloudProviderAWS = "aws"
CloudProviderGCP = "gcp"
Expand All @@ -151,6 +152,7 @@ const (
)

var supportedCloudProvider = map[string]bool{
Local: true,
Minio: true,
CloudProviderAWS: true,
CloudProviderGCP: true,
Expand Down
31 changes: 0 additions & 31 deletions core/storage/azure_chunk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/minio/minio-go/v7"
"go.uber.org/zap"
"golang.org/x/exp/mmap"
"golang.org/x/sync/errgroup"

"github.com/zilliztech/milvus-backup/internal/log"
Expand Down Expand Up @@ -227,36 +226,6 @@ func (mcm *AzureChunkManager) ReadWithPrefix(ctx context.Context, bucketName str
return objectsKeys, objectsValues, nil
}

func (mcm *AzureChunkManager) Mmap(ctx context.Context, bucketName string, filePath string) (*mmap.ReaderAt, error) {
return nil, errors.New("this method has not been implemented")
}

// ReadAt reads specific position data of minio storage if exists.
func (mcm *AzureChunkManager) ReadAt(ctx context.Context, bucketName string, filePath string, off int64, length int64) ([]byte, error) {
return nil, errors.New("this method has not been implemented")
//if off < 0 || length < 0 {
// return nil, io.EOF
//}
//
//object, err := mcm.getObject(ctx, bucketName, filePath, off, length)
//if err != nil {
// log.Warn("failed to get object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err))
// return nil, err
//}
//defer object.Close()
//
//data, err := Read(object, length)
//if err != nil {
// errResponse := minio.ToErrorResponse(err)
// if errResponse.Code == "NoSuchKey" {
// return nil, WrapErrNoSuchKey(filePath)
// }
// log.Warn("failed to read object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err))
// return nil, err
//}
//return data, nil
}

// Remove deletes an object with @key.
func (mcm *AzureChunkManager) Remove(ctx context.Context, bucketName string, filePath string) error {
err := mcm.removeObject(ctx, bucketName, filePath)
Expand Down
12 changes: 12 additions & 0 deletions core/storage/chunk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ func NewChunkManager(ctx context.Context, params paramtable.BackupParams) (Chunk
engine := params.MinioCfg.CloudProvider
if engine == "azure" {
return newAzureChunkManagerWithParams(ctx, params)
} else if engine == "local" {
return newLocalChunkManagerWithParams(ctx, params)
} else {
return newMinioChunkManagerWithParams(ctx, params)
}
Expand Down Expand Up @@ -66,3 +68,13 @@ func newAzureChunkManagerWithParams(ctx context.Context, params paramtable.Backu

return NewAzureChunkManager(ctx, c)
}

func newLocalChunkManagerWithParams(ctx context.Context, params paramtable.BackupParams) (*LocalChunkManager, error) {
c := newDefaultConfig()
c.rootPath = params.MinioCfg.RootPath
c.cloudProvider = params.MinioCfg.CloudProvider
c.storageEngine = params.MinioCfg.StorageType
c.backupRootPath = params.MinioCfg.BackupRootPath

return NewLocalChunkManager(ctx, c)
}
Loading

0 comments on commit 5235d83

Please sign in to comment.