Skip to content

Commit

Permalink
Merge pull request #108 from makerdao/vdb-1363-backfill-events
Browse files Browse the repository at this point in the history
(VDB-1363) backfill events
  • Loading branch information
rmulhol authored Jul 14, 2020
2 parents 2d82f28 + 01502e4 commit 419fc76
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 33 deletions.
64 changes: 64 additions & 0 deletions cmd/backfillEvents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cmd

import (
"fmt"

"github.com/makerdao/vulcanizedb/libraries/shared/logs"
"github.com/makerdao/vulcanizedb/utils"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var endingBlockNumber int64

// backfillEventsCmd represents the backfillEvents command
var backfillEventsCmd = &cobra.Command{
Use: "backfillEvents",
Short: "BackFill events from already-checked headers",
Long: `Fetch and persist events from configured transformers across a range
of headers that may have already been checked for logs. Useful when adding a
new event transformer to an instance that has already been running and marking
headers checked as it queried for the previous (now incomplete) set of logs.`,
Run: func(cmd *cobra.Command, args []string) {
SubCommand = cmd.CalledAs()
LogWithCommand = *logrus.WithField("SubCommand", SubCommand)
err := backFillEvents()
if err != nil {
logrus.Fatalf("error back-filling events: %s", err.Error())
}
logrus.Info("completed back-filling events")
},
}

func init() {
rootCmd.AddCommand(backfillEventsCmd)
backfillEventsCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "e", -1, "last block from which to back-fill events")
backfillEventsCmd.MarkFlagRequired("ending-block-number")
}

func backFillEvents() error {
ethEventInitializers, _, _, exportTransformersErr := exportTransformers()
if exportTransformersErr != nil {
LogWithCommand.Fatalf("SubCommand %v: exporting transformers failed: %v", SubCommand, exportTransformersErr)
}

blockChain := getBlockChain()
db := utils.LoadPostgres(databaseConfig, blockChain.Node())

extractor := logs.NewLogExtractor(&db, blockChain)

for _, initializer := range ethEventInitializers {
transformer := initializer(&db)
err := extractor.AddTransformerConfig(transformer.GetConfig())
if err != nil {
return fmt.Errorf("error adding transformer: %w", err)
}
}

err := extractor.BackFillLogs(endingBlockNumber)
if err != nil {
return fmt.Errorf("error backfilling logs: %w", err)
}

return nil
}
80 changes: 56 additions & 24 deletions libraries/shared/logs/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package logs

import (
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/makerdao/vulcanizedb/libraries/shared/constants"
Expand All @@ -38,6 +39,7 @@ var (

type ILogExtractor interface {
AddTransformerConfig(config event.TransformerConfig) error
BackFillLogs(endingBlock int64) error
ExtractLogs(recheckHeaders constants.TransformerExecution) error
}

Expand All @@ -46,6 +48,7 @@ type LogExtractor struct {
CheckedHeadersRepository datastore.CheckedHeadersRepository
CheckedLogsRepository datastore.CheckedLogsRepository
Fetcher fetcher.ILogFetcher
HeaderRepository datastore.HeaderRepository
LogRepository datastore.EventLogRepository
StartingBlock *int64
EndingBlock *int64
Expand All @@ -65,7 +68,7 @@ func NewLogExtractor(db *postgres.DB, bc core.BlockChain) *LogExtractor {
}
}

// Add additional logs to extract
// AddTransformerConfig adds additional logs to extract
func (extractor *LogExtractor) AddTransformerConfig(config event.TransformerConfig) error {
checkedHeadersErr := extractor.updateCheckedHeaders(config)
if checkedHeadersErr != nil {
Expand Down Expand Up @@ -110,42 +113,27 @@ func shouldResetEndingBlockToLaterTransformerBlock(currentTransformerBlock int64
return isCurrentBlockNegativeOne && isTransformerBlockGreater
}

// Fetch and persist watched logs
// ExtractLogs fetches and persists watched logs from unchecked headers
func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) error {
if len(extractor.Addresses) < 1 {
logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error())
return ErrNoWatchedAddresses
return fmt.Errorf("error extracting logs: %w", ErrNoWatchedAddresses)
}

uncheckedHeaders, uncheckedHeadersErr := extractor.CheckedHeadersRepository.UncheckedHeaders(*extractor.StartingBlock, *extractor.EndingBlock, extractor.getCheckCount(recheckHeaders))
if uncheckedHeadersErr != nil {
logrus.Errorf("error fetching missing headers: %s", uncheckedHeadersErr)
return uncheckedHeadersErr
return fmt.Errorf("error getting unchecked headers to check for logs: %w", uncheckedHeadersErr)
}

if len(uncheckedHeaders) < 1 {
return ErrNoUncheckedHeaders
}

for _, header := range uncheckedHeaders {
logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header)
if fetchLogsErr != nil {
logError("error fetching logs for header: %s", fetchLogsErr, header)
return fetchLogsErr
}

if len(logs) > 0 {
transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs)
if transactionsSyncErr != nil {
logError("error syncing transactions: %s", transactionsSyncErr, header)
return transactionsSyncErr
}

createLogsErr := extractor.LogRepository.CreateEventLogs(header.Id, logs)
if createLogsErr != nil {
logError("error persisting logs: %s", createLogsErr, header)
return createLogsErr
}
err := extractor.fetchAndPersistLogsForHeader(header)
if err != nil {
return fmt.Errorf("error fetching and persisting logs for header with id %d: %w", header.Id, err)
}

markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id)
Expand All @@ -157,6 +145,28 @@ func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerEx
return nil
}

// BackFillLogs fetches and persists watched logs from provided range of headers
func (extractor LogExtractor) BackFillLogs(endingBlock int64) error {
if len(extractor.Addresses) < 1 {
logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error())
return fmt.Errorf("error extracting logs: %w", ErrNoWatchedAddresses)
}

headers, headersErr := extractor.HeaderRepository.GetHeadersInRange(*extractor.StartingBlock, endingBlock)
if headersErr != nil {
logrus.Errorf("error fetching missing headers: %s", headersErr)
return fmt.Errorf("error getting unchecked headers to check for logs: %w", headersErr)
}

for _, header := range headers {
err := extractor.fetchAndPersistLogsForHeader(header)
if err != nil {
return fmt.Errorf("error fetching and persisting logs for header with id %d: %w", header.Id, err)
}
}
return nil
}

