Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Use COPY for inserting into accounts table #5115

Merged
merged 1 commit into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package history

Check failure on line 1 in services/horizon/internal/db2/history/accounts_batch_insert_builder.go

View workflow job for this annotation

GitHub Actions / golangci

1-39 lines are duplicate of `services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go:1-40` (dupl)

import (
"context"

"github.com/stellar/go/support/db"
)

// AccountsBatchInsertBuilder is used to insert accounts into the accounts table
type AccountsBatchInsertBuilder interface {
Add(account AccountEntry) error
Exec(ctx context.Context) error
}

// AccountsBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type accountsBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

// NewAccountsBatchInsertBuilder constructs a new AccountsBatchInsertBuilder instance
func (q *Q) NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder {
return &accountsBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "accounts",
}
}

// Add adds a new account to the batch
func (i *accountsBatchInsertBuilder) Add(account AccountEntry) error {
return i.builder.RowStruct(account)
}

// Exec writes the batch of accounts to the database.
func (i *accountsBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ type QAccounts interface {
GetAccountsByIDs(ctx context.Context, ids []string) ([]AccountEntry, error)
UpsertAccounts(ctx context.Context, accounts []AccountEntry) error
RemoveAccounts(ctx context.Context, accountIDs []string) (int64, error)
NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder
}

// AccountSigner is a row of data from the `accounts_signers` table
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package history

import (
"context"

"github.com/stretchr/testify/mock"
)

type MockAccountsBatchInsertBuilder struct {
mock.Mock
}

func (m *MockAccountsBatchInsertBuilder) Add(account AccountEntry) error {
a := m.Called(account)
return a.Error(0)
}

func (m *MockAccountsBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
return a.Error(0)
}
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ func (m *MockQAccounts) RemoveAccounts(ctx context.Context, accountIDs []string)
a := m.Called(ctx, accountIDs)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQAccounts) NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder {
a := m.Called()
return a.Get(0).(AccountsBatchInsertBuilder)
}
48 changes: 27 additions & 21 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {

q := &mockDBQ{}

q.MockQAccounts.On("UpsertAccounts", ctx, []history.AccountEntry{
{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
SequenceTime: zero.IntFrom(0),
MasterWeight: 1,
},
}).Return(nil).Once()

batchBuilders := mockChangeProcessorBatchBuilders(q, ctx, true)
defer mock.AssertExpectationsForObjects(t, batchBuilders...)

Expand All @@ -49,6 +38,16 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {
Sponsor: null.String{},
}).Return(nil).Once()

assert.IsType(t, &history.MockAccountsBatchInsertBuilder{}, batchBuilders[1])
batchBuilders[1].(*history.MockAccountsBatchInsertBuilder).On("Add", history.AccountEntry{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
SequenceTime: zero.IntFrom(0),
MasterWeight: 1,
}).Return(nil).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -94,16 +93,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
nil,
).Once()

q.MockQAccounts.On("UpsertAccounts", ctx, []history.AccountEntry{
{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
MasterWeight: 1,
},
}).Return(nil).Once()

batchBuilders := mockChangeProcessorBatchBuilders(q, ctx, true)
defer mock.AssertExpectationsForObjects(t, batchBuilders...)

Expand All @@ -114,6 +103,15 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
Weight: 1,
}).Return(nil).Once()

assert.IsType(t, &history.MockAccountsBatchInsertBuilder{}, batchBuilders[1])
batchBuilders[1].(*history.MockAccountsBatchInsertBuilder).On("Add", history.AccountEntry{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
MasterWeight: 1,
}).Return(nil).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -494,6 +492,13 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder").
Return(mockAccountSignersBatchInsertBuilder).Twice()

mockAccountsBatchInsertBuilder := &history.MockAccountsBatchInsertBuilder{}
if mockExec {
mockAccountsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
}
q.MockQAccounts.On("NewAccountsBatchInsertBuilder").
Return(mockAccountsBatchInsertBuilder).Twice()

mockClaimableBalanceClaimantBatchInsertBuilder := &history.MockClaimableBalanceClaimantBatchInsertBuilder{}
if mockExec {
mockClaimableBalanceClaimantBatchInsertBuilder.On("Exec", ctx).
Expand Down Expand Up @@ -531,6 +536,7 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec
Return(mockTrustLinesBatchInsertBuilder)

return []interface{}{mockAccountSignersBatchInsertBuilder,
mockAccountsBatchInsertBuilder,
mockClaimableBalanceBatchInsertBuilder,
mockClaimableBalanceClaimantBatchInsertBuilder,
mockLiquidityPoolBatchInsertBuilder,
Expand Down
23 changes: 19 additions & 4 deletions services/horizon/internal/ingest/processors/accounts_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
type AccountsProcessor struct {
accountsQ history.QAccounts

cache *ingest.ChangeCompactor
cache *ingest.ChangeCompactor
batchInsertBuilder history.AccountsBatchInsertBuilder
}

func NewAccountsProcessor(accountsQ history.QAccounts) *AccountsProcessor {
Expand All @@ -24,6 +25,7 @@ func NewAccountsProcessor(accountsQ history.QAccounts) *AccountsProcessor {

func (p *AccountsProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.batchInsertBuilder = p.accountsQ.NewAccountsBatchInsertBuilder()
}

func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
Expand All @@ -41,13 +43,14 @@ func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Cha
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}

return nil
}

func (p *AccountsProcessor) Commit(ctx context.Context) error {
defer p.reset()

batchUpsertAccounts := []history.AccountEntry{}
removeBatch := []string{}

Expand All @@ -63,8 +66,15 @@ func (p *AccountsProcessor) Commit(ctx context.Context) error {
}

switch {
case change.Post != nil:
// Created and updated
case change.Pre == nil && change.Post != nil:
// Created
row := p.ledgerEntryToRow(*change.Post)
err := p.batchInsertBuilder.Add(row)
if err != nil {
return errors.Wrap(err, "Error adding to AccountsBatchInsertBuilder")
}
case change.Pre != nil && change.Post != nil:
// Updated
row := p.ledgerEntryToRow(*change.Post)
batchUpsertAccounts = append(batchUpsertAccounts, row)
case change.Pre != nil && change.Post == nil:
Expand All @@ -77,6 +87,11 @@ func (p *AccountsProcessor) Commit(ctx context.Context) error {
}
}

err := p.batchInsertBuilder.Exec(ctx)
if err != nil {
return errors.Wrap(err, "Error executing AccountsBatchInsertBuilder")
}

// Upsert accounts
if len(batchUpsertAccounts) > 0 {
err := p.accountsQ.UpsertAccounts(ctx, batchUpsertAccounts)
Expand Down
Loading
Loading