Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: metrics on meterer usage #1212

Merged
merged 4 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions core/meterer/meterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,34 @@ func (m *Meterer) Start(ctx context.Context) {

// MeterRequest validates a blob header and adds it to the meterer's state
// TODO: return error if there's a rejection (with reasoning) or internal error (should be very rare)
func (m *Meterer) MeterRequest(ctx context.Context, header core.PaymentMetadata, numSymbols uint, quorumNumbers []uint8) error {
func (m *Meterer) MeterRequest(ctx context.Context, header core.PaymentMetadata, numSymbols uint, quorumNumbers []uint8) (uint32, error) {
accountID := gethcommon.HexToAddress(header.AccountID)
symbolsCharged := m.SymbolsCharged(numSymbols)
m.logger.Info("Validating incoming request's payment metadata", "paymentMetadata", header, "numSymbols", numSymbols, "quorumNumbers", quorumNumbers)
// Validate against the payment method
if header.CumulativePayment.Sign() == 0 {
reservation, err := m.ChainPaymentState.GetReservedPaymentByAccount(ctx, accountID)
if err != nil {
return fmt.Errorf("failed to get active reservation by account: %w", err)
return 0, fmt.Errorf("failed to get active reservation by account: %w", err)
}
if err := m.ServeReservationRequest(ctx, header, reservation, numSymbols, quorumNumbers); err != nil {
return fmt.Errorf("invalid reservation: %w", err)
if err := m.ServeReservationRequest(ctx, header, reservation, symbolsCharged, quorumNumbers); err != nil {
return 0, fmt.Errorf("invalid reservation: %w", err)
}
} else {
onDemandPayment, err := m.ChainPaymentState.GetOnDemandPaymentByAccount(ctx, accountID)
if err != nil {
return fmt.Errorf("failed to get on-demand payment by account: %w", err)
return 0, fmt.Errorf("failed to get on-demand payment by account: %w", err)
}
if err := m.ServeOnDemandRequest(ctx, header, onDemandPayment, numSymbols, quorumNumbers); err != nil {
return fmt.Errorf("invalid on-demand request: %w", err)
if err := m.ServeOnDemandRequest(ctx, header, onDemandPayment, symbolsCharged, quorumNumbers); err != nil {
return 0, fmt.Errorf("invalid on-demand request: %w", err)
}
}

return nil
return symbolsCharged, nil
}

// ServeReservationRequest handles the rate limiting logic for incoming requests
func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.PaymentMetadata, reservation *core.ReservedPayment, numSymbols uint, quorumNumbers []uint8) error {
func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.PaymentMetadata, reservation *core.ReservedPayment, symbolsCharged uint32, quorumNumbers []uint8) error {
m.logger.Info("Recording and validating reservation usage", "header", header, "reservation", reservation)
if !reservation.IsActive(uint64(time.Now().Unix())) {
return fmt.Errorf("reservation not active")
Expand All @@ -112,7 +113,7 @@ func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.Payme
}

// Update bin usage atomically and check against reservation's data rate as the bin limit
if err := m.IncrementBinUsage(ctx, header, reservation, numSymbols); err != nil {
if err := m.IncrementBinUsage(ctx, header, reservation, symbolsCharged); err != nil {
return fmt.Errorf("bin overflows: %w", err)
}

Expand Down Expand Up @@ -151,8 +152,7 @@ func (m *Meterer) ValidateReservationPeriod(header core.PaymentMetadata, reserva
}

// IncrementBinUsage increments the bin usage atomically and checks for overflow
func (m *Meterer) IncrementBinUsage(ctx context.Context, header core.PaymentMetadata, reservation *core.ReservedPayment, numSymbols uint) error {
symbolsCharged := m.SymbolsCharged(numSymbols)
func (m *Meterer) IncrementBinUsage(ctx context.Context, header core.PaymentMetadata, reservation *core.ReservedPayment, symbolsCharged uint32) error {
newUsage, err := m.OffchainStore.UpdateReservationBin(ctx, header.AccountID, uint64(header.ReservationPeriod), uint64(symbolsCharged))
if err != nil {
return fmt.Errorf("failed to increment bin usage: %w", err)
Expand Down Expand Up @@ -188,7 +188,7 @@ func GetReservationPeriod(timestamp uint64, binInterval uint32) uint32 {
// ServeOnDemandRequest handles the rate limiting logic for incoming requests
// On-demand requests doesn't have additional quorum settings and should only be
// allowed by ETH and EIGEN quorums
func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, numSymbols uint, headerQuorums []uint8) error {
func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, symbolsCharged uint32, headerQuorums []uint8) error {
m.logger.Info("Recording and validating on-demand usage", "header", header, "onDemandPayment", onDemandPayment)
quorumNumbers, err := m.ChainPaymentState.GetOnDemandQuorumNumbers(ctx)
if err != nil {
Expand All @@ -198,14 +198,13 @@ func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentM
if err := m.ValidateQuorum(headerQuorums, quorumNumbers); err != nil {
return fmt.Errorf("invalid quorum for On-Demand Request: %w", err)
}
// update blob header to use the miniumum chargeable size
symbolsCharged := m.SymbolsCharged(numSymbols)

err = m.OffchainStore.AddOnDemandPayment(ctx, header, symbolsCharged)
if err != nil {
return fmt.Errorf("failed to update cumulative payment: %w", err)
}
// Validate payments attached
err = m.ValidatePayment(ctx, header, onDemandPayment, numSymbols)
err = m.ValidatePayment(ctx, header, onDemandPayment, symbolsCharged)
if err != nil {
// No tolerance for incorrect payment amounts; no rollbacks
return fmt.Errorf("invalid on-demand payment: %w", err)
Expand All @@ -231,7 +230,7 @@ func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentM
// prevPmt + PaymentMetadata.numSymbols * m.FixedFeePerByte
// <= PaymentMetadata.CumulativePayment
// <= nextPmt - nextPmtnumSymbols * m.FixedFeePerByte > nextPmt
func (m *Meterer) ValidatePayment(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, numSymbols uint) error {
func (m *Meterer) ValidatePayment(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, symbolsCharged uint32) error {
if header.CumulativePayment.Cmp(onDemandPayment.CumulativePayment) > 0 {
return fmt.Errorf("request claims a cumulative payment greater than the on-chain deposit")
}
Expand All @@ -241,7 +240,7 @@ func (m *Meterer) ValidatePayment(ctx context.Context, header core.PaymentMetada
return fmt.Errorf("failed to get relevant on-demand records: %w", err)
}
// the current request must increment cumulative payment by a magnitude sufficient to cover the blob size
if prevPmt.Add(prevPmt, m.PaymentCharged(numSymbols)).Cmp(header.CumulativePayment) > 0 {
if prevPmt.Add(prevPmt, m.PaymentCharged(uint(symbolsCharged))).Cmp(header.CumulativePayment) > 0 {
return fmt.Errorf("insufficient cumulative payment increment")
}
// the current request must not break the payment magnitude for the next payment if the two requests were delivered out-of-order
Expand All @@ -254,6 +253,7 @@ func (m *Meterer) ValidatePayment(ctx context.Context, header core.PaymentMetada

// PaymentCharged returns the chargeable price for a given data length
func (m *Meterer) PaymentCharged(numSymbols uint) *big.Int {
// symbolsCharged == m.SymbolsCharged(numSymbols) if numSymbols is already a multiple of MinNumSymbols
symbolsCharged := big.NewInt(int64(m.SymbolsCharged(numSymbols)))
pricePerSymbol := big.NewInt(int64(m.ChainPaymentState.GetPricePerSymbol()))
return symbolsCharged.Mul(symbolsCharged, pricePerSymbol)
Expand Down
43 changes: 23 additions & 20 deletions core/meterer/meterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,16 @@ func TestMetererReservations(t *testing.T) {

// test invalid quorom ID
header := createPaymentHeader(1, big.NewInt(0), accountID1)
err := mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
_, err := mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
assert.ErrorContains(t, err, "quorum number mismatch")

// overwhelming bin overflow for empty bins
header = createPaymentHeader(reservationPeriod-1, big.NewInt(0), accountID2)
err = mt.MeterRequest(ctx, *header, 10, quoromNumbers)
_, err = mt.MeterRequest(ctx, *header, 10, quoromNumbers)
assert.NoError(t, err)
// overwhelming bin overflow for empty bins
header = createPaymentHeader(reservationPeriod-1, big.NewInt(0), accountID2)
err = mt.MeterRequest(ctx, *header, 1000, quoromNumbers)
_, err = mt.MeterRequest(ctx, *header, 1000, quoromNumbers)
assert.ErrorContains(t, err, "overflow usage exceeds bin limit")

// test non-existent account
Expand All @@ -217,41 +217,42 @@ func TestMetererReservations(t *testing.T) {
}
header = createPaymentHeader(1, big.NewInt(0), crypto.PubkeyToAddress(unregisteredUser.PublicKey))
assert.NoError(t, err)
err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
_, err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
assert.ErrorContains(t, err, "failed to get active reservation by account: reservation not found")

// test inactive reservation
header = createPaymentHeader(reservationPeriod, big.NewInt(0), accountID3)
err = mt.MeterRequest(ctx, *header, 1000, []uint8{0})
_, err = mt.MeterRequest(ctx, *header, 1000, []uint8{0})
assert.ErrorContains(t, err, "reservation not active")

// test invalid reservation period
header = createPaymentHeader(reservationPeriod-3, big.NewInt(0), accountID1)
err = mt.MeterRequest(ctx, *header, 2000, quoromNumbers)
_, err = mt.MeterRequest(ctx, *header, 2000, quoromNumbers)
assert.ErrorContains(t, err, "invalid reservation period for reservation")

// test bin usage metering
symbolLength := uint(20)
requiredLength := uint(21) // 21 should be charged for length of 20 since minNumSymbols is 3
for i := 0; i < 9; i++ {
header = createPaymentHeader(reservationPeriod, big.NewInt(0), accountID2)
err = mt.MeterRequest(ctx, *header, symbolLength, quoromNumbers)
symbolsCharged, err := mt.MeterRequest(ctx, *header, symbolLength, quoromNumbers)
assert.NoError(t, err)
item, err := dynamoClient.GetItem(ctx, reservationTableName, commondynamodb.Key{
"AccountID": &types.AttributeValueMemberS{Value: accountID2.Hex()},
"ReservationPeriod": &types.AttributeValueMemberN{Value: strconv.Itoa(int(reservationPeriod))},
})
assert.NoError(t, err)
assert.Equal(t, uint32(requiredLength), symbolsCharged)
assert.Equal(t, accountID2.Hex(), item["AccountID"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, strconv.Itoa(int(reservationPeriod)), item["ReservationPeriod"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, strconv.Itoa((i+1)*int(requiredLength)), item["BinUsage"].(*types.AttributeValueMemberN).Value)

}
// first over flow is allowed
header = createPaymentHeader(reservationPeriod, big.NewInt(0), accountID2)
symbolsCharged, err := mt.MeterRequest(ctx, *header, 25, quoromNumbers)
assert.NoError(t, err)
err = mt.MeterRequest(ctx, *header, 25, quoromNumbers)
assert.NoError(t, err)
assert.Equal(t, uint32(27), symbolsCharged)
overflowedReservationPeriod := reservationPeriod + 2
item, err := dynamoClient.GetItem(ctx, reservationTableName, commondynamodb.Key{
"AccountID": &types.AttributeValueMemberS{Value: accountID2.Hex()},
Expand All @@ -266,7 +267,7 @@ func TestMetererReservations(t *testing.T) {
// second over flow
header = createPaymentHeader(reservationPeriod, big.NewInt(0), accountID2)
assert.NoError(t, err)
err = mt.MeterRequest(ctx, *header, 1, quoromNumbers)
_, err = mt.MeterRequest(ctx, *header, 1, quoromNumbers)
assert.ErrorContains(t, err, "bin has already been filled")
}

Expand All @@ -293,17 +294,17 @@ func TestMetererOnDemand(t *testing.T) {
}
header := createPaymentHeader(reservationPeriod, big.NewInt(2), crypto.PubkeyToAddress(unregisteredUser.PublicKey))
assert.NoError(t, err)
err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers)
assert.ErrorContains(t, err, "failed to get on-demand payment by account: payment not found")

// test invalid quorom ID
header = createPaymentHeader(reservationPeriod, big.NewInt(2), accountID1)
err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
_, err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
assert.ErrorContains(t, err, "invalid quorum for On-Demand Request")

// test insufficient cumulative payment
header = createPaymentHeader(reservationPeriod, big.NewInt(1), accountID1)
err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers)
assert.ErrorContains(t, err, "insufficient cumulative payment increment")
// No rollback after meter request
result, err := dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{
Expand All @@ -318,36 +319,38 @@ func TestMetererOnDemand(t *testing.T) {
priceCharged := mt.PaymentCharged(symbolLength)
assert.Equal(t, big.NewInt(int64(102*mt.ChainPaymentState.GetPricePerSymbol())), priceCharged)
header = createPaymentHeader(reservationPeriod, priceCharged, accountID2)
err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
symbolsCharged, err := mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
assert.NoError(t, err)
assert.Equal(t, uint32(102), symbolsCharged)
header = createPaymentHeader(reservationPeriod, priceCharged, accountID2)
err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
assert.ErrorContains(t, err, "exact payment already exists")

// test valid payments
for i := 1; i < 9; i++ {
header = createPaymentHeader(reservationPeriod, new(big.Int).Mul(priceCharged, big.NewInt(int64(i+1))), accountID2)
err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
symbolsCharged, err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
assert.NoError(t, err)
assert.Equal(t, uint32(102), symbolsCharged)
}

// test cumulative payment on-chain constraint
header = createPaymentHeader(reservationPeriod, big.NewInt(2023), accountID2)
err = mt.MeterRequest(ctx, *header, 1, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, 1, quorumNumbers)
assert.ErrorContains(t, err, "invalid on-demand payment: request claims a cumulative payment greater than the on-chain deposit")

// test insufficient increment in cumulative payment
previousCumulativePayment := priceCharged.Mul(priceCharged, big.NewInt(9))
symbolLength = uint(2)
priceCharged = mt.PaymentCharged(symbolLength)
header = createPaymentHeader(reservationPeriod, big.NewInt(0).Add(previousCumulativePayment, big.NewInt(0).Sub(priceCharged, big.NewInt(1))), accountID2)
err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
assert.ErrorContains(t, err, "invalid on-demand payment: insufficient cumulative payment increment")
previousCumulativePayment = big.NewInt(0).Add(previousCumulativePayment, priceCharged)

// test cannot insert cumulative payment in out of order
header = createPaymentHeader(reservationPeriod, mt.PaymentCharged(50), accountID2)
err = mt.MeterRequest(ctx, *header, 50, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, 50, quorumNumbers)
assert.ErrorContains(t, err, "invalid on-demand payment: breaking cumulative payment invariants")

numPrevRecords := 12
Expand All @@ -359,7 +362,7 @@ func TestMetererOnDemand(t *testing.T) {
assert.Equal(t, numPrevRecords, len(result))
// test failed global rate limit (previously payment recorded: 2, global limit: 1009)
header = createPaymentHeader(reservationPeriod, big.NewInt(0).Add(previousCumulativePayment, mt.PaymentCharged(1010)), accountID1)
err = mt.MeterRequest(ctx, *header, 1010, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, 1010, quorumNumbers)
assert.ErrorContains(t, err, "failed global rate limiting")
// Correct rollback
result, err = dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{
Expand Down
9 changes: 4 additions & 5 deletions disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
if onchainState == nil {
return nil, api.NewErrorInternal("onchain state is nil")
}
if err := s.validateDispersalRequest(ctx, req, onchainState); err != nil {
if err := s.validateDispersalRequest(req, onchainState); err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to validate the request: %v", err))
}

Expand All @@ -40,9 +40,8 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
finishedValidation := time.Now()
s.metrics.reportValidateDispersalRequestLatency(finishedValidation.Sub(start))

s.metrics.reportDisperseBlobSize(len(req.GetBlob()))

blob := req.GetBlob()
s.metrics.reportDisperseBlobSize(len(blob))
blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader())
if err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to parse the blob header proto: %v", err))
Expand Down Expand Up @@ -118,16 +117,16 @@ func (s *DispersalServerV2) checkPaymentMeter(ctx context.Context, req *pb.Dispe
CumulativePayment: cumulativePayment,
}

err = s.meterer.MeterRequest(ctx, paymentHeader, blobLength, blobHeader.QuorumNumbers)
symbolsCharged, err := s.meterer.MeterRequest(ctx, paymentHeader, blobLength, blobHeader.QuorumNumbers)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree that it would be a better idea to return the symbols charges as a return value from MeterRequest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did some minor related refactoring:)

return api.NewErrorResourceExhausted(err.Error())
}
s.metrics.reportDisperseMeteredBytes(int(symbolsCharged) * encoding.BYTES_PER_SYMBOL)

return nil
}

func (s *DispersalServerV2) validateDispersalRequest(
ctx context.Context,
req *pb.DisperseBlobRequest,
onchainState *OnchainState) error {

Expand Down
23 changes: 19 additions & 4 deletions disperser/apiserver/metrics_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type metricsV2 struct {
getBlobCommitmentLatency *prometheus.SummaryVec
getPaymentStateLatency *prometheus.SummaryVec
disperseBlobLatency *prometheus.SummaryVec
disperseBlobSize *prometheus.GaugeVec
disperseBlobSize *prometheus.CounterVec
disperseBlobMeteredBytes *prometheus.CounterVec
validateDispersalRequestLatency *prometheus.SummaryVec
storeBlobLatency *prometheus.SummaryVec
getBlobStatusLatency *prometheus.SummaryVec
Expand Down Expand Up @@ -79,15 +80,24 @@ func newAPIServerV2Metrics(registry *prometheus.Registry, metricsConfig disperse
[]string{},
)

disperseBlobSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
disperseBlobSize := promauto.With(registry).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "disperse_blob_size_bytes",
Help: "The size of the blob in bytes.",
},
[]string{},
)

disperseBlobMeteredBytes := promauto.With(registry).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "disperse_blob_metered_bytes",
Help: "The number of bytes charged for the blob.",
},
[]string{},
)

validateDispersalRequestLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Expand Down Expand Up @@ -124,6 +134,7 @@ func newAPIServerV2Metrics(registry *prometheus.Registry, metricsConfig disperse
getPaymentStateLatency: getPaymentStateLatency,
disperseBlobLatency: disperseBlobLatency,
disperseBlobSize: disperseBlobSize,
disperseBlobMeteredBytes: disperseBlobMeteredBytes,
validateDispersalRequestLatency: validateDispersalRequestLatency,
storeBlobLatency: storeBlobLatency,
getBlobStatusLatency: getBlobStatusLatency,
Expand Down Expand Up @@ -162,7 +173,11 @@ func (m *metricsV2) reportDisperseBlobLatency(duration time.Duration) {
}

func (m *metricsV2) reportDisperseBlobSize(size int) {
m.disperseBlobSize.WithLabelValues().Set(float64(size))
m.disperseBlobSize.WithLabelValues().Add(float64(size))
}

func (m *metricsV2) reportDisperseMeteredBytes(usageInBytes int) {
m.disperseBlobMeteredBytes.WithLabelValues().Add(float64(usageInBytes))
}

func (m *metricsV2) reportValidateDispersalRequestLatency(duration time.Duration) {
Expand Down
Loading
Loading