From 22cf78ef5e13e729e900de18cb7c3d9e669d5d99 Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 3 Jan 2024 16:26:45 +0300 Subject: [PATCH 1/7] add r2 driver and tests --- cmd/cmd.go | 1 + drivers/r2/driver.go | 1254 +++++++++++++++++++++++++++ drivers/r2/driver_test.go | 129 +++ go.mod | 23 +- go.sum | 46 +- interfaces/interfaces.go | 15 + interfaces/mocks/mock_interfaces.go | 284 ++++++ 7 files changed, 1748 insertions(+), 4 deletions(-) create mode 100644 drivers/r2/driver.go create mode 100644 drivers/r2/driver_test.go diff --git a/cmd/cmd.go b/cmd/cmd.go index 0cd0f7f..5eb44c6 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -25,6 +25,7 @@ import ( // then init() the custom drivers _ "github.com/forta-network/disco/drivers/ipfs" + _ "github.com/forta-network/disco/drivers/r2" "github.com/forta-network/disco/config" "github.com/forta-network/disco/proxy" diff --git a/drivers/r2/driver.go b/drivers/r2/driver.go new file mode 100644 index 0000000..7bf027a --- /dev/null +++ b/drivers/r2/driver.go @@ -0,0 +1,1254 @@ +package r2 + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "math" + "net/http" + "path/filepath" + "reflect" + "sort" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/forta-network/disco/interfaces" + + dcontext "github.com/distribution/distribution/v3/context" + storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" + "github.com/distribution/distribution/v3/registry/storage/driver/base" + "github.com/distribution/distribution/v3/registry/storage/driver/factory" +) + +const driverName = "r2" + +// minChunkSize defines the minimum multipart upload chunk size +// R2 API requires multipart upload chunks to be at least 5MB +const minChunkSize = 5 << 20 + +// maxChunkSize defines the maximum multipart upload chunk size allowed by R2. +const maxChunkSize = 5 << 30 + +const defaultChunkSize = 2 * minChunkSize + +const ( + // defaultMultipartCopyChunkSize defines the default chunk size for all + // but the last Upload Part - Copy operation of a multipart copy. + // Empirically, 32 MB is optimal. + defaultMultipartCopyChunkSize = 32 << 20 + + // defaultMultipartCopyMaxConcurrency defines the default maximum number + // of concurrent Upload Part - Copy operations for a multipart copy. + defaultMultipartCopyMaxConcurrency = 100 + + // defaultMultipartCopyThresholdSize defines the default object size + // above which multipart copy will be used. (PUT Object - Copy is used + // for objects at or below this size.) Empirically, 32 MB is optimal. + defaultMultipartCopyThresholdSize = 32 << 20 +) + +// listMax is the largest amount of objects you can request from R2 in a list call +const listMax = 1000 + +// DriverParameters A struct that encapsulates all of the driver parameters after all values have been set +type DriverParameters struct { + AccessKey string + SecretKey string + Bucket string + Region string + RegionEndpoint string + ForcePathStyle bool + Secure bool + SkipVerify bool + ChunkSize int64 + MultipartCopyChunkSize int64 + MultipartCopyMaxConcurrency int64 + MultipartCopyThresholdSize int64 + RootDirectory string +} + +func init() { + factory.Register(driverName, &driverFactory{}) +} + +// driverFactory implements the factory.StorageDriverFactory interface +type driverFactory struct{} + +func (factory *driverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { + return FromParameters(parameters) +} + +type driver struct { + R2 interfaces.R2Client + Bucket string + ChunkSize int64 + Encrypt bool + MultipartCopyChunkSize int64 + MultipartCopyMaxConcurrency int64 + MultipartCopyThresholdSize int64 + MultipartCombineSmallPart bool + RootDirectory string +} + +type baseEmbed struct { + base.Base +} + +// Driver is a storagedriver.StorageDriver implementation backed by Cloudflare's R2 +// Objects are stored at absolute keys in the provided bucket. +type Driver struct { + baseEmbed +} + +// FromParameters constructs a new Driver with a given parameters map +// Required parameters: +// - accesskey +// - secretkey +// - region +// - bucket +// - regionendpoint +func FromParameters(parameters map[string]interface{}) (*Driver, error) { + // Providing no values for these is valid in case the user is authenticating + // with an IAM on an ec2 instance (in which case the instance credentials will + // be summoned when GetAuth is called) + accessKey := parameters["accesskey"] + if accessKey == nil { + accessKey = "" + } + secretKey := parameters["secretkey"] + if secretKey == nil { + secretKey = "" + } + + regionEndpoint := parameters["regionendpoint"] + if regionEndpoint == nil { + regionEndpoint = "" + } + + forcePathStyleBool := true + forcePathStyle := parameters["forcepathstyle"] + switch forcePathStyle := forcePathStyle.(type) { + case string: + b, err := strconv.ParseBool(forcePathStyle) + if err != nil { + return nil, fmt.Errorf("the forcePathStyle parameter should be a boolean") + } + forcePathStyleBool = b + case bool: + forcePathStyleBool = forcePathStyle + case nil: + // do nothing + default: + return nil, fmt.Errorf("the forcePathStyle parameter should be a boolean") + } + + regionName := parameters["region"] + if regionName == nil || fmt.Sprint(regionName) == "" { + return nil, fmt.Errorf("no region parameter provided") + } + region := fmt.Sprint(regionName) + + bucket := parameters["bucket"] + if bucket == nil || fmt.Sprint(bucket) == "" { + return nil, fmt.Errorf("no bucket parameter provided") + } + + secureBool := true + secure := parameters["secure"] + switch secure := secure.(type) { + case string: + b, err := strconv.ParseBool(secure) + if err != nil { + return nil, fmt.Errorf("the secure parameter should be a boolean") + } + secureBool = b + case bool: + secureBool = secure + case nil: + // do nothing + default: + return nil, fmt.Errorf("the secure parameter should be a boolean") + } + + skipVerifyBool := false + skipVerify := parameters["skipverify"] + switch skipVerify := skipVerify.(type) { + case string: + b, err := strconv.ParseBool(skipVerify) + if err != nil { + return nil, fmt.Errorf("the skipVerify parameter should be a boolean") + } + skipVerifyBool = b + case bool: + skipVerifyBool = skipVerify + case nil: + // do nothing + default: + return nil, fmt.Errorf("the skipVerify parameter should be a boolean") + } + + keyID := parameters["keyid"] + if keyID == nil { + keyID = "" + } + + chunkSize, err := getParameterAsInt64(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize) + if err != nil { + return nil, err + } + + multipartCopyChunkSize, err := getParameterAsInt64(parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize) + if err != nil { + return nil, err + } + + multipartCopyMaxConcurrency, err := getParameterAsInt64(parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64) + if err != nil { + return nil, err + } + + multipartCopyThresholdSize, err := getParameterAsInt64(parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize) + if err != nil { + return nil, err + } + + rootDirectory := parameters["rootdirectory"] + if rootDirectory == nil { + rootDirectory = "" + } + + userAgent := parameters["useragent"] + if userAgent == nil { + userAgent = "" + } + + params := DriverParameters{ + AccessKey: fmt.Sprint(accessKey), + SecretKey: fmt.Sprint(secretKey), + Bucket: fmt.Sprint(bucket), + Region: region, + RegionEndpoint: fmt.Sprint(regionEndpoint), + ForcePathStyle: forcePathStyleBool, + Secure: secureBool, + SkipVerify: skipVerifyBool, + ChunkSize: chunkSize, + MultipartCopyChunkSize: multipartCopyChunkSize, + MultipartCopyMaxConcurrency: multipartCopyMaxConcurrency, + MultipartCopyThresholdSize: multipartCopyThresholdSize, + RootDirectory: fmt.Sprint(rootDirectory), + } + + return New(params) +} + +// getParameterAsInt64 converts parameters[name] to an int64 value (using +// defaultt if nil), verifies it is no smaller than min, and returns it. +func getParameterAsInt64(parameters map[string]interface{}, name string, defaultt int64, min int64, max int64) (int64, error) { + rv := defaultt + param := parameters[name] + switch v := param.(type) { + case string: + vv, err := strconv.ParseInt(v, 0, 64) + if err != nil { + return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, param) + } + rv = vv + case int64: + rv = v + case int, uint, int32, uint32, uint64: + rv = reflect.ValueOf(v).Convert(reflect.TypeOf(rv)).Int() + case nil: + // do nothing + default: + return 0, fmt.Errorf("invalid value for %s: %#v", name, param) + } + + if rv < min || rv > max { + return 0, fmt.Errorf("the %s %#v parameter should be a number between %d and %d (inclusive)", name, rv, min, max) + } + + return rv, nil +} + +func New(params DriverParameters) (*Driver, error) { + r2Resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + return aws.Endpoint{ + URL: params.RegionEndpoint, + }, nil + }) + + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithEndpointResolverWithOptions(r2Resolver), + config.WithRegion("auto"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(params.AccessKey, params.SecretKey, "")), + ) + if err != nil { + return nil, err + } + + r2Client := s3.NewFromConfig(cfg) + + d := &driver{ + R2: r2Client, + Bucket: params.Bucket, + ChunkSize: params.ChunkSize, + MultipartCopyChunkSize: params.MultipartCopyChunkSize, + MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency, + MultipartCopyThresholdSize: params.MultipartCopyThresholdSize, + MultipartCombineSmallPart: false, + RootDirectory: params.RootDirectory, + } + + return &Driver{ + baseEmbed: baseEmbed{ + Base: base.Base{ + StorageDriver: d, + }, + }, + }, nil +} + +// New constructs a new Driver with the given AWS credentials, region, encryption flag, and +// bucketName +func newFromClient(client interfaces.R2Client, params DriverParameters) (*Driver, error) { + d := &driver{ + R2: client, + Bucket: params.Bucket, + ChunkSize: params.ChunkSize, + MultipartCopyChunkSize: params.MultipartCopyChunkSize, + MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency, + MultipartCopyThresholdSize: params.MultipartCopyThresholdSize, + MultipartCombineSmallPart: false, + RootDirectory: params.RootDirectory, + } + return &Driver{ + baseEmbed: baseEmbed{ + Base: base.Base{ + StorageDriver: d, + }, + }, + }, nil +} + +// Implement the storagedriver.StorageDriver interface + +func (d *driver) Name() string { + return driverName +} + +// GetContent retrieves the content stored at "path" as a []byte. +func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { + reader, err := d.Reader(ctx, path, 0) + if err != nil { + return nil, err + } + return io.ReadAll(reader) +} + +// PutContent stores the []byte content at a location designated by "path". +func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { + _, err := d.R2.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + ContentType: d.getContentType(), + Body: bytes.NewReader(contents), + }) + return parseError(path, err) +} + +// Reader retrieves an io.ReadCloser for the content stored at "path" with a +// given byte offset. +func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { + resp, err := d.R2.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + Range: aws.String("bytes=" + strconv.FormatInt(offset, 10) + "-"), + }) + if err != nil { + if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "InvalidRange" { + return io.NopCloser(bytes.NewReader(nil)), nil + } + + return nil, parseError(path, err) + } + return resp.Body, nil +} + +// Writer returns a FileWriter which will store the content written to it +// at the location designated by "path" after the call to Commit. +func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (storagedriver.FileWriter, error) { + key := d.s3Path(path) + if !appendParam { + // TODO (brianbland): cancel other uploads at this path + resp, err := d.R2.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(key), + ContentType: d.getContentType(), + }) + if err != nil { + return nil, err + } + return d.newWriter(key, *resp.UploadId, nil), nil + } + + listMultipartUploadsInput := &s3.ListMultipartUploadsInput{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(key), + } + for { + resp, err := d.R2.ListMultipartUploads(ctx, listMultipartUploadsInput) + if err != nil { + return nil, parseError(path, err) + } + + // resp.Uploads can only be empty on the first call + // if there were no more results to return after the first call, resp.IsTruncated would have been false + // and the loop would be exited without recalling ListMultipartUploads + if len(resp.Uploads) == 0 { + break + } + + var allParts []types.Part + for _, multi := range resp.Uploads { + if key != *multi.Key { + continue + } + + partsList, err := d.R2.ListParts(ctx, &s3.ListPartsInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(key), + UploadId: multi.UploadId, + }) + if err != nil { + return nil, parseError(path, err) + } + allParts = append(allParts, partsList.Parts...) + for *partsList.IsTruncated { + partsList, err = d.R2.ListParts(ctx, &s3.ListPartsInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(key), + UploadId: multi.UploadId, + PartNumberMarker: partsList.NextPartNumberMarker, + }) + if err != nil { + return nil, parseError(path, err) + } + allParts = append(allParts, partsList.Parts...) + } + return d.newWriter(key, *multi.UploadId, allParts), nil + } + + // resp.NextUploadIdMarker must have at least one element or we would have returned not found + listMultipartUploadsInput.UploadIdMarker = resp.NextUploadIdMarker + + // from the s3 api docs, IsTruncated "specifies whether (true) or not (false) all of the results were returned" + // if everything has been returned, break + if resp.IsTruncated == nil || !*resp.IsTruncated { + break + } + } + return nil, storagedriver.PathNotFoundError{Path: path} +} + +// Stat retrieves the FileInfo for the given path, including the current size +// in bytes and the creation time. +func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { + resp, err := d.R2.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(d.s3Path(path)), + MaxKeys: aws.Int32(1), + }) + if err != nil { + return nil, err + } + + fi := storagedriver.FileInfoFields{ + Path: path, + } + + if len(resp.Contents) == 1 { + if *resp.Contents[0].Key != d.s3Path(path) { + fi.IsDir = true + } else { + fi.IsDir = false + fi.Size = *resp.Contents[0].Size + fi.ModTime = *resp.Contents[0].LastModified + } + } else if len(resp.CommonPrefixes) == 1 { + fi.IsDir = true + } else { + return nil, storagedriver.PathNotFoundError{Path: path} + } + + return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil +} + +// List returns a list of the objects that are direct descendants of the given path. +func (d *driver) List(ctx context.Context, opath string) ([]string, error) { + path := opath + if path != "/" && path[len(path)-1] != '/' { + path = path + "/" + } + + // This is to cover for the cases when the rootDirectory of the driver is either "" or "/". + // In those cases, there is no root prefix to replace and we must actually add a "/" to all + // results in order to keep them as valid paths as recognized by storagedriver.PathRegexp + prefix := "" + if d.s3Path("") == "" { + prefix = "/" + } + + resp, err := d.R2.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(d.s3Path(path)), + Delimiter: aws.String("/"), + MaxKeys: aws.Int32(listMax), + }) + if err != nil { + return nil, parseError(opath, err) + } + + files := []string{} + directories := []string{} + + for { + for _, key := range resp.Contents { + files = append(files, strings.Replace(*key.Key, d.s3Path(""), prefix, 1)) + } + + for _, commonPrefix := range resp.CommonPrefixes { + commonPrefix := *commonPrefix.Prefix + directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.s3Path(""), prefix, 1)) + } + + if *resp.IsTruncated { + resp, err = d.R2.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(d.s3Path(path)), + Delimiter: aws.String("/"), + MaxKeys: aws.Int32(listMax), + ContinuationToken: resp.NextContinuationToken, + }) + if err != nil { + return nil, err + } + } else { + break + } + } + + if opath != "/" { + if len(files) == 0 && len(directories) == 0 { + // Treat empty response as missing directory, since we don't actually + // have directories in s3. + return nil, storagedriver.PathNotFoundError{Path: opath} + } + } + + return append(files, directories...), nil +} + +// Move moves an object stored at sourcePath to destPath, removing the original +// object. +func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { + /* This is terrible, but aws doesn't have an actual move. */ + if err := d.copy(ctx, sourcePath, destPath); err != nil { + return err + } + return d.Delete(ctx, sourcePath) +} + +// copy copies an object stored at sourcePath to destPath. +func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) error { + // R2 can copy objects up to 5 GB in size with a single PUT Object - Copy + // operation. For larger objects, the multipart upload API must be used. + // + // Empirically, multipart copy is fastest with 32 MB parts and is faster + // than PUT Object - Copy for objects larger than 32 MB. + + fileInfo, err := d.Stat(ctx, sourcePath) + if err != nil { + return parseError(sourcePath, err) + } + + if fileInfo.Size() <= d.MultipartCopyThresholdSize { + _, err := d.R2.CopyObject(ctx, &s3.CopyObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(destPath)), + ContentType: d.getContentType(), + CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)), + }) + if err != nil { + return parseError(sourcePath, err) + } + return nil + } + + createResp, err := d.R2.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(destPath)), + ContentType: d.getContentType(), + }) + if err != nil { + return err + } + + numParts := (fileInfo.Size() + d.MultipartCopyChunkSize - 1) / d.MultipartCopyChunkSize + completedParts := make([]types.CompletedPart, numParts) + errChan := make(chan error, numParts) + limiter := make(chan struct{}, d.MultipartCopyMaxConcurrency) + + for i := range completedParts { + i := int64(i) + go func() { + limiter <- struct{}{} + firstByte := i * d.MultipartCopyChunkSize + lastByte := firstByte + d.MultipartCopyChunkSize - 1 + if lastByte >= fileInfo.Size() { + lastByte = fileInfo.Size() - 1 + } + uploadResp, err := d.R2.UploadPartCopy(ctx, &s3.UploadPartCopyInput{ + Bucket: aws.String(d.Bucket), + CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)), + Key: aws.String(d.s3Path(destPath)), + PartNumber: aws.Int32(int32(i + 1)), + UploadId: createResp.UploadId, + CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", firstByte, lastByte)), + }) + if err == nil { + completedParts[i] = types.CompletedPart{ + ETag: uploadResp.CopyPartResult.ETag, + PartNumber: aws.Int32(int32(i + 1)), + } + } + errChan <- err + <-limiter + }() + } + + for range completedParts { + err := <-errChan + if err != nil { + return err + } + } + + _, err = d.R2.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(destPath)), + UploadId: createResp.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts}, + }) + return err +} + +// Delete recursively deletes all objects stored at "path" and its subpaths. +// We must be careful since R2 does not guarantee read after delete consistency +func (d *driver) Delete(ctx context.Context, path string) error { + s3Objects := make([]types.ObjectIdentifier, 0, listMax) + s3Path := d.s3Path(path) + listObjectsInput := &s3.ListObjectsV2Input{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(s3Path), + } + + for { + // list all the objects + resp, err := d.R2.ListObjectsV2(ctx, listObjectsInput) + + // resp.Contents can only be empty on the first call + // if there were no more results to return after the first call, resp.IsTruncated would have been false + // and the loop would exit without recalling ListObjects + if err != nil || len(resp.Contents) == 0 { + return storagedriver.PathNotFoundError{Path: path} + } + + for _, key := range resp.Contents { + // Skip if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab"). + if len(*key.Key) > len(s3Path) && (*key.Key)[len(s3Path)] != '/' { + continue + } + s3Objects = append(s3Objects, types.ObjectIdentifier{ + Key: key.Key, + }) + } + + // Delete objects only if the list is not empty, otherwise R2 API returns a cryptic error + if len(s3Objects) > 0 { + // Kept for sanity, might apply to Cloudflare R2 as well. + // NOTE: according to AWS docs https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html + // by default the response returns up to 1,000 key names. The response _might_ contain fewer keys but it will never contain more. + // 10000 keys is coincidentally (?) also the max number of keys that can be deleted in a single Delete operation, so we'll just smack + // Delete here straight away and reset the object slice when successful. + resp, err := d.R2.DeleteObjects(ctx, &s3.DeleteObjectsInput{ + Bucket: aws.String(d.Bucket), + Delete: &types.Delete{ + Objects: s3Objects, + Quiet: aws.Bool(false), + }, + }) + if err != nil { + return err + } + + if len(resp.Errors) > 0 { + // NOTE: AWS SDK s3.Error does not implement error interface which + // is pretty intensely sad, so we have to do away with this for now. + errs := make([]error, 0, len(resp.Errors)) + for _, err := range resp.Errors { + errs = append(errs, errors.New(*err.Message)) + } + return storagedriver.Error{ + DriverName: driverName, + // Errs: errs, + } + } + } + // NOTE: we don't want to reallocate + // the slice so we simply "reset" it + s3Objects = s3Objects[:0] + + // resp.Contents must have at least one element or we would have returned not found + listObjectsInput.StartAfter = resp.Contents[len(resp.Contents)-1].Key + + // from the s3 api docs, IsTruncated "specifies whether (true) or not (false) all of the results were returned" + // if everything has been returned, break + if resp.IsTruncated == nil || !*resp.IsTruncated { + break + } + } + + return nil +} + +// URLFor returns a URL which may be used to retrieve the content stored at the given path. +// May return an UnsupportedMethodErr in certain StorageDriver implementations. +func (d *driver) URLFor(_ context.Context, _ string, options map[string]interface{}) (string, error) { + methodString := http.MethodGet + method, ok := options["method"] + if ok { + methodString, ok = method.(string) + if !ok || (methodString != http.MethodGet && methodString != http.MethodHead) { + return "", storagedriver.ErrUnsupportedMethod{} + } + } + + expiresIn := 20 * time.Minute + expires, ok := options["expiry"] + if ok { + et, ok := expires.(time.Time) + if ok { + expiresIn = time.Until(et) + } + } + + var req *request.Request + + switch methodString { + case http.MethodGet: + // req, _ = d.R2.GetObject(ctx, &s3.GetObjectInput{ + // Bucket: aws.String(d.Bucket), + // Key: aws.String(d.s3Path(path)), + // }) + case http.MethodHead: + // req, _ = d.R2.HeadObjectRequest(&s3.HeadObjectInput{ + // Bucket: aws.String(d.Bucket), + // Key: aws.String(d.s3Path(path)), + // }) + default: + panic("unreachable") + } + + return req.Presign(expiresIn) +} + +// Walk traverses a filesystem defined within driver, starting +// from the given path, calling f on each file +func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) error { + path := from + if !strings.HasSuffix(path, "/") { + path = path + "/" + } + + prefix := "" + if d.s3Path("") == "" { + prefix = "/" + } + + var objectCount int64 + if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, f); err != nil { + return err + } + + // R2 doesn't have the concept of empty directories, so it'll return path not found if there are no objects + if objectCount == 0 { + return storagedriver.PathNotFoundError{Path: from} + } + + return nil +} + +func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error { + var ( + retError error + // the most recent directory walked for de-duping + prevDir string + // the most recent skip directory to avoid walking over undesirable files + prevSkipDir string + ) + prevDir = strings.Replace(path, d.s3Path(""), prefix, 1) + + listObjectsInput := &s3.ListObjectsV2Input{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(path), + MaxKeys: aws.Int32(listMax), + } + + ctx, done := dcontext.WithTrace(parentCtx) + defer done("s3aws.ListObjectsV2Pages(%s)", path) + + // When the "delimiter" argument is omitted, the R2 list API will list all objects in the bucket + // recursively, omitting directory paths. Objects are listed in sorted, depth-first order so we + // can infer all the directories by comparing each object path to the last one we saw. + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html + + // With files returned in sorted depth-first order, directories are inferred in the same order. + // ErrSkipDir is handled by explicitly skipping over any files under the skipped directory. This may be sub-optimal + // for extreme edge cases but for the general use case in a registry, this is orders of magnitude + // faster than a more explicit recursive implementation. + // TODO: broken + objects, listObjectErr := d.R2.ListObjectsV2(ctx, listObjectsInput, nil) + walkInfos := make([]storagedriver.FileInfoInternal, 0, len(objects.Contents)) + + for _, file := range objects.Contents { + filePath := strings.Replace(*file.Key, d.s3Path(""), prefix, 1) + + // get a list of all inferred directories between the previous directory and this file + dirs := directoryDiff(prevDir, filePath) + if len(dirs) > 0 { + for _, dir := range dirs { + walkInfos = append(walkInfos, storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{ + IsDir: true, + Path: dir, + }, + }) + prevDir = dir + } + } + + walkInfos = append(walkInfos, storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{ + IsDir: false, + Size: *file.Size, + ModTime: *file.LastModified, + Path: filePath, + }, + }) + } + + for _, walkInfo := range walkInfos { + // skip any results under the last skip directory + if prevSkipDir != "" && strings.HasPrefix(walkInfo.Path(), prevSkipDir) { + continue + } + + err := f(walkInfo) + *objectCount++ + + if err != nil { + if errors.Is(err, storagedriver.ErrSkipDir) { + if walkInfo.IsDir() { + prevSkipDir = walkInfo.Path() + continue + } + // is file, stop gracefully + return err + } + retError = err + return err + } + } + + if retError != nil { + return retError + } + + if listObjectErr != nil { + return listObjectErr + } + + return nil +} + +// directoryDiff finds all directories that are not in common between +// the previous and current paths in sorted order. +// +// # Examples +// +// directoryDiff("/path/to/folder", "/path/to/folder/folder/file") +// // => [ "/path/to/folder/folder" ] +// +// directoryDiff("/path/to/folder/folder1", "/path/to/folder/folder2/file") +// // => [ "/path/to/folder/folder2" ] +// +// directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/file") +// // => [ "/path/to/folder/folder2" ] +// +// directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/folder1/file") +// // => [ "/path/to/folder/folder2", "/path/to/folder/folder2/folder1" ] +// +// directoryDiff("/", "/path/to/folder/folder/file") +// // => [ "/path", "/path/to", "/path/to/folder", "/path/to/folder/folder" ] +func directoryDiff(prev, current string) []string { + var paths []string + + if prev == "" || current == "" { + return paths + } + + parent := current + for { + parent = filepath.Dir(parent) + if parent == "/" || parent == prev || strings.HasPrefix(prev, parent) { + break + } + paths = append(paths, parent) + } + reverse(paths) + return paths +} + +func reverse(s []string) { + for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { + s[i], s[j] = s[j], s[i] + } +} + +func (d *driver) s3Path(path string) string { + return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/") +} + +// S3BucketKey returns the s3 bucket key for the given storage driver path. +func (d *Driver) S3BucketKey(path string) string { + return d.StorageDriver.(*driver).s3Path(path) +} + +func parseError(path string, err error) error { + if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "NoSuchKey" { + return storagedriver.PathNotFoundError{Path: path} + } + + return err +} + +func (d *driver) getContentType() *string { + return aws.String("application/octet-stream") +} + +// writer attempts to upload parts to R2 in a buffered fashion where the last +// part is at least as large as the chunksize, so the multipart upload could be +// cleanly resumed in the future. This is violated if Close is called after less +// than a full chunk is written. +type writer struct { + driver *driver + key string + uploadID string + parts []types.Part + size int64 + readyPart []byte + pendingPart []byte + closed bool + committed bool + cancelled bool +} + +func (d *driver) newWriter(key, uploadID string, parts []types.Part) storagedriver.FileWriter { + var size int64 + for _, part := range parts { + size += *part.Size + } + return &writer{ + driver: d, + key: key, + uploadID: uploadID, + parts: parts, + size: size, + } +} + +type completedParts []types.CompletedPart + +func (a completedParts) Len() int { return len(a) } +func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber } + +func (w *writer) Write(p []byte) (int, error) { + if w.closed { + return 0, fmt.Errorf("already closed") + } else if w.committed { + return 0, fmt.Errorf("already committed") + } else if w.cancelled { + return 0, fmt.Errorf("already cancelled") + } + + // If the last written part is smaller than minChunkSize, we need to make a + // new multipart upload :sadface: + if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize { + var completedUploadedParts completedParts + for _, part := range w.parts { + completedUploadedParts = append(completedUploadedParts, types.CompletedPart{ + ETag: part.ETag, + PartNumber: part.PartNumber, + }) + } + + sort.Sort(completedUploadedParts) + + ctx := context.Background() + _, err := w.driver.R2.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + UploadId: aws.String(w.uploadID), + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: completedUploadedParts, + }, + }) + if err != nil { + w.driver.R2.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + UploadId: aws.String(w.uploadID), + }) + return 0, err + } + + resp, err := w.driver.R2.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + ContentType: w.driver.getContentType(), + }) + if err != nil { + return 0, err + } + w.uploadID = *resp.UploadId + + // If the entire written file is smaller than minChunkSize, we need to make + // a new part from scratch :double sad face: + if w.size < minChunkSize { + resp, err := w.driver.R2.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + }) + if err != nil { + return 0, err + } + defer resp.Body.Close() + w.parts = nil + w.readyPart, err = io.ReadAll(resp.Body) + if err != nil { + return 0, err + } + } else { + // Otherwise we can use the old file as the new first part + copyPartResp, err := w.driver.R2.UploadPartCopy(ctx, &s3.UploadPartCopyInput{ + Bucket: aws.String(w.driver.Bucket), + CopySource: aws.String(w.driver.Bucket + "/" + w.key), + Key: aws.String(w.key), + PartNumber: aws.Int32(1), + UploadId: resp.UploadId, + }) + if err != nil { + return 0, err + } + w.parts = []types.Part{ + { + ETag: copyPartResp.CopyPartResult.ETag, + PartNumber: aws.Int32(1), + Size: aws.Int64(w.size), + }, + } + } + } + + var n int + + for len(p) > 0 { + // If no parts are ready to write, fill up the first part + if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 { + if len(p) >= neededBytes { + w.readyPart = append(w.readyPart, p[:neededBytes]...) + n += neededBytes + p = p[neededBytes:] + } else { + w.readyPart = append(w.readyPart, p...) + n += len(p) + p = nil + } + } + + if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 { + if len(p) >= neededBytes { + w.pendingPart = append(w.pendingPart, p[:neededBytes]...) + n += neededBytes + p = p[neededBytes:] + err := w.flushPart() + if err != nil { + w.size += int64(n) + return n, err + } + } else { + w.pendingPart = append(w.pendingPart, p...) + n += len(p) + p = nil + } + } + } + w.size += int64(n) + return n, nil +} + +func (w *writer) Size() int64 { + return w.size +} + +func (w *writer) Close() error { + if w.closed { + return fmt.Errorf("already closed") + } + w.closed = true + return w.flushPart() +} + +func (w *writer) Cancel() error { + ctx := context.Background() + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } + w.cancelled = true + _, err := w.driver.R2.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + UploadId: aws.String(w.uploadID), + }) + return err +} + +func (w *writer) Commit() error { + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } else if w.cancelled { + return fmt.Errorf("already cancelled") + } + err := w.flushPart() + if err != nil { + return err + } + w.committed = true + + var completedUploadedParts completedParts + for _, part := range w.parts { + completedUploadedParts = append(completedUploadedParts, types.CompletedPart{ + ETag: part.ETag, + PartNumber: part.PartNumber, + }) + } + + // This is an AWS S3 edge case, but we're keeping it for sanity + // + // + // This is an edge case when we are trying to upload an empty chunk of data using + // a MultiPart upload. As a result we are trying to complete the MultipartUpload + // with an empty slice of `completedUploadedParts` which will always lead to 400 + // being returned from S3 See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload + // Solution: we upload an empty i.e. 0 byte part as a single part and then append it + // to the completedUploadedParts slice used to complete the Multipart upload. + if len(w.parts) == 0 { + ctx := context.Background() + resp, err := w.driver.R2.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + PartNumber: aws.Int32(1), + UploadId: aws.String(w.uploadID), + Body: bytes.NewReader(nil), + }) + if err != nil { + return err + } + + completedUploadedParts = append(completedUploadedParts, types.CompletedPart{ + ETag: resp.ETag, + PartNumber: aws.Int32(1), + }) + } + + sort.Sort(completedUploadedParts) + ctx := context.Background() + _, err = w.driver.R2.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + UploadId: aws.String(w.uploadID), + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: completedUploadedParts, + }, + }) + if err != nil { + w.driver.R2.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + UploadId: aws.String(w.uploadID), + }) + return err + } + return nil +} + +// flushPart flushes buffers to write a part to R2. +// Only called by Write (with both buffers full) and Close/Commit (always) +func (w *writer) flushPart() error { + if len(w.readyPart) == 0 && len(w.pendingPart) == 0 { + // nothing to write + return nil + } + if len(w.pendingPart) < int(w.driver.ChunkSize) { + // closing with a small pending part + // combine ready and pending to avoid writing a small part + w.readyPart = append(w.readyPart, w.pendingPart...) + w.pendingPart = nil + } + ctx := context.Background() + + partNumber := aws.Int32(int32(len(w.parts) + 1)) + resp, err := w.driver.R2.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + PartNumber: partNumber, + UploadId: aws.String(w.uploadID), + Body: bytes.NewReader(w.readyPart), + }) + if err != nil { + return err + } + w.parts = append(w.parts, types.Part{ + ETag: resp.ETag, + PartNumber: partNumber, + Size: aws.Int64(int64(len(w.readyPart))), + }) + w.readyPart = w.pendingPart + w.pendingPart = nil + return w.flushPart() +} diff --git a/drivers/r2/driver_test.go b/drivers/r2/driver_test.go new file mode 100644 index 0000000..eac4dc7 --- /dev/null +++ b/drivers/r2/driver_test.go @@ -0,0 +1,129 @@ +package r2 + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" + mock_interfaces "github.com/forta-network/disco/interfaces/mocks" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +const ( + testPath = "/test-path" +) + +type DriverTestSuite struct { + r *require.Assertions + + r2Client *mock_interfaces.MockR2Client + driver storagedriver.StorageDriver + + suite.Suite +} + +func TestDriver(t *testing.T) { + suite.Run(t, &DriverTestSuite{}) +} + +func (s *DriverTestSuite) SetupTest() { + s.r = s.Require() + + ctrl := gomock.NewController(s.T()) + s.r2Client = mock_interfaces.NewMockR2Client(ctrl) + params := DriverParameters{ChunkSize: minChunkSize} + + var err error + s.driver, err = newFromClient(s.r2Client, params) + assert.NoError(s.T(), err) +} + +func (s *DriverTestSuite) TestReader() { + output := &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("1"))), + } + input := &s3.GetObjectInput{ + Bucket: aws.String(""), + Key: aws.String("test-path"), + Range: aws.String("bytes=0-"), + } + s.r2Client.EXPECT(). + GetObject(gomock.Any(), input). + Return(output, nil) + + reader, err := s.driver.Reader(context.Background(), testPath, 0) + s.r.NoError(err) + b, err := io.ReadAll(reader) + s.r.NoError(err) + s.r.Equal("1", string(b)) +} + +func (s *DriverTestSuite) TestGetContent() { + output := &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("1"))), + } + input := &s3.GetObjectInput{ + Bucket: aws.String(""), + Key: aws.String("test-path"), + Range: aws.String("bytes=0-"), + } + s.r2Client.EXPECT(). + GetObject(gomock.Any(), input). + Return(output, nil) + + b, err := s.driver.GetContent(context.Background(), testPath) + s.r.NoError(err) + s.r.Equal("1", string(b)) +} + +func (s *DriverTestSuite) TestWriter() { + testUploadID := "test-upload-id" + + // Mock ListMultipartUploads + lmuOutput := &s3.ListMultipartUploadsOutput{ + Uploads: []types.MultipartUpload{ + { + Key: aws.String("test-path"), + UploadId: aws.String(testUploadID), + }, + }, + } + s.r2Client.EXPECT().ListMultipartUploads(gomock.Any(), gomock.Any()).Return(lmuOutput, nil) + + // Mock ListParts + listPartsOutput := &s3.ListPartsOutput{ + Parts: []types.Part{}, + IsTruncated: aws.Bool(false), + } + s.r2Client.EXPECT().ListParts(gomock.Any(), gomock.Any()).Return(listPartsOutput, nil) + + // Get writer + writer, err := s.driver.Writer(context.Background(), testPath, true) + s.r.NoError(err) + + // Write data + data := []byte("test data") + n, err := writer.Write(data) + s.r.NoError(err) + s.r.Equal(len(data), n) + + // Mock UploadPart + uploadPartOutput := &s3.UploadPartOutput{} + s.r2Client.EXPECT().UploadPart(gomock.Any(), gomock.Any()).Return(uploadPartOutput, nil) + + // Mock CompleteMultipartUpload + completeMultipartUploadOutput := &s3.CompleteMultipartUploadOutput{} + s.r2Client.EXPECT().CompleteMultipartUpload(gomock.Any(), gomock.Any()).Return(completeMultipartUploadOutput, nil) + + // Commit and Close + s.r.NoError(writer.Commit()) + s.r.NoError(writer.Close()) +} diff --git a/go.mod b/go.mod index 6792f41..4c6c51e 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,12 @@ module github.com/forta-network/disco go 1.19 require ( + github.com/aws/aws-sdk-go v1.34.9 + github.com/aws/aws-sdk-go-v2 v1.24.0 + github.com/aws/aws-sdk-go-v2/config v1.26.2 + github.com/aws/aws-sdk-go-v2/credentials v1.16.13 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.15.9 + github.com/aws/aws-sdk-go-v2/service/s3 v1.47.7 github.com/distribution/distribution/v3 v3.0.0-20210602065436-4f27e1934ccc github.com/golang/mock v1.6.0 github.com/ipfs/go-cid v0.0.7 @@ -19,7 +25,20 @@ require ( github.com/Azure/azure-sdk-for-go v16.2.1+incompatible // indirect github.com/Azure/go-autorest v10.8.1+incompatible // indirect github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d // indirect - github.com/aws/aws-sdk-go v1.34.9 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.4 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.10 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.9 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.9 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.9 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.9 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.9 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.9 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.18.5 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.26.6 // indirect + github.com/aws/smithy-go v1.19.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bshuster-repo/logrus-logstash-hook v1.0.0 // indirect github.com/btcsuite/btcd v0.20.1-beta // indirect @@ -41,7 +60,7 @@ require ( github.com/gorilla/mux v1.8.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/ipfs/go-ipfs-files v0.0.8 // indirect - github.com/jmespath/go-jmespath v0.3.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/cpuid/v2 v2.0.4 // indirect github.com/libp2p/go-buffer-pool v0.0.2 // indirect github.com/libp2p/go-flow-metrics v0.0.3 // indirect diff --git a/go.sum b/go.sum index 94b6fd5..a7d580a 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,44 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/aws/aws-sdk-go v1.34.9 h1:cUGBW9CVdi0mS7K1hDzxIqTpfeWhpoQiguq81M1tjK0= github.com/aws/aws-sdk-go v1.34.9/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/aws/aws-sdk-go-v2 v1.24.0 h1:890+mqQ+hTpNuw0gGP6/4akolQkSToDJgHfQE7AwGuk= +github.com/aws/aws-sdk-go-v2 v1.24.0/go.mod h1:LNh45Br1YAkEKaAqvmE1m8FUx6a5b/V0oAKV7of29b4= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.4 h1:OCs21ST2LrepDfD3lwlQiOqIGp6JiEUqG84GzTDoyJs= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.4/go.mod h1:usURWEKSNNAcAZuzRn/9ZYPT8aZQkR7xcCtunK/LkJo= +github.com/aws/aws-sdk-go-v2/config v1.26.2 h1:+RWLEIWQIGgrz2pBPAUoGgNGs1TOyF4Hml7hCnYj2jc= +github.com/aws/aws-sdk-go-v2/config v1.26.2/go.mod h1:l6xqvUxt0Oj7PI/SUXYLNyZ9T/yBPn3YTQcJLLOdtR8= +github.com/aws/aws-sdk-go-v2/credentials v1.16.13 h1:WLABQ4Cp4vXtXfOWOS3MEZKr6AAYUpMczLhgKtAjQ/8= +github.com/aws/aws-sdk-go-v2/credentials v1.16.13/go.mod h1:Qg6x82FXwW0sJHzYruxGiuApNo31UEtJvXVSZAXeWiw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.10 h1:w98BT5w+ao1/r5sUuiH6JkVzjowOKeOJRHERyy1vh58= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.10/go.mod h1:K2WGI7vUvkIv1HoNbfBA1bvIZ+9kL3YVmWxeKuLQsiw= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.15.9 h1:5zA8qVCXMPGt6YneFnll5B157SfdK2SewU85PH9/yM0= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.15.9/go.mod h1:t4gy210hPxkbtYM8xOzrWdxVq1PyekR76OOKXy3s0Vs= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.9 h1:v+HbZaCGmOwnTTVS86Fleq0vPzOd7tnJGbFhP0stNLs= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.9/go.mod h1:Xjqy+Nyj7VDLBtCMkQYOw1QYfAEZCVLrfI0ezve8wd4= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.9 h1:N94sVhRACtXyVcjXxrwK1SKFIJrA9pOJ5yu2eSHnmls= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.9/go.mod h1:hqamLz7g1/4EJP+GH5NBhcUMLjW+gKLQabgyz6/7WAU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2 h1:GrSw8s0Gs/5zZ0SX+gX4zQjRnRsMJDJ2sLur1gRBhEM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.9 h1:ugD6qzjYtB7zM5PN/ZIeaAIyefPaD82G8+SJopgvUpw= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.9/go.mod h1:YD0aYBWCrPENpHolhKw2XDlTIWae2GKXT1T4o6N6hiM= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4 h1:/b31bi3YVNlkzkBrm9LfpaKoaYZUxIAj4sHfOTmLfqw= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4/go.mod h1:2aGXHFmbInwgP9ZfpmdIfOELL79zhdNYNmReK8qDfdQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.9 h1:/90OR2XbSYfXucBMJ4U14wrjlfleq/0SB6dZDPncgmo= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.9/go.mod h1:dN/Of9/fNZet7UrQQ6kTDo/VSwKPIq94vjlU16bRARc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.9 h1:Nf2sHxjMJR8CSImIVCONRi4g0Su3J+TSTbS7G0pUeMU= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.9/go.mod h1:idky4TER38YIjr2cADF1/ugFMKvZV7p//pVeV5LZbF0= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.9 h1:iEAeF6YC3l4FzlJPP9H3Ko1TXpdjdqWffxXjp8SY6uk= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.9/go.mod h1:kjsXoK23q9Z/tLBrckZLLyvjhZoS+AGrzqzUfEClvMM= +github.com/aws/aws-sdk-go-v2/service/s3 v1.47.7 h1:o0ASbVwUAIrfp/WcCac+6jioZt4Hd8k/1X8u7GJ/QeM= +github.com/aws/aws-sdk-go-v2/service/s3 v1.47.7/go.mod h1:vADO6Jn+Rq4nDtfwNjhgR84qkZwiC6FqCaXdw/kYwjA= +github.com/aws/aws-sdk-go-v2/service/sso v1.18.5 h1:ldSFWz9tEHAwHNmjx2Cvy1MjP5/L9kNoR0skc6wyOOM= +github.com/aws/aws-sdk-go-v2/service/sso v1.18.5/go.mod h1:CaFfXLYL376jgbP7VKC96uFcU8Rlavak0UlAwk1Dlhc= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 h1:2k9KmFawS63euAkY4/ixVNsYYwrwnd5fIvgEKkfZFNM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5/go.mod h1:W+nd4wWDVkSUIox9bacmkBP5NMFQeTJ/xqNabpzSR38= +github.com/aws/aws-sdk-go-v2/service/sts v1.26.6 h1:HJeiuZ2fldpd0WqngyMR6KW7ofkXNLyOaHwEIGm39Cs= +github.com/aws/aws-sdk-go-v2/service/sts v1.26.6/go.mod h1:XX5gh4CB7wAs4KhcF46G6C8a2i7eupU19dcAAE+EydU= +github.com/aws/smithy-go v1.19.0 h1:KWFKQV80DpP3vJrrA9sVAHQ5gc2z8i4EzrLhLlWXcBM= +github.com/aws/smithy-go v1.19.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -85,8 +123,8 @@ github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= -github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH4= github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= @@ -105,8 +143,11 @@ github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdr github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= -github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -322,6 +363,7 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/interfaces/interfaces.go b/interfaces/interfaces.go index 59b03e4..079d13b 100644 --- a/interfaces/interfaces.go +++ b/interfaces/interfaces.go @@ -4,6 +4,8 @@ import ( "context" "io" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" ipfsapi "github.com/ipfs/go-ipfs-api" ) @@ -26,6 +28,19 @@ type IPFSFilesAPI interface { FilesMv(ctx context.Context, src string, dest string) error } +// R2Client makes requests to an R2 API. +type R2Client interface { + manager.DeleteObjectsAPIClient + manager.UploadAPIClient + manager.DownloadAPIClient + manager.ListObjectsV2APIClient + s3.ListMultipartUploadsAPIClient + s3.ListPartsAPIClient + s3.HeadObjectAPIClient + CopyObject(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) + UploadPartCopy(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) +} + // StorageDriver is storage driver interface. type StorageDriver interface { storagedriver.StorageDriver diff --git a/interfaces/mocks/mock_interfaces.go b/interfaces/mocks/mock_interfaces.go index 462acbf..1b04299 100644 --- a/interfaces/mocks/mock_interfaces.go +++ b/interfaces/mocks/mock_interfaces.go @@ -9,6 +9,7 @@ import ( io "io" reflect "reflect" + s3 "github.com/aws/aws-sdk-go-v2/service/s3" driver "github.com/distribution/distribution/v3/registry/storage/driver" interfaces "github.com/forta-network/disco/interfaces" gomock "github.com/golang/mock/gomock" @@ -356,6 +357,289 @@ func (mr *MockIPFSFilesAPIMockRecorder) FilesWrite(ctx, path, data interface{}, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilesWrite", reflect.TypeOf((*MockIPFSFilesAPI)(nil).FilesWrite), varargs...) } +// MockR2Client is a mock of R2Client interface. +type MockR2Client struct { + ctrl *gomock.Controller + recorder *MockR2ClientMockRecorder +} + +// MockR2ClientMockRecorder is the mock recorder for MockR2Client. +type MockR2ClientMockRecorder struct { + mock *MockR2Client +} + +// NewMockR2Client creates a new mock instance. +func NewMockR2Client(ctrl *gomock.Controller) *MockR2Client { + mock := &MockR2Client{ctrl: ctrl} + mock.recorder = &MockR2ClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockR2Client) EXPECT() *MockR2ClientMockRecorder { + return m.recorder +} + +// AbortMultipartUpload mocks base method. +func (m *MockR2Client) AbortMultipartUpload(arg0 context.Context, arg1 *s3.AbortMultipartUploadInput, arg2 ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "AbortMultipartUpload", varargs...) + ret0, _ := ret[0].(*s3.AbortMultipartUploadOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AbortMultipartUpload indicates an expected call of AbortMultipartUpload. +func (mr *MockR2ClientMockRecorder) AbortMultipartUpload(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AbortMultipartUpload", reflect.TypeOf((*MockR2Client)(nil).AbortMultipartUpload), varargs...) +} + +// CompleteMultipartUpload mocks base method. +func (m *MockR2Client) CompleteMultipartUpload(arg0 context.Context, arg1 *s3.CompleteMultipartUploadInput, arg2 ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CompleteMultipartUpload", varargs...) + ret0, _ := ret[0].(*s3.CompleteMultipartUploadOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CompleteMultipartUpload indicates an expected call of CompleteMultipartUpload. +func (mr *MockR2ClientMockRecorder) CompleteMultipartUpload(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CompleteMultipartUpload", reflect.TypeOf((*MockR2Client)(nil).CompleteMultipartUpload), varargs...) +} + +// CopyObject mocks base method. +func (m *MockR2Client) CopyObject(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CopyObject", varargs...) + ret0, _ := ret[0].(*s3.CopyObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CopyObject indicates an expected call of CopyObject. +func (mr *MockR2ClientMockRecorder) CopyObject(ctx, params interface{}, optFns ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*MockR2Client)(nil).CopyObject), varargs...) +} + +// CreateMultipartUpload mocks base method. +func (m *MockR2Client) CreateMultipartUpload(arg0 context.Context, arg1 *s3.CreateMultipartUploadInput, arg2 ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CreateMultipartUpload", varargs...) + ret0, _ := ret[0].(*s3.CreateMultipartUploadOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateMultipartUpload indicates an expected call of CreateMultipartUpload. +func (mr *MockR2ClientMockRecorder) CreateMultipartUpload(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateMultipartUpload", reflect.TypeOf((*MockR2Client)(nil).CreateMultipartUpload), varargs...) +} + +// DeleteObjects mocks base method. +func (m *MockR2Client) DeleteObjects(arg0 context.Context, arg1 *s3.DeleteObjectsInput, arg2 ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteObjects", varargs...) + ret0, _ := ret[0].(*s3.DeleteObjectsOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteObjects indicates an expected call of DeleteObjects. +func (mr *MockR2ClientMockRecorder) DeleteObjects(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObjects", reflect.TypeOf((*MockR2Client)(nil).DeleteObjects), varargs...) +} + +// GetObject mocks base method. +func (m *MockR2Client) GetObject(arg0 context.Context, arg1 *s3.GetObjectInput, arg2 ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetObject", varargs...) + ret0, _ := ret[0].(*s3.GetObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetObject indicates an expected call of GetObject. +func (mr *MockR2ClientMockRecorder) GetObject(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockR2Client)(nil).GetObject), varargs...) +} + +// HeadObject mocks base method. +func (m *MockR2Client) HeadObject(arg0 context.Context, arg1 *s3.HeadObjectInput, arg2 ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "HeadObject", varargs...) + ret0, _ := ret[0].(*s3.HeadObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HeadObject indicates an expected call of HeadObject. +func (mr *MockR2ClientMockRecorder) HeadObject(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeadObject", reflect.TypeOf((*MockR2Client)(nil).HeadObject), varargs...) +} + +// ListMultipartUploads mocks base method. +func (m *MockR2Client) ListMultipartUploads(arg0 context.Context, arg1 *s3.ListMultipartUploadsInput, arg2 ...func(*s3.Options)) (*s3.ListMultipartUploadsOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListMultipartUploads", varargs...) + ret0, _ := ret[0].(*s3.ListMultipartUploadsOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListMultipartUploads indicates an expected call of ListMultipartUploads. +func (mr *MockR2ClientMockRecorder) ListMultipartUploads(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListMultipartUploads", reflect.TypeOf((*MockR2Client)(nil).ListMultipartUploads), varargs...) +} + +// ListObjectsV2 mocks base method. +func (m *MockR2Client) ListObjectsV2(arg0 context.Context, arg1 *s3.ListObjectsV2Input, arg2 ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListObjectsV2", varargs...) + ret0, _ := ret[0].(*s3.ListObjectsV2Output) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListObjectsV2 indicates an expected call of ListObjectsV2. +func (mr *MockR2ClientMockRecorder) ListObjectsV2(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjectsV2", reflect.TypeOf((*MockR2Client)(nil).ListObjectsV2), varargs...) +} + +// ListParts mocks base method. +func (m *MockR2Client) ListParts(arg0 context.Context, arg1 *s3.ListPartsInput, arg2 ...func(*s3.Options)) (*s3.ListPartsOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListParts", varargs...) + ret0, _ := ret[0].(*s3.ListPartsOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListParts indicates an expected call of ListParts. +func (mr *MockR2ClientMockRecorder) ListParts(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListParts", reflect.TypeOf((*MockR2Client)(nil).ListParts), varargs...) +} + +// PutObject mocks base method. +func (m *MockR2Client) PutObject(arg0 context.Context, arg1 *s3.PutObjectInput, arg2 ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PutObject", varargs...) + ret0, _ := ret[0].(*s3.PutObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PutObject indicates an expected call of PutObject. +func (mr *MockR2ClientMockRecorder) PutObject(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutObject", reflect.TypeOf((*MockR2Client)(nil).PutObject), varargs...) +} + +// UploadPart mocks base method. +func (m *MockR2Client) UploadPart(arg0 context.Context, arg1 *s3.UploadPartInput, arg2 ...func(*s3.Options)) (*s3.UploadPartOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UploadPart", varargs...) + ret0, _ := ret[0].(*s3.UploadPartOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UploadPart indicates an expected call of UploadPart. +func (mr *MockR2ClientMockRecorder) UploadPart(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UploadPart", reflect.TypeOf((*MockR2Client)(nil).UploadPart), varargs...) +} + +// UploadPartCopy mocks base method. +func (m *MockR2Client) UploadPartCopy(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UploadPartCopy", varargs...) + ret0, _ := ret[0].(*s3.UploadPartCopyOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UploadPartCopy indicates an expected call of UploadPartCopy. +func (mr *MockR2ClientMockRecorder) UploadPartCopy(ctx, params interface{}, optFns ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UploadPartCopy", reflect.TypeOf((*MockR2Client)(nil).UploadPartCopy), varargs...) +} + // MockStorageDriver is a mock of StorageDriver interface. type MockStorageDriver struct { ctrl *gomock.Controller From 89579ac7494a4dbc968d0ecc0f6aaab12aee551e Mon Sep 17 00:00:00 2001 From: ali Date: Mon, 8 Jan 2024 14:08:45 +0300 Subject: [PATCH 2/7] fix linter issues --- drivers/r2/driver.go | 23 +++++++---------------- go.mod | 2 ++ go.sum | 4 ++++ 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/drivers/r2/driver.go b/drivers/r2/driver.go index 7bf027a..8064809 100644 --- a/drivers/r2/driver.go +++ b/drivers/r2/driver.go @@ -23,6 +23,7 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/request" "github.com/forta-network/disco/interfaces" + "github.com/hashicorp/go-multierror" dcontext "github.com/distribution/distribution/v3/context" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" @@ -197,11 +198,6 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { return nil, fmt.Errorf("the skipVerify parameter should be a boolean") } - keyID := parameters["keyid"] - if keyID == nil { - keyID = "" - } - chunkSize, err := getParameterAsInt64(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize) if err != nil { return nil, err @@ -227,11 +223,6 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { rootDirectory = "" } - userAgent := parameters["useragent"] - if userAgent == nil { - userAgent = "" - } - params := DriverParameters{ AccessKey: fmt.Sprint(accessKey), SecretKey: fmt.Sprint(secretKey), @@ -704,13 +695,14 @@ func (d *driver) Delete(ctx context.Context, path string) error { if len(resp.Errors) > 0 { // NOTE: AWS SDK s3.Error does not implement error interface which // is pretty intensely sad, so we have to do away with this for now. - errs := make([]error, 0, len(resp.Errors)) + var errs multierror.Error for _, err := range resp.Errors { - errs = append(errs, errors.New(*err.Message)) + errs.Errors = append(errs.Errors, errors.New(*err.Message)) } + return storagedriver.Error{ DriverName: driverName, - // Errs: errs, + Enclosed: errs.Unwrap(), } } } @@ -875,7 +867,6 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre // is file, stop gracefully return err } - retError = err return err } } @@ -1025,7 +1016,7 @@ func (w *writer) Write(p []byte) (int, error) { }, }) if err != nil { - w.driver.R2.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + _, _ = w.driver.R2.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), UploadId: aws.String(w.uploadID), @@ -1207,7 +1198,7 @@ func (w *writer) Commit() error { }, }) if err != nil { - w.driver.R2.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + _, _ = w.driver.R2.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), UploadId: aws.String(w.uploadID), diff --git a/go.mod b/go.mod index 4c6c51e..71c4619 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.47.7 github.com/distribution/distribution/v3 v3.0.0-20210602065436-4f27e1934ccc github.com/golang/mock v1.6.0 + github.com/hashicorp/go-multierror v1.1.1 github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-ipfs-api v0.2.0 github.com/kelseyhightower/envconfig v1.4.0 @@ -58,6 +59,7 @@ require ( github.com/gomodule/redigo v1.8.2 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/mux v1.8.0 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/ipfs/go-ipfs-files v0.0.8 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect diff --git a/go.sum b/go.sum index a7d580a..902d4f6 100644 --- a/go.sum +++ b/go.sum @@ -130,6 +130,10 @@ github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= From 5d47ed1b38672e6d7e7a84ddeda889439b05ec4d Mon Sep 17 00:00:00 2001 From: ali Date: Tue, 9 Jan 2024 14:27:21 +0300 Subject: [PATCH 3/7] add presign support --- drivers/r2/driver.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/drivers/r2/driver.go b/drivers/r2/driver.go index 8064809..6bbc84b 100644 --- a/drivers/r2/driver.go +++ b/drivers/r2/driver.go @@ -16,12 +16,12 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/request" "github.com/forta-network/disco/interfaces" "github.com/hashicorp/go-multierror" @@ -99,6 +99,7 @@ type driver struct { MultipartCopyThresholdSize int64 MultipartCombineSmallPart bool RootDirectory string + presignClient *s3.PresignClient } type baseEmbed struct { @@ -288,9 +289,10 @@ func New(params DriverParameters) (*Driver, error) { } r2Client := s3.NewFromConfig(cfg) - + presignClient := s3.NewPresignClient(r2Client) d := &driver{ R2: r2Client, + presignClient: presignClient, Bucket: params.Bucket, ChunkSize: params.ChunkSize, MultipartCopyChunkSize: params.MultipartCopyChunkSize, @@ -665,8 +667,9 @@ func (d *driver) Delete(ctx context.Context, path string) error { } for _, key := range resp.Contents { + k := *key.Key // Skip if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab"). - if len(*key.Key) > len(s3Path) && (*key.Key)[len(s3Path)] != '/' { + if len(k) > len(s3Path) && k[len(s3Path)] != '/' { continue } s3Objects = append(s3Objects, types.ObjectIdentifier{ @@ -725,7 +728,7 @@ func (d *driver) Delete(ctx context.Context, path string) error { // URLFor returns a URL which may be used to retrieve the content stored at the given path. // May return an UnsupportedMethodErr in certain StorageDriver implementations. -func (d *driver) URLFor(_ context.Context, _ string, options map[string]interface{}) (string, error) { +func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { methodString := http.MethodGet method, ok := options["method"] if ok { @@ -744,24 +747,27 @@ func (d *driver) URLFor(_ context.Context, _ string, options map[string]interfac } } - var req *request.Request + var ( + presignedURL *v4.PresignedHTTPRequest + err error + ) switch methodString { case http.MethodGet: - // req, _ = d.R2.GetObject(ctx, &s3.GetObjectInput{ - // Bucket: aws.String(d.Bucket), - // Key: aws.String(d.s3Path(path)), - // }) + presignedURL, err = d.presignClient.PresignGetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + }, s3.WithPresignExpires(expiresIn)) case http.MethodHead: - // req, _ = d.R2.HeadObjectRequest(&s3.HeadObjectInput{ - // Bucket: aws.String(d.Bucket), - // Key: aws.String(d.s3Path(path)), - // }) + presignedURL, err = d.presignClient.PresignHeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + }, s3.WithPresignExpires(expiresIn)) default: panic("unreachable") } - return req.Presign(expiresIn) + return presignedURL.URL, err } // Walk traverses a filesystem defined within driver, starting From ddc6ccec4f7700c4538f56a40683dba643923e7f Mon Sep 17 00:00:00 2001 From: ali Date: Tue, 9 Jan 2024 14:28:03 +0300 Subject: [PATCH 4/7] increase test coverage --- drivers/r2/driver_test.go | 95 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/drivers/r2/driver_test.go b/drivers/r2/driver_test.go index eac4dc7..28b2dff 100644 --- a/drivers/r2/driver_test.go +++ b/drivers/r2/driver_test.go @@ -3,8 +3,10 @@ package r2 import ( "bytes" "context" + "fmt" "io" "testing" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -127,3 +129,96 @@ func (s *DriverTestSuite) TestWriter() { s.r.NoError(writer.Commit()) s.r.NoError(writer.Close()) } + +func (s *DriverTestSuite) TestPutContent() { + s.r2Client.EXPECT().PutObject(gomock.Any(), gomock.Any()).Return(&s3.PutObjectOutput{}, nil) + + err := s.driver.PutContent(context.Background(), testPath, []byte("1")) + s.r.NoError(err) +} + +func (s *DriverTestSuite) TestStat() { + s.r2Client.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&s3.ListObjectsV2Output{ + Contents: []types.Object{{ + Key: aws.String(fmt.Sprintf("test-path/x")), + Size: aws.Int64(123), + LastModified: aws.Time(time.Now()), + }}, + }, nil) + + stat, err := s.driver.Stat(context.Background(), testPath) + s.r.NoError(err) + s.r.NotNil(stat) +} + +func (s *DriverTestSuite) TestList() { + s.r2Client.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&s3.ListObjectsV2Output{ + IsTruncated: aws.Bool(false), + Contents: []types.Object{{ + Key: aws.String(fmt.Sprintf("test-path/x")), + Size: aws.Int64(123), + LastModified: aws.Time(time.Now()), + }}, + }, nil) + + list, err := s.driver.List(context.Background(), testPath) + s.r.NoError(err) + s.r.Equal([]string{"/test-path/x"}, list) +} + +func (s *DriverTestSuite) TestMove() { + s.r2Client.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&s3.ListObjectsV2Output{ + Contents: []types.Object{{ + Key: aws.String(fmt.Sprintf("test-path/x")), + Size: aws.Int64(123), + LastModified: aws.Time(time.Now()), + }}, + }, nil) + + s.r2Client.EXPECT().CopyObject(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&s3.CopyObjectOutput{}, nil) + + s.r2Client.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&s3.ListObjectsV2Output{ + Contents: []types.Object{{ + Key: aws.String(fmt.Sprintf("test-path/x")), + Size: aws.Int64(123), + LastModified: aws.Time(time.Now()), + }}, + }, nil) + + s.r2Client.EXPECT().DeleteObjects(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&s3.DeleteObjectsOutput{}, nil) + + s.r.NoError(s.driver.Move(context.Background(), testPath, testPath+"1")) +} + +func (s *DriverTestSuite) TestDelete() { + s.r2Client.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any()). + Return(&s3.ListObjectsV2Output{ + Contents: []types.Object{{ + Key: aws.String(fmt.Sprintf("test-path/x")), + }}, + }, nil) + s.r2Client.EXPECT().DeleteObjects(gomock.Any(), gomock.Any()).Return(&s3.DeleteObjectsOutput{}, nil) + + s.r.NoError(s.driver.Delete(context.Background(), testPath)) +} + +func (s *DriverTestSuite) TestWalk() { + s.r2Client.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&s3.ListObjectsV2Output{ + Contents: []types.Object{{ + Key: aws.String(fmt.Sprintf("test-path/x")), + Size: aws.Int64(123), + LastModified: aws.Time(time.Now()), + }}, + }, nil) + + s.r.NoError(s.driver.Walk(context.Background(), testPath, func(fileInfo storagedriver.FileInfo) error { + return nil + })) +} From 24f77ed53e4bbc0c2725183dab2bf97158b206a9 Mon Sep 17 00:00:00 2001 From: ali Date: Tue, 9 Jan 2024 14:34:20 +0300 Subject: [PATCH 5/7] add license reference --- drivers/r2/LICENSE | 201 +++++++++++++++++++++++++++++++++++++++++++ drivers/r2/driver.go | 4 + 2 files changed, 205 insertions(+) create mode 100644 drivers/r2/LICENSE diff --git a/drivers/r2/LICENSE b/drivers/r2/LICENSE new file mode 100644 index 0000000..5c304d1 --- /dev/null +++ b/drivers/r2/LICENSE @@ -0,0 +1,201 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/drivers/r2/driver.go b/drivers/r2/driver.go index 6bbc84b..9b5c1a8 100644 --- a/drivers/r2/driver.go +++ b/drivers/r2/driver.go @@ -1,3 +1,7 @@ +// r2 package is inspired by a proposal in distribution/distribution repository, created by tpoxa +// Discussion: https://github.com/distribution/distribution/pull/3940 +// Revision: https://github.com/container-registry/distribution/tree/r2-multipart-issue +// License at revision: https://github.com/container-registry/distribution/blob/r2-multipart-issue/LICENSE package r2 import ( From 2ba0a87fdc76bc772dbbb6382964fb937e56d220 Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 10 Jan 2024 14:59:54 +0300 Subject: [PATCH 6/7] override r2 secrets from environment --- drivers/r2/driver.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/drivers/r2/driver.go b/drivers/r2/driver.go index 9b5c1a8..6eb18f5 100644 --- a/drivers/r2/driver.go +++ b/drivers/r2/driver.go @@ -12,6 +12,7 @@ import ( "io" "math" "net/http" + "os" "path/filepath" "reflect" "sort" @@ -136,6 +137,16 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { secretKey = "" } + // use environment variables to override secrets + accessKeyEnv := os.Getenv("R2_ACCESS_KEY") + if accessKeyEnv != "" { + accessKey = accessKeyEnv + } + secretKeyEnv := os.Getenv("R2_SECRET_KEY") + if secretKeyEnv != "" { + secretKey = secretKeyEnv + } + regionEndpoint := parameters["regionendpoint"] if regionEndpoint == nil { regionEndpoint = "" From 9487aa2b3576b3b158511d783ffb6407bad2659c Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 10 Jan 2024 15:03:35 +0300 Subject: [PATCH 7/7] fix linter issues --- drivers/r2/driver_test.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/drivers/r2/driver_test.go b/drivers/r2/driver_test.go index 28b2dff..64d06b6 100644 --- a/drivers/r2/driver_test.go +++ b/drivers/r2/driver_test.go @@ -3,7 +3,6 @@ package r2 import ( "bytes" "context" - "fmt" "io" "testing" "time" @@ -141,7 +140,7 @@ func (s *DriverTestSuite) TestStat() { s.r2Client.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any(), gomock.Any()). Return(&s3.ListObjectsV2Output{ Contents: []types.Object{{ - Key: aws.String(fmt.Sprintf("test-path/x")), + Key: aws.String("test-path/x"), Size: aws.Int64(123), LastModified: aws.Time(time.Now()), }}, @@ -157,7 +156,7 @@ func (s *DriverTestSuite) TestList() { Return(&s3.ListObjectsV2Output{ IsTruncated: aws.Bool(false), Contents: []types.Object{{ - Key: aws.String(fmt.Sprintf("test-path/x")), + Key: aws.String("test-path/x"), Size: aws.Int64(123), LastModified: aws.Time(time.Now()), }}, @@ -172,7 +171,7 @@ func (s *DriverTestSuite) TestMove() { s.r2Client.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any(), gomock.Any()). Return(&s3.ListObjectsV2Output{ Contents: []types.Object{{ - Key: aws.String(fmt.Sprintf("test-path/x")), + Key: aws.String("test-path/x"), Size: aws.Int64(123), LastModified: aws.Time(time.Now()), }}, @@ -184,7 +183,7 @@ func (s *DriverTestSuite) TestMove() { s.r2Client.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any(), gomock.Any()). Return(&s3.ListObjectsV2Output{ Contents: []types.Object{{ - Key: aws.String(fmt.Sprintf("test-path/x")), + Key: aws.String("test-path/x"), Size: aws.Int64(123), LastModified: aws.Time(time.Now()), }}, @@ -200,7 +199,7 @@ func (s *DriverTestSuite) TestDelete() { s.r2Client.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any()). Return(&s3.ListObjectsV2Output{ Contents: []types.Object{{ - Key: aws.String(fmt.Sprintf("test-path/x")), + Key: aws.String("test-path/x"), }}, }, nil) s.r2Client.EXPECT().DeleteObjects(gomock.Any(), gomock.Any()).Return(&s3.DeleteObjectsOutput{}, nil) @@ -212,7 +211,7 @@ func (s *DriverTestSuite) TestWalk() { s.r2Client.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any(), gomock.Any()). Return(&s3.ListObjectsV2Output{ Contents: []types.Object{{ - Key: aws.String(fmt.Sprintf("test-path/x")), + Key: aws.String("test-path/x"), Size: aws.Int64(123), LastModified: aws.Time(time.Now()), }},