Skip to content

Commit

Permalink
refactor: replace aws S3Client with blobstore inside blobindexer.go
Browse files Browse the repository at this point in the history
  • Loading branch information
Monika-Bitfly committed Feb 13, 2025
1 parent 8d1d654 commit fb4a5a4
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 36 deletions.
64 changes: 28 additions & 36 deletions backend/pkg/blobindexer/blobindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/db2/blobstore"
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
"github.com/gobitfly/beaconchain/pkg/commons/services"
Expand All @@ -23,11 +23,9 @@ import (
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"

"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
awss3 "github.com/gobitfly/beaconchain/pkg/blobindexer/s3"
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/sync/errgroup"
)
Expand All @@ -36,7 +34,7 @@ var enableCheckingBeforePutting = false
var waitForOtherBlobIndexerDuration = time.Second * 60

type BlobIndexer struct {
S3Client awss3.S3Client
BlobStore *blobstore.S3BlobStore
running bool
runningMu *sync.Mutex
clEndpoint string
Expand Down Expand Up @@ -72,14 +70,15 @@ func NewBlobIndexer() (*BlobIndexer, error) {
o.BaseEndpoint = aws.String(utils.Config.BlobIndexer.S3.Endpoint)
})

blobStore := blobstore.NewS3BlobStore(s3Client)
writtenBlobsCache, err := lru.New[string, bool](1000)
if err != nil {
return nil, err
}

id := utils.GetUUID()
bi := &BlobIndexer{
S3Client: s3Client,
BlobStore: blobStore,
runningMu: &sync.Mutex{},
clEndpoint: "http://" + utils.Config.Indexer.Node.Host + ":" + utils.Config.Indexer.Node.Port,
cl: consapi.NewClient("http://" + utils.Config.Indexer.Node.Host + ":" + utils.Config.Indexer.Node.Port),
Expand Down Expand Up @@ -238,22 +237,25 @@ func (bi *BlobIndexer) putObjectInS3(key, logLabel string, contentType *string,
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()

_, err := bi.S3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &utils.Config.BlobIndexer.S3.Bucket,
Key: &key,
blob := blobstore.Blob{
Body: bytes.NewReader(data),
ContentType: contentType,
Metadata: metadata,
})
}

err := bi.BlobStore.Put(ctx,
utils.Config.BlobIndexer.S3.Bucket,
key,
blob)

if err != nil {
return err
return fmt.Errorf("error putting object in S3 with key %s: %w", key, err)
}

return nil
}

func (bi *BlobIndexer) getObjectFromS3(key, logLabel string) (*s3.GetObjectOutput, error) {
func (bi *BlobIndexer) getObjectFromS3(key, logLabel string) (blobstore.Blob, error) {
start := time.Now()
defer func() {
metrics.TaskDuration.WithLabelValues(logLabel).Observe(time.Since(start).Seconds())
Expand All @@ -262,22 +264,17 @@ func (bi *BlobIndexer) getObjectFromS3(key, logLabel string) (*s3.GetObjectOutpu
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

obj, err := bi.S3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &utils.Config.BlobIndexer.S3.Bucket,
Key: &key,
})
blob, err := bi.BlobStore.Get(ctx,
utils.Config.BlobIndexer.S3.Bucket,
key)
if err != nil {
// If the object that you request doesn’t exist, the error that Amazon S3 returns
// depends on whether you also have the s3:ListBucket permission.
// If you have the s3:ListBucket permission on the bucket, Amazon S3 returns an HTTP status code 404 (Not Found) error.
// If you don’t have the s3:ListBucket permission, Amazon S3 returns an HTTP status code 403 ("access denied") error.
return nil, handleS3Error(err, key, logLabel)
return blobstore.Blob{}, fmt.Errorf("error getting object from S3 with key %s: %w", key, err)
}

return obj, nil
return blob, nil
}

func (bi *BlobIndexer) getObjectMetadataFromS3(key, logLabel string) error {
func (bi *BlobIndexer) checkIfObjectExistsInS3(key, logLabel string) error {
start := time.Now()
defer func() {
metrics.TaskDuration.WithLabelValues(logLabel).Observe(time.Since(start).Seconds())
Expand All @@ -286,12 +283,15 @@ func (bi *BlobIndexer) getObjectMetadataFromS3(key, logLabel string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

_, err := bi.S3Client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &utils.Config.BlobIndexer.S3.Bucket,
Key: &key,
})
exists, err := bi.BlobStore.Exist(ctx,
utils.Config.BlobIndexer.S3.Bucket,
key)
if err != nil {
return handleS3Error(err, key, logLabel)
return fmt.Errorf("error checking object metadata in S3 with key %s: %w", key, err)
}

if !exists {
return fmt.Errorf("object with key %s does not exist", key)
}

return nil
Expand Down Expand Up @@ -511,7 +511,7 @@ func (bi *BlobIndexer) storeBlobInS3(blob constypes.BlobSidecarsData) error {

// check if the blob exists in S3
if enableCheckingBeforePutting {
if err := bi.getObjectMetadataFromS3(key, label); err != nil {
if err := bi.checkIfObjectExistsInS3(key, label); err != nil {
return err
}
}
Expand All @@ -526,14 +526,6 @@ func (bi *BlobIndexer) storeBlobInS3(blob constypes.BlobSidecarsData) error {
return nil
}

func handleS3Error(err error, key, logLabel string) error {
var httpResponseErr *awshttp.ResponseError
if errors.As(err, &httpResponseErr) && (httpResponseErr.HTTPStatusCode() == http.StatusNotFound || httpResponseErr.HTTPStatusCode() == http.StatusForbidden) {
return nil
}
return fmt.Errorf("S3 error for %s with key %s: %w", logLabel, key, err)
}

func updateIndexerStatus(batchEnd uint64, finalizedHeader *constypes.StandardBeaconHeaderResponse, lastIndexedFinalizedBlobSlot *atomic.Uint64, status *BlobIndexerStatus, bi *BlobIndexer) BlobIndexerStatus {
lastIndexedFinalizedSlot := getLastIndexedFinalizedSlot(batchEnd, finalizedHeader)
newBlobIndexerStatus := BlobIndexerStatus{
Expand Down
4 changes: 4 additions & 0 deletions backend/pkg/commons/db2/blobstore/s3blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (s *S3BlobStore) Get(ctx context.Context, bucket string, key string) (Blob,
Key: &key,
})
if err != nil {
// If the object that you request doesn’t exist, the error that Amazon S3 returns
// depends on whether you also have the s3:ListBucket permission.
// If you have the s3:ListBucket permission on the bucket, Amazon S3 returns an HTTP status code 404 (Not Found) error.
// If you don’t have the s3:ListBucket permission, Amazon S3 returns an HTTP status code 403 ("access denied") error.
var httpResponseErr *awshttp.ResponseError
if errors.As(err, &httpResponseErr) &&
(httpResponseErr.HTTPStatusCode() == http.StatusNotFound ||
Expand Down

0 comments on commit fb4a5a4

Please sign in to comment.