Skip to content

Commit

Permalink
refactor: updated storeBlobsInS3 func and renamed label to logLabel
Browse files Browse the repository at this point in the history
  • Loading branch information
Monika-Bitfly committed Feb 13, 2025
1 parent fda3ed8 commit 3b1dc89
Showing 1 changed file with 21 additions and 16 deletions.
37 changes: 21 additions & 16 deletions backend/pkg/blobindexer/blobindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ func (bi *BlobIndexer) getBlobSidecarsAtSlot(slot uint64) (*constypes.StandardBl
return blobSidecar, nil
}

func (bi *BlobIndexer) putObjectInS3(key, label string, contentType *string, data []byte, metadata map[string]string) error {
func (bi *BlobIndexer) putObjectInS3(key, logLabel string, contentType *string, data []byte, metadata map[string]string) error {
start := time.Now()
defer func() {
metrics.TaskDuration.WithLabelValues(label).Observe(time.Since(start).Seconds())
metrics.TaskDuration.WithLabelValues(logLabel).Observe(time.Since(start).Seconds())
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
Expand All @@ -253,10 +253,10 @@ func (bi *BlobIndexer) putObjectInS3(key, label string, contentType *string, dat
return nil
}

func (bi *BlobIndexer) getObjectFromS3(key, label string) (*s3.GetObjectOutput, error) {
func (bi *BlobIndexer) getObjectFromS3(key, logLabel string) (*s3.GetObjectOutput, error) {
start := time.Now()
defer func() {
metrics.TaskDuration.WithLabelValues(label).Observe(time.Since(start).Seconds())
metrics.TaskDuration.WithLabelValues(logLabel).Observe(time.Since(start).Seconds())
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand All @@ -271,30 +271,30 @@ func (bi *BlobIndexer) getObjectFromS3(key, label string) (*s3.GetObjectOutput,
// depends on whether you also have the s3:ListBucket permission.
// If you have the s3:ListBucket permission on the bucket, Amazon S3 returns an HTTP status code 404 (Not Found) error.
// If you don’t have the s3:ListBucket permission, Amazon S3 returns an HTTP status code 403 ("access denied") error.
return nil, handleS3Error(err, key, label)
return nil, handleS3Error(err, key, logLabel)
}

return obj, nil
}

func (bi *BlobIndexer) getObjectMetadataFromS3(key, label string) (*s3.HeadObjectOutput, error) {
func (bi *BlobIndexer) getObjectMetadataFromS3(key, logLabel string) error {
start := time.Now()
defer func() {
metrics.TaskDuration.WithLabelValues(label).Observe(time.Since(start).Seconds())
metrics.TaskDuration.WithLabelValues(logLabel).Observe(time.Since(start).Seconds())
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

headObj, err := bi.S3Client.HeadObject(ctx, &s3.HeadObjectInput{
_, err := bi.S3Client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &utils.Config.BlobIndexer.S3.Bucket,
Key: &key,
})
if err != nil {
return nil, handleS3Error(err, key, label)
return handleS3Error(err, key, logLabel)
}

return headObj, nil
return nil
}

func (bi *BlobIndexer) fetchNodeData(ctx context.Context) (*constypes.StandardBeaconHeaderResponse, *constypes.StandardBeaconHeaderResponse, *constypes.StandardSpecResponse, error) {
Expand Down Expand Up @@ -388,12 +388,12 @@ func (bi *BlobIndexer) indexBlobsInBatches(status *BlobIndexerStatus, headHeader
}()

lastIndexedFinalizedBlobSlot := atomic.NewUint64(status.LastIndexedFinalizedBlobSlot)

batchSize := uint64(100)
for batchStart := startSlot; batchStart <= headHeader.Data.Header.Message.Slot; batchStart += batchSize {
batchStartTs := time.Now()

batchEnd := calculateBatchEnd(batchStart, batchSize, headHeader.Data.Header.Message.Slot)

blobsIndexed, err := bi.processBlobs(batchStart, batchEnd, finalizedHeader, lastIndexedFinalizedBlobSlot)
if err != nil {
return err
Expand Down Expand Up @@ -478,14 +478,19 @@ func (bi *BlobIndexer) storeBlobsInS3(blobs []constypes.BlobSidecarsData) error
for _, d := range blobs {
d := d
g.Go(func() error {
return bi.storeBlobInS3(gCtx, d)
select {
case <-gCtx.Done():
return gCtx.Err()
default:
return bi.storeBlobInS3(d)
}
})
}

return g.Wait()
}

func (bi *BlobIndexer) storeBlobInS3(ctx context.Context, blob constypes.BlobSidecarsData) error {
func (bi *BlobIndexer) storeBlobInS3(blob constypes.BlobSidecarsData) error {
versionedBlobHash := fmt.Sprintf("%#x", utils.VersionedBlobHash(blob.KzgCommitment).Bytes())
key := fmt.Sprintf("%s/blobs/%s", bi.networkID, versionedBlobHash)
label := "blobindexer_check_blob"
Expand All @@ -506,7 +511,7 @@ func (bi *BlobIndexer) storeBlobInS3(ctx context.Context, blob constypes.BlobSid

// check if the blob exists in S3
if enableCheckingBeforePutting {
if _, err := bi.getObjectMetadataFromS3(key, label); err != nil {
if err := bi.getObjectMetadataFromS3(key, label); err != nil {
return err
}
}
Expand All @@ -521,12 +526,12 @@ func (bi *BlobIndexer) storeBlobInS3(ctx context.Context, blob constypes.BlobSid
return nil
}

func handleS3Error(err error, key, label string) error {
func handleS3Error(err error, key, logLabel string) error {
var httpResponseErr *awshttp.ResponseError
if errors.As(err, &httpResponseErr) && (httpResponseErr.HTTPStatusCode() == http.StatusNotFound || httpResponseErr.HTTPStatusCode() == http.StatusForbidden) {
return nil
}
return fmt.Errorf("S3 error for %s with key %s: %w", label, key, err)
return fmt.Errorf("S3 error for %s with key %s: %w", logLabel, key, err)
}

func updateIndexerStatus(batchEnd uint64, finalizedHeader *constypes.StandardBeaconHeaderResponse, lastIndexedFinalizedBlobSlot *atomic.Uint64, status *BlobIndexerStatus, bi *BlobIndexer) BlobIndexerStatus {
Expand Down

0 comments on commit 3b1dc89

Please sign in to comment.