diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index e7e7e90854..9e15920609 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -1,6 +1,7 @@ package history import ( + "cmp" "context" "database/sql/driver" "fmt" @@ -12,37 +13,29 @@ import ( "github.com/stellar/go/support/collections/set" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/ordered" ) +var errSealed = errors.New("cannot register more entries to Loader after calling Exec()") + +// LoaderStats describes the result of executing a history lookup id Loader +type LoaderStats struct { + // Total is the number of elements registered to the Loader + Total int + // Inserted is the number of elements inserted into the lookup table + Inserted int +} + // FutureAccountID represents a future history account. // A FutureAccountID is created by an AccountLoader and // the account id is available after calling Exec() on // the AccountLoader. -type FutureAccountID struct { - address string - loader *AccountLoader -} - -const loaderLookupBatchSize = 50000 - -// Value implements the database/sql/driver Valuer interface. -func (a FutureAccountID) Value() (driver.Value, error) { - return a.loader.GetNow(a.address) -} +type FutureAccountID = future[string, Account] // AccountLoader will map account addresses to their history // account ids. If there is no existing mapping for a given address, // the AccountLoader will insert into the history_accounts table to // establish a mapping. -type AccountLoader struct { - sealed bool - set set.Set[string] - ids map[string]int64 - stats LoaderStats -} - -var errSealed = errors.New("cannot register more entries to loader after calling Exec()") +type AccountLoader = loader[string, Account] // NewAccountLoader will construct a new AccountLoader instance. func NewAccountLoader() *AccountLoader { @@ -51,141 +44,222 @@ func NewAccountLoader() *AccountLoader { set: set.Set[string]{}, ids: map[string]int64{}, stats: LoaderStats{}, + name: "AccountLoader", + table: "history_accounts", + columnsForKeys: func(addresses []string) []columnValues { + return []columnValues{ + { + name: "address", + dbType: "character varying(64)", + objects: addresses, + }, + } + }, + mappingFromRow: func(account Account) (string, int64) { + return account.Address, account.ID + }, + less: cmp.Less[string], } } -// GetFuture registers the given account address into the loader and -// returns a FutureAccountID which will hold the history account id for -// the address after Exec() is called. -func (a *AccountLoader) GetFuture(address string) FutureAccountID { - if a.sealed { +type loader[K comparable, T any] struct { + sealed bool + set set.Set[K] + ids map[K]int64 + stats LoaderStats + name string + table string + columnsForKeys func([]K) []columnValues + mappingFromRow func(T) (K, int64) + less func(K, K) bool +} + +type future[K comparable, T any] struct { + key K + loader *loader[K, T] +} + +// Value implements the database/sql/driver Valuer interface. +func (f future[K, T]) Value() (driver.Value, error) { + return f.loader.GetNow(f.key) +} + +// GetFuture registers the given key into the Loader and +// returns a future which will hold the history id for +// the key after Exec() is called. +func (l *loader[K, T]) GetFuture(key K) future[K, T] { + if l.sealed { panic(errSealed) } - a.set.Add(address) - return FutureAccountID{ - address: address, - loader: a, + l.set.Add(key) + return future[K, T]{ + key: key, + loader: l, } } -// GetNow returns the history account id for the given address. +// GetNow returns the history id for the given key. // GetNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *AccountLoader) GetNow(address string) (int64, error) { - if !a.sealed { - return 0, fmt.Errorf(`invalid account loader state, - Exec was not called yet to properly seal and resolve %v id`, address) +func (l *loader[K, T]) GetNow(key K) (int64, error) { + if !l.sealed { + return 0, fmt.Errorf(`invalid loader state, + Exec was not called yet to properly seal and resolve %v id`, key) } - if internalID, ok := a.ids[address]; !ok { - return 0, fmt.Errorf(`account loader address %q was not found`, address) + if internalID, ok := l.ids[key]; !ok { + return 0, fmt.Errorf(`loader key %v was not found`, key) } else { return internalID, nil } } -func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string) error { - for i := 0; i < len(addresses); i += loaderLookupBatchSize { - end := ordered.Min(len(addresses), i+loaderLookupBatchSize) +// Exec will look up all the history ids for the keys registered in the Loader. +// If there are no history ids for a given set of keys, Exec will insert rows +// into the corresponding history table to establish a mapping between each key and its history id. +func (l *loader[K, T]) Exec(ctx context.Context, session db.SessionInterface) error { + l.sealed = true + if len(l.set) == 0 { + return nil + } + q := &Q{session} + keys := make([]K, 0, len(l.set)) + for key := range l.set { + keys = append(keys, key) + } + // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock + // https://github.com/stellar/go/issues/2370 + sort.Slice(keys, func(i, j int) bool { + return l.less(keys[i], keys[j]) + }) - var accounts []Account - if err := q.AccountsByAddresses(ctx, &accounts, addresses[i:end]); err != nil { - return errors.Wrap(err, "could not select accounts") - } + if count, err := l.insert(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count + l.stats.Inserted += count + } - for _, account := range accounts { - a.ids[account.Address] = account.ID - } + if count, err := l.query(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count } + return nil } -// LoaderStats describes the result of executing a history lookup id loader -type LoaderStats struct { - // Total is the number of elements registered to the loader - Total int - // Inserted is the number of elements inserted into the lookup table - Inserted int +// Stats returns the number of addresses registered in the Loader and the number of rows +// inserted into the history table. +func (l *loader[K, T]) Stats() LoaderStats { + return l.stats } -// Exec will look up all the history account ids for the addresses registered in the loader. -// If there are no history account ids for a given set of addresses, Exec will insert rows -// into the history_accounts table to establish a mapping between address and history account id. -func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) error { - a.sealed = true - if len(a.set) == 0 { - return nil - } - q := &Q{session} - addresses := make([]string, 0, len(a.set)) - for address := range a.set { - addresses = append(addresses, address) - } +func (l *loader[K, T]) Name() string { + return l.name +} - if err := a.lookupKeys(ctx, q, addresses); err != nil { - return err +func (l *loader[K, T]) filter(keys []K) []K { + if len(l.ids) == 0 { + return keys } - a.stats.Total += len(addresses) - insert := 0 - for _, address := range addresses { - if _, ok := a.ids[address]; ok { + remaining := make([]K, 0, len(keys)) + for _, key := range keys { + if _, ok := l.ids[key]; ok { continue } - addresses[insert] = address - insert++ + remaining = append(remaining, key) } - if insert == 0 { - return nil + return remaining +} + +func (l *loader[K, T]) updateMap(rows []T) { + for _, row := range rows { + key, id := l.mappingFromRow(row) + l.ids[key] = id + } +} + +func (l *loader[K, T]) insert(ctx context.Context, q *Q, keys []K) (int, error) { + keys = l.filter(keys) + if len(keys) == 0 { + return 0, nil } - addresses = addresses[:insert] - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Strings(addresses) + var rows []T err := bulkInsert( ctx, q, - "history_accounts", - []string{"address"}, - []bulkInsertField{ - { - name: "address", - dbType: "character varying(64)", - objects: addresses, - }, - }, + l.table, + l.columnsForKeys(keys), + &rows, ) if err != nil { - return err + return 0, err } - a.stats.Inserted += insert - return a.lookupKeys(ctx, q, addresses) + l.updateMap(rows) + return len(rows), nil } -// Stats returns the number of addresses registered in the loader and the number of addresses -// inserted into the history_accounts table. -func (a *AccountLoader) Stats() LoaderStats { - return a.stats -} +func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K) (int, error) { + keys = l.filter(keys) + if len(keys) == 0 { + return 0, nil + } -func (a *AccountLoader) Name() string { - return "AccountLoader" + var rows []T + err := bulkGet( + ctx, + q, + l.table, + l.columnsForKeys(keys), + &rows, + ) + if err != nil { + return 0, err + } + + l.updateMap(rows) + return len(rows), nil } -type bulkInsertField struct { +type columnValues struct { name string dbType string objects []string } -func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField) error { +func bulkInsert(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error { unnestPart := make([]string, 0, len(fields)) insertFieldsPart := make([]string, 0, len(fields)) pqArrays := make([]interface{}, 0, len(fields)) + // In the code below we are building the bulk insert query which looks like: + // + // WITH rows AS + // (SELECT + // /* unnestPart */ + // unnest(?::type1[]), /* field1 */ + // unnest(?::type2[]), /* field2 */ + // ... + // ) + // INSERT INTO table ( + // /* insertFieldsPart */ + // field1, + // field2, + // ... + // ) + // SELECT * FROM rows ON CONFLICT (field1, field2, ...) DO NOTHING RETURNING * + // + // Using unnest allows to get around the maximum limit of 65,535 query parameters, + // see https://www.postgresql.org/docs/12/limits.html and + // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ + // + // Without using unnest we would have to use multiple insert statements to insert + // all the rows for large datasets. for _, field := range fields { unnestPart = append( unnestPart, @@ -200,21 +274,69 @@ func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string pq.Array(field.objects), ) } + columns := strings.Join(insertFieldsPart, ",") sql := ` - WITH r AS + WITH rows AS (SELECT ` + strings.Join(unnestPart, ",") + `) INSERT INTO ` + table + ` - (` + strings.Join(insertFieldsPart, ",") + `) - SELECT * from r - ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING` + (` + columns + `) + SELECT * FROM rows + ON CONFLICT (` + columns + `) DO NOTHING + RETURNING *` + + return q.SelectRaw( + ctx, + response, + sql, + pqArrays..., + ) +} + +func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error { + unnestPart := make([]string, 0, len(fields)) + columns := make([]string, 0, len(fields)) + pqArrays := make([]interface{}, 0, len(fields)) + + // In the code below we are building the bulk get query which looks like: + // + // SELECT * FROM table WHERE (field1, field2, ...) IN + // (SELECT + // /* unnestPart */ + // unnest(?::type1[]), /* field1 */ + // unnest(?::type2[]), /* field2 */ + // ... + // ) + // + // Using unnest allows to get around the maximum limit of 65,535 query parameters, + // see https://www.postgresql.org/docs/12/limits.html and + // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ + // + // Without using unnest we would have to use multiple select statements to obtain + // all the rows for large datasets. + for _, field := range fields { + unnestPart = append( + unnestPart, + fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), + ) + columns = append( + columns, + field.name, + ) + pqArrays = append( + pqArrays, + pq.Array(field.objects), + ) + } + sql := `SELECT * FROM ` + table + ` WHERE (` + strings.Join(columns, ",") + `) IN + (SELECT ` + strings.Join(unnestPart, ",") + `)` - _, err := q.ExecRaw( - context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType), + return q.SelectRaw( + ctx, + response, sql, pqArrays..., ) - return err } // AccountLoaderStub is a stub wrapper around AccountLoader which allows diff --git a/services/horizon/internal/db2/history/account_loader_test.go b/services/horizon/internal/db2/history/account_loader_test.go index ed30b43bd9..9a9fb30445 100644 --- a/services/horizon/internal/db2/history/account_loader_test.go +++ b/services/horizon/internal/db2/history/account_loader_test.go @@ -26,7 +26,7 @@ func TestAccountLoader(t *testing.T) { future := loader.GetFuture(address) _, err := future.Value() assert.Error(t, err) - assert.Contains(t, err.Error(), `invalid account loader state,`) + assert.Contains(t, err.Error(), `invalid loader state,`) duplicateFuture := loader.GetFuture(address) assert.Equal(t, future, duplicateFuture) } @@ -55,4 +55,35 @@ func TestAccountLoader(t *testing.T) { _, err = loader.GetNow("not present") assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that Loader works when all the previous values are already + // present in the db and also add 10 more rows to insert + loader = NewAccountLoader() + for i := 0; i < 10; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + for _, address := range addresses { + future := loader.GetFuture(address) + _, err = future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid loader state,`) + } + + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 110, + Inserted: 10, + }, loader.Stats()) + + for _, address := range addresses { + var internalId int64 + internalId, err = loader.GetNow(address) + assert.NoError(t, err) + var account Account + assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) + assert.Equal(t, account.ID, internalId) + assert.Equal(t, account.Address, address) + } + } diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index fe17dc17be..cdd2a0d714 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -1,16 +1,9 @@ package history import ( - "context" - "database/sql/driver" - "fmt" - "sort" "strings" "github.com/stellar/go/support/collections/set" - "github.com/stellar/go/support/db" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/ordered" "github.com/stellar/go/xdr" ) @@ -40,26 +33,13 @@ func AssetKeyFromXDR(asset xdr.Asset) AssetKey { // A FutureAssetID is created by an AssetLoader and // the asset id is available after calling Exec() on // the AssetLoader. -type FutureAssetID struct { - asset AssetKey - loader *AssetLoader -} - -// Value implements the database/sql/driver Valuer interface. -func (a FutureAssetID) Value() (driver.Value, error) { - return a.loader.GetNow(a.asset) -} +type FutureAssetID = future[AssetKey, Asset] // AssetLoader will map assets to their history // asset ids. If there is no existing mapping for a given sset, // the AssetLoader will insert into the history_assets table to // establish a mapping. -type AssetLoader struct { - sealed bool - set set.Set[AssetKey] - ids map[AssetKey]int64 - stats LoaderStats -} +type AssetLoader = loader[AssetKey, Asset] // NewAssetLoader will construct a new AssetLoader instance. func NewAssetLoader() *AssetLoader { @@ -68,152 +48,47 @@ func NewAssetLoader() *AssetLoader { set: set.Set[AssetKey]{}, ids: map[AssetKey]int64{}, stats: LoaderStats{}, - } -} - -// GetFuture registers the given asset into the loader and -// returns a FutureAssetID which will hold the history asset id for -// the asset after Exec() is called. -func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID { - if a.sealed { - panic(errSealed) - } - a.set.Add(asset) - return FutureAssetID{ - asset: asset, - loader: a, - } -} - -// GetNow returns the history asset id for the given asset. -// GetNow should only be called on values which were registered by -// GetFuture() calls. Also, Exec() must be called before any GetNow -// call can succeed. -func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) { - if !a.sealed { - return 0, fmt.Errorf(`invalid asset loader state, - Exec was not called yet to properly seal and resolve %v id`, asset) - } - if internalID, ok := a.ids[asset]; !ok { - return 0, fmt.Errorf(`asset loader id %v was not found`, asset) - } else { - return internalID, nil - } -} - -func (a *AssetLoader) lookupKeys(ctx context.Context, q *Q, keys []AssetKey) error { - var rows []Asset - for i := 0; i < len(keys); i += loaderLookupBatchSize { - end := ordered.Min(len(keys), i+loaderLookupBatchSize) - subset := keys[i:end] - args := make([]interface{}, 0, 3*len(subset)) - placeHolders := make([]string, 0, len(subset)) - for _, key := range subset { - args = append(args, key.Code, key.Type, key.Issuer) - placeHolders = append(placeHolders, "(?, ?, ?)") - } - rawSQL := fmt.Sprintf( - "SELECT * FROM history_assets WHERE (asset_code, asset_type, asset_issuer) in (%s)", - strings.Join(placeHolders, ", "), - ) - err := q.SelectRaw(ctx, &rows, rawSQL, args...) - if err != nil { - return errors.Wrap(err, "could not select assets") - } - - for _, row := range rows { - a.ids[AssetKey{ - Type: row.Type, - Code: row.Code, - Issuer: row.Issuer, - }] = row.ID - } - } - return nil -} - -// Exec will look up all the history asset ids for the assets registered in the loader. -// If there are no history asset ids for a given set of assets, Exec will insert rows -// into the history_assets table. -func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) error { - a.sealed = true - if len(a.set) == 0 { - return nil - } - q := &Q{session} - keys := make([]AssetKey, 0, len(a.set)) - for key := range a.set { - keys = append(keys, key) - } - - if err := a.lookupKeys(ctx, q, keys); err != nil { - return err - } - a.stats.Total += len(keys) - - assetTypes := make([]string, 0, len(a.set)-len(a.ids)) - assetCodes := make([]string, 0, len(a.set)-len(a.ids)) - assetIssuers := make([]string, 0, len(a.set)-len(a.ids)) - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Slice(keys, func(i, j int) bool { - return keys[i].String() < keys[j].String() - }) - insert := 0 - for _, key := range keys { - if _, ok := a.ids[key]; ok { - continue - } - assetTypes = append(assetTypes, key.Type) - assetCodes = append(assetCodes, key.Code) - assetIssuers = append(assetIssuers, key.Issuer) - keys[insert] = key - insert++ - } - if insert == 0 { - return nil - } - keys = keys[:insert] - - err := bulkInsert( - ctx, - q, - "history_assets", - []string{"asset_code", "asset_type", "asset_issuer"}, - []bulkInsertField{ - { - name: "asset_code", - dbType: "character varying(12)", - objects: assetCodes, - }, - { - name: "asset_issuer", - dbType: "character varying(56)", - objects: assetIssuers, - }, - { - name: "asset_type", - dbType: "character varying(64)", - objects: assetTypes, - }, + name: "AssetLoader", + table: "history_assets", + columnsForKeys: func(keys []AssetKey) []columnValues { + assetTypes := make([]string, 0, len(keys)) + assetCodes := make([]string, 0, len(keys)) + assetIssuers := make([]string, 0, len(keys)) + for _, key := range keys { + assetTypes = append(assetTypes, key.Type) + assetCodes = append(assetCodes, key.Code) + assetIssuers = append(assetIssuers, key.Issuer) + } + + return []columnValues{ + { + name: "asset_code", + dbType: "character varying(12)", + objects: assetCodes, + }, + { + name: "asset_type", + dbType: "character varying(64)", + objects: assetTypes, + }, + { + name: "asset_issuer", + dbType: "character varying(56)", + objects: assetIssuers, + }, + } + }, + mappingFromRow: func(asset Asset) (AssetKey, int64) { + return AssetKey{ + Type: asset.Type, + Code: asset.Code, + Issuer: asset.Issuer, + }, asset.ID + }, + less: func(a AssetKey, b AssetKey) bool { + return a.String() < b.String() }, - ) - if err != nil { - return err } - a.stats.Inserted += insert - - return a.lookupKeys(ctx, q, keys) -} - -// Stats returns the number of assets registered in the loader and the number of assets -// inserted into the history_assets table. -func (a *AssetLoader) Stats() LoaderStats { - return a.stats -} - -func (a *AssetLoader) Name() string { - return "AssetLoader" } // AssetLoaderStub is a stub wrapper around AssetLoader which allows diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index f097561e4a..ca65cebb7e 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -71,7 +71,7 @@ func TestAssetLoader(t *testing.T) { future := loader.GetFuture(key) _, err := future.Value() assert.Error(t, err) - assert.Contains(t, err.Error(), `invalid asset loader state,`) + assert.Contains(t, err.Error(), `invalid loader state,`) duplicateFuture := loader.GetFuture(key) assert.Equal(t, future, duplicateFuture) } @@ -106,4 +106,58 @@ func TestAssetLoader(t *testing.T) { _, err = loader.GetNow(AssetKey{}) assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that Loader works when all the previous values are already + // present in the db and also add 10 more rows to insert + loader = NewAssetLoader() + for i := 0; i < 10; i++ { + var key AssetKey + if i%2 == 0 { + code := [4]byte{0, 0, 0, 0} + copy(code[:], fmt.Sprintf("ab%d", i)) + key = AssetKeyFromXDR(xdr.Asset{ + Type: xdr.AssetTypeAssetTypeCreditAlphanum4, + AlphaNum4: &xdr.AlphaNum4{ + AssetCode: code, + Issuer: xdr.MustAddress(keypair.MustRandom().Address())}}) + } else { + code := [12]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + copy(code[:], fmt.Sprintf("abcdef%d", i)) + key = AssetKeyFromXDR(xdr.Asset{ + Type: xdr.AssetTypeAssetTypeCreditAlphanum12, + AlphaNum12: &xdr.AlphaNum12{ + AssetCode: code, + Issuer: xdr.MustAddress(keypair.MustRandom().Address())}}) + + } + keys = append(keys, key) + } + + for _, key := range keys { + future := loader.GetFuture(key) + _, err = future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid loader state,`) + } + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 110, + Inserted: 10, + }, loader.Stats()) + + for _, key := range keys { + var internalID int64 + internalID, err = loader.GetNow(key) + assert.NoError(t, err) + var assetXDR xdr.Asset + if key.Type == "native" { + assetXDR = xdr.MustNewNativeAsset() + } else { + assetXDR = xdr.MustNewCreditAsset(key.Code, key.Issuer) + } + var assetID int64 + assetID, err = q.GetAssetID(context.Background(), assetXDR) + assert.NoError(t, err) + assert.Equal(t, assetID, internalID) + } } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader.go b/services/horizon/internal/db2/history/claimable_balance_loader.go index ef18683cb6..f775ea4b24 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader.go @@ -1,41 +1,22 @@ package history import ( - "context" - "database/sql/driver" - "fmt" - "sort" + "cmp" "github.com/stellar/go/support/collections/set" - "github.com/stellar/go/support/db" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/ordered" ) // FutureClaimableBalanceID represents a future history claimable balance. // A FutureClaimableBalanceID is created by a ClaimableBalanceLoader and // the claimable balance id is available after calling Exec() on // the ClaimableBalanceLoader. -type FutureClaimableBalanceID struct { - id string - loader *ClaimableBalanceLoader -} - -// Value implements the database/sql/driver Valuer interface. -func (a FutureClaimableBalanceID) Value() (driver.Value, error) { - return a.loader.getNow(a.id) -} +type FutureClaimableBalanceID = future[string, HistoryClaimableBalance] // ClaimableBalanceLoader will map claimable balance ids to their internal // history ids. If there is no existing mapping for a given claimable balance id, // the ClaimableBalanceLoader will insert into the history_claimable_balances table to // establish a mapping. -type ClaimableBalanceLoader struct { - sealed bool - set set.Set[string] - ids map[string]int64 - stats LoaderStats -} +type ClaimableBalanceLoader = loader[string, HistoryClaimableBalance] // NewClaimableBalanceLoader will construct a new ClaimableBalanceLoader instance. func NewClaimableBalanceLoader() *ClaimableBalanceLoader { @@ -44,118 +25,20 @@ func NewClaimableBalanceLoader() *ClaimableBalanceLoader { set: set.Set[string]{}, ids: map[string]int64{}, stats: LoaderStats{}, - } -} - -// GetFuture registers the given claimable balance into the loader and -// returns a FutureClaimableBalanceID which will hold the internal history id for -// the claimable balance after Exec() is called. -func (a *ClaimableBalanceLoader) GetFuture(id string) FutureClaimableBalanceID { - if a.sealed { - panic(errSealed) - } - - a.set.Add(id) - return FutureClaimableBalanceID{ - id: id, - loader: a, - } -} - -// getNow returns the internal history id for the given claimable balance. -// getNow should only be called on values which were registered by -// GetFuture() calls. Also, Exec() must be called before any getNow -// call can succeed. -func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) { - if !a.sealed { - return 0, fmt.Errorf(`invalid claimable balance loader state, - Exec was not called yet to properly seal and resolve %v id`, id) - } - if internalID, ok := a.ids[id]; !ok { - return 0, fmt.Errorf(`claimable balance loader id %q was not found`, id) - } else { - return internalID, nil - } -} - -func (a *ClaimableBalanceLoader) lookupKeys(ctx context.Context, q *Q, ids []string) error { - for i := 0; i < len(ids); i += loaderLookupBatchSize { - end := ordered.Min(len(ids), i+loaderLookupBatchSize) - - cbs, err := q.ClaimableBalancesByIDs(ctx, ids[i:end]) - if err != nil { - return errors.Wrap(err, "could not select claimable balances") - } - - for _, cb := range cbs { - a.ids[cb.BalanceID] = cb.InternalID - } - } - return nil -} - -// Exec will look up all the internal history ids for the claimable balances registered in the loader. -// If there are no internal ids for a given set of claimable balances, Exec will insert rows -// into the history_claimable_balances table. -func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInterface) error { - a.sealed = true - if len(a.set) == 0 { - return nil - } - q := &Q{session} - ids := make([]string, 0, len(a.set)) - for id := range a.set { - ids = append(ids, id) - } - - if err := a.lookupKeys(ctx, q, ids); err != nil { - return err - } - a.stats.Total += len(ids) - - insert := 0 - for _, id := range ids { - if _, ok := a.ids[id]; ok { - continue - } - ids[insert] = id - insert++ - } - if insert == 0 { - return nil - } - ids = ids[:insert] - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Strings(ids) - - err := bulkInsert( - ctx, - q, - "history_claimable_balances", - []string{"claimable_balance_id"}, - []bulkInsertField{ - { - name: "claimable_balance_id", - dbType: "text", - objects: ids, - }, + name: "ClaimableBalanceLoader", + table: "history_claimable_balances", + columnsForKeys: func(keys []string) []columnValues { + return []columnValues{ + { + name: "claimable_balance_id", + dbType: "text", + objects: keys, + }, + } }, - ) - if err != nil { - return err + mappingFromRow: func(row HistoryClaimableBalance) (string, int64) { + return row.BalanceID, row.InternalID + }, + less: cmp.Less[string], } - a.stats.Inserted += insert - - return a.lookupKeys(ctx, q, ids) -} - -// Stats returns the number of claimable balances registered in the loader and the number of claimable balances -// inserted into the history_claimable_balances table. -func (a *ClaimableBalanceLoader) Stats() LoaderStats { - return a.stats -} - -func (a *ClaimableBalanceLoader) Name() string { - return "ClaimableBalanceLoader" } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader_test.go b/services/horizon/internal/db2/history/claimable_balance_loader_test.go index aaf91ccdcc..f5759015c7 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader_test.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader_test.go @@ -35,7 +35,7 @@ func TestClaimableBalanceLoader(t *testing.T) { futures = append(futures, future) _, err := future.Value() assert.Error(t, err) - assert.Contains(t, err.Error(), `invalid claimable balance loader state,`) + assert.Contains(t, err.Error(), `invalid loader state,`) duplicateFuture := loader.GetFuture(id) assert.Equal(t, future, duplicateFuture) } @@ -63,8 +63,45 @@ func TestClaimableBalanceLoader(t *testing.T) { assert.Equal(t, cb.InternalID, internalID) } - futureCb := &FutureClaimableBalanceID{id: "not-present", loader: loader} + futureCb := &FutureClaimableBalanceID{key: "not-present", loader: loader} _, err = futureCb.Value() assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that Loader works when all the previous values are already + // present in the db and also add 10 more rows to insert + loader = NewClaimableBalanceLoader() + for i := 100; i < 110; i++ { + balanceID := xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &xdr.Hash{byte(i)}, + } + var id string + id, err = xdr.MarshalHex(balanceID) + tt.Assert.NoError(err) + ids = append(ids, id) + } + + for _, id := range ids { + future := loader.GetFuture(id) + _, err = future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid loader state,`) + } + + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 110, + Inserted: 10, + }, loader.Stats()) + + for _, id := range ids { + internalID, err := loader.GetNow(id) + assert.NoError(t, err) + var cb HistoryClaimableBalance + cb, err = q.ClaimableBalanceByID(context.Background(), id) + assert.NoError(t, err) + assert.Equal(t, cb.BalanceID, id) + assert.Equal(t, cb.InternalID, internalID) + } } diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader.go b/services/horizon/internal/db2/history/liquidity_pool_loader.go index d619fa3bb4..a03caaa988 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader.go @@ -1,41 +1,22 @@ package history import ( - "context" - "database/sql/driver" - "fmt" - "sort" + "cmp" "github.com/stellar/go/support/collections/set" - "github.com/stellar/go/support/db" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/ordered" ) // FutureLiquidityPoolID represents a future history liquidity pool. // A FutureLiquidityPoolID is created by an LiquidityPoolLoader and // the liquidity pool id is available after calling Exec() on // the LiquidityPoolLoader. -type FutureLiquidityPoolID struct { - id string - loader *LiquidityPoolLoader -} - -// Value implements the database/sql/driver Valuer interface. -func (a FutureLiquidityPoolID) Value() (driver.Value, error) { - return a.loader.GetNow(a.id) -} +type FutureLiquidityPoolID = future[string, HistoryLiquidityPool] // LiquidityPoolLoader will map liquidity pools to their internal // history ids. If there is no existing mapping for a given liquidity pool, // the LiquidityPoolLoader will insert into the history_liquidity_pools table to // establish a mapping. -type LiquidityPoolLoader struct { - sealed bool - set set.Set[string] - ids map[string]int64 - stats LoaderStats -} +type LiquidityPoolLoader = loader[string, HistoryLiquidityPool] // NewLiquidityPoolLoader will construct a new LiquidityPoolLoader instance. func NewLiquidityPoolLoader() *LiquidityPoolLoader { @@ -44,120 +25,22 @@ func NewLiquidityPoolLoader() *LiquidityPoolLoader { set: set.Set[string]{}, ids: map[string]int64{}, stats: LoaderStats{}, - } -} - -// GetFuture registers the given liquidity pool into the loader and -// returns a FutureLiquidityPoolID which will hold the internal history id for -// the liquidity pool after Exec() is called. -func (a *LiquidityPoolLoader) GetFuture(id string) FutureLiquidityPoolID { - if a.sealed { - panic(errSealed) - } - - a.set.Add(id) - return FutureLiquidityPoolID{ - id: id, - loader: a, - } -} - -// GetNow returns the internal history id for the given liquidity pool. -// GetNow should only be called on values which were registered by -// GetFuture() calls. Also, Exec() must be called before any GetNow -// call can succeed. -func (a *LiquidityPoolLoader) GetNow(id string) (int64, error) { - if !a.sealed { - return 0, fmt.Errorf(`invalid liquidity pool loader state, - Exec was not called yet to properly seal and resolve %v id`, id) - } - if internalID, ok := a.ids[id]; !ok { - return 0, fmt.Errorf(`liquidity pool loader id %q was not found`, id) - } else { - return internalID, nil - } -} - -func (a *LiquidityPoolLoader) lookupKeys(ctx context.Context, q *Q, ids []string) error { - for i := 0; i < len(ids); i += loaderLookupBatchSize { - end := ordered.Min(len(ids), i+loaderLookupBatchSize) - - lps, err := q.LiquidityPoolsByIDs(ctx, ids[i:end]) - if err != nil { - return errors.Wrap(err, "could not select accounts") - } - - for _, lp := range lps { - a.ids[lp.PoolID] = lp.InternalID - } - } - return nil -} - -// Exec will look up all the internal history ids for the liquidity pools registered in the loader. -// If there are no internal history ids for a given set of liquidity pools, Exec will insert rows -// into the history_liquidity_pools table. -func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterface) error { - a.sealed = true - if len(a.set) == 0 { - return nil - } - q := &Q{session} - ids := make([]string, 0, len(a.set)) - for id := range a.set { - ids = append(ids, id) - } - - if err := a.lookupKeys(ctx, q, ids); err != nil { - return err - } - a.stats.Total += len(ids) - - insert := 0 - for _, id := range ids { - if _, ok := a.ids[id]; ok { - continue - } - ids[insert] = id - insert++ - } - if insert == 0 { - return nil - } - ids = ids[:insert] - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Strings(ids) - - err := bulkInsert( - ctx, - q, - "history_liquidity_pools", - []string{"liquidity_pool_id"}, - []bulkInsertField{ - { - name: "liquidity_pool_id", - dbType: "text", - objects: ids, - }, + name: "LiquidityPoolLoader", + table: "history_liquidity_pools", + columnsForKeys: func(keys []string) []columnValues { + return []columnValues{ + { + name: "liquidity_pool_id", + dbType: "text", + objects: keys, + }, + } }, - ) - if err != nil { - return err + mappingFromRow: func(row HistoryLiquidityPool) (string, int64) { + return row.PoolID, row.InternalID + }, + less: cmp.Less[string], } - a.stats.Inserted += insert - - return a.lookupKeys(ctx, q, ids) -} - -// Stats returns the number of liquidity pools registered in the loader and the number of liquidity pools -// inserted into the history_liquidity_pools table. -func (a *LiquidityPoolLoader) Stats() LoaderStats { - return a.stats -} - -func (a *LiquidityPoolLoader) Name() string { - return "LiquidityPoolLoader" } // LiquidityPoolLoaderStub is a stub wrapper around LiquidityPoolLoader which allows diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go index 25ca80826c..aec2fcd886 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go @@ -29,7 +29,7 @@ func TestLiquidityPoolLoader(t *testing.T) { future := loader.GetFuture(id) _, err := future.Value() assert.Error(t, err) - assert.Contains(t, err.Error(), `invalid liquidity pool loader state,`) + assert.Contains(t, err.Error(), `invalid loader state,`) duplicateFuture := loader.GetFuture(id) assert.Equal(t, future, duplicateFuture) } @@ -59,4 +59,39 @@ func TestLiquidityPoolLoader(t *testing.T) { _, err = loader.GetNow("not present") assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that Loader works when all the previous values are already + // present in the db and also add 10 more rows to insert + loader = NewLiquidityPoolLoader() + for i := 100; i < 110; i++ { + poolID := xdr.PoolId{byte(i)} + var id string + id, err = xdr.MarshalHex(poolID) + tt.Assert.NoError(err) + ids = append(ids, id) + } + + for _, id := range ids { + future := loader.GetFuture(id) + _, err = future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid loader state,`) + } + + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 110, + Inserted: 10, + }, loader.Stats()) + + for _, id := range ids { + var internalID int64 + internalID, err = loader.GetNow(id) + assert.NoError(t, err) + var lp HistoryLiquidityPool + lp, err = q.LiquidityPoolByID(context.Background(), id) + assert.NoError(t, err) + assert.Equal(t, lp.PoolID, id) + assert.Equal(t, lp.InternalID, internalID) + } }