func logError(description string, err error, header core.Header) {
logrus.WithFields(logrus.Fields{
"headerId": header.Id,
Expand All @@ -168,9 +178,8 @@ func logError(description string, err error, header core.Header) {
func (extractor *LogExtractor) getCheckCount(recheckHeaders constants.TransformerExecution) int64 {
if recheckHeaders == constants.HeaderUnchecked {
return 1
} else {
return extractor.RecheckHeaderCap
}
return extractor.RecheckHeaderCap
}

func (extractor *LogExtractor) updateCheckedHeaders(config event.TransformerConfig) error {
Expand All @@ -187,3 +196,26 @@ func (extractor *LogExtractor) updateCheckedHeaders(config event.TransformerConf
}
return nil
}

func (extractor *LogExtractor) fetchAndPersistLogsForHeader(header core.Header) error {
logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header)
if fetchLogsErr != nil {
logError("error fetching logs for header: %s", fetchLogsErr, header)
return fmt.Errorf("error fetching logs for block %d: %w", header.BlockNumber, fetchLogsErr)
}

if len(logs) > 0 {
transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs)
if transactionsSyncErr != nil {
logError("error syncing transactions: %s", transactionsSyncErr, header)
return fmt.Errorf("error syncing transactions for block %d: %w", header.BlockNumber, transactionsSyncErr)
}

createLogsErr := extractor.LogRepository.CreateEventLogs(header.Id, logs)
if createLogsErr != nil {
logError("error persisting logs: %s", createLogsErr, header)
return fmt.Errorf("error persisting logs for block %d: %w", header.BlockNumber, createLogsErr)
}
}
return nil
}
166 changes: 166 additions & 0 deletions libraries/shared/logs/extractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,166 @@ var _ = Describe("Log extractor", func() {
})
})
})

