Skip to content

Commit

Permalink
fix e2e test flakiness and remove upload path rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
canercidam committed Jan 3, 2024
1 parent c1692a6 commit ae5adae
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 40 deletions.
19 changes: 0 additions & 19 deletions drivers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,11 @@ package drivers

import (
"context"
"strings"

storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/forta-network/disco/drivers/multidriver"
)

// FixUploadPath rewrites .../repository/<name>/_uploads to .../uploads to make things easier.
func FixUploadPath(path string) string {
if !strings.Contains(path, "/_uploads") {
return path
}
newPath := "/docker/registry/v2/uploads"
var append bool
for _, segment := range strings.Split(path, "/") {
if append {
newPath += "/" + segment
}
if segment == "_uploads" {
append = true
}
}
return newPath
}

// Copy copies from src to dst.
func Copy(ctx context.Context, driver storagedriver.StorageDriver, src, dst string) (storagedriver.FileInfo, error) {
return multidriver.Replicate(ctx, driver, driver, src, dst, true)
Expand Down
11 changes: 0 additions & 11 deletions drivers/ipfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
"github.com/forta-network/disco/config"
"github.com/forta-network/disco/deps"
"github.com/forta-network/disco/drivers"
"github.com/forta-network/disco/drivers/filewriter"
"github.com/forta-network/disco/drivers/multidriver"
"github.com/forta-network/disco/interfaces"
Expand Down Expand Up @@ -114,7 +113,6 @@ func (d *driver) Name() string {

// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
path = drivers.FixUploadPath(path)
readCloser, err := d.Reader(ctx, path, 0)
if err != nil {
return nil, err
Expand All @@ -125,14 +123,12 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {

// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
path = drivers.FixUploadPath(path)
return d.api.FilesWrite(ctx, path, bytes.NewBuffer(contents), ipfsapi.FilesWrite.Create(true), ipfsapi.FilesWrite.Parents(true))
}

// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
path = drivers.FixUploadPath(path)
reader, err := d.api.FilesRead(ctx, path, ipfsapi.FilesRead.Offset(offset))
if err != nil && isNotFoundErr(err) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
Expand All @@ -146,7 +142,6 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func (d *driver) Writer(ctx context.Context, path string, shouldAppend bool) (storagedriver.FileWriter, error) {
path = drivers.FixUploadPath(path)
fileOpts := []ipfsapi.FilesOpt{ipfsapi.FilesWrite.Create(true), ipfsapi.FilesWrite.Parents(true)}
var offset int64
if shouldAppend {
Expand Down Expand Up @@ -180,7 +175,6 @@ func isNotFoundErr(err error) bool {
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
path = drivers.FixUploadPath(path)
stat, err := d.api.FilesStat(ctx, path)
if err != nil && isNotFoundErr(err) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
Expand All @@ -193,7 +187,6 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,

// List returns a list of the objects that are direct descendants of the given path.
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
path = drivers.FixUploadPath(path)
results, err := d.api.FilesLs(ctx, path)
if err != nil && isNotFoundErr(err) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
Expand All @@ -210,8 +203,6 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) {

// Move moves an object stored at sourcePath to destPath, removing the original object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
sourcePath = drivers.FixUploadPath(sourcePath)
destPath = drivers.FixUploadPath(destPath)
folderPath := destPath[:strings.LastIndex(destPath, "/")+1]
if err := d.api.FilesMkdir(ctx, folderPath, ipfsapi.FilesMkdir.Parents(true)); err != nil {
return err
Expand All @@ -221,7 +212,6 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e

// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(ctx context.Context, path string) error {
path = drivers.FixUploadPath(path)
return d.api.FilesRm(ctx, path, true)
}

Expand All @@ -235,6 +225,5 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file.
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
path = drivers.FixUploadPath(path)
return storagedriver.WalkFallback(ctx, d, path, f)
}
53 changes: 43 additions & 10 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"bytes"
"context"
"fmt"
"io/fs"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"testing"
"time"

Expand All @@ -20,8 +23,8 @@ var (
processStartWaitSeconds = 60
pushImageRef = "localhost:1970/test"

expectedImageSha = "35ff92bfc7e822eab96fe3d712164f6b547c3acffc8691b80528d334283849ab"
expectedImageCid = "bafybeihfub2ktzp6a77zrihiwf6c2hex3nwxd7zl7u6tj3ueu5kstqk4ii"
expectedImageSha = "35ff92bfc7e822eab96fe3d712164f6b547c3acffc8691b80528d334283849ab"

expectedImageCidCacheOnly = "bafybeibv76jl7r7ielvls37d24jbmt3lkr6dvt74q2i3qbji2m2cqocjvm"

unexpectedImageCid = "bafybeielvnt5apaxbk6chthc4dc3p6vscpx3ai4uvti7gwh253j7facsxu"
Expand All @@ -39,8 +42,6 @@ var (
expectedLayerBlob2 = "/docker/registry/v2/blobs/sha256/d9/d96e79a5881296813985815a1fa73e2441e72769541b1fb32a0e14f2acf4d659/data"

expectedCidTagCacheOnly = path.Join(reposPath, expectedImageSha, "_manifests", "tags", expectedImageCidCacheOnly)

cidImageRef = path.Join("localhost:1970", expectedImageCid)
)

type E2ETestSuite struct {
Expand Down Expand Up @@ -170,16 +171,40 @@ func (s *E2ETestSuite) verifyFiles() {
s.r.FailNow("no cid repos found in ipfs")
}

func getImageCid() (foundCid string) {
filepath.WalkDir("testdir/cache", func(currPath string, d fs.DirEntry, err error) error {
if len(foundCid) > 0 {
return filepath.SkipAll

Check failure on line 177 in e2e/e2e_test.go

View workflow job for this annotation

GitHub Actions / Update coverage badge

undefined: filepath.SkipAll

Check failure on line 177 in e2e/e2e_test.go

View workflow job for this annotation

GitHub Actions / build

undefined: filepath.SkipAll
}
if strings.Contains(currPath, "bafybei") {
for _, segment := range strings.Split(currPath, "/") {
if strings.Contains(segment, "bafybei") {
foundCid = segment
return nil
}
}
}
return nil
})
return
}

func getImagePullRef(imageName string) string {
return path.Join("localhost:1970", imageName)
}

func (s *E2ETestSuite) TestPurgeIPFS_Pull() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())
imageCid := getImageCid()
imageCidPullRef := getImagePullRef(imageCid)

// delete from ipfs (primary store)
s.startCleanIpfs()

// pull
s.r.NoError(exec.Command("docker", "pull", cidImageRef).Run())
s.r.NoError(exec.Command("docker", "pull", imageCidPullRef).Run())

// it was able to pull without needing ipfs
_, err := s.ipfsClient1.FilesStat(context.Background(), "/docker")
Expand All @@ -190,6 +215,8 @@ func (s *E2ETestSuite) TestPurgeIPFS_PushAgainPull() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())
imageCid := getImageCid()
imageCidPullRef := getImagePullRef(imageCid)

// delete from ipfs (primary store)
s.startCleanIpfs()
Expand All @@ -199,25 +226,29 @@ func (s *E2ETestSuite) TestPurgeIPFS_PushAgainPull() {

s.verifyFiles()

s.r.NoError(exec.Command("docker", "pull", cidImageRef).Run())
s.r.NoError(exec.Command("docker", "pull", imageCidPullRef).Run())
}

func (s *E2ETestSuite) TestPurgeCache_Pull() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())
imageCid := getImageCid()
imageCidPullRef := getImagePullRef(imageCid)

// delete from filestore (secondary store)
s.r.NoError(os.RemoveAll("testdir/cache"))

// pull
s.r.NoError(exec.Command("docker", "pull", cidImageRef).Run())
s.r.NoError(exec.Command("docker", "pull", imageCidPullRef).Run())
}

func (s *E2ETestSuite) TestPurgeCache_PushAgainPull() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())
imageCid := getImageCid()
imageCidPullRef := getImagePullRef(imageCid)

// delete from filestore (secondary store)
s.r.NoError(os.RemoveAll("testdir/cache"))
Expand All @@ -227,19 +258,21 @@ func (s *E2ETestSuite) TestPurgeCache_PushAgainPull() {

s.verifyFiles()

s.r.NoError(exec.Command("docker", "pull", cidImageRef).Run())
s.r.NoError(exec.Command("docker", "pull", imageCidPullRef).Run())
}

func (s *E2ETestSuite) TestPurgeCache_MissingCidRepo() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())
imageCid := getImageCid()
imageCidPullRef := getImagePullRef(imageCid)

// delete the cid repo from filestore (secondary store)
s.r.NoError(os.RemoveAll(path.Join("testdir/cache/docker/registry/v2/repositories", expectedImageCid)))
s.r.NoError(os.RemoveAll(path.Join("testdir/cache/docker/registry/v2/repositories", imageCid)))

// pull should replicate
s.r.NoError(exec.Command("docker", "pull", cidImageRef).Run())
s.r.NoError(exec.Command("docker", "pull", imageCidPullRef).Run())

s.verifyFiles()
}
Expand Down

0 comments on commit ae5adae

Please sign in to comment.