Skip to content

Commit

Permalink
refactor: pruning based on max storage size
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Jan 6, 2025
1 parent c461dfb commit 059e42b
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 57 deletions.
19 changes: 0 additions & 19 deletions core/meterer/meterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ type Config struct {

// OnchainUpdateInterval is the interval for refreshing the on-chain state
OnchainUpdateInterval time.Duration
// OffchainPruneInterval is the interval for pruning the off-chain state
OffchainPruneInterval time.Duration
}

// Meterer handles payment accounting across different accounts. Disperser API server receives requests from clients and each request contains a blob header
Expand Down Expand Up @@ -70,23 +68,6 @@ func (m *Meterer) Start(ctx context.Context) {
}
}
}()
go func() {
ticker := time.NewTicker(m.OffchainPruneInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
now := uint64(time.Now().Unix())
reservationWindow := m.ChainPaymentState.GetReservationWindow()
if err := m.OffchainStore.DeleteOldPeriods(ctx, GetReservationPeriod(now, reservationWindow)-uint32(MinNumPeriods)); err != nil {
m.logger.Error("Failed to prune off-chain state", "error", err)
}
case <-ctx.Done():
return
}
}
}()
}

// MeterRequest validates a blob header and adds it to the meterer's state
Expand Down
2 changes: 1 addition & 1 deletion core/meterer/meterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func setup(_ *testing.M) {
config := meterer.Config{
ChainReadTimeout: 1 * time.Second,
OnchainUpdateInterval: 1 * time.Second,
OffchainPruneInterval: 1 * time.Second,
}

err = meterer.CreateReservationTable(clientConfig, reservationTableName)
Expand Down Expand Up @@ -147,6 +146,7 @@ func setup(_ *testing.M) {
ondemandTableName,
globalReservationTableName,
uint64(100),
uint64(100),
logger,
)

Expand Down
109 changes: 83 additions & 26 deletions core/meterer/offchain_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
const MinNumPeriods int32 = 3

type OffchainStore struct {
dynamoClient commondynamodb.Client
reservationTableName string
onDemandTableName string
globalBinTableName string
logger logging.Logger
// TODO: add maximum storage for both tables
MaxOnDemandStorage uint64
dynamoClient commondynamodb.Client
reservationTableName string
onDemandTableName string
globalBinTableName string
logger logging.Logger
MaxOnDemandStorage uint64
MaxReservationPeriods uint64
}

func NewOffchainStore(
Expand All @@ -35,6 +35,7 @@ func NewOffchainStore(
onDemandTableName string,
globalBinTableName string,
maxOnDemandStorage uint64,
maxReservationPeriods uint64,
logger logging.Logger,
) (OffchainStore, error) {

Expand All @@ -47,23 +48,27 @@ func NewOffchainStore(
if err != nil {
return OffchainStore{}, err
}

err = dynamoClient.TableExists(context.Background(), onDemandTableName)
if err != nil {
return OffchainStore{}, err
}

err = dynamoClient.TableExists(context.Background(), globalBinTableName)
if err != nil {
return OffchainStore{}, err
}

//TODO: add a separate thread to periodically clean up the tables
// delete expired reservation periods (<i-1) and old on-demand payments (retain max N payments)
return OffchainStore{
dynamoClient: dynamoClient,
reservationTableName: reservationTableName,
onDemandTableName: onDemandTableName,
globalBinTableName: globalBinTableName,
logger: logger,
MaxOnDemandStorage: maxOnDemandStorage,
dynamoClient: dynamoClient,
reservationTableName: reservationTableName,
onDemandTableName: onDemandTableName,
globalBinTableName: globalBinTableName,
logger: logger,
MaxOnDemandStorage: maxOnDemandStorage,
MaxReservationPeriods: maxReservationPeriods,
}, nil
}

Expand Down Expand Up @@ -93,6 +98,11 @@ func (s *OffchainStore) UpdateReservationPeriod(ctx context.Context, accountID s
return 0, fmt.Errorf("failed to parse PeriodUsage: %w", err)
}

if err := s.PruneReservationPeriods(ctx, accountID); err != nil {
// Don't fail the request if pruning fails, just log a warning
s.logger.Warn("failed to prune reservation periods", "accountID", accountID, "error", err)
}

return periodUsageValue, nil
}

Expand Down Expand Up @@ -121,6 +131,11 @@ func (s *OffchainStore) UpdateGlobalPeriod(ctx context.Context, reservationPerio
return 0, err
}

if err := s.PruneGlobalPeriods(ctx); err != nil {
// Don't fail the request if pruning fails, just log a warning
s.logger.Warn("failed to prune global periods", "error", err)
}

return periodUsageValue, nil
}

Expand Down Expand Up @@ -354,33 +369,75 @@ func (s *OffchainStore) GetLargestCumulativePayment(ctx context.Context, account
return payment, nil
}

// DeleteOldPeriods removes all reservation bin entries with indices strictly less than the provided reservationPeriod
func (s *OffchainStore) DeleteOldPeriods(ctx context.Context, reservationPeriod uint32) error {
// get all keys that need to be deleted
func (s *OffchainStore) PruneReservationPeriods(ctx context.Context, accountID string) error {
queryInput := &dynamodb.QueryInput{
TableName: aws.String(s.reservationTableName),
FilterExpression: aws.String("ReservationPeriod < :reservationPeriod"),
TableName: aws.String(s.reservationTableName),
KeyConditionExpression: aws.String("AccountID = :account"),
ExpressionAttributeValues: commondynamodb.ExpressionValues{
":reservationPeriod": &types.AttributeValueMemberN{Value: strconv.FormatUint(uint64(reservationPeriod), 10)},
":account": &types.AttributeValueMemberS{Value: accountID},
},
ScanIndexForward: aws.Bool(true), // ascending order
}

periods, err := s.dynamoClient.QueryWithInput(ctx, queryInput)
if err != nil {
return fmt.Errorf("failed to query existing reservation periods: %w", err)
}

if len(periods) >= int(s.MaxReservationPeriods) {
numToDelete := len(periods) - int(s.MaxReservationPeriods) + 1
// Create keys for all reservation periods to delete (taking the smallest reservation periods)
keysToDelete := make([]commondynamodb.Key, numToDelete)
for i := 0; i < numToDelete; i++ {
keysToDelete[i] = commondynamodb.Key{
"AccountID": periods[i]["AccountID"],
"ReservationPeriod": periods[i]["ReservationPeriod"],
}
}

// Delete the items in batches
failedKeys, err := s.dynamoClient.DeleteItems(ctx, s.onDemandTableName, keysToDelete)
if err != nil {
return fmt.Errorf("failed to delete oldest reservation periods: %w", err)
}
if len(failedKeys) > 0 {
return fmt.Errorf("failed to delete %d reservation periods", len(failedKeys))
}
}

items, err := s.dynamoClient.QueryWithInput(ctx, queryInput)
return nil
}

// DeleteOldPeriods removes all reservation bin entries with indices strictly less than the provided reservationPeriod
func (s *OffchainStore) PruneGlobalPeriods(ctx context.Context) error {
// First, get total count of entries
queryInput := &dynamodb.QueryInput{
TableName: aws.String(s.globalBinTableName),
ScanIndexForward: aws.Bool(true),
}

allItems, err := s.dynamoClient.QueryWithInput(ctx, queryInput)
if err != nil {
return fmt.Errorf("failed to query old periods: %w", err)
return fmt.Errorf("failed to query periods: %w", err)
}

keys := make([]commondynamodb.Key, len(items))
for i, item := range items {
totalCount := len(allItems)
if totalCount <= int(s.MaxReservationPeriods) {
return nil
}

numToDelete := totalCount - int(s.MaxReservationPeriods)

keys := make([]commondynamodb.Key, numToDelete)
for i := 0; i < numToDelete; i++ {
keys[i] = commondynamodb.Key{
"AccountID": item["AccountID"],
"ReservationPeriod": item["ReservationPeriod"],
"ReservationPeriod": allItems[i]["ReservationPeriod"],
}
}

// Delete the items in batches
if len(keys) > 0 {
failedKeys, err := s.dynamoClient.DeleteItems(ctx, s.reservationTableName, keys)
failedKeys, err := s.dynamoClient.DeleteItems(ctx, s.globalBinTableName, keys)
if err != nil {
return fmt.Errorf("failed to delete old periods: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal
table_names[1],
table_names[2],
uint64(100),
uint64(100),
logger,
)
if err != nil {
Expand All @@ -801,7 +802,6 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal
mt := meterer.NewMeterer(meterer.Config{
ChainReadTimeout: 1 * time.Second,
OnchainUpdateInterval: 1 * time.Second,
OffchainPruneInterval: 1 * time.Second,
}, mockState, store, logger)
err = mt.ChainPaymentState.RefreshOnchainPaymentState(context.Background())
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions disperser/apiserver/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type DispersalServerV2 struct {
onchainState atomic.Pointer[OnchainState]
maxNumSymbolsPerBlob uint64
onchainStateRefreshInterval time.Duration
OffchainPruneInterval time.Duration

metrics *metricsV2
}
Expand All @@ -73,7 +72,6 @@ func NewDispersalServerV2(
prover encoding.Prover,
maxNumSymbolsPerBlob uint64,
onchainStateRefreshInterval time.Duration,
OffchainPruneInterval time.Duration,
_logger logging.Logger,
registry *prometheus.Registry,
) (*DispersalServerV2, error) {
Expand Down Expand Up @@ -117,7 +115,6 @@ func NewDispersalServerV2(

maxNumSymbolsPerBlob: maxNumSymbolsPerBlob,
onchainStateRefreshInterval: onchainStateRefreshInterval,
OffchainPruneInterval: OffchainPruneInterval,

metrics: newAPIServerV2Metrics(registry),
}, nil
Expand Down
3 changes: 1 addition & 2 deletions disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ func newTestServerV2(t *testing.T) *testComponents {
table_names[1],
table_names[2],
uint64(100),
uint64(100),
logger,
)
if err != nil {
Expand All @@ -492,7 +493,6 @@ func newTestServerV2(t *testing.T) *testComponents {
meterer := meterer.NewMeterer(meterer.Config{
ChainReadTimeout: 1 * time.Second,
OnchainUpdateInterval: 1 * time.Second,
OffchainPruneInterval: 1 * time.Second,
}, mockState, store, logger)

chainReader.On("GetCurrentBlockNumber").Return(uint32(100), nil)
Expand Down Expand Up @@ -521,7 +521,6 @@ func newTestServerV2(t *testing.T) *testComponents {
prover,
10,
time.Hour,
time.Hour,
logger,
prometheus.NewRegistry())
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
EnablePaymentMeterer bool
OnchainUpdateInterval int
OffchainMaxOnDemandStorage int
OffchainMaxReservedPeriods int
ChainReadTimeout int
ReservationsTableName string
OnDemandTableName string
Expand All @@ -47,7 +48,6 @@ type Config struct {
MaxBlobSize int
MaxNumSymbolsPerBlob uint
OnchainStateRefreshInterval time.Duration
OffchainPruneInterval time.Duration

BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string
Expand Down Expand Up @@ -127,8 +127,8 @@ func NewConfig(ctx *cli.Context) (Config, error) {
MaxBlobSize: ctx.GlobalInt(flags.MaxBlobSize.Name),
MaxNumSymbolsPerBlob: ctx.GlobalUint(flags.MaxNumSymbolsPerBlob.Name),
OnchainStateRefreshInterval: ctx.GlobalDuration(flags.OnchainStateRefreshInterval.Name),
OffchainPruneInterval: ctx.GlobalDuration(flags.OffchainPruneInterval.Name),
OffchainMaxOnDemandStorage: ctx.GlobalInt(flags.OffchainMaxOnDemandStorage.Name),
OffchainMaxReservedPeriods: ctx.GlobalInt(flags.OffchainMaxReservedPeriods.Name),

Check failure on line 131 in disperser/cmd/apiserver/config.go

View workflow job for this annotation

GitHub Actions / Linter

undefined: flags.OffchainMaxReservedPeriods (typecheck)

Check failure on line 131 in disperser/cmd/apiserver/config.go

View workflow job for this annotation

GitHub Actions / Linter

undefined: flags.OffchainMaxReservedPeriods (typecheck)

BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
Expand Down
3 changes: 1 addition & 2 deletions disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func RunDisperserServer(ctx *cli.Context) error {
mtConfig := mt.Config{
ChainReadTimeout: time.Duration(config.ChainReadTimeout) * time.Second,
OnchainUpdateInterval: time.Duration(config.OnchainUpdateInterval) * time.Second,
OffchainPruneInterval: time.Duration(config.OffchainPruneInterval) * time.Second,
}

paymentChainState, err := mt.NewOnchainPaymentState(context.Background(), transactor)
Expand All @@ -118,6 +117,7 @@ func RunDisperserServer(ctx *cli.Context) error {
config.OnDemandTableName,
config.GlobalRateTableName,
uint64(config.OffchainMaxOnDemandStorage),
uint64(config.OffchainMaxReservedPeriods),
logger,
)
if err != nil {
Expand Down Expand Up @@ -182,7 +182,6 @@ func RunDisperserServer(ctx *cli.Context) error {
prover,
uint64(config.MaxNumSymbolsPerBlob),
config.OnchainStateRefreshInterval,
config.OffchainPruneInterval,
logger,
reg,
)
Expand Down
2 changes: 1 addition & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
table_names[1],
table_names[2],
uint64(100),
uint64(100),
logger,
)
if err != nil {
Expand All @@ -298,7 +299,6 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
mt := meterer.NewMeterer(meterer.Config{
ChainReadTimeout: 1 * time.Second,
OnchainUpdateInterval: 1 * time.Second,
OffchainPruneInterval: 1 * time.Second,
}, mockState, offchainStore, logger)
server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, grpcprom.NewServerMetrics(), mt, ratelimiter, rateConfig, testMaxBlobSize)

Expand Down

0 comments on commit 059e42b

Please sign in to comment.