Describe("BackFillLogs", func() {
It("returns error if no watched addresses configured", func() {
err := extractor.BackFillLogs(0)

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(logs.ErrNoWatchedAddresses))
})

It("gets headers from transformer's starting block through passed ending block", func() {
fakeConfig := event.TransformerConfig{
ContractAddresses: []string{fakes.FakeAddress.Hex()},
Topic: fakes.FakeHash.Hex(),
StartingBlockNumber: rand.Int63(),
}
extractor.AddTransformerConfig(fakeConfig)
mockHeaderRepository := &fakes.MockHeaderRepository{}
extractor.HeaderRepository = mockHeaderRepository
endingBlock := rand.Int63()

_ = extractor.BackFillLogs(endingBlock)

Expect(mockHeaderRepository.GetHeadersInRangeStartingBlock).To(Equal(fakeConfig.StartingBlockNumber))
Expect(mockHeaderRepository.GetHeadersInRangeEndingBlock).To(Equal(endingBlock))
})

It("returns error if getting headers in range returns error", func() {
mockHeaderRepository := &fakes.MockHeaderRepository{}
mockHeaderRepository.GetHeadersInRangeError = fakes.FakeError
extractor.HeaderRepository = mockHeaderRepository
addTransformerConfig(extractor)

err := extractor.BackFillLogs(0)

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})

It("does nothing if no headers found", func() {
mockHeaderRepository := &fakes.MockHeaderRepository{}
extractor.HeaderRepository = mockHeaderRepository
addTransformerConfig(extractor)
mockLogFetcher := &mocks.MockLogFetcher{}
extractor.Fetcher = mockLogFetcher

_ = extractor.BackFillLogs(0)

Expect(mockLogFetcher.FetchCalled).To(BeFalse())
})

It("fetches logs for headers in range", func() {
addHeaderInRange(extractor)
config := event.TransformerConfig{
ContractAddresses: []string{fakes.FakeAddress.Hex()},
Topic: fakes.FakeHash.Hex(),
StartingBlockNumber: rand.Int63(),
}
addTransformerErr := extractor.AddTransformerConfig(config)
Expect(addTransformerErr).NotTo(HaveOccurred())
mockLogFetcher := &mocks.MockLogFetcher{}
extractor.Fetcher = mockLogFetcher

err := extractor.BackFillLogs(0)

Expect(err).NotTo(HaveOccurred())
Expect(mockLogFetcher.FetchCalled).To(BeTrue())
expectedTopics := []common.Hash{common.HexToHash(config.Topic)}
Expect(mockLogFetcher.Topics).To(Equal(expectedTopics))
expectedAddresses := event.HexStringsToAddresses(config.ContractAddresses)
Expect(mockLogFetcher.ContractAddresses).To(Equal(expectedAddresses))
})

It("returns error if fetching logs fails", func() {
addHeaderInRange(extractor)
addTransformerConfig(extractor)
mockLogFetcher := &mocks.MockLogFetcher{}
mockLogFetcher.ReturnError = fakes.FakeError
extractor.Fetcher = mockLogFetcher

err := extractor.BackFillLogs(0)

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})

It("does not sync transactions when no logs", func() {
addHeaderInRange(extractor)
addTransformerConfig(extractor)
mockTransactionSyncer := &fakes.MockTransactionSyncer{}
extractor.Syncer = mockTransactionSyncer

err := extractor.BackFillLogs(0)

Expect(err).NotTo(HaveOccurred())
Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse())
})

Describe("when there are fetched logs", func() {
It("syncs transactions", func() {
addHeaderInRange(extractor)
addFetchedLog(extractor)
addTransformerConfig(extractor)
mockTransactionSyncer := &fakes.MockTransactionSyncer{}
extractor.Syncer = mockTransactionSyncer

err := extractor.BackFillLogs(0)

Expect(err).NotTo(HaveOccurred())
Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue())
})

It("returns error if syncing transactions fails", func() {
addHeaderInRange(extractor)
addFetchedLog(extractor)
addTransformerConfig(extractor)
mockTransactionSyncer := &fakes.MockTransactionSyncer{}
mockTransactionSyncer.SyncTransactionsError = fakes.FakeError
extractor.Syncer = mockTransactionSyncer

err := extractor.BackFillLogs(0)

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})

It("persists fetched logs", func() {
addHeaderInRange(extractor)
addTransformerConfig(extractor)
fakeLogs := []types.Log{{
Address: common.HexToAddress("0xA"),
Topics: []common.Hash{common.HexToHash("0xA")},
Data: []byte{},
Index: 0,
}}
mockLogFetcher := &mocks.MockLogFetcher{ReturnLogs: fakeLogs}
extractor.Fetcher = mockLogFetcher
mockLogRepository := &fakes.MockEventLogRepository{}
extractor.LogRepository = mockLogRepository

err := extractor.BackFillLogs(0)

Expect(err).NotTo(HaveOccurred())
Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs))
})

It("returns error if persisting logs fails", func() {
addHeaderInRange(extractor)
addFetchedLog(extractor)
addTransformerConfig(extractor)
mockLogRepository := &fakes.MockEventLogRepository{}
mockLogRepository.CreateError = fakes.FakeError
extractor.LogRepository = mockLogRepository

err := extractor.BackFillLogs(0)

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
})
})
})

func addTransformerConfig(extractor *logs.LogExtractor) {
Expand All @@ -400,6 +560,12 @@ func addUncheckedHeader(extractor *logs.LogExtractor) {
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
}

func addHeaderInRange(extractor *logs.LogExtractor) {
mockHeadersRepository := &fakes.MockHeaderRepository{}
mockHeadersRepository.AllHeaders = []core.Header{{}}
extractor.HeaderRepository = mockHeadersRepository
}

func addFetchedLog(extractor *logs.LogExtractor) {
mockLogFetcher := &mocks.MockLogFetcher{}
mockLogFetcher.ReturnLogs = []types.Log{{}}
Expand Down
Loading

0 comments on commit 419fc76

Please sign in to comment.