Skip to content

Commit

Permalink
make batch size configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Jul 26, 2024
1 parent 7ae9b3f commit d1f33c6
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 14 deletions.
8 changes: 3 additions & 5 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ type IngestionQ interface {
NewTradeBatchInsertBuilder() TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error
ReapLookupTables(ctx context.Context) (map[string]LookupTableReapResult, error)
ReapLookupTables(ctx context.Context, batchSize int) (map[string]LookupTableReapResult, error)
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -981,7 +981,7 @@ type LookupTableReapResult struct {
// which aren't used (orphaned), i.e. history entries for them were reaped.
// This method must be executed inside ingestion transaction. Otherwise it may
// create invalid state in lookup and history tables.
func (q *Q) ReapLookupTables(ctx context.Context) (
func (q *Q) ReapLookupTables(ctx context.Context, batchSize int) (
map[string]LookupTableReapResult,
error,
) {
Expand All @@ -994,8 +994,6 @@ func (q *Q) ReapLookupTables(ctx context.Context) (
return nil, fmt.Errorf("could not obtain offsets: %w", err)
}

const batchSize = 1000

results := map[string]LookupTableReapResult{}
for table, historyTables := range historyLookupTables {
startTime := time.Now()
Expand Down Expand Up @@ -1127,7 +1125,7 @@ var historyLookupTables = map[string][]tableObjectFieldPair{
// possible that rows will be skipped from deletion. But offset is reset
// when it reaches the table size so eventually all orphaned rows are
// deleted.
func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) string {
func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize int, offset int64) string {
var conditions []string

for _, historyTable := range historyTables {
Expand Down
70 changes: 65 additions & 5 deletions services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestReapLookupTables(t *testing.T) {
err = q.Begin(tt.Ctx)
tt.Require.NoError(err)

results, err := q.ReapLookupTables(tt.Ctx)
results, err := q.ReapLookupTables(tt.Ctx, 5)
tt.Require.NoError(err)

err = q.Commit()
Expand All @@ -76,12 +76,12 @@ func TestReapLookupTables(t *testing.T) {
tt.Assert.Equal(1, curLedgers, "curLedgers")

tt.Assert.Equal(25, prevAccounts, "prevAccounts")
tt.Assert.Equal(1, curAccounts, "curAccounts")
tt.Assert.Equal(int64(24), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`)
tt.Assert.Equal(21, curAccounts, "curAccounts")
tt.Assert.Equal(int64(4), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`)

tt.Assert.Equal(7, prevAssets, "prevAssets")
tt.Assert.Equal(0, curAssets, "curAssets")
tt.Assert.Equal(int64(7), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`)
tt.Assert.Equal(2, curAssets, "curAssets")
tt.Assert.Equal(int64(5), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`)

tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances")
tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances")
Expand All @@ -91,6 +91,66 @@ func TestReapLookupTables(t *testing.T) {
tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools")
tt.Assert.Equal(int64(1), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`)

tt.Assert.Len(results, 4)
tt.Assert.Equal(int64(6), results["history_accounts"].Offset)
tt.Assert.Equal(int64(6), results["history_assets"].Offset)
tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset)
tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset)

err = q.Begin(tt.Ctx)
tt.Require.NoError(err)

results, err = q.ReapLookupTables(tt.Ctx, 5)
tt.Require.NoError(err)

err = q.Commit()
tt.Require.NoError(err)

err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`)
tt.Require.NoError(err)

tt.Assert.Equal(16, curAccounts, "curAccounts")
tt.Assert.Equal(int64(5), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`)

tt.Assert.Equal(0, curAssets, "curAssets")
tt.Assert.Equal(int64(2), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`)

tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`)

tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`)

tt.Assert.Len(results, 4)
tt.Assert.Equal(int64(11), results["history_accounts"].Offset)
tt.Assert.Equal(int64(0), results["history_assets"].Offset)
tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset)
tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset)

err = q.Begin(tt.Ctx)
tt.Require.NoError(err)

results, err = q.ReapLookupTables(tt.Ctx, 1000)
tt.Require.NoError(err)

err = q.Commit()
tt.Require.NoError(err)

err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`)
tt.Require.NoError(err)

tt.Assert.Equal(1, curAccounts, "curAccounts")
tt.Assert.Equal(int64(15), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`)

tt.Assert.Equal(0, curAssets, "curAssets")
tt.Assert.Equal(int64(0), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`)

tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`)

tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`)

tt.Assert.Len(results, 4)
tt.Assert.Equal(int64(0), results["history_accounts"].Offset)
tt.Assert.Equal(int64(0), results["history_assets"].Offset)
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,13 @@ const (
// * Reaping (requires 2 connections, the extra connection is used for holding the advisory lock)
MaxDBConnections = 5

defaultCoreCursorName = "HORIZON"
stateVerificationErrorThreshold = 3

// 100 ledgers per flush has shown in stress tests
// to be best point on performance curve, default to that.
MaxLedgersPerFlush uint32 = 100

reapLookupTablesBatchSize = 1000
)

var log = logpkg.DefaultLogger.WithField("service", "ingest")
Expand Down Expand Up @@ -841,7 +842,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
defer cancel()

reapStart := time.Now()
results, err := s.historyQ.ReapLookupTables(ctx)
results, err := s.historyQ.ReapLookupTables(ctx, reapLookupTablesBatchSize)
if err != nil {
log.WithError(err).Warn("Error reaping lookup tables")
return
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,8 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder {
return args.Get(0).(history.TradeBatchInsertBuilder)
}

func (m *mockDBQ) ReapLookupTables(ctx context.Context) (map[string]history.LookupTableReapResult, error) {
args := m.Called(ctx)
func (m *mockDBQ) ReapLookupTables(ctx context.Context, batchSize int) (map[string]history.LookupTableReapResult, error) {
args := m.Called(ctx, batchSize)
var r1 map[string]history.LookupTableReapResult
if args.Get(0) != nil {
r1 = args.Get(0).(map[string]history.LookupTableReapResult)
Expand Down

0 comments on commit d1f33c6

Please sign in to comment.