diff --git a/disperser/apiserver/disperse_blob_v2.go b/disperser/apiserver/disperse_blob_v2.go index c53c1bc66..5b79c2417 100644 --- a/disperser/apiserver/disperse_blob_v2.go +++ b/disperser/apiserver/disperse_blob_v2.go @@ -4,15 +4,18 @@ import ( "context" "errors" "fmt" + "math/big" "time" "github.com/Layr-Labs/eigenda/api" pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2" + "github.com/Layr-Labs/eigenda/core" corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser/common" dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/rs" + "github.com/ethereum/go-ethereum/common/hexutil" ) func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBlobRequest) (*pb.DisperseBlobReply, error) { @@ -40,7 +43,16 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl if err != nil { return nil, api.NewErrorInternal(err.Error()) } - s.logger.Debug("received a new blob dispersal request", "blobSizeBytes", len(data), "quorums", req.GetBlobHeader().GetQuorumNumbers()) + + publicKeyBytes, err := hexutil.Decode(blobHeader.PaymentMetadata.AccountID) + if err != nil { + return nil, api.NewErrorInternal(fmt.Sprintf("failed to decode public key: %v", err)) + } + s.logger.Debug("received a new blob dispersal request", + "accountId", publicKeyBytes, + "blobSizeBytes", len(data), + "quorums", req.GetBlobHeader().GetQuorumNumbers(), + ) blobKey, err := s.StoreBlob(ctx, data, blobHeader, time.Now(), onchainState.TTL) if err != nil { @@ -136,34 +148,30 @@ func (s *DispersalServerV2) validateDispersalRequest(ctx context.Context, req *p if err != nil { return api.NewErrorInvalidArg(fmt.Sprintf("invalid blob header: %s", err.Error())) } - // TODO(ian-shim): enable this check for authentication - // if blobHeader.PaymentMetadata == nil { - // return api.NewErrorInvalidArg("payment metadata is required") - // } - // if err = s.authenticator.AuthenticateBlobRequest(blobHeader); err != nil { - // return api.NewErrorInvalidArg(fmt.Sprintf("authentication failed: %s", err.Error())) - // } - // TODO(ian-shim): enable this check when we have payment metadata + authentication in disperser client - // if len(blobHeader.PaymentMetadata.AccountID) == 0 || blobHeader.PaymentMetadata.ReservationPeriod == 0 || blobHeader.PaymentMetadata.CumulativePayment == nil { - // return api.NewErrorInvalidArg("invalid payment metadata") - // } + if err = s.authenticator.AuthenticateBlobRequest(blobHeader); err != nil { + return api.NewErrorInvalidArg(fmt.Sprintf("authentication failed: %s", err.Error())) + } + + if len(blobHeader.PaymentMetadata.AccountID) == 0 || blobHeader.PaymentMetadata.ReservationPeriod == 0 || blobHeader.PaymentMetadata.CumulativePayment == nil { + return api.NewErrorInvalidArg("invalid payment metadata") + } // handle payments and check rate limits - // reservationPeriod := blobHeaderProto.GetPaymentHeader().GetReservationPeriod() - // cumulativePayment := new(big.Int).SetBytes(blobHeaderProto.GetPaymentHeader().GetCumulativePayment()) - // accountID := blobHeaderProto.GetPaymentHeader().GetAccountId() - - // paymentHeader := core.PaymentMetadata{ - // AccountID: accountID, - // ReservationPeriod: reservationPeriod, - // CumulativePayment: cumulativePayment, - // } - - // err := s.meterer.MeterRequest(ctx, paymentHeader, blobLength, blobHeader.QuorumNumbers) - // if err != nil { - // return api.NewErrorResourceExhausted(err.Error()) - // } + reservationPeriod := blobHeaderProto.GetPaymentHeader().GetReservationPeriod() + cumulativePayment := new(big.Int).SetBytes(blobHeaderProto.GetPaymentHeader().GetCumulativePayment()) + accountID := blobHeaderProto.GetPaymentHeader().GetAccountId() + + paymentHeader := core.PaymentMetadata{ + AccountID: accountID, + ReservationPeriod: reservationPeriod, + CumulativePayment: cumulativePayment, + } + + err = s.meterer.MeterRequest(ctx, paymentHeader, blobLength, blobHeader.QuorumNumbers) + if err != nil { + return api.NewErrorResourceExhausted(err.Error()) + } commitments, err := s.prover.GetCommitmentsForPaddedLength(data) if err != nil { diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 2659f0c87..5775211a0 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -129,6 +129,14 @@ func RunDisperserServer(ctx *cli.Context) error { logger, // metrics.NewNoopMetrics(), ) + + logger.Info("Enabled payment meterer", + "interval", config.UpdateInterval, + "chainReadTimeout", config.ChainReadTimeout, + "reservationsTable", config.ReservationsTableName, + "onDemandTable", config.OnDemandTableName, + "globalRateTable", config.GlobalRateTableName, + ) } var ratelimiter common.RateLimiter