From 8d169ed65e1a6dbd6d13bc75c13de5453b19ff49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cgifi-siby=E2=80=9D?= Date: Mon, 6 Jan 2025 11:00:41 -0800 Subject: [PATCH] =?UTF-8?q?To=20address=20all=20the=20issues=20relaated=20?= =?UTF-8?q?to=20azure=20storage=20Signed-off-by:=20=E2=80=9Cgifi-siby?= =?UTF-8?q?=E2=80=9D=20?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/storage/azure_chunk_manager.go | 63 +++++++++++++++++-------- core/storage/azure_object_storage.go | 70 ++++++++++++++++------------ 2 files changed, 85 insertions(+), 48 deletions(-) diff --git a/core/storage/azure_chunk_manager.go b/core/storage/azure_chunk_manager.go index 746ee20..39f6304 100644 --- a/core/storage/azure_chunk_manager.go +++ b/core/storage/azure_chunk_manager.go @@ -277,30 +277,55 @@ func (mcm *AzureChunkManager) RemoveWithPrefix(ctx context.Context, bucketName s if err != nil { return err } - removeKeys := make([]string, 0) + // Group objects by their depth (number of / in the key) + groupedByLevel := make(map[int][]string) + var maxLevel int for key := range objects { - removeKeys = append(removeKeys, key) + level := strings.Count(key, "/") + groupedByLevel[level] = append(groupedByLevel[level], key) + if level > maxLevel { + maxLevel = level + } } - i := 0 - maxGoroutine := 10 - for i < len(removeKeys) { - runningGroup, groupCtx := errgroup.WithContext(ctx) - for j := 0; j < maxGoroutine && i < len(removeKeys); j++ { - key := removeKeys[i] - runningGroup.Go(func() error { - err := mcm.removeObject(groupCtx, bucketName, key) - if err != nil { - log.Warn("failed to remove object", zap.String("bucket", bucketName), zap.String("path", key), zap.Error(err)) - return err - } - return nil - }) - i++ + for level := maxLevel; level >= 0; level-- { + // Get the objects at this level + keysAtLevel, exists := groupedByLevel[level] + if !exists || len(keysAtLevel) == 0 { + continue } - if err := runningGroup.Wait(); err != nil { - return err + + // Dynamically adjust maxGoroutines based on the number of objects at this level + maxGoroutines := 10 + if len(keysAtLevel) < maxGoroutines { + maxGoroutines = len(keysAtLevel) + } + i := 0 + for i < len(keysAtLevel) { + runningGroup, groupCtx := errgroup.WithContext(context.Background()) + for j := 0; j < maxGoroutines && i < len(keysAtLevel); j++ { + key := keysAtLevel[i] + runningGroup.Go(func(key string) func() error { + return func() error { + err := mcm.removeObject(groupCtx, bucketName, key) + if err != nil { + log.Warn("failed to remove object", zap.String("bucket", bucketName), zap.String("path", key), zap.Error(err)) + return err + } + return nil + } + }(key)) + i++ + } + if err := runningGroup.Wait(); err != nil { + return err + } } } + err = mcm.removeObject(ctx, bucketName, strings.TrimSuffix(prefix, "/")) + if err != nil { + log.Warn("failed to remove object", zap.String("bucket", bucketName), zap.String("path", prefix), zap.Error(err)) + return err + } return nil } diff --git a/core/storage/azure_object_storage.go b/core/storage/azure_object_storage.go index 9f62417..c12a2f1 100644 --- a/core/storage/azure_object_storage.go +++ b/core/storage/azure_object_storage.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "strings" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" @@ -203,17 +204,12 @@ func (aos *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, obj func (aos *AzureObjectStorage) CopyObject(ctx context.Context, fromBucketName, toBucketName, fromPath, toPath string) error { var blobCli *blockblob.Client var fromPathUrl string - if aos.clients[fromBucketName].accessKeyID == aos.clients[toBucketName].accessKeyID { - fromPathUrl = fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath) - blobCli = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath) - } else { - srcSAS, err := aos.getSAS(fromBucketName) - if err != nil { - return err - } - fromPathUrl = fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath, srcSAS.Encode()) - blobCli = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath) + srcSAS, err := aos.getSAS(fromBucketName) + if err != nil { + return err } + fromPathUrl = fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath, srcSAS.Encode()) + blobCli = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath) // we need to abort the previous copy operation before copy from url abortErr := func() error { @@ -229,36 +225,52 @@ func (aos *AzureObjectStorage) CopyObject(ctx context.Context, fromBucketName, t return nil }() - if _, err := blobCli.CopyFromURL(ctx, fromPathUrl, nil); err != nil { - return fmt.Errorf("storage: azure copy from url %w abort previous %w", err, abortErr) + // Check if fromPath is a folder or a file + isFile, err := aos.isFile(ctx, fromBucketName, fromPath) + if err != nil { + return err + } + if isFile { + if _, err := blobCli.CopyFromURL(ctx, fromPathUrl, nil); err != nil { + return fmt.Errorf("storage: azure copy from url %w abort previous %w", err, abortErr) + } } - return nil } func (aos *AzureObjectStorage) getSAS(bucket string) (*sas.QueryParameters, error) { - srcSvcCli := aos.clients[bucket].client - // Set current and past time and create key - now := time.Now().UTC().Add(-10 * time.Second) - expiry := now.Add(48 * time.Hour) - info := service.KeyInfo{ - Start: to.Ptr(now.UTC().Format(sas.TimeFormat)), - Expiry: to.Ptr(expiry.UTC().Format(sas.TimeFormat)), - } - udc, err := srcSvcCli.GetUserDelegationCredential(context.Background(), info, nil) + credential, err := azblob.NewSharedKeyCredential(aos.clients[bucket].accessKeyID, aos.clients[bucket].secretAccessKeyID) if err != nil { return nil, err } - // Create Blob Signature Values with desired permissions and sign with user delegation credential - sasQueryParams, err := sas.BlobSignatureValues{ + sasQueryParams, err := sas.AccountSignatureValues{ Protocol: sas.ProtocolHTTPS, - StartTime: time.Now().UTC().Add(time.Second * -10), - ExpiryTime: time.Now().UTC().Add(time.Duration(sasSignMinute * time.Minute)), - Permissions: to.Ptr(sas.ContainerPermissions{Read: true, List: true}).String(), - ContainerName: bucket, - }.SignWithUserDelegation(udc) + ExpiryTime: time.Now().UTC().Add(48 * time.Hour), // 48-hours before expiration, + Permissions: to.Ptr(sas.AccountPermissions{Read: true, List: true}).String(), + ResourceTypes: to.Ptr(sas.AccountResourceTypes{Container: true, Object: true}).String(), + }.SignWithSharedKey(credential) if err != nil { return nil, err } return &sasQueryParams, nil } + +func (aos *AzureObjectStorage) isFile(ctx context.Context, bucketName, path string) (bool, error) { + // Prefix ends with a slash for directory-like behavior + directoryPrefix := path + if !strings.HasSuffix(directoryPrefix, "/") { + directoryPrefix += "/" + } + + objects, err := aos.ListObjects(ctx, bucketName, directoryPrefix, false) + if err != nil { + return false, err + } + // If ListObjects called with end file name, it returns file name itself + if len(objects) == 1 { + if _, found := objects[path]; found { + return true, nil + } + } + return false, nil +}