From 0a5a19b77fdb1b9a951e63c341d2d5098f4852bf Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Tue, 16 Jun 2020 19:16:43 -0500 Subject: [PATCH 1/2] (VDB-1363) Add command to backfill events --- cmd/backfillEvents.go | 58 +++++++ libraries/shared/logs/extractor.go | 80 +++++++--- libraries/shared/logs/extractor_test.go | 149 ++++++++++++++++++ libraries/shared/mocks/log_extractor.go | 4 + .../backfill/storage_value_loader_test.go | 2 +- pkg/fakes/mock_header_repository.go | 17 +- 6 files changed, 277 insertions(+), 33 deletions(-) create mode 100644 cmd/backfillEvents.go diff --git a/cmd/backfillEvents.go b/cmd/backfillEvents.go new file mode 100644 index 000000000..bc5eb945d --- /dev/null +++ b/cmd/backfillEvents.go @@ -0,0 +1,58 @@ +package cmd + +import ( + "fmt" + + "github.com/makerdao/vulcanizedb/libraries/shared/logs" + "github.com/makerdao/vulcanizedb/utils" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +// backfillEventsCmd represents the backfillEvents command +var backfillEventsCmd = &cobra.Command{ + Use: "backfillEvents", + Short: "BackFill events from already-checked headers", + Long: `Fetch and persist events from configured transformers across a range +of headers that may have already been checked for logs. Useful when adding a +new event transformer to an instance that has already been running and marking +headers checked as it queried for the previous (now incomplete) set of logs.`, + Run: func(cmd *cobra.Command, args []string) { + err := backFillEvents() + if err != nil { + logrus.Fatalf("error back-filling events: %s", err.Error()) + } + logrus.Info("completed back-filling events") + }, +} + +func init() { + rootCmd.AddCommand(backfillEventsCmd) +} + +func backFillEvents() error { + ethEventInitializers, _, _, exportTransformersErr := exportTransformers() + if exportTransformersErr != nil { + LogWithCommand.Fatalf("SubCommand %v: exporting transformers failed: %v", SubCommand, exportTransformersErr) + } + + blockChain := getBlockChain() + db := utils.LoadPostgres(databaseConfig, blockChain.Node()) + + extractor := logs.NewLogExtractor(&db, blockChain) + + for _, initializer := range ethEventInitializers { + transformer := initializer(&db) + err := extractor.AddTransformerConfig(transformer.GetConfig()) + if err != nil { + return fmt.Errorf("error adding transformer: %w", err) + } + } + + err := extractor.BackFillLogs() + if err != nil { + return fmt.Errorf("error backfilling logs: %w", err) + } + + return nil +} diff --git a/libraries/shared/logs/extractor.go b/libraries/shared/logs/extractor.go index 2b11f797f..1d0598937 100644 --- a/libraries/shared/logs/extractor.go +++ b/libraries/shared/logs/extractor.go @@ -18,6 +18,7 @@ package logs import ( "errors" + "fmt" "github.com/ethereum/go-ethereum/common" "github.com/makerdao/vulcanizedb/libraries/shared/constants" @@ -38,6 +39,7 @@ var ( type ILogExtractor interface { AddTransformerConfig(config event.TransformerConfig) error + BackFillLogs() error ExtractLogs(recheckHeaders constants.TransformerExecution) error } @@ -46,6 +48,7 @@ type LogExtractor struct { CheckedHeadersRepository datastore.CheckedHeadersRepository CheckedLogsRepository datastore.CheckedLogsRepository Fetcher fetcher.ILogFetcher + HeaderRepository datastore.HeaderRepository LogRepository datastore.EventLogRepository StartingBlock *int64 EndingBlock *int64 @@ -65,7 +68,7 @@ func NewLogExtractor(db *postgres.DB, bc core.BlockChain) *LogExtractor { } } -// Add additional logs to extract +// AddTransformerConfig adds additional logs to extract func (extractor *LogExtractor) AddTransformerConfig(config event.TransformerConfig) error { checkedHeadersErr := extractor.updateCheckedHeaders(config) if checkedHeadersErr != nil { @@ -110,17 +113,17 @@ func shouldResetEndingBlockToLaterTransformerBlock(currentTransformerBlock int64 return isCurrentBlockNegativeOne && isTransformerBlockGreater } -// Fetch and persist watched logs +// ExtractLogs fetches and persists watched logs from unchecked headers func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) error { if len(extractor.Addresses) < 1 { logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error()) - return ErrNoWatchedAddresses + return fmt.Errorf("error extracting logs: %w", ErrNoWatchedAddresses) } uncheckedHeaders, uncheckedHeadersErr := extractor.CheckedHeadersRepository.UncheckedHeaders(*extractor.StartingBlock, *extractor.EndingBlock, extractor.getCheckCount(recheckHeaders)) if uncheckedHeadersErr != nil { logrus.Errorf("error fetching missing headers: %s", uncheckedHeadersErr) - return uncheckedHeadersErr + return fmt.Errorf("error getting unchecked headers to check for logs: %w", uncheckedHeadersErr) } if len(uncheckedHeaders) < 1 { @@ -128,24 +131,9 @@ func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerEx } for _, header := range uncheckedHeaders { - logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header) - if fetchLogsErr != nil { - logError("error fetching logs for header: %s", fetchLogsErr, header) - return fetchLogsErr - } - - if len(logs) > 0 { - transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs) - if transactionsSyncErr != nil { - logError("error syncing transactions: %s", transactionsSyncErr, header) - return transactionsSyncErr - } - - createLogsErr := extractor.LogRepository.CreateEventLogs(header.Id, logs) - if createLogsErr != nil { - logError("error persisting logs: %s", createLogsErr, header) - return createLogsErr - } + err := extractor.fetchAndPersistLogsForHeader(header) + if err != nil { + return fmt.Errorf("error fetching and persisting logs for header with id %d: %w", header.Id, err) } markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id) @@ -157,6 +145,28 @@ func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerEx return nil } +// BackFillLogs fetches and persists watched logs from provided range of headers +func (extractor LogExtractor) BackFillLogs() error { + if len(extractor.Addresses) < 1 { + logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error()) + return fmt.Errorf("error extracting logs: %w", ErrNoWatchedAddresses) + } + + headers, headersErr := extractor.HeaderRepository.GetHeadersInRange(*extractor.StartingBlock, *extractor.EndingBlock) + if headersErr != nil { + logrus.Errorf("error fetching missing headers: %s", headersErr) + return fmt.Errorf("error getting unchecked headers to check for logs: %w", headersErr) + } + + for _, header := range headers { + err := extractor.fetchAndPersistLogsForHeader(header) + if err != nil { + return fmt.Errorf("error fetching and persisting logs for header with id %d: %w", header.Id, err) + } + } + return nil +} + func logError(description string, err error, header core.Header) { logrus.WithFields(logrus.Fields{ "headerId": header.Id, @@ -168,9 +178,8 @@ func logError(description string, err error, header core.Header) { func (extractor *LogExtractor) getCheckCount(recheckHeaders constants.TransformerExecution) int64 { if recheckHeaders == constants.HeaderUnchecked { return 1 - } else { - return extractor.RecheckHeaderCap } + return extractor.RecheckHeaderCap } func (extractor *LogExtractor) updateCheckedHeaders(config event.TransformerConfig) error { @@ -187,3 +196,26 @@ func (extractor *LogExtractor) updateCheckedHeaders(config event.TransformerConf } return nil } + +func (extractor *LogExtractor) fetchAndPersistLogsForHeader(header core.Header) error { + logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header) + if fetchLogsErr != nil { + logError("error fetching logs for header: %s", fetchLogsErr, header) + return fmt.Errorf("error fetching logs for block %d: %w", header.BlockNumber, fetchLogsErr) + } + + if len(logs) > 0 { + transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs) + if transactionsSyncErr != nil { + logError("error syncing transactions: %s", transactionsSyncErr, header) + return fmt.Errorf("error syncing transactions for block %d: %w", header.BlockNumber, transactionsSyncErr) + } + + createLogsErr := extractor.LogRepository.CreateEventLogs(header.Id, logs) + if createLogsErr != nil { + logError("error persisting logs: %s", createLogsErr, header) + return fmt.Errorf("error persisting logs for block %d: %w", header.BlockNumber, createLogsErr) + } + } + return nil +} diff --git a/libraries/shared/logs/extractor_test.go b/libraries/shared/logs/extractor_test.go index e0cea6e64..12d6a436e 100644 --- a/libraries/shared/logs/extractor_test.go +++ b/libraries/shared/logs/extractor_test.go @@ -383,6 +383,149 @@ var _ = Describe("Log extractor", func() { }) }) }) + + Describe("BackFillLogs", func() { + It("returns error if no watched addresses configured", func() { + err := extractor.BackFillLogs() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(logs.ErrNoWatchedAddresses)) + }) + + It("returns error if getting headers in range returns error", func() { + mockHeaderRepository := &fakes.MockHeaderRepository{} + mockHeaderRepository.GetHeadersInRangeError = fakes.FakeError + extractor.HeaderRepository = mockHeaderRepository + addTransformerConfig(extractor) + + err := extractor.BackFillLogs() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + + It("does nothing if no headers found", func() { + mockHeaderRepository := &fakes.MockHeaderRepository{} + extractor.HeaderRepository = mockHeaderRepository + addTransformerConfig(extractor) + mockLogFetcher := &mocks.MockLogFetcher{} + extractor.Fetcher = mockLogFetcher + + _ = extractor.BackFillLogs() + + Expect(mockLogFetcher.FetchCalled).To(BeFalse()) + }) + + It("fetches logs for headers in range", func() { + addHeaderInRange(extractor) + config := event.TransformerConfig{ + ContractAddresses: []string{fakes.FakeAddress.Hex()}, + Topic: fakes.FakeHash.Hex(), + StartingBlockNumber: rand.Int63(), + } + addTransformerErr := extractor.AddTransformerConfig(config) + Expect(addTransformerErr).NotTo(HaveOccurred()) + mockLogFetcher := &mocks.MockLogFetcher{} + extractor.Fetcher = mockLogFetcher + + err := extractor.BackFillLogs() + + Expect(err).NotTo(HaveOccurred()) + Expect(mockLogFetcher.FetchCalled).To(BeTrue()) + expectedTopics := []common.Hash{common.HexToHash(config.Topic)} + Expect(mockLogFetcher.Topics).To(Equal(expectedTopics)) + expectedAddresses := event.HexStringsToAddresses(config.ContractAddresses) + Expect(mockLogFetcher.ContractAddresses).To(Equal(expectedAddresses)) + }) + + It("returns error if fetching logs fails", func() { + addHeaderInRange(extractor) + addTransformerConfig(extractor) + mockLogFetcher := &mocks.MockLogFetcher{} + mockLogFetcher.ReturnError = fakes.FakeError + extractor.Fetcher = mockLogFetcher + + err := extractor.BackFillLogs() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + + It("does not sync transactions when no logs", func() { + addHeaderInRange(extractor) + addTransformerConfig(extractor) + mockTransactionSyncer := &fakes.MockTransactionSyncer{} + extractor.Syncer = mockTransactionSyncer + + err := extractor.BackFillLogs() + + Expect(err).NotTo(HaveOccurred()) + Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse()) + }) + + Describe("when there are fetched logs", func() { + It("syncs transactions", func() { + addHeaderInRange(extractor) + addFetchedLog(extractor) + addTransformerConfig(extractor) + mockTransactionSyncer := &fakes.MockTransactionSyncer{} + extractor.Syncer = mockTransactionSyncer + + err := extractor.BackFillLogs() + + Expect(err).NotTo(HaveOccurred()) + Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue()) + }) + + It("returns error if syncing transactions fails", func() { + addHeaderInRange(extractor) + addFetchedLog(extractor) + addTransformerConfig(extractor) + mockTransactionSyncer := &fakes.MockTransactionSyncer{} + mockTransactionSyncer.SyncTransactionsError = fakes.FakeError + extractor.Syncer = mockTransactionSyncer + + err := extractor.BackFillLogs() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + + It("persists fetched logs", func() { + addHeaderInRange(extractor) + addTransformerConfig(extractor) + fakeLogs := []types.Log{{ + Address: common.HexToAddress("0xA"), + Topics: []common.Hash{common.HexToHash("0xA")}, + Data: []byte{}, + Index: 0, + }} + mockLogFetcher := &mocks.MockLogFetcher{ReturnLogs: fakeLogs} + extractor.Fetcher = mockLogFetcher + mockLogRepository := &fakes.MockEventLogRepository{} + extractor.LogRepository = mockLogRepository + + err := extractor.BackFillLogs() + + Expect(err).NotTo(HaveOccurred()) + Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs)) + }) + + It("returns error if persisting logs fails", func() { + addHeaderInRange(extractor) + addFetchedLog(extractor) + addTransformerConfig(extractor) + mockLogRepository := &fakes.MockEventLogRepository{} + mockLogRepository.CreateError = fakes.FakeError + extractor.LogRepository = mockLogRepository + + err := extractor.BackFillLogs() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + }) + }) }) func addTransformerConfig(extractor *logs.LogExtractor) { @@ -400,6 +543,12 @@ func addUncheckedHeader(extractor *logs.LogExtractor) { extractor.CheckedHeadersRepository = mockCheckedHeadersRepository } +func addHeaderInRange(extractor *logs.LogExtractor) { + mockHeadersRepository := &fakes.MockHeaderRepository{} + mockHeadersRepository.AllHeaders = []core.Header{{}} + extractor.HeaderRepository = mockHeadersRepository +} + func addFetchedLog(extractor *logs.LogExtractor) { mockLogFetcher := &mocks.MockLogFetcher{} mockLogFetcher.ReturnLogs = []types.Log{{}} diff --git a/libraries/shared/mocks/log_extractor.go b/libraries/shared/mocks/log_extractor.go index eb7014cb6..66661f2f9 100644 --- a/libraries/shared/mocks/log_extractor.go +++ b/libraries/shared/mocks/log_extractor.go @@ -48,3 +48,7 @@ func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.Transfor // return no unchecked headers error so that extractor hits retry interval when delegator under test return logs.ErrNoUncheckedHeaders } + +func (extractor *MockLogExtractor) BackFillLogs() error { + panic("implement me") +} diff --git a/libraries/shared/storage/backfill/storage_value_loader_test.go b/libraries/shared/storage/backfill/storage_value_loader_test.go index cf074aa3a..c9692f240 100644 --- a/libraries/shared/storage/backfill/storage_value_loader_test.go +++ b/libraries/shared/storage/backfill/storage_value_loader_test.go @@ -128,7 +128,7 @@ var _ = Describe("StorageValueLoader", func() { }) It("returns an error if a header for the given block cannot be retrieved", func() { - headerRepo.GetHeaderByBlockNumberError = fakes.FakeError + headerRepo.GetHeadersInRangeError = fakes.FakeError runnerErr := runner.Run() Expect(runnerErr).To(HaveOccurred()) Expect(runnerErr).To(Equal(fakes.FakeError)) diff --git a/pkg/fakes/mock_header_repository.go b/pkg/fakes/mock_header_repository.go index 1786d188c..50f9e6443 100644 --- a/pkg/fakes/mock_header_repository.go +++ b/pkg/fakes/mock_header_repository.go @@ -23,10 +23,6 @@ import ( ) type MockHeaderRepository struct { - createOrUpdateHeaderCallCount int - createOrUpdateHeaderErr error - createOrUpdateHeaderPassedBlockNumbers []int64 - createOrUpdateHeaderReturnID int64 AllHeaders []core.Header CreateTransactionsCalled bool CreateTransactionsError error @@ -35,13 +31,18 @@ type MockHeaderRepository struct { GetHeaderByBlockNumberReturnID int64 GetHeaderByIDError error GetHeaderByIDHeaderToReturn core.Header - missingBlockNumbers []int64 - headerExists bool GetHeaderPassedBlockNumber int64 - GetHeadersInRangeStartingBlock int64 GetHeadersInRangeEndingBlock int64 + GetHeadersInRangeError error + GetHeadersInRangeStartingBlock int64 MostRecentHeaderBlockNumber int64 MostRecentHeaderBlockNumberErr error + createOrUpdateHeaderCallCount int + createOrUpdateHeaderErr error + createOrUpdateHeaderPassedBlockNumbers []int64 + createOrUpdateHeaderReturnID int64 + headerExists bool + missingBlockNumbers []int64 } func NewMockHeaderRepository() *MockHeaderRepository { @@ -91,7 +92,7 @@ func (mock *MockHeaderRepository) GetHeaderByID(id int64) (core.Header, error) { func (mock *MockHeaderRepository) GetHeadersInRange(startingBlock, endingBlock int64) ([]core.Header, error) { mock.GetHeadersInRangeStartingBlock = startingBlock mock.GetHeadersInRangeEndingBlock = endingBlock - return mock.AllHeaders, mock.GetHeaderByBlockNumberError + return mock.AllHeaders, mock.GetHeadersInRangeError } func (mock *MockHeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64) ([]int64, error) { From 01502e44e3eb606ab32da34f684145e0146dfdb3 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Wed, 17 Jun 2020 10:50:23 -0500 Subject: [PATCH 2/2] Add ending block number for backfilling events --- cmd/backfillEvents.go | 8 +++++- libraries/shared/logs/extractor.go | 6 ++-- libraries/shared/logs/extractor_test.go | 37 ++++++++++++++++++------- libraries/shared/mocks/log_extractor.go | 2 +- 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/cmd/backfillEvents.go b/cmd/backfillEvents.go index bc5eb945d..2bd418787 100644 --- a/cmd/backfillEvents.go +++ b/cmd/backfillEvents.go @@ -9,6 +9,8 @@ import ( "github.com/spf13/cobra" ) +var endingBlockNumber int64 + // backfillEventsCmd represents the backfillEvents command var backfillEventsCmd = &cobra.Command{ Use: "backfillEvents", @@ -18,6 +20,8 @@ of headers that may have already been checked for logs. Useful when adding a new event transformer to an instance that has already been running and marking headers checked as it queried for the previous (now incomplete) set of logs.`, Run: func(cmd *cobra.Command, args []string) { + SubCommand = cmd.CalledAs() + LogWithCommand = *logrus.WithField("SubCommand", SubCommand) err := backFillEvents() if err != nil { logrus.Fatalf("error back-filling events: %s", err.Error()) @@ -28,6 +32,8 @@ headers checked as it queried for the previous (now incomplete) set of logs.`, func init() { rootCmd.AddCommand(backfillEventsCmd) + backfillEventsCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "e", -1, "last block from which to back-fill events") + backfillEventsCmd.MarkFlagRequired("ending-block-number") } func backFillEvents() error { @@ -49,7 +55,7 @@ func backFillEvents() error { } } - err := extractor.BackFillLogs() + err := extractor.BackFillLogs(endingBlockNumber) if err != nil { return fmt.Errorf("error backfilling logs: %w", err) } diff --git a/libraries/shared/logs/extractor.go b/libraries/shared/logs/extractor.go index 1d0598937..9c8e75ca9 100644 --- a/libraries/shared/logs/extractor.go +++ b/libraries/shared/logs/extractor.go @@ -39,7 +39,7 @@ var ( type ILogExtractor interface { AddTransformerConfig(config event.TransformerConfig) error - BackFillLogs() error + BackFillLogs(endingBlock int64) error ExtractLogs(recheckHeaders constants.TransformerExecution) error } @@ -146,13 +146,13 @@ func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerEx } // BackFillLogs fetches and persists watched logs from provided range of headers -func (extractor LogExtractor) BackFillLogs() error { +func (extractor LogExtractor) BackFillLogs(endingBlock int64) error { if len(extractor.Addresses) < 1 { logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error()) return fmt.Errorf("error extracting logs: %w", ErrNoWatchedAddresses) } - headers, headersErr := extractor.HeaderRepository.GetHeadersInRange(*extractor.StartingBlock, *extractor.EndingBlock) + headers, headersErr := extractor.HeaderRepository.GetHeadersInRange(*extractor.StartingBlock, endingBlock) if headersErr != nil { logrus.Errorf("error fetching missing headers: %s", headersErr) return fmt.Errorf("error getting unchecked headers to check for logs: %w", headersErr) diff --git a/libraries/shared/logs/extractor_test.go b/libraries/shared/logs/extractor_test.go index 12d6a436e..9bfb6ad5d 100644 --- a/libraries/shared/logs/extractor_test.go +++ b/libraries/shared/logs/extractor_test.go @@ -386,19 +386,36 @@ var _ = Describe("Log extractor", func() { Describe("BackFillLogs", func() { It("returns error if no watched addresses configured", func() { - err := extractor.BackFillLogs() + err := extractor.BackFillLogs(0) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(logs.ErrNoWatchedAddresses)) }) + It("gets headers from transformer's starting block through passed ending block", func() { + fakeConfig := event.TransformerConfig{ + ContractAddresses: []string{fakes.FakeAddress.Hex()}, + Topic: fakes.FakeHash.Hex(), + StartingBlockNumber: rand.Int63(), + } + extractor.AddTransformerConfig(fakeConfig) + mockHeaderRepository := &fakes.MockHeaderRepository{} + extractor.HeaderRepository = mockHeaderRepository + endingBlock := rand.Int63() + + _ = extractor.BackFillLogs(endingBlock) + + Expect(mockHeaderRepository.GetHeadersInRangeStartingBlock).To(Equal(fakeConfig.StartingBlockNumber)) + Expect(mockHeaderRepository.GetHeadersInRangeEndingBlock).To(Equal(endingBlock)) + }) + It("returns error if getting headers in range returns error", func() { mockHeaderRepository := &fakes.MockHeaderRepository{} mockHeaderRepository.GetHeadersInRangeError = fakes.FakeError extractor.HeaderRepository = mockHeaderRepository addTransformerConfig(extractor) - err := extractor.BackFillLogs() + err := extractor.BackFillLogs(0) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) @@ -411,7 +428,7 @@ var _ = Describe("Log extractor", func() { mockLogFetcher := &mocks.MockLogFetcher{} extractor.Fetcher = mockLogFetcher - _ = extractor.BackFillLogs() + _ = extractor.BackFillLogs(0) Expect(mockLogFetcher.FetchCalled).To(BeFalse()) }) @@ -428,7 +445,7 @@ var _ = Describe("Log extractor", func() { mockLogFetcher := &mocks.MockLogFetcher{} extractor.Fetcher = mockLogFetcher - err := extractor.BackFillLogs() + err := extractor.BackFillLogs(0) Expect(err).NotTo(HaveOccurred()) Expect(mockLogFetcher.FetchCalled).To(BeTrue()) @@ -445,7 +462,7 @@ var _ = Describe("Log extractor", func() { mockLogFetcher.ReturnError = fakes.FakeError extractor.Fetcher = mockLogFetcher - err := extractor.BackFillLogs() + err := extractor.BackFillLogs(0) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) @@ -457,7 +474,7 @@ var _ = Describe("Log extractor", func() { mockTransactionSyncer := &fakes.MockTransactionSyncer{} extractor.Syncer = mockTransactionSyncer - err := extractor.BackFillLogs() + err := extractor.BackFillLogs(0) Expect(err).NotTo(HaveOccurred()) Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse()) @@ -471,7 +488,7 @@ var _ = Describe("Log extractor", func() { mockTransactionSyncer := &fakes.MockTransactionSyncer{} extractor.Syncer = mockTransactionSyncer - err := extractor.BackFillLogs() + err := extractor.BackFillLogs(0) Expect(err).NotTo(HaveOccurred()) Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue()) @@ -485,7 +502,7 @@ var _ = Describe("Log extractor", func() { mockTransactionSyncer.SyncTransactionsError = fakes.FakeError extractor.Syncer = mockTransactionSyncer - err := extractor.BackFillLogs() + err := extractor.BackFillLogs(0) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) @@ -505,7 +522,7 @@ var _ = Describe("Log extractor", func() { mockLogRepository := &fakes.MockEventLogRepository{} extractor.LogRepository = mockLogRepository - err := extractor.BackFillLogs() + err := extractor.BackFillLogs(0) Expect(err).NotTo(HaveOccurred()) Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs)) @@ -519,7 +536,7 @@ var _ = Describe("Log extractor", func() { mockLogRepository.CreateError = fakes.FakeError extractor.LogRepository = mockLogRepository - err := extractor.BackFillLogs() + err := extractor.BackFillLogs(0) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) diff --git a/libraries/shared/mocks/log_extractor.go b/libraries/shared/mocks/log_extractor.go index 66661f2f9..89b8a0ce9 100644 --- a/libraries/shared/mocks/log_extractor.go +++ b/libraries/shared/mocks/log_extractor.go @@ -49,6 +49,6 @@ func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.Transfor return logs.ErrNoUncheckedHeaders } -func (extractor *MockLogExtractor) BackFillLogs() error { +func (extractor *MockLogExtractor) BackFillLogs(endingBlock int64) error { panic("implement me") }