diff --git a/pkg/contract_watcher/full/transformer/transformer.go b/pkg/contract_watcher/full/transformer/transformer.go index 42a95d210..dbc1dcedb 100644 --- a/pkg/contract_watcher/full/transformer/transformer.go +++ b/pkg/contract_watcher/full/transformer/transformer.go @@ -19,6 +19,8 @@ package transformer import ( "errors" + "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/converter" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/retriever" @@ -106,7 +108,11 @@ func (tr *Transformer) Init() error { // Get contract name if it has one var name = new(string) - tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock) + pollingErr := tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock) + if pollingErr != nil { + // can't return this error because "name" might not exist on the contract + logrus.Warnf("error fetching contract data: %s", pollingErr.Error()) + } // Remove any potential accidental duplicate inputs in arg filter values eventArgs := map[string]bool{} diff --git a/pkg/contract_watcher/header/converter/converter.go b/pkg/contract_watcher/header/converter/converter.go index 7815cf483..afc04212e 100644 --- a/pkg/contract_watcher/header/converter/converter.go +++ b/pkg/contract_watcher/header/converter/converter.go @@ -132,7 +132,7 @@ func (c *Converter) Convert(logs []gethTypes.Log, event types.Event, headerID in // Convert the given watched event logs into types.Logs; returns a map of event names to a slice of their converted logs func (c *Converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.Event, headerID int64) (map[string][]types.Log, error) { - contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil) + boundContract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil) eventsToLogs := make(map[string][]types.Log) for _, event := range events { eventsToLogs[event.Name] = make([]types.Log, 0, len(logs)) @@ -141,7 +141,7 @@ func (c *Converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.E // If the log is of this event type, process it as such if event.Sig() == log.Topics[0] { values := make(map[string]interface{}) - err := contract.UnpackLogIntoMap(values, event.Name, log) + err := boundContract.UnpackLogIntoMap(values, event.Name, log) if err != nil { return nil, err } diff --git a/pkg/contract_watcher/header/repository/header_repository.go b/pkg/contract_watcher/header/repository/header_repository.go index 78d961581..4ad817f01 100644 --- a/pkg/contract_watcher/header/repository/header_repository.go +++ b/pkg/contract_watcher/header/repository/header_repository.go @@ -21,6 +21,7 @@ import ( "github.com/hashicorp/golang-lru" "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" @@ -148,7 +149,10 @@ func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids [ pgStr = pgStr[:len(pgStr)-2] _, err = tx.Exec(pgStr, header.Id) if err != nil { - tx.Rollback() + rollbackErr := tx.Rollback() + if rollbackErr != nil { + logrus.Warnf("error rolling back transaction: %s", rollbackErr.Error()) + } return err } } @@ -246,6 +250,7 @@ func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlock // Returns a continuous set of headers func continuousHeaders(headers []core.Header) []core.Header { if len(headers) < 1 { + logrus.Trace("no headers to arrange continuously") return headers } previousHeader := headers[0].BlockNumber diff --git a/pkg/contract_watcher/header/repository/header_repository_test.go b/pkg/contract_watcher/header/repository/header_repository_test.go index a3136bdca..327afb593 100644 --- a/pkg/contract_watcher/header/repository/header_repository_test.go +++ b/pkg/contract_watcher/header/repository/header_repository_test.go @@ -137,9 +137,9 @@ var _ = Describe("Repository", func() { h1 := missingHeaders[0] h2 := missingHeaders[1] h3 := missingHeaders[2] - Expect(h1.BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber))) - Expect(h2.BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber))) - Expect(h3.BlockNumber).To(Equal(int64(mocks.MockHeader3.BlockNumber))) + Expect(h1.BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber)) + Expect(h2.BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber)) + Expect(h3.BlockNumber).To(Equal(mocks.MockHeader3.BlockNumber)) }) It("Returns only contiguous chunks of headers", func() { @@ -150,8 +150,8 @@ var _ = Describe("Repository", func() { missingHeaders, err := contractHeaderRepo.MissingHeaders(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(2)) - Expect(missingHeaders[0].BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber))) - Expect(missingHeaders[1].BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber))) + Expect(missingHeaders[0].BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber)) + Expect(missingHeaders[1].BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber)) }) It("Fails if eventID does not yet exist in check_headers table", func() { @@ -199,8 +199,8 @@ var _ = Describe("Repository", func() { missingHeaders, err := contractHeaderRepo.MissingHeadersForAll(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(2)) - Expect(missingHeaders[0].BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber))) - Expect(missingHeaders[1].BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber))) + Expect(missingHeaders[0].BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber)) + Expect(missingHeaders[1].BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber)) }) It("returns headers after starting header if starting header not missing", func() { diff --git a/pkg/contract_watcher/header/retriever/block_retriever_test.go b/pkg/contract_watcher/header/retriever/block_retriever_test.go index 497dbdf82..0e0d49215 100644 --- a/pkg/contract_watcher/header/retriever/block_retriever_test.go +++ b/pkg/contract_watcher/header/retriever/block_retriever_test.go @@ -44,9 +44,12 @@ var _ = Describe("Block Retriever", func() { Describe("RetrieveFirstBlock", func() { It("Retrieves block number of earliest header in the database", func() { - headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) + _, err := headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) + Expect(err).ToNot(HaveOccurred()) + _, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) + Expect(err).ToNot(HaveOccurred()) + _, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) + Expect(err).ToNot(HaveOccurred()) i, err := r.RetrieveFirstBlock() Expect(err).NotTo(HaveOccurred()) @@ -61,9 +64,12 @@ var _ = Describe("Block Retriever", func() { Describe("RetrieveMostRecentBlock", func() { It("Retrieves the latest header's block number", func() { - headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) + _, err := headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) + Expect(err).ToNot(HaveOccurred()) + _, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) + Expect(err).ToNot(HaveOccurred()) + _, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) + Expect(err).ToNot(HaveOccurred()) i, err := r.RetrieveMostRecentBlock() Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/contract_watcher/header/transformer/transformer.go b/pkg/contract_watcher/header/transformer/transformer.go index e6b303443..1b8287142 100644 --- a/pkg/contract_watcher/header/transformer/transformer.go +++ b/pkg/contract_watcher/header/transformer/transformer.go @@ -18,10 +18,12 @@ package transformer import ( "errors" + "fmt" "strings" "github.com/ethereum/go-ethereum/common" gethTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/converter" @@ -107,22 +109,22 @@ func (tr *Transformer) Init() error { // Configure Abi if tr.Config.Abis[contractAddr] == "" { // If no abi is given in the config, this method will try fetching from internal look-up table and etherscan - err := tr.Parser.Parse(contractAddr) - if err != nil { - return err + parseErr := tr.Parser.Parse(contractAddr) + if parseErr != nil { + return fmt.Errorf("error parsing contract by address: %s", parseErr.Error()) } } else { // If we have an abi from the config, load that into the parser - err := tr.Parser.ParseAbiStr(tr.Config.Abis[contractAddr]) - if err != nil { - return err + parseErr := tr.Parser.ParseAbiStr(tr.Config.Abis[contractAddr]) + if parseErr != nil { + return fmt.Errorf("error parsing contract abi: %s", parseErr.Error()) } } // Get first block and most recent block number in the header repo - firstBlock, err := tr.Retriever.RetrieveFirstBlock() - if err != nil { - return err + firstBlock, retrieveErr := tr.Retriever.RetrieveFirstBlock() + if retrieveErr != nil { + return fmt.Errorf("error retrieving first block: %s", retrieveErr.Error()) } // Set to specified range if it falls within the bounds @@ -132,7 +134,11 @@ func (tr *Transformer) Init() error { // Get contract name if it has one var name = new(string) - tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1) + pollingErr := tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1) + if pollingErr != nil { + // can't return this error because "name" might not exist on the contract + logrus.Warnf("error fetching contract data: %s", pollingErr.Error()) + } // Remove any potential accidental duplicate inputs eventArgs := map[string]bool{} @@ -165,9 +171,9 @@ func (tr *Transformer) Init() error { tr.sortedEventIds[con.Address] = make([]string, 0, len(con.Events)) for _, event := range con.Events { eventId := strings.ToLower(event.Name + "_" + con.Address) - err := tr.HeaderRepository.AddCheckColumn(eventId) - if err != nil { - return err + addColumnErr := tr.HeaderRepository.AddCheckColumn(eventId) + if addColumnErr != nil { + return fmt.Errorf("error adding check column: %s", addColumnErr.Error()) } // Keep track of this event id; sorted and unsorted tr.sortedEventIds[con.Address] = append(tr.sortedEventIds[con.Address], eventId) @@ -180,9 +186,9 @@ func (tr *Transformer) Init() error { tr.sortedMethodIds[con.Address] = make([]string, 0, len(con.Methods)) for _, m := range con.Methods { methodId := strings.ToLower(m.Name + "_" + con.Address) - err := tr.HeaderRepository.AddCheckColumn(methodId) - if err != nil { - return err + addColumnErr := tr.HeaderRepository.AddCheckColumn(methodId) + if addColumnErr != nil { + return fmt.Errorf("error adding check column: %s", addColumnErr.Error()) } tr.sortedMethodIds[con.Address] = append(tr.sortedMethodIds[con.Address], methodId) } @@ -202,9 +208,9 @@ func (tr *Transformer) Execute() error { } // Find unchecked headers for all events across all contracts; these are returned in asc order - missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds) - if err != nil { - return err + missingHeaders, missingHeadersErr := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds) + if missingHeadersErr != nil { + return fmt.Errorf("error getting missing headers: %s", missingHeadersErr.Error()) } // Iterate over headers @@ -216,23 +222,24 @@ func (tr *Transformer) Execute() error { // Map to sort batch fetched logs by which contract they belong to, for post fetch processing sortedLogs := make(map[string][]gethTypes.Log) // And fetch all event logs across contracts at this header - allLogs, err := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header) - if err != nil { - return err + allLogs, fetchErr := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header) + if fetchErr != nil { + return fmt.Errorf("error fetching logs: %s", fetchErr.Error()) } // If no logs are found mark the header checked for all of these eventIDs // and continue to method polling and onto the next iteration if len(allLogs) < 1 { - err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds) - if err != nil { - return err + markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds) + if markCheckedErr != nil { + return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) } - err = tr.methodPolling(header, tr.sortedMethodIds) - if err != nil { - return err + pollingErr := tr.methodPolling(header, tr.sortedMethodIds) + if pollingErr != nil { + return fmt.Errorf("error polling methods: %s", pollingErr.Error()) } tr.Start = header.BlockNumber + 1 // Empty header; setup to start at the next header + logrus.Tracef("no logs found for block %d, continuing", header.BlockNumber) continue } @@ -245,6 +252,7 @@ func (tr *Transformer) Execute() error { // Process logs for each contract for conAddr, logs := range sortedLogs { if logs == nil { + logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber) continue } // Configure converter with this contract @@ -252,34 +260,35 @@ func (tr *Transformer) Execute() error { tr.Converter.Update(con) // Convert logs into batches of log mappings (eventName => []types.Logs - convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id) - if err != nil { - return err + convertedLogs, convertErr := tr.Converter.ConvertBatch(logs, con.Events, header.Id) + if convertErr != nil { + return fmt.Errorf("error converting logs: %s", convertErr.Error()) } // Cycle through each type of event log and persist them for eventName, logs := range convertedLogs { // If logs for this event are empty, mark them checked at this header and continue if len(logs) < 1 { eventId := strings.ToLower(eventName + "_" + con.Address) - err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId) - if err != nil { - return err + markCheckedErr := tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId) + if markCheckedErr != nil { + return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) } + logrus.Tracef("no logs found for event %s on contract %s at block %d, continuing", eventName, conAddr, header.BlockNumber) continue } // If logs aren't empty, persist them // Header is marked checked in the transactions - err = tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name) - if err != nil { - return err + persistErr := tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name) + if persistErr != nil { + return fmt.Errorf("error persisting logs: %s", persistErr.Error()) } } } // Poll contracts at this block height - err = tr.methodPolling(header, tr.sortedMethodIds) - if err != nil { - return err + pollingErr := tr.methodPolling(header, tr.sortedMethodIds) + if pollingErr != nil { + return fmt.Errorf("error polling methods: %s", pollingErr.Error()) } // Success; setup to start at the next header tr.Start = header.BlockNumber + 1 @@ -294,19 +303,20 @@ func (tr *Transformer) methodPolling(header core.Header, sortedMethodIds map[str // Skip method polling processes if no methods are specified // Also don't try to poll methods below this contract's specified starting block if len(con.Methods) == 0 || header.BlockNumber < con.StartingBlock { + logrus.Tracef("not polling contract: %s", con.Address) continue } // Poll all methods for this contract at this header - err := tr.Poller.PollContractAt(*con, header.BlockNumber) - if err != nil { - return err + pollingErr := tr.Poller.PollContractAt(*con, header.BlockNumber) + if pollingErr != nil { + return fmt.Errorf("error polling contract %s: %s", con.Address, pollingErr.Error()) } // Mark this header checked for the methods - err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address]) - if err != nil { - return err + markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address]) + if markCheckedErr != nil { + return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) } } diff --git a/pkg/contract_watcher/header/transformer/transformer_test.go b/pkg/contract_watcher/header/transformer/transformer_test.go index 04dff0b0a..c96c589e4 100644 --- a/pkg/contract_watcher/header/transformer/transformer_test.go +++ b/pkg/contract_watcher/header/transformer/transformer_test.go @@ -68,7 +68,7 @@ var _ = Describe("Transformer", func() { err := t.Init() Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) + Expect(err.Error()).To(ContainSubstring(fakes.FakeError.Error())) }) }) @@ -109,7 +109,7 @@ var _ = Describe("Transformer", func() { err := t.Init() Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) + Expect(err.Error()).To(ContainSubstring(fakes.FakeError.Error())) }) }) }) diff --git a/pkg/contract_watcher/shared/constants/interface.go b/pkg/contract_watcher/shared/constants/interface.go index f49ac96ac..5e383f289 100644 --- a/pkg/contract_watcher/shared/constants/interface.go +++ b/pkg/contract_watcher/shared/constants/interface.go @@ -24,7 +24,7 @@ import ( var SupportsInterfaceABI = `[{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}]` // Individual event interfaces for constructing ABI from -var SupportsInterace = `{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}` +var SupportsInterface = `{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}` var AddrChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"a","type":"address"}],"name":"AddrChanged","type":"event"}` var ContentChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"hash","type":"bytes32"}],"name":"ContentChanged","type":"event"}` var NameChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"name","type":"string"}],"name":"NameChanged","type":"event"}` diff --git a/pkg/contract_watcher/shared/fetcher/fetcher.go b/pkg/contract_watcher/shared/fetcher/fetcher.go index 459e96c36..7507628f7 100644 --- a/pkg/contract_watcher/shared/fetcher/fetcher.go +++ b/pkg/contract_watcher/shared/fetcher/fetcher.go @@ -47,7 +47,7 @@ func newFetcherError(err error, fetchMethod string) *fetcherError { // Fetcher struct type Fetcher struct { - BlockChain core.BlockChain // Underyling Blockchain + BlockChain core.BlockChain // Underlying Blockchain } // Fetcher error diff --git a/pkg/contract_watcher/shared/helpers/test_helpers/database.go b/pkg/contract_watcher/shared/helpers/test_helpers/database.go index 5a380b249..a997c9df4 100644 --- a/pkg/contract_watcher/shared/helpers/test_helpers/database.go +++ b/pkg/contract_watcher/shared/helpers/test_helpers/database.go @@ -115,9 +115,9 @@ func SetupDBandBC() (*postgres.DB, core.BlockChain) { rpcClient := client.NewRpcClient(rawRpcClient, infuraIPC) ethClient := ethclient.NewClient(rawRpcClient) blockChainClient := client.NewEthClient(ethClient) - node := node.MakeNode(rpcClient) + madeNode := node.MakeNode(rpcClient) transactionConverter := rpc2.NewRpcTransactionConverter(ethClient) - blockChain := geth.NewBlockChain(blockChainClient, rpcClient, node, transactionConverter) + blockChain := geth.NewBlockChain(blockChainClient, rpcClient, madeNode, transactionConverter) db, err := postgres.NewDB(config.Database{ Hostname: "localhost", diff --git a/pkg/contract_watcher/shared/helpers/test_helpers/mocks/entities.go b/pkg/contract_watcher/shared/helpers/test_helpers/mocks/entities.go index b68307103..27fa21733 100644 --- a/pkg/contract_watcher/shared/helpers/test_helpers/mocks/entities.go +++ b/pkg/contract_watcher/shared/helpers/test_helpers/mocks/entities.go @@ -324,7 +324,7 @@ var MockConfig = config.ContractConfig{ "0x1234567890abcdef": "fake_abi", }, Events: map[string][]string{ - "0x1234567890abcdef": []string{"Transfer"}, + "0x1234567890abcdef": {"Transfer"}, }, Methods: map[string][]string{ "0x1234567890abcdef": nil, diff --git a/pkg/contract_watcher/shared/helpers/test_helpers/test_data.go b/pkg/contract_watcher/shared/helpers/test_helpers/test_data.go index c52fd20c6..5e35b607d 100644 --- a/pkg/contract_watcher/shared/helpers/test_helpers/test_data.go +++ b/pkg/contract_watcher/shared/helpers/test_helpers/test_data.go @@ -35,7 +35,7 @@ var TusdConfig = config.ContractConfig{ tusd: "", }, Events: map[string][]string{ - tusd: []string{"Transfer"}, + tusd: {"Transfer"}, }, Methods: map[string][]string{ tusd: nil, @@ -60,7 +60,7 @@ var ENSConfig = config.ContractConfig{ ens: "", }, Events: map[string][]string{ - ens: []string{"NewOwner"}, + ens: {"NewOwner"}, }, Methods: map[string][]string{ ens: nil, @@ -87,8 +87,8 @@ var ENSandTusdConfig = config.ContractConfig{ tusd: "", }, Events: map[string][]string{ - ens: []string{"NewOwner"}, - tusd: []string{"Transfer"}, + ens: {"NewOwner"}, + tusd: {"Transfer"}, }, Methods: map[string][]string{ ens: nil, diff --git a/pkg/contract_watcher/shared/repository/event_repository.go b/pkg/contract_watcher/shared/repository/event_repository.go index a43d4766e..759c3889c 100644 --- a/pkg/contract_watcher/shared/repository/event_repository.go +++ b/pkg/contract_watcher/shared/repository/event_repository.go @@ -22,6 +22,7 @@ import ( "strings" "github.com/hashicorp/golang-lru" + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/repository" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types" @@ -31,7 +32,7 @@ import ( const ( // Number of contract address and method ids to keep in cache contractCacheSize = 100 - eventChacheSize = 1000 + eventCacheSize = 1000 ) // Event repository is used to persist event data into custom tables @@ -52,7 +53,7 @@ type eventRepository struct { func NewEventRepository(db *postgres.DB, mode types.Mode) *eventRepository { ccs, _ := lru.New(contractCacheSize) - ecs, _ := lru.New(eventChacheSize) + ecs, _ := lru.New(eventCacheSize) return &eventRepository{ db: db, mode: mode, @@ -68,14 +69,14 @@ func (r *eventRepository) PersistLogs(logs []types.Log, eventInfo types.Event, c if len(logs) == 0 { return errors.New("event repository error: passed empty logs slice") } - _, err := r.CreateContractSchema(contractAddr) - if err != nil { - return err + _, schemaErr := r.CreateContractSchema(contractAddr) + if schemaErr != nil { + return fmt.Errorf("error creating schema for contract %s: %s", contractAddr, schemaErr.Error()) } - _, err = r.CreateEventTable(contractAddr, eventInfo) - if err != nil { - return err + _, tableErr := r.CreateEventTable(contractAddr, eventInfo) + if tableErr != nil { + return fmt.Errorf("error creating table for event %s on contract %s: %s", eventInfo.Name, contractAddr, tableErr.Error()) } return r.persistLogs(logs, eventInfo, contractAddr, contractName) @@ -97,9 +98,9 @@ func (r *eventRepository) persistLogs(logs []types.Log, eventInfo types.Event, c // Creates a custom postgres command to persist logs for the given event (compatible with header synced vDB) func (r *eventRepository) persistHeaderSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { - tx, err := r.db.Beginx() - if err != nil { - return err + tx, txErr := r.db.Beginx() + if txErr != nil { + return fmt.Errorf("error beginning db transaction: %s", txErr.Error()) } for _, event := range logs { @@ -130,20 +131,27 @@ func (r *eventRepository) persistHeaderSyncLogs(logs []types.Log, eventInfo type } pgStr = pgStr + ") ON CONFLICT DO NOTHING" + logrus.Tracef("query for inserting log: %s", pgStr) // Add this query to the transaction - _, err = tx.Exec(pgStr, data...) - if err != nil { - tx.Rollback() - return err + _, execErr := tx.Exec(pgStr, data...) + if execErr != nil { + rollbackErr := tx.Rollback() + if rollbackErr != nil { + logrus.Warnf("error rolling back transactions while persisting logs: %s", rollbackErr.Error()) + } + return fmt.Errorf("error executing query: %s", execErr.Error()) } } // Mark header as checked for this eventId eventId := strings.ToLower(eventInfo.Name + "_" + contractAddr) - err = repository.MarkContractWatcherHeaderCheckedInTransaction(logs[0].Id, tx, eventId) // This assumes all logs are from same block - if err != nil { - tx.Rollback() - return err + markCheckedErr := repository.MarkContractWatcherHeaderCheckedInTransaction(logs[0].Id, tx, eventId) // This assumes all logs are from same block + if markCheckedErr != nil { + rollbackErr := tx.Rollback() + if rollbackErr != nil { + logrus.Warnf("error rolling back transaction while marking header checked: %s", rollbackErr.Error()) + } + return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) } return tx.Commit() @@ -151,9 +159,9 @@ func (r *eventRepository) persistHeaderSyncLogs(logs []types.Log, eventInfo type // Creates a custom postgres command to persist logs for the given event (compatible with fully synced vDB) func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { - tx, err := r.db.Beginx() - if err != nil { - return err + tx, txErr := r.db.Beginx() + if txErr != nil { + return fmt.Errorf("error beginning db transaction: %s", txErr.Error()) } for _, event := range logs { @@ -179,10 +187,14 @@ func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types. } pgStr = pgStr + ") ON CONFLICT (vulcanize_log_id) DO NOTHING" - _, err = tx.Exec(pgStr, data...) - if err != nil { - tx.Rollback() - return err + logrus.Tracef("query for inserting log: %s", pgStr) + _, execErr := tx.Exec(pgStr, data...) + if execErr != nil { + rollbackErr := tx.Rollback() + if rollbackErr != nil { + logrus.Warnf("error rolling back transactions while persisting logs: %s", rollbackErr.Error()) + } + return fmt.Errorf("error executing query: %s", execErr.Error()) } } @@ -198,15 +210,15 @@ func (r *eventRepository) CreateEventTable(contractAddr string, event types.Even if ok { return false, nil } - tableExists, err := r.checkForTable(contractAddr, event.Name) - if err != nil { - return false, err + tableExists, checkTableErr := r.checkForTable(contractAddr, event.Name) + if checkTableErr != nil { + return false, fmt.Errorf("error checking for table: %s", checkTableErr) } if !tableExists { - err = r.newEventTable(tableID, event) - if err != nil { - return false, err + createTableErr := r.newEventTable(tableID, event) + if createTableErr != nil { + return false, fmt.Errorf("error creating table: %s", createTableErr.Error()) } } @@ -270,14 +282,14 @@ func (r *eventRepository) CreateContractSchema(contractAddr string) (bool, error if ok { return false, nil } - schemaExists, err := r.checkForSchema(contractAddr) - if err != nil { - return false, err + schemaExists, checkSchemaErr := r.checkForSchema(contractAddr) + if checkSchemaErr != nil { + return false, fmt.Errorf("error checking for schema: %s", checkSchemaErr.Error()) } if !schemaExists { - err = r.newContractSchema(contractAddr) - if err != nil { - return false, err + createSchemaErr := r.newContractSchema(contractAddr) + if createSchemaErr != nil { + return false, fmt.Errorf("error creating schema: %s", createSchemaErr.Error()) } } diff --git a/pkg/contract_watcher/shared/repository/event_repository_test.go b/pkg/contract_watcher/shared/repository/event_repository_test.go index 47f1448f2..40f1a783c 100644 --- a/pkg/contract_watcher/shared/repository/event_repository_test.go +++ b/pkg/contract_watcher/shared/repository/event_repository_test.go @@ -190,7 +190,7 @@ var _ = Describe("Repository", func() { } Expect(scanLog).To(Equal(expectedLog)) - // Attempt to persist the same log again in seperate call + // Attempt to persist the same log again in separate call err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/contract_watcher/shared/repository/method_repository.go b/pkg/contract_watcher/shared/repository/method_repository.go index 2616675e5..346de5a6c 100644 --- a/pkg/contract_watcher/shared/repository/method_repository.go +++ b/pkg/contract_watcher/shared/repository/method_repository.go @@ -22,6 +22,7 @@ import ( "strings" "github.com/hashicorp/golang-lru" + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" @@ -112,7 +113,10 @@ func (r *methodRepository) persistResults(results []types.Result, methodInfo typ // Add this query to the transaction _, err = tx.Exec(pgStr, data...) if err != nil { - tx.Rollback() + rollbackErr := tx.Rollback() + if rollbackErr != nil { + logrus.Warnf("error rolling back transaction: %s", rollbackErr.Error()) + } return err } } diff --git a/pkg/contract_watcher/shared/retriever/address_retriever_test.go b/pkg/contract_watcher/shared/retriever/address_retriever_test.go index c8aa6fff3..dc8f578ef 100644 --- a/pkg/contract_watcher/shared/retriever/address_retriever_test.go +++ b/pkg/contract_watcher/shared/retriever/address_retriever_test.go @@ -48,12 +48,10 @@ var mockEvent = core.WatchedEvent{ var _ = Describe("Address Retriever Test", func() { var db *postgres.DB var dataStore repository.EventRepository - var err error var info *contract.Contract var vulcanizeLogId int64 var log *types.Log var r retriever.AddressRetriever - var addresses map[common.Address]bool var wantedEvents = []string{"Transfer"} BeforeEach(func() { @@ -61,17 +59,18 @@ var _ = Describe("Address Retriever Test", func() { mockEvent.LogID = vulcanizeLogId event := info.Events["Transfer"] - err = info.GenerateFilters() - Expect(err).ToNot(HaveOccurred()) + filterErr := info.GenerateFilters() + Expect(filterErr).ToNot(HaveOccurred()) c := converter.Converter{} c.Update(info) - log, err = c.Convert(mockEvent, event) - Expect(err).ToNot(HaveOccurred()) + var convertErr error + log, convertErr = c.Convert(mockEvent, event) + Expect(convertErr).ToNot(HaveOccurred()) dataStore = repository.NewEventRepository(db, types.FullSync) - dataStore.PersistLogs([]types.Log{*log}, event, info.Address, info.Name) - Expect(err).ToNot(HaveOccurred()) + persistErr := dataStore.PersistLogs([]types.Log{*log}, event, info.Address, info.Name) + Expect(persistErr).ToNot(HaveOccurred()) r = retriever.NewAddressRetriever(db, types.FullSync) }) @@ -82,8 +81,8 @@ var _ = Describe("Address Retriever Test", func() { Describe("RetrieveTokenHolderAddresses", func() { It("Retrieves a list of token holder addresses from persisted event logs", func() { - addresses, err = r.RetrieveTokenHolderAddresses(*info) - Expect(err).ToNot(HaveOccurred()) + addresses, retrieveErr := r.RetrieveTokenHolderAddresses(*info) + Expect(retrieveErr).ToNot(HaveOccurred()) _, ok := addresses[common.HexToAddress("0x000000000000000000000000000000000000000000000000000000000000af21")] Expect(ok).To(Equal(true)) @@ -100,8 +99,8 @@ var _ = Describe("Address Retriever Test", func() { }) It("Returns empty list when empty contract info is used", func() { - addresses, err = r.RetrieveTokenHolderAddresses(contract.Contract{}) - Expect(err).ToNot(HaveOccurred()) + addresses, retrieveErr := r.RetrieveTokenHolderAddresses(contract.Contract{}) + Expect(retrieveErr).ToNot(HaveOccurred()) Expect(len(addresses)).To(Equal(0)) }) })