diff --git a/services/horizon/internal/db2/history/accounts_batch_insert_builder.go b/services/horizon/internal/db2/history/accounts_batch_insert_builder.go new file mode 100644 index 0000000000..5e68468094 --- /dev/null +++ b/services/horizon/internal/db2/history/accounts_batch_insert_builder.go @@ -0,0 +1,39 @@ +package history + +import ( + "context" + + "github.com/stellar/go/support/db" +) + +// AccountsBatchInsertBuilder is used to insert accounts into the accounts table +type AccountsBatchInsertBuilder interface { + Add(account AccountEntry) error + Exec(ctx context.Context) error +} + +// AccountsBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder +type accountsBatchInsertBuilder struct { + session db.SessionInterface + builder db.FastBatchInsertBuilder + table string +} + +// NewAccountsBatchInsertBuilder constructs a new AccountsBatchInsertBuilder instance +func (q *Q) NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder { + return &accountsBatchInsertBuilder{ + session: q, + builder: db.FastBatchInsertBuilder{}, + table: "accounts", + } +} + +// Add adds a new account to the batch +func (i *accountsBatchInsertBuilder) Add(account AccountEntry) error { + return i.builder.RowStruct(account) +} + +// Exec writes the batch of accounts to the database. +func (i *accountsBatchInsertBuilder) Exec(ctx context.Context) error { + return i.builder.Exec(ctx, i.session, i.table) +} diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 7c05f2dfa6..ab2f69aa3c 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -312,6 +312,7 @@ type QAccounts interface { GetAccountsByIDs(ctx context.Context, ids []string) ([]AccountEntry, error) UpsertAccounts(ctx context.Context, accounts []AccountEntry) error RemoveAccounts(ctx context.Context, accountIDs []string) (int64, error) + NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder } // AccountSigner is a row of data from the `accounts_signers` table diff --git a/services/horizon/internal/db2/history/mock_accounts_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_accounts_batch_insert_builder.go new file mode 100644 index 0000000000..a200a15e15 --- /dev/null +++ b/services/horizon/internal/db2/history/mock_accounts_batch_insert_builder.go @@ -0,0 +1,21 @@ +package history + +import ( + "context" + + "github.com/stretchr/testify/mock" +) + +type MockAccountsBatchInsertBuilder struct { + mock.Mock +} + +func (m *MockAccountsBatchInsertBuilder) Add(account AccountEntry) error { + a := m.Called(account) + return a.Error(0) +} + +func (m *MockAccountsBatchInsertBuilder) Exec(ctx context.Context) error { + a := m.Called(ctx) + return a.Error(0) +} diff --git a/services/horizon/internal/db2/history/mock_q_accounts.go b/services/horizon/internal/db2/history/mock_q_accounts.go index 99f793d147..6d60802d17 100644 --- a/services/horizon/internal/db2/history/mock_q_accounts.go +++ b/services/horizon/internal/db2/history/mock_q_accounts.go @@ -25,3 +25,8 @@ func (m *MockQAccounts) RemoveAccounts(ctx context.Context, accountIDs []string) a := m.Called(ctx, accountIDs) return a.Get(0).(int64), a.Error(1) } + +func (m *MockQAccounts) NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder { + a := m.Called() + return a.Get(0).(AccountsBatchInsertBuilder) +} diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 7dfea07193..7e7838cf43 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -27,17 +27,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { q := &mockDBQ{} - q.MockQAccounts.On("UpsertAccounts", ctx, []history.AccountEntry{ - { - LastModifiedLedger: 1, - AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", - Balance: int64(1000000000000000000), - SequenceNumber: 0, - SequenceTime: zero.IntFrom(0), - MasterWeight: 1, - }, - }).Return(nil).Once() - batchBuilders := mockChangeProcessorBatchBuilders(q, ctx, true) defer mock.AssertExpectationsForObjects(t, batchBuilders...) @@ -49,6 +38,16 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { Sponsor: null.String{}, }).Return(nil).Once() + assert.IsType(t, &history.MockAccountsBatchInsertBuilder{}, batchBuilders[1]) + batchBuilders[1].(*history.MockAccountsBatchInsertBuilder).On("Add", history.AccountEntry{ + LastModifiedLedger: 1, + AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + Balance: int64(1000000000000000000), + SequenceNumber: 0, + SequenceTime: zero.IntFrom(0), + MasterWeight: 1, + }).Return(nil).Once() + q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -94,16 +93,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { nil, ).Once() - q.MockQAccounts.On("UpsertAccounts", ctx, []history.AccountEntry{ - { - LastModifiedLedger: 1, - AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", - Balance: int64(1000000000000000000), - SequenceNumber: 0, - MasterWeight: 1, - }, - }).Return(nil).Once() - batchBuilders := mockChangeProcessorBatchBuilders(q, ctx, true) defer mock.AssertExpectationsForObjects(t, batchBuilders...) @@ -114,6 +103,15 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { Weight: 1, }).Return(nil).Once() + assert.IsType(t, &history.MockAccountsBatchInsertBuilder{}, batchBuilders[1]) + batchBuilders[1].(*history.MockAccountsBatchInsertBuilder).On("Add", history.AccountEntry{ + LastModifiedLedger: 1, + AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + Balance: int64(1000000000000000000), + SequenceNumber: 0, + MasterWeight: 1, + }).Return(nil).Once() + q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -494,6 +492,13 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec q.MockQSigners.On("NewAccountSignersBatchInsertBuilder"). Return(mockAccountSignersBatchInsertBuilder).Twice() + mockAccountsBatchInsertBuilder := &history.MockAccountsBatchInsertBuilder{} + if mockExec { + mockAccountsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + } + q.MockQAccounts.On("NewAccountsBatchInsertBuilder"). + Return(mockAccountsBatchInsertBuilder).Twice() + mockClaimableBalanceClaimantBatchInsertBuilder := &history.MockClaimableBalanceClaimantBatchInsertBuilder{} if mockExec { mockClaimableBalanceClaimantBatchInsertBuilder.On("Exec", ctx). @@ -531,6 +536,7 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec Return(mockTrustLinesBatchInsertBuilder) return []interface{}{mockAccountSignersBatchInsertBuilder, + mockAccountsBatchInsertBuilder, mockClaimableBalanceBatchInsertBuilder, mockClaimableBalanceClaimantBatchInsertBuilder, mockLiquidityPoolBatchInsertBuilder, diff --git a/services/horizon/internal/ingest/processors/accounts_processor.go b/services/horizon/internal/ingest/processors/accounts_processor.go index 8130175ab6..681b7d3847 100644 --- a/services/horizon/internal/ingest/processors/accounts_processor.go +++ b/services/horizon/internal/ingest/processors/accounts_processor.go @@ -13,7 +13,8 @@ import ( type AccountsProcessor struct { accountsQ history.QAccounts - cache *ingest.ChangeCompactor + cache *ingest.ChangeCompactor + batchInsertBuilder history.AccountsBatchInsertBuilder } func NewAccountsProcessor(accountsQ history.QAccounts) *AccountsProcessor { @@ -24,6 +25,7 @@ func NewAccountsProcessor(accountsQ history.QAccounts) *AccountsProcessor { func (p *AccountsProcessor) reset() { p.cache = ingest.NewChangeCompactor() + p.batchInsertBuilder = p.accountsQ.NewAccountsBatchInsertBuilder() } func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Change) error { @@ -41,13 +43,14 @@ func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Cha if err != nil { return errors.Wrap(err, "error in Commit") } - p.reset() } return nil } func (p *AccountsProcessor) Commit(ctx context.Context) error { + defer p.reset() + batchUpsertAccounts := []history.AccountEntry{} removeBatch := []string{} @@ -63,8 +66,15 @@ func (p *AccountsProcessor) Commit(ctx context.Context) error { } switch { - case change.Post != nil: - // Created and updated + case change.Pre == nil && change.Post != nil: + // Created + row := p.ledgerEntryToRow(*change.Post) + err := p.batchInsertBuilder.Add(row) + if err != nil { + return errors.Wrap(err, "Error adding to AccountsBatchInsertBuilder") + } + case change.Pre != nil && change.Post != nil: + // Updated row := p.ledgerEntryToRow(*change.Post) batchUpsertAccounts = append(batchUpsertAccounts, row) case change.Pre != nil && change.Post == nil: @@ -77,6 +87,11 @@ func (p *AccountsProcessor) Commit(ctx context.Context) error { } } + err := p.batchInsertBuilder.Exec(ctx) + if err != nil { + return errors.Wrap(err, "Error executing AccountsBatchInsertBuilder") + } + // Upsert accounts if len(batchUpsertAccounts) > 0 { err := p.accountsQ.UpsertAccounts(ctx, batchUpsertAccounts) diff --git a/services/horizon/internal/ingest/processors/accounts_processor_test.go b/services/horizon/internal/ingest/processors/accounts_processor_test.go index 9b9c98d7b5..26f6af3487 100644 --- a/services/horizon/internal/ingest/processors/accounts_processor_test.go +++ b/services/horizon/internal/ingest/processors/accounts_processor_test.go @@ -19,15 +19,20 @@ func TestAccountsProcessorTestSuiteState(t *testing.T) { type AccountsProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *AccountsProcessor - mockQ *history.MockQAccounts + ctx context.Context + processor *AccountsProcessor + mockQ *history.MockQAccounts + mockAccountsBatchInsertBuilder *history.MockAccountsBatchInsertBuilder } func (s *AccountsProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQAccounts{} + s.mockAccountsBatchInsertBuilder = &history.MockAccountsBatchInsertBuilder{} + s.mockQ.On("NewAccountsBatchInsertBuilder").Return(s.mockAccountsBatchInsertBuilder).Twice() + s.mockAccountsBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.processor = NewAccountsProcessor(s.mockQ) } @@ -42,19 +47,14 @@ func (s *AccountsProcessorTestSuiteState) TestNoEntries() { func (s *AccountsProcessorTestSuiteState) TestCreatesAccounts() { // We use LedgerEntryChangesCache so all changes are squashed - s.mockQ.On( - "UpsertAccounts", s.ctx, - []history.AccountEntry{ - { - LastModifiedLedger: 123, - AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - MasterWeight: 1, - ThresholdLow: 1, - ThresholdMedium: 1, - ThresholdHigh: 1, - }, - }, - ).Return(nil).Once() + s.mockAccountsBatchInsertBuilder.On("Add", history.AccountEntry{ + LastModifiedLedger: 123, + AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + MasterWeight: 1, + ThresholdLow: 1, + ThresholdMedium: 1, + ThresholdHigh: 1, + }).Return(nil).Once() err := s.processor.ProcessChange(s.ctx, ingest.Change{ Type: xdr.LedgerEntryTypeAccount, @@ -79,15 +79,20 @@ func TestAccountsProcessorTestSuiteLedger(t *testing.T) { type AccountsProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *AccountsProcessor - mockQ *history.MockQAccounts + ctx context.Context + processor *AccountsProcessor + mockQ *history.MockQAccounts + mockAccountsBatchInsertBuilder *history.MockAccountsBatchInsertBuilder } func (s *AccountsProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQAccounts{} + s.mockAccountsBatchInsertBuilder = &history.MockAccountsBatchInsertBuilder{} + s.mockQ.On("NewAccountsBatchInsertBuilder").Return(s.mockAccountsBatchInsertBuilder).Twice() + s.mockAccountsBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.processor = NewAccountsProcessor(s.mockQ) } @@ -146,21 +151,15 @@ func (s *AccountsProcessorTestSuiteLedger) TestNewAccount() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockQ.On( - "UpsertAccounts", - s.ctx, - []history.AccountEntry{ - { - AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - MasterWeight: 0, - ThresholdLow: 1, - ThresholdMedium: 2, - ThresholdHigh: 3, - HomeDomain: "stellar.org", - LastModifiedLedger: uint32(123), - }, - }, - ).Return(nil).Once() + s.mockAccountsBatchInsertBuilder.On("Add", history.AccountEntry{ + AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + MasterWeight: 0, + ThresholdLow: 1, + ThresholdMedium: 2, + ThresholdHigh: 3, + HomeDomain: "stellar.org", + LastModifiedLedger: uint32(123), + }).Return(nil).Once() } func (s *AccountsProcessorTestSuiteLedger) TestNewAccountUpgrade() { @@ -235,23 +234,17 @@ func (s *AccountsProcessorTestSuiteLedger) TestNewAccountUpgrade() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockQ.On( - "UpsertAccounts", - s.ctx, - []history.AccountEntry{ - { - AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - SequenceLedger: zero.IntFrom(2346), - SequenceTime: zero.IntFrom(1647265534), - MasterWeight: 0, - ThresholdLow: 1, - ThresholdMedium: 2, - ThresholdHigh: 3, - HomeDomain: "stellar.org", - LastModifiedLedger: uint32(123), - }, - }, - ).Return(nil).Once() + s.mockAccountsBatchInsertBuilder.On("Add", history.AccountEntry{ + AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + SequenceLedger: zero.IntFrom(2346), + SequenceTime: zero.IntFrom(1647265534), + MasterWeight: 0, + ThresholdLow: 1, + ThresholdMedium: 2, + ThresholdHigh: 3, + HomeDomain: "stellar.org", + LastModifiedLedger: uint32(123), + }).Return(nil).Once() } func (s *AccountsProcessorTestSuiteLedger) TestRemoveAccount() { @@ -322,23 +315,17 @@ func (s *AccountsProcessorTestSuiteLedger) TestProcessUpgradeChange() { }) s.Assert().NoError(err) - s.mockQ.On( - "UpsertAccounts", - s.ctx, - []history.AccountEntry{ - { - LastModifiedLedger: uint32(lastModifiedLedgerSeq) + 1, - AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - SequenceTime: zero.IntFrom(0), - SequenceLedger: zero.IntFrom(0), - MasterWeight: 0, - ThresholdLow: 1, - ThresholdMedium: 2, - ThresholdHigh: 3, - HomeDomain: "stellar.org", - }, - }, - ).Return(nil).Once() + s.mockAccountsBatchInsertBuilder.On("Add", history.AccountEntry{ + LastModifiedLedger: uint32(lastModifiedLedgerSeq) + 1, + AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + SequenceTime: zero.IntFrom(0), + SequenceLedger: zero.IntFrom(0), + MasterWeight: 0, + ThresholdLow: 1, + ThresholdMedium: 2, + ThresholdHigh: 3, + HomeDomain: "stellar.org", + }).Return(nil).Once() } func (s *AccountsProcessorTestSuiteLedger) TestFeeProcessedBeforeEverythingElse() {