diff --git a/.github/workflows/ledgerexporter.yml b/.github/workflows/ledgerexporter.yml index c80a367771..ac1e265582 100644 --- a/.github/workflows/ledgerexporter.yml +++ b/.github/workflows/ledgerexporter.yml @@ -13,9 +13,10 @@ jobs: CAPTIVE_CORE_DEBIAN_PKG_VERSION: 21.1.0-1921.b3aeb14cc.focal LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED: "true" LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core - # this pins to a version of quickstart:testing that has the same version as LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN + # this pins to a version of quickstart:testing that has the same version of core + # as specified on LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN # this is the multi-arch index sha, get it by 'docker buildx imagetools inspect stellar/quickstart:testing' - LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:03c6679f838a92b1eda4cd3a9e2bdee4c3586e278a138a0acf36a9bc99a0041f + LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:5c8186f53cc98571749054dd782dce33b0aca2d1a622a7610362f7c15b79b1bf LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false" steps: - name: Install captive core diff --git a/exp/services/ledgerexporter/internal/integration_test.go b/exp/services/ledgerexporter/internal/integration_test.go index dab2e5b5f8..ccc5463908 100644 --- a/exp/services/ledgerexporter/internal/integration_test.go +++ b/exp/services/ledgerexporter/internal/integration_test.go @@ -34,6 +34,7 @@ const ( // tests then refer to ledger sequences only up to this, therefore // don't have to do complex waiting within test for a sequence to exist. waitForCoreLedgerSequence = 16 + configTemplate = "test/integration_config_template.toml" ) func TestLedgerExporterTestSuite(t *testing.T) { @@ -54,6 +55,7 @@ type LedgerExporterTestSuite struct { dockerCli *client.Client gcsServer *fakestorage.Server finishedSetup bool + config Config } func (s *LedgerExporterTestSuite) TestScanAndFill() { @@ -74,7 +76,7 @@ func (s *LedgerExporterTestSuite) TestScanAndFill() { s.T().Log(output) s.T().Log(errOutput) - datastore, err := datastore.NewGCSDataStore(s.ctx, "integration-test/standalone") + datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig) require.NoError(err) _, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFFA--5.xdr.zstd") @@ -104,7 +106,7 @@ func (s *LedgerExporterTestSuite) TestAppend() { s.T().Log(output) s.T().Log(errOutput) - datastore, err := datastore.NewGCSDataStore(s.ctx, "integration-test/standalone") + datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig) require.NoError(err) _, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFF6--9.xdr.zstd") @@ -134,7 +136,7 @@ func (s *LedgerExporterTestSuite) TestAppendUnbounded() { s.T().Log(errOutput) }() - datastore, err := datastore.NewGCSDataStore(s.ctx, "integration-test/standalone") + datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig) require.NoError(err) require.EventuallyWithT(func(c *assert.CollectT) { @@ -158,9 +160,9 @@ func (s *LedgerExporterTestSuite) SetupSuite() { }() testTempDir := t.TempDir() - ledgerExporterConfigTemplate, err := toml.LoadFile("test/integration_config_template.toml") + ledgerExporterConfigTemplate, err := toml.LoadFile(configTemplate) if err != nil { - t.Fatalf("unable to load config template file %v", err) + t.Fatalf("unable to load config template file %v, %v", configTemplate, err) } // if LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN not specified, @@ -172,7 +174,10 @@ func (s *LedgerExporterTestSuite) SetupSuite() { tomlBytes, err := toml.Marshal(ledgerExporterConfigTemplate) if err != nil { - t.Fatalf("unable to load config file %v", err) + t.Fatalf("unable to parse config file toml %v, %v", configTemplate, err) + } + if err = toml.Unmarshal(tomlBytes, &s.config); err != nil { + t.Fatalf("unable to marshal config file toml into struct, %v", err) } tempSeedDataPath := filepath.Join(testTempDir, "data") diff --git a/ingest/ledgerbackend/buffered_storage_backend.go b/ingest/ledgerbackend/buffered_storage_backend.go index 4a353bfe22..aa70336295 100644 --- a/ingest/ledgerbackend/buffered_storage_backend.go +++ b/ingest/ledgerbackend/buffered_storage_backend.go @@ -18,12 +18,10 @@ import ( var _ LedgerBackend = (*BufferedStorageBackend)(nil) type BufferedStorageBackendConfig struct { - LedgerBatchConfig datastore.DataStoreSchema - DataStore datastore.DataStore - BufferSize uint32 - NumWorkers uint32 - RetryLimit uint32 - RetryWait time.Duration + BufferSize uint32 `toml:"buffer_size"` + NumWorkers uint32 `toml:"num_workers"` + RetryLimit uint32 `toml:"retry_limit"` + RetryWait time.Duration `toml:"retry_wait"` } // BufferedStorageBackend is a ledger backend that reads from a storage service. @@ -45,7 +43,7 @@ type BufferedStorageBackend struct { } // NewBufferedStorageBackend returns a new BufferedStorageBackend instance. -func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBackendConfig) (*BufferedStorageBackend, error) { +func NewBufferedStorageBackend(config BufferedStorageBackendConfig, dataStore datastore.DataStore) (*BufferedStorageBackend, error) { if config.BufferSize == 0 { return nil, errors.New("buffer size must be > 0") } @@ -54,17 +52,13 @@ func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBacken return nil, errors.New("number of workers must be <= BufferSize") } - if config.DataStore == nil { - return nil, errors.New("no DataStore provided") - } - - if config.LedgerBatchConfig.LedgersPerFile <= 0 { + if dataStore.GetSchema().LedgersPerFile <= 0 { return nil, errors.New("ledgersPerFile must be > 0") } bsBackend := &BufferedStorageBackend{ config: config, - dataStore: config.DataStore, + dataStore: dataStore, } return bsBackend, nil diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index f18329fffa..ca2711c40d 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -44,29 +44,21 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig param := make(map[string]string) param["destination_bucket_path"] = "testURL" - ledgerBatchConfig := datastore.DataStoreSchema{ - LedgersPerFile: 1, - FilesPerPartition: 64000, - } - - dataStore := new(datastore.MockDataStore) - return BufferedStorageBackendConfig{ - LedgerBatchConfig: ledgerBatchConfig, - DataStore: dataStore, - BufferSize: 100, - NumWorkers: 5, - RetryLimit: 3, - RetryWait: time.Microsecond, + BufferSize: 100, + NumWorkers: 5, + RetryLimit: 3, + RetryWait: time.Microsecond, } } func createBufferedStorageBackendForTesting() BufferedStorageBackend { config := createBufferedStorageBackendConfigForTesting() + dataStore := new(datastore.MockDataStore) return BufferedStorageBackend{ config: config, - dataStore: config.DataStore, + dataStore: dataStore, } } @@ -86,6 +78,10 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) } mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil) } + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: count, + FilesPerPartition: partitionSize, + }) t.Cleanup(func() { mockDataStore.AssertExpectations(t) @@ -126,15 +122,18 @@ func createLCMBatchReader(start, end, count uint32) io.ReadCloser { } func TestNewBufferedStorageBackend(t *testing.T) { - ctx := context.Background() config := createBufferedStorageBackendConfigForTesting() - - bsb, err := NewBufferedStorageBackend(ctx, config) + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: uint32(1), + FilesPerPartition: partitionSize, + }) + bsb, err := NewBufferedStorageBackend(config, mockDataStore) assert.NoError(t, err) - assert.Equal(t, bsb.dataStore, config.DataStore) - assert.Equal(t, uint32(1), bsb.config.LedgerBatchConfig.LedgersPerFile) - assert.Equal(t, uint32(64000), bsb.config.LedgerBatchConfig.FilesPerPartition) + assert.Equal(t, bsb.dataStore, mockDataStore) + assert.Equal(t, uint32(1), bsb.dataStore.GetSchema().LedgersPerFile) + assert.Equal(t, uint32(64000), bsb.dataStore.GetSchema().FilesPerPartition) assert.Equal(t, uint32(100), bsb.config.BufferSize) assert.Equal(t, uint32(5), bsb.config.NumWorkers) assert.Equal(t, uint32(3), bsb.config.RetryLimit) @@ -210,12 +209,14 @@ func TestCloudStorageGetLedger_MultipleLedgerPerFile(t *testing.T) { lcmArray := createLCMForTesting(startLedger, endLedger) bsb := createBufferedStorageBackendForTesting() ctx := context.Background() - bsb.config.LedgerBatchConfig.LedgersPerFile = uint32(2) ledgerRange := BoundedRange(startLedger, endLedger) mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 2) bsb.dataStore = mockDataStore - + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: uint32(2), + FilesPerPartition: partitionSize, + }) assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50) @@ -451,6 +452,10 @@ func TestLedgerBufferClose(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: ledgerPerFileCount, + FilesPerPartition: partitionSize, + }) objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3) afterPrepareRange := make(chan struct{}) @@ -483,7 +488,10 @@ func TestLedgerBufferBoundedObjectNotFound(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: ledgerPerFileCount, + FilesPerPartition: partitionSize, + }) objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3) mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once() t.Cleanup(func() { @@ -509,7 +517,10 @@ func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: ledgerPerFileCount, + FilesPerPartition: partitionSize, + }) objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3) iteration := &atomic.Int32{} cancelAfter := int32(bsb.config.RetryLimit) + 2 @@ -551,7 +562,10 @@ func TestLedgerBufferRetryLimit(t *testing.T) { }) bsb.dataStore = mockDataStore - + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: ledgerPerFileCount, + FilesPerPartition: partitionSize, + }) assert.NoError(t, bsb.PrepareRange(context.Background(), ledgerRange)) bsb.ledgerBuffer.wg.Wait() diff --git a/ingest/ledgerbackend/ledger_buffer.go b/ingest/ledgerbackend/ledger_buffer.go index 5b2ec57ffc..6965461bba 100644 --- a/ingest/ledgerbackend/ledger_buffer.go +++ b/ingest/ledgerbackend/ledger_buffer.go @@ -95,7 +95,7 @@ func (lb *ledgerBuffer) pushTaskQueue() { return } lb.taskQueue <- lb.nextTaskLedger - lb.nextTaskLedger += lb.config.LedgerBatchConfig.LedgersPerFile + lb.nextTaskLedger += lb.dataStore.GetSchema().LedgersPerFile } // sleepWithContext returns true upon sleeping without interruption from the context @@ -163,7 +163,7 @@ func (lb *ledgerBuffer) worker(ctx context.Context) { } func (lb *ledgerBuffer) downloadLedgerObject(ctx context.Context, sequence uint32) ([]byte, error) { - objectKey := lb.config.LedgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence) + objectKey := lb.dataStore.GetSchema().GetObjectKeyFromSequenceNumber(sequence) reader, err := lb.dataStore.GetFile(ctx, objectKey) if err != nil { @@ -198,7 +198,7 @@ func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) { for lb.ledgerPriorityQueue.Len() > 0 && lb.currentLedger == uint32(lb.ledgerPriorityQueue.Peek().startLedger) { item := lb.ledgerPriorityQueue.Pop() lb.ledgerQueue <- item.payload - lb.currentLedger += lb.config.LedgerBatchConfig.LedgersPerFile + lb.currentLedger += lb.dataStore.GetSchema().LedgersPerFile } } diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 501ad51847..cd5d8af57b 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -3,6 +3,17 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## Pending + +### Added + +- Reingest from pre-computed tx meta on remote cloud storage. ([4911](https://github.com/stellar/go/issues/4911)), ([5374](https://github.com/stellar/go/pull/5374)) + - Configure horizon reingestion to obtain ledger tx meta in pre-computed files from a Google Cloud Storage(GCS) location. + - Using this option will no longer require a captive core binary be present and it no longer runs a captive core sub-process, instead obtaining the tx meta from the GCS backend. + - Horizon supports this new feature with two new parameters `ledgerbackend` and `datastore-config` on the `reingest` command. Refer to [Reingestion README](./internal/ingest/README.md#reingestion). + + + ## 2.31.0 ### Breaking Changes diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index e2589a7385..92a732e002 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" + "github.com/pelletier/go-toml" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -18,28 +19,43 @@ import ( "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/db2/schema" "github.com/stellar/go/services/horizon/internal/ingest" + "github.com/stellar/go/support/config" support "github.com/stellar/go/support/config" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" hlog "github.com/stellar/go/support/log" ) -var dbCmd = &cobra.Command{ - Use: "db [command]", - Short: "commands to manage horizon's postgres db", -} - -var dbMigrateCmd = &cobra.Command{ - Use: "migrate [command]", - Short: "commands to run schema migrations on horizon's postgres db", -} +var ( + runDBReingestRangeFn = runDBReingestRange + dbCmd *cobra.Command + dbMigrateCmd *cobra.Command + dbInitCmd *cobra.Command + dbMigrateDownCmd *cobra.Command + dbMigrateRedoCmd *cobra.Command + dbMigrateStatusCmd *cobra.Command + dbMigrateUpCmd *cobra.Command + dbReapCmd *cobra.Command + dbReingestCmd *cobra.Command + dbReingestRangeCmd *cobra.Command + dbFillGapsCmd *cobra.Command + dbDetectGapsCmd *cobra.Command + reingestForce bool + parallelWorkers uint + parallelJobSize uint32 + retries uint + retryBackoffSeconds uint + ledgerBackendStr string + storageBackendConfigPath string + ledgerBackendType ingest.LedgerBackendType +) -func requireAndSetFlags(names ...string) error { +func requireAndSetFlags(horizonFlags config.ConfigOptions, names ...string) error { set := map[string]bool{} for _, name := range names { set[name] = true } - for _, flag := range globalFlags { + for _, flag := range horizonFlags { if set[flag.Name] { flag.Require() if err := flag.SetValue(); err != nil { @@ -58,44 +74,17 @@ func requireAndSetFlags(names ...string) error { return fmt.Errorf("could not find %s flags", strings.Join(missing, ",")) } -var dbInitCmd = &cobra.Command{ - Use: "init", - Short: "install schema", - Long: "init initializes the postgres database used by horizon.", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { - return err - } - - db, err := sql.Open("postgres", globalConfig.DatabaseURL) - if err != nil { - return err - } - - numMigrationsRun, err := schema.Migrate(db, schema.MigrateUp, 0) - if err != nil { - return err - } - - if numMigrationsRun == 0 { - log.Println("No migrations applied.") - } else { - log.Printf("Successfully applied %d migrations.\n", numMigrationsRun) - } - return nil - }, -} - -func migrate(dir schema.MigrateDir, count int) error { - if !globalConfig.Ingest { +func migrate(dir schema.MigrateDir, count int, horizonConfig *horizon.Config) error { + if !horizonConfig.Ingest { log.Println("Skipping migrations because ingest flag is not enabled") return nil } - dbConn, err := db.Open("postgres", globalConfig.DatabaseURL) + dbConn, err := db.Open("postgres", horizonConfig.DatabaseURL) if err != nil { return err } + defer dbConn.Close() numMigrationsRun, err := schema.Migrate(dbConn.DB.DB, dir, count) if err != nil { @@ -110,160 +99,6 @@ func migrate(dir schema.MigrateDir, count int) error { return nil } -var dbMigrateDownCmd = &cobra.Command{ - Use: "down COUNT", - Short: "run downwards db schema migrations", - Long: "performs a downards schema migration command", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { - return err - } - - // Only allow invocations with 1 args. - if len(args) != 1 { - return ErrUsage{cmd} - } - - count, err := strconv.Atoi(args[0]) - if err != nil { - log.Println(err) - return ErrUsage{cmd} - } - - return migrate(schema.MigrateDown, count) - }, -} - -var dbMigrateRedoCmd = &cobra.Command{ - Use: "redo COUNT", - Short: "redo db schema migrations", - Long: "performs a redo schema migration command", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { - return err - } - - // Only allow invocations with 1 args. - if len(args) != 1 { - return ErrUsage{cmd} - } - - count, err := strconv.Atoi(args[0]) - if err != nil { - log.Println(err) - return ErrUsage{cmd} - } - - return migrate(schema.MigrateRedo, count) - }, -} - -var dbMigrateStatusCmd = &cobra.Command{ - Use: "status", - Short: "print current database migration status", - Long: "print current database migration status", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName); err != nil { - return err - } - - // Only allow invocations with 0 args. - if len(args) != 0 { - fmt.Println(args) - return ErrUsage{cmd} - } - - dbConn, err := db.Open("postgres", globalConfig.DatabaseURL) - if err != nil { - return err - } - - status, err := schema.Status(dbConn.DB.DB) - if err != nil { - return err - } - - fmt.Println(status) - return nil - }, -} - -var dbMigrateUpCmd = &cobra.Command{ - Use: "up [COUNT]", - Short: "run upwards db schema migrations", - Long: "performs an upwards schema migration command", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { - return err - } - - // Only allow invocations with 0-1 args. - if len(args) > 1 { - return ErrUsage{cmd} - } - - count := 0 - if len(args) == 1 { - var err error - count, err = strconv.Atoi(args[0]) - if err != nil { - log.Println(err) - return ErrUsage{cmd} - } - } - - return migrate(schema.MigrateUp, count) - }, -} - -var dbReapCmd = &cobra.Command{ - Use: "reap", - Short: "reaps (i.e. removes) any reapable history data", - Long: "reap removes any historical data that is earlier than the configured retention cutoff", - RunE: func(cmd *cobra.Command, args []string) error { - - err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: false}) - if err != nil { - return err - } - - session, err := db.Open("postgres", globalConfig.DatabaseURL) - if err != nil { - return fmt.Errorf("cannot open Horizon DB: %v", err) - } - defer session.Close() - - reaper := ingest.NewReaper( - ingest.ReapConfig{ - RetentionCount: uint32(globalConfig.HistoryRetentionCount), - BatchSize: uint32(globalConfig.HistoryRetentionReapCount), - }, - session, - ) - ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) - defer cancel() - return reaper.DeleteUnretainedHistory(ctx) - }, -} - -var dbReingestCmd = &cobra.Command{ - Use: "reingest", - Short: "reingest commands", - Long: "reingest ingests historical data for every ledger or ledgers specified by subcommand", - RunE: func(cmd *cobra.Command, args []string) error { - fmt.Println("Use one of the subcomands...") - return ErrUsage{cmd} - }, -} - -var ( - reingestForce bool - parallelWorkers uint - parallelJobSize uint32 - retries uint - retryBackoffSeconds uint -) - func ingestRangeCmdOpts() support.ConfigOptions { return support.ConfigOptions{ { @@ -307,108 +142,43 @@ func ingestRangeCmdOpts() support.ConfigOptions { FlagDefault: uint(5), Usage: "[optional] backoff seconds between reingest retries", }, + { + Name: "ledgerbackend", + ConfigKey: &ledgerBackendStr, + OptType: types.String, + Required: false, + FlagDefault: ingest.CaptiveCoreBackend.String(), + Usage: fmt.Sprintf("[optional] Specify the ledger backend type: '%s' (default) or '%s'", + ingest.CaptiveCoreBackend.String(), + ingest.BufferedStorageBackend.String()), + CustomSetValue: func(co *support.ConfigOption) error { + val := viper.GetString(co.Name) + switch val { + case ingest.CaptiveCoreBackend.String(): + ledgerBackendType = ingest.CaptiveCoreBackend + case ingest.BufferedStorageBackend.String(): + ledgerBackendType = ingest.BufferedStorageBackend + default: + return fmt.Errorf("invalid ledger backend: %s, must be 'captive-core' or 'datastore'", val) + } + *co.ConfigKey.(*string) = val + return nil + }, + }, + { + Name: "datastore-config", + ConfigKey: &storageBackendConfigPath, + OptType: types.String, + Required: false, + Usage: "[optional] Specify the path to the datastore config file (required for datastore backend)", + }, } } var dbReingestRangeCmdOpts = ingestRangeCmdOpts() -var dbReingestRangeCmd = &cobra.Command{ - Use: "range [Start sequence number] [End sequence number]", - Short: "reingests ledgers within a range", - Long: "reingests ledgers between X and Y sequence number (closed intervals)", - RunE: func(cmd *cobra.Command, args []string) error { - if err := dbReingestRangeCmdOpts.RequireE(); err != nil { - return err - } - if err := dbReingestRangeCmdOpts.SetValues(); err != nil { - return err - } - - if len(args) != 2 { - return ErrUsage{cmd} - } - - argsUInt32 := make([]uint32, 2) - for i, arg := range args { - if seq, err := strconv.ParseUint(arg, 10, 32); err != nil { - cmd.Usage() - return fmt.Errorf(`invalid sequence number "%s"`, arg) - } else { - argsUInt32[i] = uint32(seq) - } - } - - err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}) - if err != nil { - return err - } - return runDBReingestRange( - []history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}}, - reingestForce, - parallelWorkers, - *globalConfig, - ) - }, -} - var dbFillGapsCmdOpts = ingestRangeCmdOpts() -var dbFillGapsCmd = &cobra.Command{ - Use: "fill-gaps [Start sequence number] [End sequence number]", - Short: "Ingests any gaps found in the horizon db", - Long: "Ingests any gaps found in the horizon db. The command takes an optional start and end parameters which restrict the range of ledgers ingested.", - RunE: func(cmd *cobra.Command, args []string) error { - if err := dbFillGapsCmdOpts.RequireE(); err != nil { - return err - } - if err := dbFillGapsCmdOpts.SetValues(); err != nil { - return err - } - - if len(args) != 0 && len(args) != 2 { - hlog.Errorf("Expected either 0 arguments or 2 but found %v arguments", len(args)) - return ErrUsage{cmd} - } - var start, end uint64 - var withRange bool - if len(args) == 2 { - var err error - start, err = strconv.ParseUint(args[0], 10, 32) - if err != nil { - cmd.Usage() - return fmt.Errorf(`invalid sequence number "%s"`, args[0]) - } - end, err = strconv.ParseUint(args[1], 10, 32) - if err != nil { - cmd.Usage() - return fmt.Errorf(`invalid sequence number "%s"`, args[1]) - } - withRange = true - } - - err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}) - if err != nil { - return err - } - var gaps []history.LedgerRange - if withRange { - gaps, err = runDBDetectGapsInRange(*globalConfig, uint32(start), uint32(end)) - if err != nil { - return err - } - hlog.Infof("found gaps %v within range [%v, %v]", gaps, start, end) - } else { - gaps, err = runDBDetectGaps(*globalConfig) - if err != nil { - return err - } - hlog.Infof("found gaps %v", gaps) - } - - return runDBReingestRange(gaps, reingestForce, parallelWorkers, *globalConfig) - }, -} - -func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config) error { +func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error { var err error if reingestForce && parallelWorkers > 1 { @@ -435,6 +205,8 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, RoundingSlippageFilter: config.RoundingSlippageFilter, MaxLedgerPerFlush: maxLedgersPerFlush, SkipTxmeta: config.SkipTxmeta, + LedgerBackendType: ledgerBackendType, + StorageBackendConfig: storageBackendConfig, } if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil { @@ -476,35 +248,6 @@ the reingest command completes.`) return nil } -var dbDetectGapsCmd = &cobra.Command{ - Use: "detect-gaps", - Short: "detects ingestion gaps in Horizon's database", - Long: "detects ingestion gaps in Horizon's database and prints a list of reingest commands needed to fill the gaps", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName); err != nil { - return err - } - - if len(args) != 0 { - return ErrUsage{cmd} - } - gaps, err := runDBDetectGaps(*globalConfig) - if err != nil { - return err - } - if len(gaps) == 0 { - hlog.Info("No gaps found") - return nil - } - fmt.Println("Horizon commands to run in order to fill in the gaps:") - cmdname := os.Args[0] - for _, g := range gaps { - fmt.Printf("%s db reingest range %d %d\n", cmdname, g.StartSequence, g.EndSequence) - } - return nil - }, -} - func runDBDetectGaps(config horizon.Config) ([]history.LedgerRange, error) { horizonSession, err := db.Open("postgres", config.DatabaseURL) if err != nil { @@ -525,7 +268,340 @@ func runDBDetectGapsInRange(config horizon.Config, start, end uint32) ([]history return q.GetLedgerGapsInRange(context.Background(), start, end) } -func init() { +func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, horizonFlags config.ConfigOptions) { + dbCmd = &cobra.Command{ + Use: "db [command]", + Short: "commands to manage horizon's postgres db", + } + + dbMigrateCmd = &cobra.Command{ + Use: "migrate [command]", + Short: "commands to run schema migrations on horizon's postgres db", + } + + dbInitCmd = &cobra.Command{ + Use: "init", + Short: "install schema", + Long: "init initializes the postgres database used by horizon.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { + return err + } + + db, err := sql.Open("postgres", horizonConfig.DatabaseURL) + if err != nil { + return err + } + + numMigrationsRun, err := schema.Migrate(db, schema.MigrateUp, 0) + if err != nil { + return err + } + + if numMigrationsRun == 0 { + log.Println("No migrations applied.") + } else { + log.Printf("Successfully applied %d migrations.\n", numMigrationsRun) + } + return nil + }, + } + + dbMigrateDownCmd = &cobra.Command{ + Use: "down COUNT", + Short: "run downwards db schema migrations", + Long: "performs a downards schema migration command", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { + return err + } + + // Only allow invocations with 1 args. + if len(args) != 1 { + return ErrUsage{cmd} + } + + count, err := strconv.Atoi(args[0]) + if err != nil { + log.Println(err) + return ErrUsage{cmd} + } + + return migrate(schema.MigrateDown, count, horizonConfig) + }, + } + + dbMigrateRedoCmd = &cobra.Command{ + Use: "redo COUNT", + Short: "redo db schema migrations", + Long: "performs a redo schema migration command", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { + return err + } + + // Only allow invocations with 1 args. + if len(args) != 1 { + return ErrUsage{cmd} + } + + count, err := strconv.Atoi(args[0]) + if err != nil { + log.Println(err) + return ErrUsage{cmd} + } + + return migrate(schema.MigrateRedo, count, horizonConfig) + }, + } + + dbMigrateStatusCmd = &cobra.Command{ + Use: "status", + Short: "print current database migration status", + Long: "print current database migration status", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName); err != nil { + return err + } + + // Only allow invocations with 0 args. + if len(args) != 0 { + fmt.Println(args) + return ErrUsage{cmd} + } + + dbConn, err := db.Open("postgres", horizonConfig.DatabaseURL) + if err != nil { + return err + } + + status, err := schema.Status(dbConn.DB.DB) + if err != nil { + return err + } + + fmt.Println(status) + return nil + }, + } + + dbMigrateUpCmd = &cobra.Command{ + Use: "up [COUNT]", + Short: "run upwards db schema migrations", + Long: "performs an upwards schema migration command", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { + return err + } + + // Only allow invocations with 0-1 args. + if len(args) > 1 { + return ErrUsage{cmd} + } + + count := 0 + if len(args) == 1 { + var err error + count, err = strconv.Atoi(args[0]) + if err != nil { + log.Println(err) + return ErrUsage{cmd} + } + } + + return migrate(schema.MigrateUp, count, horizonConfig) + }, + } + + dbReapCmd = &cobra.Command{ + Use: "reap", + Short: "reaps (i.e. removes) any reapable history data", + Long: "reap removes any historical data that is earlier than the configured retention cutoff", + RunE: func(cmd *cobra.Command, args []string) error { + + err := horizon.ApplyFlags(horizonConfig, horizonFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}) + if err != nil { + return err + } + + session, err := db.Open("postgres", horizonConfig.DatabaseURL) + if err != nil { + return fmt.Errorf("cannot open Horizon DB: %v", err) + } + defer session.Close() + + reaper := ingest.NewReaper( + ingest.ReapConfig{ + RetentionCount: uint32(horizonConfig.HistoryRetentionCount), + BatchSize: uint32(horizonConfig.HistoryRetentionReapCount), + }, + session, + ) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer cancel() + return reaper.DeleteUnretainedHistory(ctx) + }, + } + + dbReingestCmd = &cobra.Command{ + Use: "reingest", + Short: "reingest commands", + Long: "reingest ingests historical data for every ledger or ledgers specified by subcommand", + RunE: func(cmd *cobra.Command, args []string) error { + fmt.Println("Use one of the subcomands...") + return ErrUsage{cmd} + }, + } + + dbReingestRangeCmd = &cobra.Command{ + Use: "range [Start sequence number] [End sequence number]", + Short: "reingests ledgers within a range", + Long: "reingests ledgers between X and Y sequence number (closed intervals)", + RunE: func(cmd *cobra.Command, args []string) error { + if err := dbReingestRangeCmdOpts.RequireE(); err != nil { + return err + } + if err := dbReingestRangeCmdOpts.SetValues(); err != nil { + return err + } + + if len(args) != 2 { + return ErrUsage{cmd} + } + + argsUInt32 := make([]uint32, 2) + for i, arg := range args { + if seq, err := strconv.ParseUint(arg, 10, 32); err != nil { + cmd.Usage() + return fmt.Errorf(`invalid sequence number "%s"`, arg) + } else { + argsUInt32[i] = uint32(seq) + } + } + + var err error + var storageBackendConfig ingest.StorageBackendConfig + options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} + if ledgerBackendType == ingest.BufferedStorageBackend { + if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil { + return err + } + // when using buffered storage, performance observations have noted optimal parallel batch size + // of 100, apply that as default if the flag was absent. + if !viper.IsSet("parallel-job-size") { + parallelJobSize = 100 + } + options.NoCaptiveCore = true + } + + if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { + return err + } + return runDBReingestRangeFn( + []history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}}, + reingestForce, + parallelWorkers, + *horizonConfig, + storageBackendConfig, + ) + }, + } + + dbFillGapsCmd = &cobra.Command{ + Use: "fill-gaps [Start sequence number] [End sequence number]", + Short: "Ingests any gaps found in the horizon db", + Long: "Ingests any gaps found in the horizon db. The command takes an optional start and end parameters which restrict the range of ledgers ingested.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := dbFillGapsCmdOpts.RequireE(); err != nil { + return err + } + if err := dbFillGapsCmdOpts.SetValues(); err != nil { + return err + } + + if len(args) != 0 && len(args) != 2 { + hlog.Errorf("Expected either 0 arguments or 2 but found %v arguments", len(args)) + return ErrUsage{cmd} + } + + var start, end uint64 + var withRange bool + if len(args) == 2 { + var err error + start, err = strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Usage() + return fmt.Errorf(`invalid sequence number "%s"`, args[0]) + } + end, err = strconv.ParseUint(args[1], 10, 32) + if err != nil { + cmd.Usage() + return fmt.Errorf(`invalid sequence number "%s"`, args[1]) + } + withRange = true + } + + var err error + var storageBackendConfig ingest.StorageBackendConfig + options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} + if ledgerBackendType == ingest.BufferedStorageBackend { + if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil { + return err + } + options.NoCaptiveCore = true + } + + if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { + return err + } + var gaps []history.LedgerRange + if withRange { + gaps, err = runDBDetectGapsInRange(*horizonConfig, uint32(start), uint32(end)) + if err != nil { + return err + } + hlog.Infof("found gaps %v within range [%v, %v]", gaps, start, end) + } else { + gaps, err = runDBDetectGaps(*horizonConfig) + if err != nil { + return err + } + hlog.Infof("found gaps %v", gaps) + } + + return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, *horizonConfig, storageBackendConfig) + }, + } + + dbDetectGapsCmd = &cobra.Command{ + Use: "detect-gaps", + Short: "detects ingestion gaps in Horizon's database", + Long: "detects ingestion gaps in Horizon's database and prints a list of reingest commands needed to fill the gaps", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName); err != nil { + return err + } + + if len(args) != 0 { + return ErrUsage{cmd} + } + gaps, err := runDBDetectGaps(*horizonConfig) + if err != nil { + return err + } + if len(gaps) == 0 { + hlog.Info("No gaps found") + return nil + } + fmt.Println("Horizon commands to run in order to fill in the gaps:") + cmdname := os.Args[0] + for _, g := range gaps { + fmt.Printf("%s db reingest range %d %d\n", cmdname, g.StartSequence, g.EndSequence) + } + return nil + }, + } + if err := dbReingestRangeCmdOpts.Init(dbReingestRangeCmd); err != nil { log.Fatal(err.Error()) } @@ -536,7 +612,7 @@ func init() { viper.BindPFlags(dbReingestRangeCmd.PersistentFlags()) viper.BindPFlags(dbFillGapsCmd.PersistentFlags()) - RootCmd.AddCommand(dbCmd) + rootCmd.AddCommand(dbCmd) dbCmd.AddCommand( dbInitCmd, dbMigrateCmd, @@ -553,3 +629,23 @@ func init() { ) dbReingestCmd.AddCommand(dbReingestRangeCmd) } + +func loadStorageBackendConfig(storageBackendConfigPath string) (ingest.StorageBackendConfig, error) { + if storageBackendConfigPath == "" { + return ingest.StorageBackendConfig{}, errors.New("datastore config file is required for datastore ledgerbackend type") + } + cfg, err := toml.LoadFile(storageBackendConfigPath) + if err != nil { + return ingest.StorageBackendConfig{}, fmt.Errorf("failed to load datastore ledgerbackend config file %v: %w", storageBackendConfigPath, err) + } + var storageBackendConfig ingest.StorageBackendConfig + if err = cfg.Unmarshal(&storageBackendConfig); err != nil { + return ingest.StorageBackendConfig{}, fmt.Errorf("error unmarshalling datastore ledgerbackend TOML config: %w", err) + } + + return storageBackendConfig, nil +} + +func init() { + DefineDBCommands(RootCmd, globalConfig, globalFlags) +} diff --git a/services/horizon/cmd/db_test.go b/services/horizon/cmd/db_test.go new file mode 100644 index 0000000000..6a00576bd3 --- /dev/null +++ b/services/horizon/cmd/db_test.go @@ -0,0 +1,266 @@ +package cmd + +import ( + "testing" + + "github.com/spf13/cobra" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + horizon "github.com/stellar/go/services/horizon/internal" + "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/services/horizon/internal/ingest" + "github.com/stellar/go/support/db/dbtest" +) + +func TestDBCommandsTestSuite(t *testing.T) { + dbCmdSuite := &DBCommandsTestSuite{} + suite.Run(t, dbCmdSuite) +} + +type DBCommandsTestSuite struct { + suite.Suite + db *dbtest.DB + rootCmd *cobra.Command +} + +func (s *DBCommandsTestSuite) SetupSuite() { + runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, + horizon.Config, ingest.StorageBackendConfig) error { + return nil + } + + s.db = dbtest.Postgres(s.T()) + + RootCmd.SetArgs([]string{ + "db", "migrate", "up", "--db-url", s.db.DSN}) + require.NoError(s.T(), RootCmd.Execute()) +} + +func (s *DBCommandsTestSuite) TearDownSuite() { + s.db.Close() +} + +func (s *DBCommandsTestSuite) BeforeTest(suiteName string, testName string) { + s.rootCmd = NewRootCmd() +} + +func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() { + s.rootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.db.DSN, + "--network", "testnet", + "--parallel-workers", "2", + "--ledgerbackend", "datastore", + "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", + "2", + "10"}) + + require.NoError(s.T(), s.rootCmd.Execute()) + require.Equal(s.T(), parallelJobSize, uint32(100)) +} + +func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() { + s.rootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.db.DSN, + "--network", "testnet", + "--stellar-core-binary-path", "/test/core/bin/path", + "--parallel-workers", "2", + "--ledgerbackend", "captive-core", + "2", + "10"}) + + require.NoError(s.T(), s.rootCmd.Execute()) + require.Equal(s.T(), parallelJobSize, uint32(100_000)) +} + +func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() { + s.rootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.db.DSN, + "--network", "testnet", + "--stellar-core-binary-path", "/test/core/bin/path", + "--parallel-workers", "2", + "--parallel-job-size", "5", + "--ledgerbackend", "captive-core", + "2", + "10"}) + + require.NoError(s.T(), s.rootCmd.Execute()) + require.Equal(s.T(), parallelJobSize, uint32(5)) +} + +func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() { + s.rootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.db.DSN, + "--network", "testnet", + "--parallel-workers", "2", + "--parallel-job-size", "5", + "--ledgerbackend", "datastore", + "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", + "2", + "10"}) + + require.NoError(s.T(), s.rootCmd.Execute()) + require.Equal(s.T(), parallelJobSize, uint32(5)) +} + +func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() { + tests := []struct { + name string + args []string + ledgerBackend ingest.LedgerBackendType + expectError bool + errorMessage string + }{ + { + name: "default; w/ individual network flags", + args: []string{ + "1", "100", + "--network-passphrase", "passphrase", + "--history-archive-urls", "[]", + }, + expectError: false, + }, + { + name: "default; w/o individual network flags", + args: []string{ + "1", "100", + }, + expectError: true, + errorMessage: "network-passphrase must be set", + }, + { + name: "default; no history-archive-urls flag", + args: []string{ + "1", "100", + "--network-passphrase", "passphrase", + }, + expectError: true, + errorMessage: "history-archive-urls must be set", + }, + { + name: "default; w/ network parameter", + args: []string{ + "1", "100", + "--network", "testnet", + }, + expectError: false, + }, + { + name: "datastore; w/ individual network flags", + args: []string{ + "1", "100", + "--ledgerbackend", "datastore", + "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", + "--network-passphrase", "passphrase", + "--history-archive-urls", "[]", + }, + expectError: false, + }, + { + name: "datastore; w/o individual network flags", + args: []string{ + "1", "100", + "--ledgerbackend", "datastore", + "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", + }, + expectError: true, + errorMessage: "network-passphrase must be set", + }, + { + name: "datastore; no history-archive-urls flag", + args: []string{ + "1", "100", + "--ledgerbackend", "datastore", + "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", + "--network-passphrase", "passphrase", + }, + expectError: true, + errorMessage: "history-archive-urls must be set", + }, + { + name: "captive-core; valid", + args: []string{ + "1", "100", + "--network", "testnet", + "--ledgerbackend", "captive-core", + }, + expectError: false, + }, + { + name: "invalid datastore", + args: []string{ + "1", "100", + "--network", "testnet", + "--ledgerbackend", "unknown", + }, + expectError: true, + errorMessage: "invalid ledger backend: unknown, must be 'captive-core' or 'datastore'", + }, + { + name: "datastore; missing config file", + args: []string{ + "1", "100", + "--network", "testnet", + "--ledgerbackend", "datastore", + "--datastore-config", "invalid.config.toml", + }, + expectError: true, + errorMessage: "failed to load datastore ledgerbackend config file", + }, + { + name: "datastore; w/ config", + args: []string{ + "1", "100", + "--network", "testnet", + "--ledgerbackend", "datastore", + "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", + }, + expectError: false, + }, + { + name: "datastore; w/o config", + args: []string{ + "1", "100", + "--network", "testnet", + "--ledgerbackend", "datastore", + }, + expectError: true, + errorMessage: "datastore config file is required for datastore ledgerbackend type", + }, + } + + commands := []struct { + cmd []string + name string + }{ + {[]string{"db", "reingest", "range"}, "TestDbReingestRangeCmd"}, + {[]string{"db", "fill-gaps"}, "TestDbFillGapsCmd"}, + } + + for _, command := range commands { + for _, tt := range tests { + s.T().Run(tt.name+"_"+command.name, func(t *testing.T) { + + rootCmd := NewRootCmd() + var args []string + args = append(command.cmd, tt.args...) + rootCmd.SetArgs(append([]string{ + "--db-url", s.db.DSN, + "--stellar-core-binary-path", "/test/core/bin/path", + }, args...)) + + if tt.expectError { + err := rootCmd.Execute() + require.Error(t, err) + require.Contains(t, err.Error(), tt.errorMessage) + } else { + require.NoError(t, rootCmd.Execute()) + } + }) + } + } +} diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index 864067da8f..f6b94a8f52 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -9,6 +9,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/stellar/go/historyarchive" horizon "github.com/stellar/go/services/horizon/internal" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -94,7 +95,7 @@ var ingestVerifyRangeCmd = &cobra.Command{ co.SetValue() } - if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}); err != nil { + if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil { return err } @@ -189,7 +190,7 @@ var ingestStressTestCmd = &cobra.Command{ co.SetValue() } - if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}); err != nil { + if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil { return err } @@ -239,7 +240,7 @@ var ingestTriggerStateRebuildCmd = &cobra.Command{ Short: "updates a database to trigger state rebuild, state will be rebuilt by a running Horizon instance, DO NOT RUN production DB, some endpoints will be unavailable until state is rebuilt", RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}); err != nil { + if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil { return err } @@ -263,7 +264,7 @@ var ingestInitGenesisStateCmd = &cobra.Command{ Short: "ingests genesis state (ledger 1)", RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}); err != nil { + if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil { return err } @@ -320,7 +321,7 @@ var ingestBuildStateCmd = &cobra.Command{ co.SetValue() } - if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}); err != nil { + if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil { return err } diff --git a/services/horizon/cmd/root.go b/services/horizon/cmd/root.go index d2900496d4..099979c97b 100644 --- a/services/horizon/cmd/root.go +++ b/services/horizon/cmd/root.go @@ -12,7 +12,13 @@ import ( var ( globalConfig, globalFlags = horizon.Flags() - RootCmd = &cobra.Command{ + RootCmd = createRootCmd(globalConfig, globalFlags) + originalHelpFunc = RootCmd.HelpFunc() + originalUsageFunc = RootCmd.UsageFunc() +) + +func createRootCmd(horizonConfig *horizon.Config, configOptions config.ConfigOptions) *cobra.Command { + return &cobra.Command{ Use: "horizon", Short: "client-facing api server for the Stellar network", SilenceErrors: true, @@ -23,16 +29,44 @@ var ( "DEPRECATED - the use of command-line flags has been deprecated in favor of environment variables. Please" + "consult our Configuring section in the developer documentation on how to use them - https://developers.stellar.org/docs/run-api-server/configuring", RunE: func(cmd *cobra.Command, args []string) error { - app, err := horizon.NewAppFromFlags(globalConfig, globalFlags) + app, err := horizon.NewAppFromFlags(horizonConfig, configOptions) if err != nil { return err } return app.Serve() }, } - originalHelpFunc = RootCmd.HelpFunc() - originalUsageFunc = RootCmd.UsageFunc() -) +} + +func initRootCmd(cmd *cobra.Command, + originalHelpFn func(*cobra.Command, []string), + originalUsageFn func(*cobra.Command) error, + horizonGlobalFlags config.ConfigOptions) { + // override the default help output, apply further filtering on which global flags + // will be shown on the help outout dependent on the command help was issued upon. + cmd.SetHelpFunc(func(c *cobra.Command, args []string) { + enableGlobalOptionsInHelp(c, horizonGlobalFlags) + originalHelpFn(c, args) + }) + + cmd.SetUsageFunc(func(c *cobra.Command) error { + enableGlobalOptionsInHelp(c, horizonGlobalFlags) + return originalUsageFn(c) + }) + + err := horizonGlobalFlags.Init(cmd) + if err != nil { + stdLog.Fatal(err.Error()) + } +} + +func NewRootCmd() *cobra.Command { + horizonGlobalConfig, horizonGlobalFlags := horizon.Flags() + cmd := createRootCmd(horizonGlobalConfig, horizonGlobalFlags) + initRootCmd(cmd, cmd.HelpFunc(), cmd.UsageFunc(), horizonGlobalFlags) + DefineDBCommands(cmd, horizonGlobalConfig, horizonGlobalFlags) + return cmd +} // ErrUsage indicates we should print the usage string and exit with code 1 type ErrUsage struct { @@ -51,23 +85,7 @@ func (e ErrExitCode) Error() string { } func init() { - - // override the default help output, apply further filtering on which global flags - // will be shown on the help outout dependent on the command help was issued upon. - RootCmd.SetHelpFunc(func(c *cobra.Command, args []string) { - enableGlobalOptionsInHelp(c, globalFlags) - originalHelpFunc(c, args) - }) - - RootCmd.SetUsageFunc(func(c *cobra.Command) error { - enableGlobalOptionsInHelp(c, globalFlags) - return originalUsageFunc(c) - }) - - err := globalFlags.Init(RootCmd) - if err != nil { - stdLog.Fatal(err.Error()) - } + initRootCmd(RootCmd, originalHelpFunc, originalUsageFunc, globalFlags) } func Execute() error { diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index 9930fee69f..38fca67576 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -832,7 +832,7 @@ func Flags() (*Config, support.ConfigOptions) { // NewAppFromFlags constructs a new Horizon App from the given command line flags func NewAppFromFlags(config *Config, flags support.ConfigOptions) (*App, error) { - err := ApplyFlags(config, flags, ApplyOptions{RequireCaptiveCoreFullConfig: true, AlwaysIngest: false}) + err := ApplyFlags(config, flags, ApplyOptions{RequireCaptiveCoreFullConfig: true}) if err != nil { return nil, err } @@ -850,30 +850,10 @@ func NewAppFromFlags(config *Config, flags support.ConfigOptions) (*App, error) } type ApplyOptions struct { - AlwaysIngest bool RequireCaptiveCoreFullConfig bool + NoCaptiveCore bool } -type networkConfig struct { - defaultConfig []byte - HistoryArchiveURLs []string - NetworkPassphrase string -} - -var ( - PubnetConf = networkConfig{ - defaultConfig: ledgerbackend.PubnetDefaultConfig, - HistoryArchiveURLs: network.PublicNetworkhistoryArchiveURLs, - NetworkPassphrase: network.PublicNetworkPassphrase, - } - - TestnetConf = networkConfig{ - defaultConfig: ledgerbackend.TestnetDefaultConfig, - HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs, - NetworkPassphrase: network.TestNetworkPassphrase, - } -) - // getCaptiveCoreBinaryPath retrieves the path of the Captive Core binary // Returns the path or an error if the binary is not found func getCaptiveCoreBinaryPath() (string, error) { @@ -884,69 +864,32 @@ func getCaptiveCoreBinaryPath() (string, error) { return result, nil } -// getCaptiveCoreConfigFromNetworkParameter returns the default Captive Core configuration based on the network. -func getCaptiveCoreConfigFromNetworkParameter(config *Config) (networkConfig, error) { - var defaultNetworkConfig networkConfig - - if config.NetworkPassphrase != "" { - return defaultNetworkConfig, fmt.Errorf("invalid config: %s parameter not allowed with the %s parameter", - NetworkPassphraseFlagName, NetworkFlagName) - } - - if len(config.HistoryArchiveURLs) > 0 { - return defaultNetworkConfig, fmt.Errorf("invalid config: %s parameter not allowed with the %s parameter", - HistoryArchiveURLsFlagName, NetworkFlagName) - } - - switch config.Network { - case StellarPubnet: - defaultNetworkConfig = PubnetConf - case StellarTestnet: - defaultNetworkConfig = TestnetConf - default: - return defaultNetworkConfig, fmt.Errorf("no default configuration found for network %s", config.Network) - } - - return defaultNetworkConfig, nil -} - // setCaptiveCoreConfiguration prepares configuration for the Captive Core func setCaptiveCoreConfiguration(config *Config, options ApplyOptions) error { stdLog.Println("Preparing captive core...") + var err error // If the user didn't specify a Stellar Core binary, we can check the // $PATH and possibly fill it in for them. if config.CaptiveCoreBinaryPath == "" { - var err error if config.CaptiveCoreBinaryPath, err = getCaptiveCoreBinaryPath(); err != nil { return fmt.Errorf("captive core requires %s", StellarCoreBinaryPathName) } } - var defaultNetworkConfig networkConfig - if config.Network != "" { - var err error - defaultNetworkConfig, err = getCaptiveCoreConfigFromNetworkParameter(config) - if err != nil { - return err - } - config.NetworkPassphrase = defaultNetworkConfig.NetworkPassphrase - config.HistoryArchiveURLs = defaultNetworkConfig.HistoryArchiveURLs - } else { - if config.NetworkPassphrase == "" { - return fmt.Errorf("%s must be set", NetworkPassphraseFlagName) - } + var defaultCaptiveCoreConfig []byte + switch config.Network { + case StellarPubnet: + defaultCaptiveCoreConfig = ledgerbackend.PubnetDefaultConfig + case StellarTestnet: - if len(config.HistoryArchiveURLs) == 0 { - return fmt.Errorf("%s must be set", HistoryArchiveURLsFlagName) - } + defaultCaptiveCoreConfig = ledgerbackend.TestnetDefaultConfig } config.CaptiveCoreTomlParams.CoreBinaryPath = config.CaptiveCoreBinaryPath config.CaptiveCoreTomlParams.HistoryArchiveURLs = config.HistoryArchiveURLs config.CaptiveCoreTomlParams.NetworkPassphrase = config.NetworkPassphrase - var err error if config.CaptiveCoreConfigPath != "" { config.CaptiveCoreToml, err = ledgerbackend.NewCaptiveCoreTomlFromFile(config.CaptiveCoreConfigPath, config.CaptiveCoreTomlParams) @@ -960,8 +903,8 @@ func setCaptiveCoreConfiguration(config *Config, options ApplyOptions) error { if err != nil { return errors.Wrap(err, "invalid captive core toml file") } - } else if len(defaultNetworkConfig.defaultConfig) != 0 { - config.CaptiveCoreToml, err = ledgerbackend.NewCaptiveCoreTomlFromData(defaultNetworkConfig.defaultConfig, + } else if len(defaultCaptiveCoreConfig) != 0 { + config.CaptiveCoreToml, err = ledgerbackend.NewCaptiveCoreTomlFromData(defaultCaptiveCoreConfig, config.CaptiveCoreTomlParams) if err != nil { return errors.Wrap(err, "invalid captive core toml file") @@ -1004,10 +947,6 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption return err } - if options.AlwaysIngest { - config.Ingest = true - } - if config.Ingest { // Migrations should be checked as early as possible. Apply and check // only on ingesting instances which are required to have write-access @@ -1023,9 +962,15 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption return err } - err := setCaptiveCoreConfiguration(config, options) - if err != nil { - return errors.Wrap(err, "error generating captive core configuration") + if err := setNetworkConfiguration(config); err != nil { + return err + } + + if !options.NoCaptiveCore { + err := setCaptiveCoreConfiguration(config, options) + if err != nil { + return errors.Wrap(err, "error generating captive core configuration") + } } } @@ -1061,3 +1006,37 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption return nil } + +func setNetworkConfiguration(config *Config) error { + if config.Network != "" { + if config.NetworkPassphrase != "" { + return fmt.Errorf("invalid config: %s parameter not allowed with the %s parameter", + NetworkPassphraseFlagName, NetworkFlagName) + } + + if len(config.HistoryArchiveURLs) > 0 { + return fmt.Errorf("invalid config: %s parameter not allowed with the %s parameter", + HistoryArchiveURLsFlagName, NetworkFlagName) + } + + switch config.Network { + case StellarPubnet: + config.NetworkPassphrase = network.PublicNetworkPassphrase + config.HistoryArchiveURLs = network.PublicNetworkhistoryArchiveURLs + case StellarTestnet: + config.NetworkPassphrase = network.TestNetworkPassphrase + config.HistoryArchiveURLs = network.TestNetworkhistoryArchiveURLs + default: + return fmt.Errorf("no default configuration found for network %s", config.Network) + } + } + + if config.NetworkPassphrase == "" { + return fmt.Errorf("%s must be set", NetworkPassphraseFlagName) + } + + if len(config.HistoryArchiveURLs) == 0 { + return fmt.Errorf("%s must be set", HistoryArchiveURLsFlagName) + } + return nil +} diff --git a/services/horizon/internal/flags_test.go b/services/horizon/internal/flags_test.go index 76ec1ffd8d..4d8d352080 100644 --- a/services/horizon/internal/flags_test.go +++ b/services/horizon/internal/flags_test.go @@ -8,6 +8,7 @@ import ( "github.com/spf13/cobra" + "github.com/stellar/go/network" "github.com/stellar/go/services/horizon/internal/test" "github.com/stretchr/testify/assert" @@ -29,16 +30,16 @@ func Test_createCaptiveCoreDefaultConfig(t *testing.T) { config: Config{Network: StellarTestnet, CaptiveCoreBinaryPath: "/path/to/captive-core/binary", }, - networkPassphrase: TestnetConf.NetworkPassphrase, - historyArchiveURLs: TestnetConf.HistoryArchiveURLs, + networkPassphrase: network.TestNetworkPassphrase, + historyArchiveURLs: network.TestNetworkhistoryArchiveURLs, }, { name: "pubnet default config", config: Config{Network: StellarPubnet, CaptiveCoreBinaryPath: "/path/to/captive-core/binary", }, - networkPassphrase: PubnetConf.NetworkPassphrase, - historyArchiveURLs: PubnetConf.HistoryArchiveURLs, + networkPassphrase: network.PublicNetworkPassphrase, + historyArchiveURLs: network.PublicNetworkhistoryArchiveURLs, }, { name: "testnet validation; history archive urls supplied", @@ -83,18 +84,41 @@ func Test_createCaptiveCoreDefaultConfig(t *testing.T) { }, errStr: "no default configuration found for network unknown", }, + { + name: "no network specified; passphrase not supplied", + config: Config{ + HistoryArchiveURLs: []string{"HistoryArchiveURLs"}, + CaptiveCoreBinaryPath: "/path/to/captive-core/binary", + }, + errStr: fmt.Sprintf("%s must be set", NetworkPassphraseFlagName), + }, + { + name: "no network specified; history archive urls not supplied", + config: Config{ + NetworkPassphrase: "NetworkPassphrase", + CaptiveCoreBinaryPath: "/path/to/captive-core/binary", + }, + errStr: fmt.Sprintf("%s must be set", HistoryArchiveURLsFlagName), + }, + + { + name: "unknown network specified", + config: Config{Network: "unknown", + NetworkPassphrase: "", + HistoryArchiveURLs: []string{}, + CaptiveCoreBinaryPath: "/path/to/captive-core/binary", + }, + errStr: "no default configuration found for network unknown", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.config.CaptiveCoreTomlParams.UseDB = true - e := setCaptiveCoreConfiguration(&tt.config, - ApplyOptions{RequireCaptiveCoreFullConfig: true}) + e := setNetworkConfiguration(&tt.config) if tt.errStr == "" { assert.NoError(t, e) assert.Equal(t, tt.networkPassphrase, tt.config.NetworkPassphrase) assert.Equal(t, tt.historyArchiveURLs, tt.config.HistoryArchiveURLs) - assert.Equal(t, tt.networkPassphrase, tt.config.CaptiveCoreTomlParams.NetworkPassphrase) - assert.Equal(t, tt.historyArchiveURLs, tt.config.CaptiveCoreTomlParams.HistoryArchiveURLs) } else { assert.Equal(t, tt.errStr, e.Error()) } @@ -102,53 +126,50 @@ func Test_createCaptiveCoreDefaultConfig(t *testing.T) { } } -func Test_createCaptiveCoreConfig(t *testing.T) { - - var errorMsgConfig = "%s must be set" +func TestSetCaptiveCoreConfig(t *testing.T) { tests := []struct { name string requireCaptiveCoreConfig bool config Config - networkPassphrase string - historyArchiveURLs []string errStr string }{ { - name: "no network specified; valid parameters", + name: "testnet default config", requireCaptiveCoreConfig: true, config: Config{ - NetworkPassphrase: PubnetConf.NetworkPassphrase, - HistoryArchiveURLs: PubnetConf.HistoryArchiveURLs, - CaptiveCoreConfigPath: "../../../ingest/ledgerbackend/configs/captive-core-pubnet.cfg", + Network: StellarTestnet, + NetworkPassphrase: network.TestNetworkPassphrase, + HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs, CaptiveCoreBinaryPath: "/path/to/captive-core/binary", }, - networkPassphrase: PubnetConf.NetworkPassphrase, - historyArchiveURLs: PubnetConf.HistoryArchiveURLs, }, { - name: "no network specified; passphrase not supplied", + name: "pubnet default config", requireCaptiveCoreConfig: true, config: Config{ - HistoryArchiveURLs: []string{"HistoryArchiveURLs"}, + Network: StellarPubnet, + NetworkPassphrase: network.PublicNetworkPassphrase, + HistoryArchiveURLs: network.PublicNetworkhistoryArchiveURLs, CaptiveCoreBinaryPath: "/path/to/captive-core/binary", }, - errStr: fmt.Sprintf(errorMsgConfig, NetworkPassphraseFlagName), }, { - name: "no network specified; history archive urls not supplied", + name: "no network specified; valid parameters", requireCaptiveCoreConfig: true, config: Config{ - NetworkPassphrase: "NetworkPassphrase", + NetworkPassphrase: network.PublicNetworkPassphrase, + HistoryArchiveURLs: network.PublicNetworkhistoryArchiveURLs, + CaptiveCoreConfigPath: "../../../ingest/ledgerbackend/configs/captive-core-pubnet.cfg", CaptiveCoreBinaryPath: "/path/to/captive-core/binary", }, - errStr: fmt.Sprintf(errorMsgConfig, HistoryArchiveURLsFlagName), }, + { name: "no network specified; captive-core-config-path not supplied", requireCaptiveCoreConfig: true, config: Config{ - NetworkPassphrase: PubnetConf.NetworkPassphrase, - HistoryArchiveURLs: PubnetConf.HistoryArchiveURLs, + NetworkPassphrase: network.PublicNetworkPassphrase, + HistoryArchiveURLs: network.PublicNetworkhistoryArchiveURLs, CaptiveCoreBinaryPath: "/path/to/captive-core/binary", }, errStr: fmt.Sprintf("invalid config: captive core requires that --%s is set or "+ @@ -158,8 +179,8 @@ func Test_createCaptiveCoreConfig(t *testing.T) { name: "no network specified; captive-core-config-path invalid file", requireCaptiveCoreConfig: true, config: Config{ - NetworkPassphrase: PubnetConf.NetworkPassphrase, - HistoryArchiveURLs: PubnetConf.HistoryArchiveURLs, + NetworkPassphrase: network.PublicNetworkPassphrase, + HistoryArchiveURLs: network.PublicNetworkhistoryArchiveURLs, CaptiveCoreConfigPath: "xyz.cfg", CaptiveCoreBinaryPath: "/path/to/captive-core/binary", }, @@ -170,25 +191,21 @@ func Test_createCaptiveCoreConfig(t *testing.T) { name: "no network specified; captive-core-config-path incorrect config", requireCaptiveCoreConfig: true, config: Config{ - NetworkPassphrase: PubnetConf.NetworkPassphrase, - HistoryArchiveURLs: PubnetConf.HistoryArchiveURLs, + NetworkPassphrase: network.PublicNetworkPassphrase, + HistoryArchiveURLs: network.PublicNetworkhistoryArchiveURLs, CaptiveCoreConfigPath: "../../../ingest/ledgerbackend/configs/captive-core-testnet.cfg", CaptiveCoreBinaryPath: "/path/to/captive-core/binary", }, errStr: fmt.Sprintf("invalid captive core toml file: invalid captive core toml: "+ "NETWORK_PASSPHRASE in captive core config file: %s does not match Horizon "+ - "network-passphrase flag: %s", TestnetConf.NetworkPassphrase, PubnetConf.NetworkPassphrase), + "network-passphrase flag: %s", network.TestNetworkPassphrase, network.PublicNetworkPassphrase), }, { - name: "no network specified; captive-core-config not required", + name: "no network specified; full captive-core-config not required", requireCaptiveCoreConfig: false, config: Config{ - NetworkPassphrase: PubnetConf.NetworkPassphrase, - HistoryArchiveURLs: PubnetConf.HistoryArchiveURLs, CaptiveCoreBinaryPath: "/path/to/captive-core/binary", }, - networkPassphrase: PubnetConf.NetworkPassphrase, - historyArchiveURLs: PubnetConf.HistoryArchiveURLs, }, } for _, tt := range tests { @@ -198,10 +215,6 @@ func Test_createCaptiveCoreConfig(t *testing.T) { ApplyOptions{RequireCaptiveCoreFullConfig: tt.requireCaptiveCoreConfig}) if tt.errStr == "" { assert.NoError(t, e) - assert.Equal(t, tt.networkPassphrase, tt.config.NetworkPassphrase) - assert.Equal(t, tt.historyArchiveURLs, tt.config.HistoryArchiveURLs) - assert.Equal(t, tt.networkPassphrase, tt.config.CaptiveCoreTomlParams.NetworkPassphrase) - assert.Equal(t, tt.historyArchiveURLs, tt.config.CaptiveCoreTomlParams.HistoryArchiveURLs) } else { require.Error(t, e) assert.Equal(t, tt.errStr, e.Error()) @@ -261,7 +274,7 @@ func TestClientQueryTimeoutFlag(t *testing.T) { if err := flags.Init(horizonCmd); err != nil { require.NoError(t, err) } - if err := ApplyFlags(config, flags, ApplyOptions{RequireCaptiveCoreFullConfig: true, AlwaysIngest: false}); err != nil { + if err := ApplyFlags(config, flags, ApplyOptions{RequireCaptiveCoreFullConfig: true}); err != nil { require.EqualError(t, err, testCase.err) } else { require.Empty(t, testCase.err) @@ -293,7 +306,7 @@ func TestEnvironmentVariables(t *testing.T) { if err := flags.Init(horizonCmd); err != nil { fmt.Println(err) } - if err := ApplyFlags(config, flags, ApplyOptions{RequireCaptiveCoreFullConfig: true, AlwaysIngest: false}); err != nil { + if err := ApplyFlags(config, flags, ApplyOptions{RequireCaptiveCoreFullConfig: true}); err != nil { fmt.Println(err) } assert.Equal(t, config.Ingest, false) diff --git a/services/horizon/internal/ingest/README.md b/services/horizon/internal/ingest/README.md index a0874a0b43..12982b5047 100644 --- a/services/horizon/internal/ingest/README.md +++ b/services/horizon/internal/ingest/README.md @@ -140,8 +140,46 @@ This pauses the state machine for 10 seconds then tries again, in hopes that a n **Next state**: [`start`](#start-state) -# Ingestion -TODO +# Reingestion +Horizon supports running reingestion by executing a sub command `db reingest range ` which will execute as an o/s process and will be synchronous, exiting the process only after the complete reingestion range is finished or an error is encountered. + +By default this sub-command will attempt to use captive core configuration in the form of stellar core binary(`--stellar-core-binary-path`) and stellar core config(`--captive-core-config-path`) to obtain ledger tx meta from a stellar network to be ingested. + +The `db reingest range` sub-command can optionally be configured to consume pre-computed ledger tx meta files from a Google Cloud Storage(GCS) location instead of running captive core on host machine. +Pre-requirements: + - Have a GCS account. + - Run the [ledgerexporter] to publish ledger tx meta files to your GCS bucket location. +Run the `db reingest` sub-command, configured to import tx meta from your GCS bucket: + ```$ DATABASE_URL= \ + NETWORK=testnet \ + stellar-horizon db reingest range \ + --parallel-workers 2 \ + --ledgerbackend "datastore" \ + --datastore-config "config.storagebackend.toml" \ + 100 200 + ``` +Notice, even though we no longer need to provide stellar-core related config for binary or config file, we do still need to provide network related config, using convenience parameter `NETWORK=testnet|pubnet` or directly with `NETWORK_PASSPHRASE` and `HISTORY_ARCHIVE_URLS` + +The `--datastore-config` must point to a new toml config file that will provide the necessary parameters for ingestion to work with remote GCS storage. + +example config toml: +``` +# Datastore Configuration +[datastore_config] +# Specifies the type of datastore. +# Currently, only Google Cloud Storage (GCS) is supported. +type = "GCS" + +[datastore_config.params] +# The Google Cloud Storage bucket path for storing data, with optional subpaths for organization. +destination_bucket_path = "path/to/my/bucket" + +[datastore_config.schema] +# Configuration for data organization of the remote files +ledgers_per_file = 1 # Number of ledgers stored in each file. +files_per_partition = 64000 # Number of files per partition/directory. + +``` # Range Preparation TODO: See `maybePrepareRange` diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index dec3123f34..1a54e6843c 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -20,6 +20,7 @@ import ( "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/ingest/filters" apkg "github.com/stellar/go/support/app" + "github.com/stellar/go/support/datastore" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" @@ -82,6 +83,28 @@ const ( var log = logpkg.DefaultLogger.WithField("service", "ingest") +type LedgerBackendType uint + +const ( + CaptiveCoreBackend LedgerBackendType = iota + BufferedStorageBackend +) + +func (s LedgerBackendType) String() string { + switch s { + case CaptiveCoreBackend: + return "captive-core" + case BufferedStorageBackend: + return "datastore" + } + return "" +} + +type StorageBackendConfig struct { + DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"` + BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig `toml:"buffered_storage_backend_config"` +} + type Config struct { StellarCoreURL string CaptiveCoreBinaryPath string @@ -115,6 +138,9 @@ type Config struct { CoreBuildVersionFn ledgerbackend.CoreBuildVersionFunc ReapConfig ReapConfig + + LedgerBackendType LedgerBackendType + StorageBackendConfig StorageBackendConfig } const ( @@ -261,29 +287,46 @@ func NewSystem(config Config) (System, error) { cancel() return nil, errors.Wrap(err, "error creating history archive") } + var ledgerBackend ledgerbackend.LedgerBackend - // the only ingest option is local captive core config - logger := log.WithField("subservice", "stellar-core") - ledgerBackend, err := ledgerbackend.NewCaptive( - ledgerbackend.CaptiveCoreConfig{ - BinaryPath: config.CaptiveCoreBinaryPath, - StoragePath: config.CaptiveCoreStoragePath, - UseDB: config.CaptiveCoreConfigUseDB, - Toml: config.CaptiveCoreToml, - NetworkPassphrase: config.NetworkPassphrase, - HistoryArchiveURLs: config.HistoryArchiveURLs, - CheckpointFrequency: config.CheckpointFrequency, - LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession), - Log: logger, - Context: ctx, - UserAgent: fmt.Sprintf("captivecore horizon/%s golang/%s", apkg.Version(), runtime.Version()), - CoreProtocolVersionFn: config.CoreProtocolVersionFn, - CoreBuildVersionFn: config.CoreBuildVersionFn, - }, - ) - if err != nil { - cancel() - return nil, errors.Wrap(err, "error creating captive core backend") + if config.LedgerBackendType == BufferedStorageBackend { + // Ingest from datastore + var dataStore datastore.DataStore + dataStore, err = datastore.NewDataStore(context.Background(), config.StorageBackendConfig.DataStoreConfig) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create datastore: %w", err) + } + ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(config.StorageBackendConfig.BufferedStorageBackendConfig, dataStore) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create buffered storage backend: %w", err) + } + } else { + // Ingest from local captive core + + logger := log.WithField("subservice", "stellar-core") + ledgerBackend, err = ledgerbackend.NewCaptive( + ledgerbackend.CaptiveCoreConfig{ + BinaryPath: config.CaptiveCoreBinaryPath, + StoragePath: config.CaptiveCoreStoragePath, + UseDB: config.CaptiveCoreConfigUseDB, + Toml: config.CaptiveCoreToml, + NetworkPassphrase: config.NetworkPassphrase, + HistoryArchiveURLs: config.HistoryArchiveURLs, + CheckpointFrequency: config.CheckpointFrequency, + LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession), + Log: logger, + Context: ctx, + UserAgent: fmt.Sprintf("captivecore horizon/%s golang/%s", apkg.Version(), runtime.Version()), + CoreProtocolVersionFn: config.CoreProtocolVersionFn, + CoreBuildVersionFn: config.CoreBuildVersionFn, + }, + ) + if err != nil { + cancel() + return nil, errors.Wrap(err, "error creating captive core backend") + } } historyQ := &history.Q{config.HistorySession.Clone()} diff --git a/services/horizon/internal/ingest/testdata/config.storagebackend.toml b/services/horizon/internal/ingest/testdata/config.storagebackend.toml new file mode 100644 index 0000000000..538b793b54 --- /dev/null +++ b/services/horizon/internal/ingest/testdata/config.storagebackend.toml @@ -0,0 +1,19 @@ +[buffered_storage_backend_config] +buffer_size = 5 # The size of the buffer +num_workers = 5 # Number of workers +retry_limit = 3 # Number of retries allowed +retry_wait = "30s" # Duration to wait before retrying in seconds + +# Datastore Configuration +[datastore_config] +# Specifies the type of datastore. Currently, only Google Cloud Storage (GCS) is supported. +type = "GCS" + +[datastore_config.params] +# The Google Cloud Storage bucket path for storing data, with optional subpaths for organization. +destination_bucket_path = "path/to/my/bucket" + +[datastore_config.schema] +# Configuration for data organization +ledgers_per_file = 1 # Number of ledgers stored in each file. +files_per_partition = 64000 # Number of files per partition/directory. diff --git a/services/horizon/internal/integration/db_test.go b/services/horizon/internal/integration/db_test.go index 1f1d2277ec..5a2b03e48b 100644 --- a/services/horizon/internal/integration/db_test.go +++ b/services/horizon/internal/integration/db_test.go @@ -3,12 +3,16 @@ package integration import ( "context" "fmt" + "io/fs" + "os" "path/filepath" "strconv" "testing" "time" + "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stellar/go/clients/horizonclient" "github.com/stellar/go/historyarchive" @@ -485,7 +489,8 @@ func TestReingestDB(t *testing.T) { horizonConfig := itest.GetHorizonIngestConfig() t.Run("validate parallel range", func(t *testing.T) { - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, + var rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "reingest", "range", @@ -494,7 +499,7 @@ func TestReingestDB(t *testing.T) { "2", )) - assert.EqualError(t, horizoncmd.RootCmd.Execute(), "Invalid range: {10 2} from > to") + assert.EqualError(t, rootCmd.Execute(), "Invalid range: {10 2} from > to") }) t.Logf("reached ledger is %v", reachedLedger) @@ -537,7 +542,8 @@ func TestReingestDB(t *testing.T) { "captive-core-reingest-range-integration-tests.cfg", ) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", + var rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "reingest", "range", "--parallel-workers=1", @@ -545,8 +551,84 @@ func TestReingestDB(t *testing.T) { fmt.Sprintf("%d", toLedger), )) - tt.NoError(horizoncmd.RootCmd.Execute()) - tt.NoError(horizoncmd.RootCmd.Execute(), "Repeat the same reingest range against db, should not have errors.") + tt.NoError(rootCmd.Execute()) + tt.NoError(rootCmd.Execute(), "Repeat the same reingest range against db, should not have errors.") +} + +func TestReingestDatastore(t *testing.T) { + test := integration.NewTest(t, integration.Config{ + SkipHorizonStart: true, + SkipCoreContainerCreation: true, + }) + err := test.StartHorizon(false) + assert.NoError(t, err) + test.WaitForHorizonWeb() + + testTempDir := t.TempDir() + fakeBucketFilesSource := "testdata/testbucket" + fakeBucketFiles := []fakestorage.Object{} + + if err = filepath.WalkDir(fakeBucketFilesSource, func(path string, entry fs.DirEntry, err error) error { + if err != nil { + return err + } + + if entry.Type().IsRegular() { + contents, err := os.ReadFile(fmt.Sprintf("%s/%s", fakeBucketFilesSource, entry.Name())) + if err != nil { + return err + } + + fakeBucketFiles = append(fakeBucketFiles, fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "path", + Name: fmt.Sprintf("to/my/bucket/FFFFFFFF--0-63999/%s", entry.Name()), + }, + Content: contents, + }) + } + return nil + }); err != nil { + t.Fatalf("unable to setup fake bucket files: %v", err) + } + + testWriter := &testWriter{test: t} + opts := fakestorage.Options{ + Scheme: "http", + Host: "127.0.0.1", + Port: uint16(0), + Writer: testWriter, + StorageRoot: filepath.Join(testTempDir, "bucket"), + PublicHost: "127.0.0.1", + InitialObjects: fakeBucketFiles, + } + + gcsServer, err := fakestorage.NewServerWithOptions(opts) + + if err != nil { + t.Fatalf("couldn't start the fake gcs http server %v", err) + } + + defer gcsServer.Stop() + t.Logf("fake gcs server started at %v", gcsServer.URL()) + t.Setenv("STORAGE_EMULATOR_HOST", gcsServer.URL()) + + rootCmd := horizoncmd.NewRootCmd() + rootCmd.SetArgs([]string{"db", + "reingest", + "range", + "--db-url", test.GetTestDB().DSN, + "--network", "testnet", + "--parallel-workers", "1", + "--ledgerbackend", "datastore", + "--datastore-config", "../ingest/testdata/config.storagebackend.toml", + "997", + "999"}) + + require.NoError(t, rootCmd.Execute()) + + _, err = test.Client().LedgerDetail(998) + require.NoError(t, err) } func TestReingestDBWithFilterRules(t *testing.T) { @@ -648,22 +730,24 @@ func TestReingestDBWithFilterRules(t *testing.T) { itest.StopHorizon() // clear the db with reaping all ledgers - horizoncmd.RootCmd.SetArgs(command(t, itest.GetHorizonIngestConfig(), "db", + var rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, itest.GetHorizonIngestConfig(), "db", "reap", "--history-retention-count=1", )) - tt.NoError(horizoncmd.RootCmd.Execute()) + tt.NoError(rootCmd.Execute()) // repopulate the db with reingestion which should catchup using core reapply filter rules // correctly on reingestion ranged - horizoncmd.RootCmd.SetArgs(command(t, itest.GetHorizonIngestConfig(), "db", + rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, itest.GetHorizonIngestConfig(), "db", "reingest", "range", "1", fmt.Sprintf("%d", reachedLedger), )) - tt.NoError(horizoncmd.RootCmd.Execute()) + tt.NoError(rootCmd.Execute()) // bring up horizon, just the api server no ingestion, to query // for tx's that should have been repopulated on db from reingestion per @@ -678,7 +762,7 @@ func TestReingestDBWithFilterRules(t *testing.T) { }() // wait until the web server is up before continuing to test requests - itest.WaitForHorizon() + itest.WaitForHorizonIngest() // Make sure that a tx from non-whitelisted account is not stored after reingestion _, err = itest.Client().TransactionDetail(nonWhiteListTxResp.Hash) @@ -733,12 +817,13 @@ func TestMigrateIngestIsTrueByDefault(t *testing.T) { newDB := dbtest.Postgres(t) freshHorizonPostgresURL := newDB.DSN - horizoncmd.RootCmd.SetArgs([]string{ + rootCmd := horizoncmd.NewRootCmd() + rootCmd.SetArgs([]string{ // ingest is set to true by default "--db-url", freshHorizonPostgresURL, "db", "migrate", "up", }) - tt.NoError(horizoncmd.RootCmd.Execute()) + tt.NoError(rootCmd.Execute()) dbConn, err := db.Open("postgres", freshHorizonPostgresURL) tt.NoError(err) @@ -754,12 +839,13 @@ func TestMigrateChecksIngestFlag(t *testing.T) { newDB := dbtest.Postgres(t) freshHorizonPostgresURL := newDB.DSN - horizoncmd.RootCmd.SetArgs([]string{ + rootCmd := horizoncmd.NewRootCmd() + rootCmd.SetArgs([]string{ "--ingest=false", "--db-url", freshHorizonPostgresURL, "db", "migrate", "up", }) - tt.NoError(horizoncmd.RootCmd.Execute()) + tt.NoError(rootCmd.Execute()) dbConn, err := db.Open("postgres", freshHorizonPostgresURL) tt.NoError(err) @@ -802,7 +888,8 @@ func TestFillGaps(t *testing.T) { tt.NoError(err) t.Run("validate parallel range", func(t *testing.T) { - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, + var rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "--parallel-workers=2", @@ -810,7 +897,7 @@ func TestFillGaps(t *testing.T) { "2", )) - assert.EqualError(t, horizoncmd.RootCmd.Execute(), "Invalid range: {10 2} from > to") + assert.EqualError(t, rootCmd.Execute(), "Invalid range: {10 2} from > to") }) // make sure a full checkpoint has elapsed otherwise there will be nothing to reingest @@ -842,21 +929,25 @@ func TestFillGaps(t *testing.T) { filepath.Dir(horizonConfig.CaptiveCoreConfigPath), "captive-core-reingest-range-integration-tests.cfg", ) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "--parallel-workers=1")) - tt.NoError(horizoncmd.RootCmd.Execute()) + + rootCmd := horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "--parallel-workers=1")) + tt.NoError(rootCmd.Execute()) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) tt.Equal(int64(0), latestLedger) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "3", "4")) - tt.NoError(horizoncmd.RootCmd.Execute()) + rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "3", "4")) + tt.NoError(rootCmd.Execute()) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger)) tt.Equal(int64(3), oldestLedger) tt.Equal(int64(4), latestLedger) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "6", "7")) - tt.NoError(horizoncmd.RootCmd.Execute()) + rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "6", "7")) + tt.NoError(rootCmd.Execute()) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger)) tt.Equal(int64(3), oldestLedger) @@ -866,8 +957,9 @@ func TestFillGaps(t *testing.T) { tt.NoError(err) tt.Equal([]history.LedgerRange{{StartSequence: 5, EndSequence: 5}}, gaps) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps")) - tt.NoError(horizoncmd.RootCmd.Execute()) + rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps")) + tt.NoError(rootCmd.Execute()) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger)) tt.Equal(int64(3), oldestLedger) @@ -876,8 +968,9 @@ func TestFillGaps(t *testing.T) { tt.NoError(err) tt.Empty(gaps) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "2", "8")) - tt.NoError(horizoncmd.RootCmd.Execute()) + rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "2", "8")) + tt.NoError(rootCmd.Execute()) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger)) tt.Equal(int64(2), oldestLedger) @@ -892,7 +985,7 @@ func TestResumeFromInitializedDB(t *testing.T) { tt := assert.New(t) // Stop the integration test, and restart it with the same database - err := itest.RestartHorizon() + err := itest.RestartHorizon(true) tt.NoError(err) successfullyResumed := func() bool { @@ -905,3 +998,12 @@ func TestResumeFromInitializedDB(t *testing.T) { tt.Eventually(successfullyResumed, 1*time.Minute, 1*time.Second) } + +type testWriter struct { + test *testing.T +} + +func (w *testWriter) Write(p []byte) (n int, err error) { + w.test.Log(string(p)) + return len(p), nil +} diff --git a/services/horizon/internal/integration/parameters_test.go b/services/horizon/internal/integration/parameters_test.go index 133950d6f3..c7e0d0c75b 100644 --- a/services/horizon/internal/integration/parameters_test.go +++ b/services/horizon/internal/integration/parameters_test.go @@ -16,6 +16,7 @@ import ( "github.com/spf13/cobra" + "github.com/stellar/go/network" "github.com/stellar/go/services/horizon/internal/paths" "github.com/stellar/go/services/horizon/internal/simplepath" @@ -75,13 +76,10 @@ func TestBucketDirDisallowed(t *testing.T) { horizon.StellarCoreBinaryPathName: os.Getenv("CAPTIVE_CORE_BIN"), } test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.Equal(t, err.Error(), integration.HorizonInitErrStr+": error generating captive core configuration:"+ " invalid captive core toml file: could not unmarshal captive core toml: setting BUCKET_DIR_PATH is disallowed"+ " for Captive Core, use CAPTIVE_CORE_STORAGE_PATH instead") - time.Sleep(1 * time.Second) - test.StopHorizon() - test.Shutdown() } func TestEnvironmentPreserved(t *testing.T) { @@ -109,9 +107,9 @@ func TestEnvironmentPreserved(t *testing.T) { } test := integration.NewTest(t, *testConfig) - err = test.StartHorizon() + err = test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizon() + test.WaitForHorizonIngest() envValue := os.Getenv("STELLAR_CORE_URL") assert.Equal(t, StellarCoreURL, envValue) @@ -126,8 +124,7 @@ func TestEnvironmentPreserved(t *testing.T) { // using NETWORK environment variables, history archive urls or network passphrase // parameters are also set. func TestInvalidNetworkParameters(t *testing.T) { - var captiveCoreConfigErrMsg = integration.HorizonInitErrStr + ": error generating captive " + - "core configuration: invalid config: %s parameter not allowed with the %s parameter" + var captiveCoreConfigErrMsg = integration.HorizonInitErrStr + ": invalid config: %s parameter not allowed with the %s parameter" testCases := []struct { name string errMsg string @@ -160,12 +157,11 @@ func TestInvalidNetworkParameters(t *testing.T) { testConfig.SkipCoreContainerCreation = true testConfig.HorizonIngestParameters = localParams test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) // Adding sleep as a workaround for the race condition in the ingestion system. // https://github.com/stellar/go/issues/5005 time.Sleep(2 * time.Second) assert.Equal(t, testCase.errMsg, err.Error()) - test.Shutdown() }) } } @@ -186,13 +182,13 @@ func TestNetworkParameter(t *testing.T) { }{ { networkValue: horizon.StellarTestnet, - networkPassphrase: horizon.TestnetConf.NetworkPassphrase, - historyArchiveURLs: horizon.TestnetConf.HistoryArchiveURLs, + networkPassphrase: network.TestNetworkPassphrase, + historyArchiveURLs: network.TestNetworkhistoryArchiveURLs, }, { networkValue: horizon.StellarPubnet, - networkPassphrase: horizon.PubnetConf.NetworkPassphrase, - historyArchiveURLs: horizon.PubnetConf.HistoryArchiveURLs, + networkPassphrase: network.PublicNetworkPassphrase, + historyArchiveURLs: network.PublicNetworkhistoryArchiveURLs, }, } for _, tt := range testCases { @@ -204,15 +200,13 @@ func TestNetworkParameter(t *testing.T) { testConfig.SkipCoreContainerCreation = true testConfig.HorizonIngestParameters = localParams test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) // Adding sleep as a workaround for the race condition in the ingestion system. // https://github.com/stellar/go/issues/5005 time.Sleep(2 * time.Second) assert.NoError(t, err) assert.Equal(t, test.GetHorizonIngestConfig().HistoryArchiveURLs, tt.historyArchiveURLs) assert.Equal(t, test.GetHorizonIngestConfig().NetworkPassphrase, tt.networkPassphrase) - - test.Shutdown() }) } } @@ -247,12 +241,11 @@ func TestNetworkEnvironmentVariable(t *testing.T) { testConfig.HorizonIngestParameters = networkParamArgs testConfig.HorizonEnvironment = map[string]string{"NETWORK": networkValue} test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() - // Adding sleep here as a workaround for the race condition in the ingestion system. - // More details can be found at https://github.com/stellar/go/issues/5005 + err := test.StartHorizon(true) + // Adding sleep as a workaround for the race condition in the ingestion system. + // https://github.com/stellar/go/issues/5005 time.Sleep(2 * time.Second) assert.NoError(t, err) - test.Shutdown() }) } } @@ -270,9 +263,9 @@ func TestCaptiveCoreConfigFilesystemState(t *testing.T) { testConfig.HorizonIngestParameters = localParams test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizon() + test.WaitForHorizonIngest() t.Run("disk state", func(t *testing.T) { validateCaptiveCoreDiskState(test, storagePath) @@ -286,9 +279,9 @@ func TestCaptiveCoreConfigFilesystemState(t *testing.T) { func TestMaxAssetsForPathRequests(t *testing.T) { t.Run("default", func(t *testing.T) { test := integration.NewTest(t, *integration.GetTestConfig()) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizon() + test.WaitForHorizonIngest() assert.Equal(t, test.HorizonIngest().Config().MaxAssetsPerPathRequest, 15) test.Shutdown() }) @@ -296,20 +289,19 @@ func TestMaxAssetsForPathRequests(t *testing.T) { testConfig := integration.GetTestConfig() testConfig.HorizonIngestParameters = map[string]string{"max-assets-per-path-request": "2"} test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizon() + test.WaitForHorizonIngest() assert.Equal(t, test.HorizonIngest().Config().MaxAssetsPerPathRequest, 2) - test.Shutdown() }) } func TestMaxPathFindingRequests(t *testing.T) { t.Run("default", func(t *testing.T) { test := integration.NewTest(t, *integration.GetTestConfig()) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizon() + test.WaitForHorizonIngest() assert.Equal(t, test.HorizonIngest().Config().MaxPathFindingRequests, uint(0)) _, ok := test.HorizonIngest().Paths().(simplepath.InMemoryFinder) assert.True(t, ok) @@ -319,37 +311,34 @@ func TestMaxPathFindingRequests(t *testing.T) { testConfig := integration.GetTestConfig() testConfig.HorizonIngestParameters = map[string]string{"max-path-finding-requests": "5"} test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizon() + test.WaitForHorizonIngest() assert.Equal(t, test.HorizonIngest().Config().MaxPathFindingRequests, uint(5)) finder, ok := test.HorizonIngest().Paths().(*paths.RateLimitedFinder) assert.True(t, ok) assert.Equal(t, finder.Limit(), 5) - test.Shutdown() }) } func TestDisablePathFinding(t *testing.T) { t.Run("default", func(t *testing.T) { test := integration.NewTest(t, *integration.GetTestConfig()) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizon() + test.WaitForHorizonIngest() assert.Equal(t, test.HorizonIngest().Config().MaxPathFindingRequests, uint(0)) _, ok := test.HorizonIngest().Paths().(simplepath.InMemoryFinder) assert.True(t, ok) - test.Shutdown() }) t.Run("set to true", func(t *testing.T) { testConfig := integration.GetTestConfig() testConfig.HorizonIngestParameters = map[string]string{"disable-path-finding": "true"} test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizon() + test.WaitForHorizonIngest() assert.Nil(t, test.HorizonIngest().Paths()) - test.Shutdown() }) } @@ -364,9 +353,8 @@ func TestDisableTxSub(t *testing.T) { testConfig.HorizonIngestParameters = localParams testConfig.SkipCoreContainerCreation = true test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.ErrorContains(t, err, "cannot initialize Horizon: flag --stellar-core-url cannot be empty") - test.Shutdown() }) t.Run("horizon starts successfully when DISABLE_TX_SUB=false, INGEST=false and stellar-core-url is provided", func(t *testing.T) { localParams := integration.MergeMaps(networkParamArgs, map[string]string{ @@ -379,9 +367,8 @@ func TestDisableTxSub(t *testing.T) { testConfig.HorizonIngestParameters = localParams testConfig.SkipCoreContainerCreation = true test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.Shutdown() }) t.Run("horizon starts successfully when DISABLE_TX_SUB=true and INGEST=true", func(t *testing.T) { testConfig := integration.GetTestConfig() @@ -390,10 +377,9 @@ func TestDisableTxSub(t *testing.T) { "ingest": "true", } test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizon() - test.Shutdown() + test.WaitForHorizonIngest() }) t.Run("do not require stellar-core-url when both DISABLE_TX_SUB=true and INGEST=false", func(t *testing.T) { localParams := integration.MergeMaps(networkParamArgs, map[string]string{ @@ -405,9 +391,8 @@ func TestDisableTxSub(t *testing.T) { testConfig.HorizonIngestParameters = localParams testConfig.SkipCoreContainerCreation = true test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.Shutdown() }) } @@ -421,9 +406,9 @@ func TestDeprecatedOutputs(t *testing.T) { testConfig := integration.GetTestConfig() testConfig.HorizonIngestParameters = map[string]string{"exp-enable-ingestion-filtering": "false"} test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizon() + test.WaitForHorizonIngest() // Use a wait group to wait for the goroutine to finish before proceeding var wg sync.WaitGroup @@ -507,9 +492,9 @@ func TestDeprecatedOutputs(t *testing.T) { testConfig := integration.GetTestConfig() testConfig.HorizonIngestParameters = map[string]string{"captive-core-use-db": "true"} test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() + err := test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizon() + test.WaitForHorizonIngest() // Use a wait group to wait for the goroutine to finish before proceeding var wg sync.WaitGroup diff --git a/services/horizon/internal/integration/testdata/testbucket/FFFFFC18--999.xdr.zstd b/services/horizon/internal/integration/testdata/testbucket/FFFFFC18--999.xdr.zstd new file mode 100644 index 0000000000..b2627e7fc1 Binary files /dev/null and b/services/horizon/internal/integration/testdata/testbucket/FFFFFC18--999.xdr.zstd differ diff --git a/services/horizon/internal/integration/testdata/testbucket/FFFFFC19--998.xdr.zstd b/services/horizon/internal/integration/testdata/testbucket/FFFFFC19--998.xdr.zstd new file mode 100644 index 0000000000..01fb99ae1f Binary files /dev/null and b/services/horizon/internal/integration/testdata/testbucket/FFFFFC19--998.xdr.zstd differ diff --git a/services/horizon/internal/integration/testdata/testbucket/FFFFFC1A--997.xdr.zstd b/services/horizon/internal/integration/testdata/testbucket/FFFFFC1A--997.xdr.zstd new file mode 100644 index 0000000000..9a509579c7 Binary files /dev/null and b/services/horizon/internal/integration/testdata/testbucket/FFFFFC1A--997.xdr.zstd differ diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index 0402301a44..9884470d70 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -23,11 +23,13 @@ import ( "github.com/creachadair/jrpc2/jhttp" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" sdk "github.com/stellar/go/clients/horizonclient" "github.com/stellar/go/clients/stellarcore" "github.com/stellar/go/keypair" proto "github.com/stellar/go/protocols/horizon" + horizoncmd "github.com/stellar/go/services/horizon/cmd" horizon "github.com/stellar/go/services/horizon/internal" "github.com/stellar/go/services/horizon/internal/ingest" "github.com/stellar/go/support/config" @@ -91,6 +93,7 @@ type Test struct { coreConfig CaptiveConfig horizonIngestConfig horizon.Config horizonWebConfig horizon.Config + testDB *dbtest.DB environment *test.EnvironmentManager horizonClient *sdk.Client @@ -170,11 +173,11 @@ func NewTest(t *testing.T, config Config) *Test { } if !config.SkipHorizonStart { - if innerErr := i.StartHorizon(); innerErr != nil { + if innerErr := i.StartHorizon(true); innerErr != nil { t.Fatalf("Failed to start Horizon: %v", innerErr) } - i.WaitForHorizon() + i.WaitForHorizonIngest() } return i @@ -297,14 +300,15 @@ func (i *Test) prepareShutdownHandlers() { }() } -func (i *Test) RestartHorizon() error { +// if startIngestProcess=true, will restart the ingest sub process also +func (i *Test) RestartHorizon(restartIngestProcess bool) error { i.StopHorizon() - if err := i.StartHorizon(); err != nil { + if err := i.StartHorizon(restartIngestProcess); err != nil { return err } - i.WaitForHorizon() + i.WaitForHorizonIngest() return nil } @@ -316,6 +320,10 @@ func (i *Test) GetHorizonWebConfig() horizon.Config { return i.horizonWebConfig } +func (i *Test) GetTestDB() *dbtest.DB { + return i.testDB +} + // Shutdown stops the integration tests and destroys all its associated // resources. It will be implicitly called when the calling test (i.e. the // `testing.Test` passed to `New()`) is finished if it hasn't been explicitly @@ -329,68 +337,91 @@ func (i *Test) Shutdown() { }) } -// StartHorizon initializes and starts the Horizon client-facing API server and the ingest server. -func (i *Test) StartHorizon() error { - postgres := dbtest.Postgres(i.t) +// StartHorizon initializes and starts the Horizon client-facing API server. +// When startIngestProcess=true, start a second process for ingest server +func (i *Test) StartHorizon(startIngestProcess bool) error { + i.testDB = dbtest.Postgres(i.t) i.shutdownCalls = append(i.shutdownCalls, func() { + if i.appStopped == nil { + // appStopped is nil when the horizon cmd.Execute creates an App, but gets an intentional error and StartHorizon + // never gets to point of running App.Serve() which would have closed the db conn eventually + // since it wires up listener to App.Close() invocation, so, we must manually detect this edge case and + // close the app's db here to clean up + if i.webNode != nil { + i.webNode.CloseDB() + } + if i.ingestNode != nil { + i.ingestNode.CloseDB() + } + } i.StopHorizon() - postgres.Close() + i.testDB.Close() }) + var err error // To facilitate custom runs of Horizon, we merge a default set of // parameters with the tester-supplied ones (if any). - mergedWebArgs := MergeMaps(i.getDefaultWebArgs(postgres), i.config.HorizonWebParameters) - webArgs := mapToFlags(mergedWebArgs) - i.t.Log("Horizon command line webArgs:", webArgs) - - mergedIngestArgs := MergeMaps(i.getDefaultIngestArgs(postgres), i.config.HorizonIngestParameters) - ingestArgs := mapToFlags(mergedIngestArgs) - i.t.Log("Horizon command line ingestArgs:", ingestArgs) - - // setup Horizon web command - var err error - webConfig, webConfigOpts := horizon.Flags() - webCmd := i.createWebCommand(webConfig, webConfigOpts) - webCmd.SetArgs(webArgs) - if err = webConfigOpts.Init(webCmd); err != nil { - return errors.Wrap(err, "cannot initialize params") - } + mergedWebArgs := MergeMaps(i.getDefaultWebArgs(), i.config.HorizonWebParameters) + mergedIngestArgs := MergeMaps(i.getDefaultIngestArgs(), i.config.HorizonIngestParameters) - // setup Horizon ingest command - ingestConfig, ingestConfigOpts := horizon.Flags() - ingestCmd := i.createIngestCommand(ingestConfig, ingestConfigOpts) - ingestCmd.SetArgs(ingestArgs) - if err = ingestConfigOpts.Init(ingestCmd); err != nil { - return errors.Wrap(err, "cannot initialize params") + // Set up Horizon clients + i.setupHorizonClient(mergedWebArgs) + if err = i.setupHorizonAdminClient(mergedIngestArgs); err != nil { + return err } if err = i.initializeEnvironmentVariables(); err != nil { return err } - if err = ingestCmd.Execute(); err != nil { - return errors.Wrap(err, HorizonInitErrStr) + // setup Horizon web process + webArgs := mapToFlags(mergedWebArgs) + i.t.Log("Horizon command line webArgs:", webArgs) + webConfig, webConfigOpts := horizon.Flags() + webCmd := i.createWebCommand(webConfig, webConfigOpts) + webCmd.SetArgs(webArgs) + if err = webConfigOpts.Init(webCmd); err != nil { + return errors.Wrap(err, "cannot initialize params") } - if err = webCmd.Execute(); err != nil { return errors.Wrap(err, HorizonInitErrStr) } + i.horizonWebConfig = *webConfig - // Set up Horizon clients - i.setupHorizonClient(mergedWebArgs) - if err = i.setupHorizonAdminClient(mergedIngestArgs); err != nil { - return err + // setup horizon ingest process + if startIngestProcess { + ingestArgs := mapToFlags(mergedIngestArgs) + i.t.Log("Horizon command line ingestArgs:", ingestArgs) + // setup Horizon ingest command + ingestConfig, ingestConfigOpts := horizon.Flags() + ingestCmd := i.createIngestCommand(ingestConfig, ingestConfigOpts) + ingestCmd.SetArgs(ingestArgs) + if err = ingestConfigOpts.Init(ingestCmd); err != nil { + return errors.Wrap(err, "cannot initialize params") + } + if err = ingestCmd.Execute(); err != nil { + return errors.Wrap(err, HorizonInitErrStr) + } + i.horizonIngestConfig = *ingestConfig + } else { + // not running ingestion, normally that process would do migration through --apply-migrations + // so migrage the empty in any case directly + var rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs([]string{ + "db", "migrate", "up", "--db-url", i.testDB.DSN}) + require.NoError(i.t, rootCmd.Execute()) } - i.horizonIngestConfig = *ingestConfig - i.horizonWebConfig = *webConfig - i.appStopped = &sync.WaitGroup{} - i.appStopped.Add(2) - go func() { - _ = i.ingestNode.Serve() - i.appStopped.Done() - }() + if i.ingestNode != nil { + i.appStopped.Add(1) + go func() { + _ = i.ingestNode.Serve() + i.appStopped.Done() + }() + } + + i.appStopped.Add(1) go func() { _ = i.webNode.Serve() i.appStopped.Done() @@ -399,13 +430,13 @@ func (i *Test) StartHorizon() error { return nil } -func (i *Test) getDefaultArgs(postgres *dbtest.DB) map[string]string { +func (i *Test) getDefaultArgs() map[string]string { // TODO: Ideally, we'd be pulling host/port information from the Docker // Compose YAML file itself rather than hardcoding it. return map[string]string{ "ingest": "false", "history-archive-urls": fmt.Sprintf("http://%s:%d", "localhost", historyArchivePort), - "db-url": postgres.RO_DSN, + "db-url": i.testDB.RO_DSN, "stellar-core-url": i.coreClient.URL, "network-passphrase": i.passPhrase, "apply-migrations": "true", @@ -417,15 +448,15 @@ func (i *Test) getDefaultArgs(postgres *dbtest.DB) map[string]string { } } -func (i *Test) getDefaultWebArgs(postgres *dbtest.DB) map[string]string { - return MergeMaps(i.getDefaultArgs(postgres), map[string]string{"admin-port": "0"}) +func (i *Test) getDefaultWebArgs() map[string]string { + return MergeMaps(i.getDefaultArgs(), map[string]string{"admin-port": "0"}) } -func (i *Test) getDefaultIngestArgs(postgres *dbtest.DB) map[string]string { - return MergeMaps(i.getDefaultArgs(postgres), map[string]string{ +func (i *Test) getDefaultIngestArgs() map[string]string { + return MergeMaps(i.getDefaultArgs(), map[string]string{ "admin-port": strconv.Itoa(i.AdminPort()), "port": "8001", - "db-url": postgres.DSN, + "db-url": i.testDB.DSN, "stellar-core-binary-path": i.coreConfig.binaryPath, "captive-core-config-path": i.coreConfig.configPath, "captive-core-http-port": "21626", @@ -816,7 +847,17 @@ func (i *Test) UpgradeProtocol(version uint32) { i.t.Fatalf("could not upgrade protocol in 10s") } -func (i *Test) WaitForHorizon() { +func (i *Test) WaitForHorizonWeb() { + // wait until the web server is up before continuing to test requests + require.Eventually(i.t, func() bool { + if _, horizonErr := i.Client().Root(); horizonErr != nil { + return false + } + return true + }, time.Second*15, time.Millisecond*100) +} + +func (i *Test) WaitForHorizonIngest() { for t := 60; t >= 0; t -= 1 { time.Sleep(time.Second) diff --git a/support/datastore/datastore.go b/support/datastore/datastore.go index e7e999345d..961ba99545 100644 --- a/support/datastore/datastore.go +++ b/support/datastore/datastore.go @@ -21,6 +21,7 @@ type DataStore interface { PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo, metaData map[string]string) (bool, error) Exists(ctx context.Context, path string) (bool, error) Size(ctx context.Context, path string) (int64, error) + GetSchema() DataStoreSchema Close() error } @@ -32,7 +33,7 @@ func NewDataStore(ctx context.Context, datastoreConfig DataStoreConfig) (DataSto if !ok { return nil, errors.Errorf("Invalid GCS config, no destination_bucket_path") } - return NewGCSDataStore(ctx, destinationBucketPath) + return NewGCSDataStore(ctx, destinationBucketPath, datastoreConfig.Schema) default: return nil, errors.Errorf("Invalid datastore type %v, not supported", datastoreConfig.Type) } diff --git a/support/datastore/gcs_datastore.go b/support/datastore/gcs_datastore.go index cdedea086d..ab1bc669b5 100644 --- a/support/datastore/gcs_datastore.go +++ b/support/datastore/gcs_datastore.go @@ -24,18 +24,19 @@ type GCSDataStore struct { client *storage.Client bucket *storage.BucketHandle prefix string + schema DataStoreSchema } -func NewGCSDataStore(ctx context.Context, bucketPath string) (DataStore, error) { +func NewGCSDataStore(ctx context.Context, bucketPath string, schema DataStoreSchema) (DataStore, error) { client, err := storage.NewClient(ctx) if err != nil { return nil, err } - return FromGCSClient(ctx, client, bucketPath) + return FromGCSClient(ctx, client, bucketPath, schema) } -func FromGCSClient(ctx context.Context, client *storage.Client, bucketPath string) (DataStore, error) { +func FromGCSClient(ctx context.Context, client *storage.Client, bucketPath string, schema DataStoreSchema) (DataStore, error) { // append the gcs:// scheme to enable usage of the url package reliably to // get parse bucket name which is first path segment as URL.Host gcsBucketURL := fmt.Sprintf("gcs://%s", bucketPath) @@ -55,7 +56,8 @@ func FromGCSClient(ctx context.Context, client *storage.Client, bucketPath strin return nil, fmt.Errorf("failed to retrieve bucket attributes: %w", err) } - return &GCSDataStore{client: client, bucket: bucket, prefix: prefix}, nil + // TODO: Datastore schema to be fetched from the datastore https://stellarorg.atlassian.net/browse/HUBBLE-397 + return &GCSDataStore{client: client, bucket: bucket, prefix: prefix, schema: schema}, nil } // GetFileMetadata retrieves the metadata for the specified file in the GCS bucket. @@ -177,3 +179,9 @@ func (b GCSDataStore) putFile(ctx context.Context, filePath string, in io.Writer } return w.Close() } + +// GetSchema returns the schema information which defines the structure +// and organization of data in the datastore. +func (b GCSDataStore) GetSchema() DataStoreSchema { + return b.schema +} diff --git a/support/datastore/gcs_test.go b/support/datastore/gcs_test.go index 8838e8dadb..618b5d602a 100644 --- a/support/datastore/gcs_test.go +++ b/support/datastore/gcs_test.go @@ -24,7 +24,7 @@ func TestGCSExists(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -52,7 +52,7 @@ func TestGCSSize(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -86,7 +86,7 @@ func TestGCSPutFile(t *testing.T) { DefaultEventBasedHold: false, }) - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -138,7 +138,7 @@ func TestGCSPutFileIfNotExists(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -187,7 +187,7 @@ func TestGCSPutFileWithMetadata(t *testing.T) { DefaultEventBasedHold: false, }) - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -255,7 +255,7 @@ func TestGCSPutFileIfNotExistsWithMetadata(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -323,7 +323,7 @@ func TestGCSGetNonExistentFile(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -365,7 +365,7 @@ func TestGCSGetFileValidatesCRC32C(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) diff --git a/support/datastore/mocks.go b/support/datastore/mocks.go index 96c15c1371..2fa39a4712 100644 --- a/support/datastore/mocks.go +++ b/support/datastore/mocks.go @@ -47,6 +47,11 @@ func (m *MockDataStore) Close() error { return args.Error(0) } +func (m *MockDataStore) GetSchema() DataStoreSchema { + args := m.Called() + return args.Get(0).(DataStoreSchema) +} + type MockResumableManager struct { mock.Mock }