diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 560550706..7794d458e 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -20,8 +20,6 @@ type Config struct { // OnchainUpdateInterval is the interval for refreshing the on-chain state OnchainUpdateInterval time.Duration - // OffchainPruneInterval is the interval for pruning the off-chain state - OffchainPruneInterval time.Duration } // Meterer handles payment accounting across different accounts. Disperser API server receives requests from clients and each request contains a blob header @@ -70,23 +68,6 @@ func (m *Meterer) Start(ctx context.Context) { } } }() - go func() { - ticker := time.NewTicker(m.OffchainPruneInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - now := uint64(time.Now().Unix()) - reservationWindow := m.ChainPaymentState.GetReservationWindow() - if err := m.OffchainStore.DeleteOldPeriods(ctx, GetReservationPeriod(now, reservationWindow)-uint32(MinNumPeriods)); err != nil { - m.logger.Error("Failed to prune off-chain state", "error", err) - } - case <-ctx.Done(): - return - } - } - }() } // MeterRequest validates a blob header and adds it to the meterer's state diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go index 9f51492d0..8f1508700 100644 --- a/core/meterer/meterer_test.go +++ b/core/meterer/meterer_test.go @@ -112,7 +112,6 @@ func setup(_ *testing.M) { config := meterer.Config{ ChainReadTimeout: 1 * time.Second, OnchainUpdateInterval: 1 * time.Second, - OffchainPruneInterval: 1 * time.Second, } err = meterer.CreateReservationTable(clientConfig, reservationTableName) @@ -147,6 +146,7 @@ func setup(_ *testing.M) { ondemandTableName, globalReservationTableName, uint64(100), + uint64(100), logger, ) diff --git a/core/meterer/offchain_store.go b/core/meterer/offchain_store.go index a0577c20a..5022f80ce 100644 --- a/core/meterer/offchain_store.go +++ b/core/meterer/offchain_store.go @@ -20,13 +20,13 @@ import ( const MinNumPeriods int32 = 3 type OffchainStore struct { - dynamoClient commondynamodb.Client - reservationTableName string - onDemandTableName string - globalBinTableName string - logger logging.Logger - // TODO: add maximum storage for both tables - MaxOnDemandStorage uint64 + dynamoClient commondynamodb.Client + reservationTableName string + onDemandTableName string + globalBinTableName string + logger logging.Logger + MaxOnDemandStorage uint64 + MaxReservationPeriods uint64 } func NewOffchainStore( @@ -35,6 +35,7 @@ func NewOffchainStore( onDemandTableName string, globalBinTableName string, maxOnDemandStorage uint64, + maxReservationPeriods uint64, logger logging.Logger, ) (OffchainStore, error) { @@ -47,23 +48,27 @@ func NewOffchainStore( if err != nil { return OffchainStore{}, err } + err = dynamoClient.TableExists(context.Background(), onDemandTableName) if err != nil { return OffchainStore{}, err } + err = dynamoClient.TableExists(context.Background(), globalBinTableName) if err != nil { return OffchainStore{}, err } + //TODO: add a separate thread to periodically clean up the tables // delete expired reservation periods (= int(s.MaxReservationPeriods) { + numToDelete := len(periods) - int(s.MaxReservationPeriods) + 1 + // Create keys for all reservation periods to delete (taking the smallest reservation periods) + keysToDelete := make([]commondynamodb.Key, numToDelete) + for i := 0; i < numToDelete; i++ { + keysToDelete[i] = commondynamodb.Key{ + "AccountID": periods[i]["AccountID"], + "ReservationPeriod": periods[i]["ReservationPeriod"], + } + } + + // Delete the items in batches + failedKeys, err := s.dynamoClient.DeleteItems(ctx, s.onDemandTableName, keysToDelete) + if err != nil { + return fmt.Errorf("failed to delete oldest reservation periods: %w", err) + } + if len(failedKeys) > 0 { + return fmt.Errorf("failed to delete %d reservation periods", len(failedKeys)) + } } - items, err := s.dynamoClient.QueryWithInput(ctx, queryInput) + return nil +} + +// DeleteOldPeriods removes all reservation bin entries with indices strictly less than the provided reservationPeriod +func (s *OffchainStore) PruneGlobalPeriods(ctx context.Context) error { + // First, get total count of entries + queryInput := &dynamodb.QueryInput{ + TableName: aws.String(s.globalBinTableName), + ScanIndexForward: aws.Bool(true), + } + + allItems, err := s.dynamoClient.QueryWithInput(ctx, queryInput) if err != nil { - return fmt.Errorf("failed to query old periods: %w", err) + return fmt.Errorf("failed to query periods: %w", err) } - keys := make([]commondynamodb.Key, len(items)) - for i, item := range items { + totalCount := len(allItems) + if totalCount <= int(s.MaxReservationPeriods) { + return nil + } + + numToDelete := totalCount - int(s.MaxReservationPeriods) + + keys := make([]commondynamodb.Key, numToDelete) + for i := 0; i < numToDelete; i++ { keys[i] = commondynamodb.Key{ - "AccountID": item["AccountID"], - "ReservationPeriod": item["ReservationPeriod"], + "ReservationPeriod": allItems[i]["ReservationPeriod"], } } // Delete the items in batches if len(keys) > 0 { - failedKeys, err := s.dynamoClient.DeleteItems(ctx, s.reservationTableName, keys) + failedKeys, err := s.dynamoClient.DeleteItems(ctx, s.globalBinTableName, keys) if err != nil { return fmt.Errorf("failed to delete old periods: %w", err) } diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 53d876d2f..2ff572a19 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -792,6 +792,7 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal table_names[1], table_names[2], uint64(100), + uint64(100), logger, ) if err != nil { @@ -801,7 +802,6 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal mt := meterer.NewMeterer(meterer.Config{ ChainReadTimeout: 1 * time.Second, OnchainUpdateInterval: 1 * time.Second, - OffchainPruneInterval: 1 * time.Second, }, mockState, store, logger) err = mt.ChainPaymentState.RefreshOnchainPaymentState(context.Background()) if err != nil { diff --git a/disperser/apiserver/server_v2.go b/disperser/apiserver/server_v2.go index 6deb20abc..2261015cb 100644 --- a/disperser/apiserver/server_v2.go +++ b/disperser/apiserver/server_v2.go @@ -57,7 +57,6 @@ type DispersalServerV2 struct { onchainState atomic.Pointer[OnchainState] maxNumSymbolsPerBlob uint64 onchainStateRefreshInterval time.Duration - OffchainPruneInterval time.Duration metrics *metricsV2 } @@ -73,7 +72,6 @@ func NewDispersalServerV2( prover encoding.Prover, maxNumSymbolsPerBlob uint64, onchainStateRefreshInterval time.Duration, - OffchainPruneInterval time.Duration, _logger logging.Logger, registry *prometheus.Registry, ) (*DispersalServerV2, error) { @@ -117,7 +115,6 @@ func NewDispersalServerV2( maxNumSymbolsPerBlob: maxNumSymbolsPerBlob, onchainStateRefreshInterval: onchainStateRefreshInterval, - OffchainPruneInterval: OffchainPruneInterval, metrics: newAPIServerV2Metrics(registry), }, nil diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index 13276126b..872526e66 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -483,6 +483,7 @@ func newTestServerV2(t *testing.T) *testComponents { table_names[1], table_names[2], uint64(100), + uint64(100), logger, ) if err != nil { @@ -492,7 +493,6 @@ func newTestServerV2(t *testing.T) *testComponents { meterer := meterer.NewMeterer(meterer.Config{ ChainReadTimeout: 1 * time.Second, OnchainUpdateInterval: 1 * time.Second, - OffchainPruneInterval: 1 * time.Second, }, mockState, store, logger) chainReader.On("GetCurrentBlockNumber").Return(uint32(100), nil) @@ -521,7 +521,6 @@ func newTestServerV2(t *testing.T) *testComponents { prover, 10, time.Hour, - time.Hour, logger, prometheus.NewRegistry()) assert.NoError(t, err) diff --git a/disperser/cmd/apiserver/config.go b/disperser/cmd/apiserver/config.go index 72ab68402..0396d3679 100644 --- a/disperser/cmd/apiserver/config.go +++ b/disperser/cmd/apiserver/config.go @@ -37,6 +37,7 @@ type Config struct { EnablePaymentMeterer bool OnchainUpdateInterval int OffchainMaxOnDemandStorage int + OffchainMaxReservedPeriods int ChainReadTimeout int ReservationsTableName string OnDemandTableName string @@ -47,7 +48,6 @@ type Config struct { MaxBlobSize int MaxNumSymbolsPerBlob uint OnchainStateRefreshInterval time.Duration - OffchainPruneInterval time.Duration BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string @@ -127,8 +127,8 @@ func NewConfig(ctx *cli.Context) (Config, error) { MaxBlobSize: ctx.GlobalInt(flags.MaxBlobSize.Name), MaxNumSymbolsPerBlob: ctx.GlobalUint(flags.MaxNumSymbolsPerBlob.Name), OnchainStateRefreshInterval: ctx.GlobalDuration(flags.OnchainStateRefreshInterval.Name), - OffchainPruneInterval: ctx.GlobalDuration(flags.OffchainPruneInterval.Name), OffchainMaxOnDemandStorage: ctx.GlobalInt(flags.OffchainMaxOnDemandStorage.Name), + OffchainMaxReservedPeriods: ctx.GlobalInt(flags.OffchainMaxReservedPeriods.Name), BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name), EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name), diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index ad20b7c7d..03b1d8681 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -101,7 +101,6 @@ func RunDisperserServer(ctx *cli.Context) error { mtConfig := mt.Config{ ChainReadTimeout: time.Duration(config.ChainReadTimeout) * time.Second, OnchainUpdateInterval: time.Duration(config.OnchainUpdateInterval) * time.Second, - OffchainPruneInterval: time.Duration(config.OffchainPruneInterval) * time.Second, } paymentChainState, err := mt.NewOnchainPaymentState(context.Background(), transactor) @@ -118,6 +117,7 @@ func RunDisperserServer(ctx *cli.Context) error { config.OnDemandTableName, config.GlobalRateTableName, uint64(config.OffchainMaxOnDemandStorage), + uint64(config.OffchainMaxReservedPeriods), logger, ) if err != nil { @@ -182,7 +182,6 @@ func RunDisperserServer(ctx *cli.Context) error { prover, uint64(config.MaxNumSymbolsPerBlob), config.OnchainStateRefreshInterval, - config.OffchainPruneInterval, logger, reg, ) diff --git a/test/integration_test.go b/test/integration_test.go index 6d79456a5..60a68735e 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -284,6 +284,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser table_names[1], table_names[2], uint64(100), + uint64(100), logger, ) if err != nil { @@ -298,7 +299,6 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser mt := meterer.NewMeterer(meterer.Config{ ChainReadTimeout: 1 * time.Second, OnchainUpdateInterval: 1 * time.Second, - OffchainPruneInterval: 1 * time.Second, }, mockState, offchainStore, logger) server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, grpcprom.NewServerMetrics(), mt, ratelimiter, rateConfig, testMaxBlobSize)