From f1d6e417fe76ccfdab76a036d686003373fa23cc Mon Sep 17 00:00:00 2001 From: Edvard Date: Thu, 14 Feb 2019 15:36:33 +0100 Subject: [PATCH 1/4] Add back accidentally removed DB config to staging.toml --- environments/staging.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/environments/staging.toml b/environments/staging.toml index ad320f279..8954a4839 100644 --- a/environments/staging.toml +++ b/environments/staging.toml @@ -1,4 +1,8 @@ [database] + name = "vulcanize_public" + hostname = "localhost" + user = "vulcanize" + password = "vulcanize" port = 5432 [client] From 6cd4e5ea95b6be79edff392f68153b2d267759d4 Mon Sep 17 00:00:00 2001 From: Edvard Date: Thu, 14 Feb 2019 16:03:57 +0100 Subject: [PATCH 2/4] Improve I/O error propagation --- cmd/lightSync.go | 15 +++++--- cmd/sync.go | 22 ++++++++--- pkg/core/blockchain.go | 2 +- pkg/datastore/ethereum/database.go | 2 + .../postgres/repositories/block_repository.go | 17 +++++++-- .../repositories/contract_repository.go | 23 ++++++++---- .../repositories/header_repository.go | 19 ++++------ .../postgres/repositories/logs_repository.go | 23 +++++++++--- .../repositories/receipt_repository.go | 5 +++ .../repositories/watched_events_repository.go | 4 ++ pkg/datastore/repository.go | 7 ++-- pkg/geth/blockchain.go | 15 ++++---- .../converters/common/header_converter.go | 4 +- pkg/history/block_validator.go | 32 +++++++++++++--- pkg/history/header_validator.go | 13 ++++--- pkg/history/populate_blocks.go | 37 +++++++++++++------ pkg/history/populate_headers.go | 15 ++++---- pkg/history/validation_window.go | 21 +++++------ 18 files changed, 179 insertions(+), 97 deletions(-) diff --git a/cmd/lightSync.go b/cmd/lightSync.go index 1b10163ba..f3d472201 100644 --- a/cmd/lightSync.go +++ b/cmd/lightSync.go @@ -62,7 +62,9 @@ func init() { func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64) { populated, err := history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber) if err != nil { - log.Fatal("Error populating headers: ", err) + // TODO Lots of possible errors in the call stack above. If errors occur, we still put + // 0 in the channel, triggering another round + log.Error("backfillAllHeaders: Error populating headers: ", err) } missingBlocksPopulated <- populated } @@ -84,7 +86,7 @@ func lightSync() { case <-ticker.C: window, err := validator.ValidateHeaders() if err != nil { - log.Error("ValidateHeaders failed in lightSync: ", err) + log.Error("lightSync: ValidateHeaders failed: ", err) } log.Info(window.GetString()) case n := <-missingBlocksPopulated: @@ -97,11 +99,14 @@ func lightSync() { } func validateArgs(blockChain *geth.BlockChain) { - lastBlock := blockChain.LastBlock().Int64() - if lastBlock == 0 { + lastBlock, err := blockChain.LastBlock() + if err != nil { + log.Error("validateArgs: Error getting last block: ", err) + } + if lastBlock.Int64() == 0 { log.Fatal("geth initial: state sync not finished") } - if startingBlockNumber > lastBlock { + if startingBlockNumber > lastBlock.Int64() { log.Fatal("starting block number > current block number") } } diff --git a/cmd/sync.go b/cmd/sync.go index 8e1840d38..82ac52b52 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -60,7 +60,11 @@ func init() { } func backFillAllBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, missingBlocksPopulated chan int, startingBlockNumber int64) { - missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber) + populated, err := history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber) + if err != nil { + log.Error("backfillAllBlocks: error in populateMissingBlocks: ", err) + } + missingBlocksPopulated <- populated } func sync() { @@ -68,12 +72,15 @@ func sync() { defer ticker.Stop() blockChain := getBlockChain() - lastBlock := blockChain.LastBlock().Int64() - if lastBlock == 0 { + lastBlock, err := blockChain.LastBlock() + if err != nil { + log.Error("sync: Error getting last block: ", err) + } + if lastBlock.Int64() == 0 { log.Fatal("geth initial: state sync not finished") } - if startingBlockNumber > lastBlock { - log.Fatal("starting block number > current block number") + if startingBlockNumber > lastBlock.Int64() { + log.Fatal("sync: starting block number > current block number") } db := utils.LoadPostgres(databaseConfig, blockChain.Node()) @@ -85,7 +92,10 @@ func sync() { for { select { case <-ticker.C: - window := validator.ValidateBlocks() + window, err := validator.ValidateBlocks() + if err != nil { + log.Error("sync: error in validateBlocks: ", err) + } log.Info(window.GetString()) case <-missingBlocksPopulated: go backFillAllBlocks(blockChain, blockRepository, missingBlocksPopulated, startingBlockNumber) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 829b144bc..fa2a846a5 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -30,7 +30,7 @@ type BlockChain interface { GetHeaderByNumbers(blockNumbers []int64) ([]Header, error) GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error) GetEthLogsWithCustomQuery(query ethereum.FilterQuery) ([]types.Log, error) - LastBlock() *big.Int + LastBlock() (*big.Int, error) Node() Node } diff --git a/pkg/datastore/ethereum/database.go b/pkg/datastore/ethereum/database.go index 2f4bb5770..dc2724c56 100644 --- a/pkg/datastore/ethereum/database.go +++ b/pkg/datastore/ethereum/database.go @@ -18,6 +18,7 @@ package ethereum import ( "fmt" + "github.com/sirupsen/logrus" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -37,6 +38,7 @@ func CreateDatabase(config DatabaseConfig) (Database, error) { case Level: levelDBConnection, err := ethdb.NewLDBDatabase(config.Path, 128, 1024) if err != nil { + logrus.Error("CreateDatabase: error connecting to new LDBD: ", err) return nil, err } levelDBReader := level.NewLevelDatabaseReader(levelDBConnection) diff --git a/pkg/datastore/postgres/repositories/block_repository.go b/pkg/datastore/postgres/repositories/block_repository.go index 92f57144f..ff52fe5a0 100644 --- a/pkg/datastore/postgres/repositories/block_repository.go +++ b/pkg/datastore/postgres/repositories/block_repository.go @@ -43,12 +43,16 @@ func NewBlockRepository(database *postgres.DB) *BlockRepository { return &BlockRepository{database: database} } -func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) { +func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) error { cutoff := chainHead - blocksFromHeadBeforeFinal - blockRepository.database.Exec(` + _, err := blockRepository.database.Exec(` UPDATE blocks SET is_final = TRUE WHERE is_final = FALSE AND number < $1`, cutoff) + if err != nil { + return err + } + return nil } func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) (int64, error) { @@ -70,7 +74,7 @@ func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) (in func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64, nodeId string) []int64 { numbers := make([]int64, 0) - blockRepository.database.Select(&numbers, + err := blockRepository.database.Select(&numbers, `SELECT all_block_numbers FROM ( SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series @@ -79,6 +83,9 @@ func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber i ) `, startingBlockNumber, highestBlockNumber, nodeId) + if err != nil { + log.Error("MissingBlockNumbers: error getting blocks: ", err) + } return numbers } @@ -108,6 +115,7 @@ func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block, case sql.ErrNoRows: return core.Block{}, datastore.ErrBlockDoesNotExist(blockNumber) default: + log.Error("GetBlock: error loading blocks: ", err) return savedBlock, err } } @@ -202,6 +210,7 @@ func (blockRepository BlockRepository) createReceipt(tx *sql.Tx, blockId int64, RETURNING id`, receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId) if err != nil { + log.Error("createReceipt: error inserting receipt: ", err) return receiptId, err } return receiptId, nil @@ -256,6 +265,7 @@ func (blockRepository BlockRepository) loadBlock(blockRows *sqlx.Row) (core.Bloc var block b err := blockRows.StructScan(&block) if err != nil { + log.Error("loadBlock: error loading block: ", err) return core.Block{}, err } transactionRows, err := blockRepository.database.Queryx(` @@ -271,6 +281,7 @@ func (blockRepository BlockRepository) loadBlock(blockRows *sqlx.Row) (core.Bloc WHERE block_id = $1 ORDER BY hash`, block.ID) if err != nil { + log.Error("loadBlock: error fetting transactions: ", err) return core.Block{}, err } block.Transactions = blockRepository.LoadTransactions(transactionRows) diff --git a/pkg/datastore/postgres/repositories/contract_repository.go b/pkg/datastore/postgres/repositories/contract_repository.go index 0ff0afdd1..8f32c4352 100644 --- a/pkg/datastore/postgres/repositories/contract_repository.go +++ b/pkg/datastore/postgres/repositories/contract_repository.go @@ -47,14 +47,17 @@ func (contractRepository ContractRepository) CreateContract(contract core.Contra return nil } -func (contractRepository ContractRepository) ContractExists(contractHash string) bool { +func (contractRepository ContractRepository) ContractExists(contractHash string) (bool, error) { var exists bool - contractRepository.DB.QueryRow( + err := contractRepository.DB.QueryRow( `SELECT exists( SELECT 1 FROM watched_contracts WHERE contract_hash = $1)`, contractHash).Scan(&exists) - return exists + if err != nil { + return false, err + } + return exists, nil } func (contractRepository ContractRepository) GetContract(contractHash string) (core.Contract, error) { @@ -66,12 +69,15 @@ func (contractRepository ContractRepository) GetContract(contractHash string) (c if err == sql.ErrNoRows { return core.Contract{}, datastore.ErrContractDoesNotExist(contractHash) } - savedContract := contractRepository.addTransactions(core.Contract{Hash: hash, Abi: abi}) + savedContract, err := contractRepository.addTransactions(core.Contract{Hash: hash, Abi: abi}) + if err != nil { + return core.Contract{}, err + } return savedContract, nil } -func (contractRepository ContractRepository) addTransactions(contract core.Contract) core.Contract { - transactionRows, _ := contractRepository.DB.Queryx(` +func (contractRepository ContractRepository) addTransactions(contract core.Contract) (core.Contract, error) { + transactionRows, err := contractRepository.DB.Queryx(` SELECT hash, nonce, tx_to, @@ -83,8 +89,11 @@ func (contractRepository ContractRepository) addTransactions(contract core.Contr FROM transactions WHERE tx_to = $1 ORDER BY block_id DESC`, contract.Hash) + if err != nil { + return core.Contract{}, err + } blockRepository := &BlockRepository{contractRepository.DB} transactions := blockRepository.LoadTransactions(transactionRows) savedContract := core.Contract{Hash: contract.Hash, Transactions: transactions, Abi: contract.Abi} - return savedContract + return savedContract, nil } diff --git a/pkg/datastore/postgres/repositories/header_repository.go b/pkg/datastore/postgres/repositories/header_repository.go index 30e4f6e4e..a9770fe73 100644 --- a/pkg/datastore/postgres/repositories/header_repository.go +++ b/pkg/datastore/postgres/repositories/header_repository.go @@ -42,6 +42,7 @@ func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int if headerDoesNotExist(err) { return repository.insertHeader(header) } + log.Error("CreateOrUpdateHeader: error getting header hash: ", err) return 0, err } if headerMustBeReplaced(hash, header) { @@ -54,6 +55,7 @@ func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, er var header core.Header err := repository.database.Get(&header, `SELECT id, block_number, hash, raw, block_timestamp FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, blockNumber, repository.database.Node.ID) + log.Error("GetHeader: error getting headers: ", err) return header, err } @@ -74,18 +76,6 @@ func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endi return numbers, nil } -func (repository HeaderRepository) HeaderExists(blockNumber int64) (bool, error) { - _, err := repository.GetHeader(blockNumber) - if err != nil { - if headerDoesNotExist(err) { - return false, nil - } - return false, err - } - - return true, nil -} - func headerMustBeReplaced(hash string, header core.Header) bool { return hash != header.Hash } @@ -98,6 +88,7 @@ func (repository HeaderRepository) getHeaderHash(header core.Header) (string, er var hash string err := repository.database.Get(&hash, `SELECT hash FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, header.BlockNumber, repository.database.Node.ID) + log.Error("getHeaderHash: error getting headers: ", err) return hash, err } @@ -106,6 +97,9 @@ func (repository HeaderRepository) insertHeader(header core.Header) (int64, erro err := repository.database.QueryRowx( `INSERT INTO public.headers (block_number, hash, block_timestamp, raw, eth_node_id, eth_node_fingerprint) VALUES ($1, $2, $3::NUMERIC, $4, $5, $6) RETURNING id`, header.BlockNumber, header.Hash, header.Timestamp, header.Raw, repository.database.NodeID, repository.database.Node.ID).Scan(&headerId) + if err != nil { + log.Error("insertHeader: error inserting header: ", err) + } return headerId, err } @@ -113,6 +107,7 @@ func (repository HeaderRepository) replaceHeader(header core.Header) (int64, err _, err := repository.database.Exec(`DELETE FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, header.BlockNumber, repository.database.Node.ID) if err != nil { + log.Error("replaceHeader: error deleting headers: ", err) return 0, err } return repository.insertHeader(header) diff --git a/pkg/datastore/postgres/repositories/logs_repository.go b/pkg/datastore/postgres/repositories/logs_repository.go index 4886e15f7..9bdaca849 100644 --- a/pkg/datastore/postgres/repositories/logs_repository.go +++ b/pkg/datastore/postgres/repositories/logs_repository.go @@ -18,6 +18,7 @@ package repositories import ( "context" + "github.com/sirupsen/logrus" "database/sql" @@ -43,12 +44,16 @@ func (logRepository LogRepository) CreateLogs(lgs []core.Log, receiptId int64) e return postgres.ErrDBInsertFailed } } - tx.Commit() + err := tx.Commit() + if err != nil { + tx.Rollback() + return postgres.ErrDBInsertFailed + } return nil } -func (logRepository LogRepository) GetLogs(address string, blockNumber int64) []core.Log { - logRows, _ := logRepository.DB.Query( +func (logRepository LogRepository) GetLogs(address string, blockNumber int64) ([]core.Log, error) { + logRows, err := logRepository.DB.Query( `SELECT block_number, address, tx_hash, @@ -61,10 +66,13 @@ func (logRepository LogRepository) GetLogs(address string, blockNumber int64) [] FROM logs WHERE address = $1 AND block_number = $2 ORDER BY block_number DESC`, address, blockNumber) + if err != nil { + return []core.Log{}, err + } return logRepository.loadLogs(logRows) } -func (logRepository LogRepository) loadLogs(logsRows *sql.Rows) []core.Log { +func (logRepository LogRepository) loadLogs(logsRows *sql.Rows) ([]core.Log, error) { var lgs []core.Log for logsRows.Next() { var blockNumber int64 @@ -73,7 +81,10 @@ func (logRepository LogRepository) loadLogs(logsRows *sql.Rows) []core.Log { var index int64 var data string var topics core.Topics - logsRows.Scan(&blockNumber, &address, &txHash, &index, &topics[0], &topics[1], &topics[2], &topics[3], &data) + err := logsRows.Scan(&blockNumber, &address, &txHash, &index, &topics[0], &topics[1], &topics[2], &topics[3], &data) + if err != nil { + logrus.Warn("loadLogs: Error scanning a row in logRows") + } lg := core.Log{ BlockNumber: blockNumber, TxHash: txHash, @@ -86,5 +97,5 @@ func (logRepository LogRepository) loadLogs(logsRows *sql.Rows) []core.Log { } lgs = append(lgs, lg) } - return lgs + return lgs, nil } diff --git a/pkg/datastore/postgres/repositories/receipt_repository.go b/pkg/datastore/postgres/repositories/receipt_repository.go index 9981559cf..6554775be 100644 --- a/pkg/datastore/postgres/repositories/receipt_repository.go +++ b/pkg/datastore/postgres/repositories/receipt_repository.go @@ -19,6 +19,7 @@ package repositories import ( "context" "database/sql" + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore" @@ -61,6 +62,9 @@ func createReceipt(receipt core.Receipt, blockId int64, tx *sql.Tx) (int64, erro RETURNING id`, receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId, ).Scan(&receiptId) + if err != nil { + logrus.Error("createReceipt: Error inserting: ", err) + } return receiptId, err } @@ -90,6 +94,7 @@ func (receiptRepository ReceiptRepository) CreateReceipt(blockId int64, receipt receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId) if err != nil { tx.Rollback() + logrus.Warning("CreateReceipt: error inserting receipt: ", err) return receiptId, err } tx.Commit() diff --git a/pkg/datastore/postgres/repositories/watched_events_repository.go b/pkg/datastore/postgres/repositories/watched_events_repository.go index b59316660..a08b6414f 100644 --- a/pkg/datastore/postgres/repositories/watched_events_repository.go +++ b/pkg/datastore/postgres/repositories/watched_events_repository.go @@ -17,6 +17,7 @@ package repositories import ( + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) @@ -28,6 +29,7 @@ type WatchedEventRepository struct { func (watchedEventRepository WatchedEventRepository) GetWatchedEvents(name string) ([]*core.WatchedEvent, error) { rows, err := watchedEventRepository.DB.Queryx(`SELECT id, name, block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data FROM watched_event_logs where name=$1`, name) if err != nil { + logrus.Error("GetWatchedEvents: error getting watched events: ", err) return nil, err } defer rows.Close() @@ -37,11 +39,13 @@ func (watchedEventRepository WatchedEventRepository) GetWatchedEvents(name strin lg := new(core.WatchedEvent) err = rows.StructScan(lg) if err != nil { + logrus.Warn("GetWatchedEvents: error scanning log: ", err) return nil, err } lgs = append(lgs, lg) } if err = rows.Err(); err != nil { + logrus.Warn("GetWatchedEvents: error scanning logs: ", err) return nil, err } return lgs, nil diff --git a/pkg/datastore/repository.go b/pkg/datastore/repository.go index 32e6c0a8a..9ae4841ba 100644 --- a/pkg/datastore/repository.go +++ b/pkg/datastore/repository.go @@ -31,7 +31,7 @@ type BlockRepository interface { CreateOrUpdateBlock(block core.Block) (int64, error) GetBlock(blockNumber int64) (core.Block, error) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 - SetBlocksStatus(chainHead int64) + SetBlocksStatus(chainHead int64) error } var ErrContractDoesNotExist = func(contractHash string) error { @@ -41,7 +41,7 @@ var ErrContractDoesNotExist = func(contractHash string) error { type ContractRepository interface { CreateContract(contract core.Contract) error GetContract(contractHash string) (core.Contract, error) - ContractExists(contractHash string) bool + ContractExists(contractHash string) (bool, error) } var ErrFilterDoesNotExist = func(name string) error { @@ -57,12 +57,11 @@ type HeaderRepository interface { CreateOrUpdateHeader(header core.Header) (int64, error) GetHeader(blockNumber int64) (core.Header, error) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) - HeaderExists(blockNumber int64) (bool, error) } type LogRepository interface { CreateLogs(logs []core.Log, receiptId int64) error - GetLogs(address string, blockNumber int64) []core.Log + GetLogs(address string, blockNumber int64) ([]core.Log, error) } var ErrReceiptDoesNotExist = func(txHash string) error { diff --git a/pkg/geth/blockchain.go b/pkg/geth/blockchain.go index 347aa7c4a..819c9c031 100644 --- a/pkg/geth/blockchain.go +++ b/pkg/geth/blockchain.go @@ -81,7 +81,7 @@ func (blockChain *BlockChain) getPOWHeader(blockNumber int64) (header core.Heade if err != nil { return header, err } - return blockChain.headerConverter.Convert(gethHeader, gethHeader.Hash().String()) + return blockChain.headerConverter.Convert(gethHeader, gethHeader.Hash().String()), nil } func (blockChain *BlockChain) getPOWHeaders(blockNumbers []int64) (headers []core.Header, err error) { @@ -113,8 +113,7 @@ func (blockChain *BlockChain) getPOWHeaders(blockNumbers []int64) (headers []cor for _, POWHeader := range POWHeaders { if POWHeader.Number != nil { - header, _ := blockChain.headerConverter.Convert(&POWHeader, POWHeader.Hash().String()) - + header := blockChain.headerConverter.Convert(&POWHeader, POWHeader.Hash().String()) headers = append(headers, header) } } @@ -147,7 +146,7 @@ func (blockChain *BlockChain) getPOAHeader(blockNumber int64) (header core.Heade GasUsed: uint64(POAHeader.GasUsed), Time: POAHeader.Time.ToInt(), Extra: POAHeader.Extra, - }, POAHeader.Hash.String()) + }, POAHeader.Hash.String()), nil } func (blockChain *BlockChain) getPOAHeaders(blockNumbers []int64) (headers []core.Header, err error) { @@ -182,7 +181,7 @@ func (blockChain *BlockChain) getPOAHeaders(blockNumbers []int64) (headers []cor var header core.Header //Header.Number of the newest block will return nil. if _, err := strconv.ParseUint(POAHeader.Number.ToInt().String(), 16, 64); err == nil { - header, _ = blockChain.headerConverter.Convert(&types.Header{ + header = blockChain.headerConverter.Convert(&types.Header{ ParentHash: POAHeader.ParentHash, UncleHash: POAHeader.UncleHash, Coinbase: POAHeader.Coinbase, @@ -232,9 +231,9 @@ func (blockChain *BlockChain) GetEthLogsWithCustomQuery(query ethereum.FilterQue return gethLogs, nil } -func (blockChain *BlockChain) LastBlock() *big.Int { - block, _ := blockChain.ethClient.HeaderByNumber(context.Background(), nil) - return block.Number +func (blockChain *BlockChain) LastBlock() (*big.Int, error) { + block, err := blockChain.ethClient.HeaderByNumber(context.Background(), nil) + return block.Number, err } func (blockChain *BlockChain) Node() core.Node { diff --git a/pkg/geth/converters/common/header_converter.go b/pkg/geth/converters/common/header_converter.go index c160e4273..a890aa139 100644 --- a/pkg/geth/converters/common/header_converter.go +++ b/pkg/geth/converters/common/header_converter.go @@ -25,7 +25,7 @@ import ( type HeaderConverter struct{} -func (converter HeaderConverter) Convert(gethHeader *types.Header, blockHash string) (core.Header, error) { +func (converter HeaderConverter) Convert(gethHeader *types.Header, blockHash string) core.Header { rawHeader, err := json.Marshal(gethHeader) if err != nil { panic(err) @@ -36,5 +36,5 @@ func (converter HeaderConverter) Convert(gethHeader *types.Header, blockHash str Raw: rawHeader, Timestamp: gethHeader.Time.String(), } - return coreHeader, nil + return coreHeader } diff --git a/pkg/history/block_validator.go b/pkg/history/block_validator.go index 6f7791a25..88be8c29c 100644 --- a/pkg/history/block_validator.go +++ b/pkg/history/block_validator.go @@ -17,6 +17,7 @@ package history import ( + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore" ) @@ -35,11 +36,30 @@ func NewBlockValidator(blockchain core.BlockChain, blockRepository datastore.Blo } } -func (bv BlockValidator) ValidateBlocks() ValidationWindow { - window := MakeValidationWindow(bv.blockchain, bv.windowSize) +func (bv BlockValidator) ValidateBlocks() (ValidationWindow, error) { + window, err := MakeValidationWindow(bv.blockchain, bv.windowSize) + if err != nil { + logrus.Error("ValidateBlocks: error creating validation window: ", err) + return ValidationWindow{}, err + } + blockNumbers := MakeRange(window.LowerBound, window.UpperBound) - RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers) - lastBlock := bv.blockchain.LastBlock().Int64() - bv.blockRepository.SetBlocksStatus(lastBlock) - return window + _, err = RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers) + if err != nil { + logrus.Error("ValidateBlocks: error getting and updating blocks: ", err) + return ValidationWindow{}, err + } + + lastBlock, err := bv.blockchain.LastBlock() + if err != nil { + logrus.Error("ValidateBlocks: error getting last block: ", err) + return ValidationWindow{}, err + } + + err = bv.blockRepository.SetBlocksStatus(lastBlock.Int64()) + if err != nil { + logrus.Error("ValidateBlocks: error setting block status: ", err) + return ValidationWindow{}, err + } + return window, nil } diff --git a/pkg/history/header_validator.go b/pkg/history/header_validator.go index c475473dd..d65d1a3d4 100644 --- a/pkg/history/header_validator.go +++ b/pkg/history/header_validator.go @@ -17,8 +17,7 @@ package history import ( - log "github.com/sirupsen/logrus" - + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore" ) @@ -38,11 +37,15 @@ func NewHeaderValidator(blockChain core.BlockChain, repository datastore.HeaderR } func (validator HeaderValidator) ValidateHeaders() (ValidationWindow, error) { - window := MakeValidationWindow(validator.blockChain, validator.windowSize) + window, err := MakeValidationWindow(validator.blockChain, validator.windowSize) + if err != nil { + logrus.Error("ValidateHeaders: error creating validation window: ", err) + return ValidationWindow{}, err + } blockNumbers := MakeRange(window.LowerBound, window.UpperBound) - _, err := RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers) + _, err = RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers) if err != nil { - log.Error("Error in ValidateHeaders: ", err) + logrus.Error("ValidateHeaders: error getting/updating headers: ", err) return ValidationWindow{}, err } return window, nil diff --git a/pkg/history/populate_blocks.go b/pkg/history/populate_blocks.go index 72f388408..6ea832377 100644 --- a/pkg/history/populate_blocks.go +++ b/pkg/history/populate_blocks.go @@ -23,28 +23,41 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore" ) -func PopulateMissingBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, startingBlockNumber int64) int { - lastBlock := blockchain.LastBlock().Int64() - blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) +func PopulateMissingBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, startingBlockNumber int64) (int, error) { + lastBlock, err := blockchain.LastBlock() + if err != nil { + log.Error("PopulateMissingBlocks: error getting last block: ", err) + return 0, err + } + blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock.Int64(), blockchain.Node().ID) if len(blockRange) == 0 { - return 0 + return 0, nil } log.Printf("Backfilling %d blocks\n\n", len(blockRange)) - RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange) - return len(blockRange) + _, err = RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange) + if err != nil { + log.Error("PopulateMissingBlocks: error gettings/updating blocks: ", err) + return 0, err + } + return len(blockRange), nil } -func RetrieveAndUpdateBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, blockNumbers []int64) int { +func RetrieveAndUpdateBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, blockNumbers []int64) (int, error) { for _, blockNumber := range blockNumbers { block, err := blockchain.GetBlockByNumber(blockNumber) if err != nil { - log.Printf("failed to retrieve block number: %d\n", blockNumber) - return 0 + log.Error("RetrieveAndUpdateBlocks: error getting block: ", err) + return 0, err + } + + _, err = blockRepository.CreateOrUpdateBlock(block) + if err != nil { + log.Error("RetrieveAndUpdateBlocks: error creating/updating block: ", err) + return 0, err } - // TODO: handle possible error here - blockRepository.CreateOrUpdateBlock(block) + } - return len(blockNumbers) + return len(blockNumbers), nil } diff --git a/pkg/history/populate_headers.go b/pkg/history/populate_headers.go index 499dcb86e..b60b7eb43 100644 --- a/pkg/history/populate_headers.go +++ b/pkg/history/populate_headers.go @@ -25,25 +25,24 @@ import ( ) func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) { - lastBlock := blockchain.LastBlock().Int64() - headerAlreadyExists, err := headerRepository.HeaderExists(lastBlock) - + lastBlock, err := blockchain.LastBlock() if err != nil { - log.Error("Error in checking header in PopulateMissingHeaders: ", err) + log.Error("PopulateMissingHeaders: Error getting last block: ", err) return 0, err - } else if headerAlreadyExists { - return 0, nil } - blockNumbers, err := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) + blockNumbers, err := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock.Int64(), blockchain.Node().ID) if err != nil { - log.Error("Error getting missing block numbers in PopulateMissingHeaders: ", err) + log.Error("PopulateMissingHeaders: Error getting missing block numbers: ", err) return 0, err + } else if len(blockNumbers) == 0 { + return 0, nil } log.Printf("Backfilling %d blocks\n\n", len(blockNumbers)) _, err = RetrieveAndUpdateHeaders(blockchain, headerRepository, blockNumbers) if err != nil { + log.Error("PopulateMissingHeaders: Error getting/updating headers:", err) return 0, err } return len(blockNumbers), nil diff --git a/pkg/history/validation_window.go b/pkg/history/validation_window.go index f6ba1a18a..0ce41b786 100644 --- a/pkg/history/validation_window.go +++ b/pkg/history/validation_window.go @@ -18,17 +18,10 @@ package history import ( "fmt" + log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" - "text/template" ) -const WindowTemplate = `Validating Blocks -|{{.LowerBound}}|-- Validation Window --|{{.UpperBound}}| ({{.UpperBound}}:HEAD) - -` - -var ParsedWindowTemplate = *template.Must(template.New("window").Parse(WindowTemplate)) - type ValidationWindow struct { LowerBound int64 UpperBound int64 @@ -38,10 +31,14 @@ func (window ValidationWindow) Size() int { return int(window.UpperBound - window.LowerBound) } -func MakeValidationWindow(blockchain core.BlockChain, windowSize int) ValidationWindow { - upperBound := blockchain.LastBlock().Int64() - lowerBound := upperBound - int64(windowSize) - return ValidationWindow{lowerBound, upperBound} +func MakeValidationWindow(blockchain core.BlockChain, windowSize int) (ValidationWindow, error) { + upperBound, err := blockchain.LastBlock() + if err != nil { + log.Error("MakeValidationWindow: error getting LastBlock: ", err) + return ValidationWindow{}, err + } + lowerBound := upperBound.Int64() - int64(windowSize) + return ValidationWindow{lowerBound, upperBound.Int64()}, nil } func MakeRange(min, max int64) []int64 { From 13eacf2081b6af8b679d00b0f7175cd994701ec6 Mon Sep 17 00:00:00 2001 From: Edvard Date: Thu, 14 Feb 2019 16:04:12 +0100 Subject: [PATCH 3/4] Update tests for error propagation --- integration_test/geth_blockchain_test.go | 4 ++-- pkg/datastore/postgres/postgres_test.go | 3 ++- .../repositories/header_repository_test.go | 17 ----------------- .../repositories/logs_repository_test.go | 19 ++++++++++++------- .../repositories/receipts_repository_test.go | 9 +++++++-- pkg/fakes/mock_block_repository.go | 3 ++- pkg/fakes/mock_blockchain.go | 4 ++-- pkg/fakes/mock_header_repository.go | 8 -------- pkg/geth/blockchain_test.go | 2 +- pkg/geth/cold_import/importer_test.go | 15 ++++++++------- .../common/header_converter_test.go | 6 ++---- pkg/history/block_validator_test.go | 5 +++-- pkg/history/populate_blocks_test.go | 17 +++++++++++------ pkg/history/populate_headers_test.go | 1 - pkg/history/validation_window_test.go | 3 ++- 15 files changed, 54 insertions(+), 62 deletions(-) diff --git a/integration_test/geth_blockchain_test.go b/integration_test/geth_blockchain_test.go index 547197741..2cd85cb3b 100644 --- a/integration_test/geth_blockchain_test.go +++ b/integration_test/geth_blockchain_test.go @@ -48,7 +48,7 @@ var _ = Describe("Reading from the Geth blockchain", func() { It("reads two blocks", func(done Done) { blocks := fakes.NewMockBlockRepository() - lastBlock := blockChain.LastBlock() + lastBlock, _ := blockChain.LastBlock() queriedBlocks := []int64{lastBlock.Int64() - 5, lastBlock.Int64() - 6} history.RetrieveAndUpdateBlocks(blockChain, blocks, queriedBlocks) blocks.AssertCreateOrUpdateBlocksCallCountAndBlockNumbersEquals(2, []int64{lastBlock.Int64() - 5, lastBlock.Int64() - 6}) @@ -60,7 +60,7 @@ var _ = Describe("Reading from the Geth blockchain", func() { Expect(err).ToNot(HaveOccurred()) firstBlock, err := blockChain.GetBlockByNumber(int64(1)) Expect(err).ToNot(HaveOccurred()) - lastBlockNumber := blockChain.LastBlock() + lastBlockNumber, _ := blockChain.LastBlock() Expect(genesisBlock.Number).To(Equal(int64(0))) Expect(firstBlock.Number).To(Equal(int64(1))) diff --git a/pkg/datastore/postgres/postgres_test.go b/pkg/datastore/postgres/postgres_test.go index 447665e3a..ea68c49a4 100644 --- a/pkg/datastore/postgres/postgres_test.go +++ b/pkg/datastore/postgres/postgres_test.go @@ -134,8 +134,9 @@ var _ = Describe("Postgres DB", func() { err := logRepository.CreateLogs([]core.Log{badLog}, 123) Expect(err).ToNot(BeNil()) - savedBlock := logRepository.GetLogs("x123", 1) + savedBlock, err := logRepository.GetLogs("x123", 1) Expect(savedBlock).To(BeNil()) + Expect(err).To(Not(HaveOccurred())) }) It("does not commit block or transactions if transaction is invalid", func() { diff --git a/pkg/datastore/postgres/repositories/header_repository_test.go b/pkg/datastore/postgres/repositories/header_repository_test.go index fbcd50bbd..ba97195ef 100644 --- a/pkg/datastore/postgres/repositories/header_repository_test.go +++ b/pkg/datastore/postgres/repositories/header_repository_test.go @@ -273,21 +273,4 @@ var _ = Describe("Block header repository", func() { Expect(missingBlockNumbers).To(ConsistOf([]int64{1, 2, 3, 4, 5})) }) }) - - Describe("HeaderExists", func() { - It("returns true if the header record exists", func() { - _, err = repo.CreateOrUpdateHeader(header) - Expect(err).NotTo(HaveOccurred()) - - result, err := repo.HeaderExists(header.BlockNumber) - Expect(err).NotTo(HaveOccurred()) - Expect(result).To(BeTrue()) - }) - - It("returns false if the header record doesn't exist", func() { - result, err := repo.HeaderExists(1) - Expect(err).NotTo(HaveOccurred()) - Expect(result).To(BeFalse()) - }) - }) }) diff --git a/pkg/datastore/postgres/repositories/logs_repository_test.go b/pkg/datastore/postgres/repositories/logs_repository_test.go index 1f88cc6ca..1402d6724 100644 --- a/pkg/datastore/postgres/repositories/logs_repository_test.go +++ b/pkg/datastore/postgres/repositories/logs_repository_test.go @@ -65,8 +65,9 @@ var _ = Describe("Logs Repository", func() { Data: "xabc", }}, receiptId) - log := logsRepository.GetLogs("x123", blockNumber) + log, err := logsRepository.GetLogs("x123", blockNumber) + Expect(err).NotTo(HaveOccurred()) Expect(log).NotTo(BeNil()) Expect(log[0].BlockNumber).To(Equal(blockNumber)) Expect(log[0].Address).To(Equal("x123")) @@ -79,7 +80,8 @@ var _ = Describe("Logs Repository", func() { }) It("returns nil if log does not exist", func() { - log := logsRepository.GetLogs("x123", 1) + log, err := logsRepository.GetLogs("x123", 1) + Expect(err).NotTo(HaveOccurred()) Expect(log).To(BeNil()) }) @@ -89,7 +91,7 @@ var _ = Describe("Logs Repository", func() { Expect(err).NotTo(HaveOccurred()) receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}) Expect(err).NotTo(HaveOccurred()) - logsRepository.CreateLogs([]core.Log{{ + _ = logsRepository.CreateLogs([]core.Log{{ BlockNumber: blockNumber, Index: 0, Address: "x123", @@ -97,7 +99,7 @@ var _ = Describe("Logs Repository", func() { Topics: core.Topics{0: "x777", 1: "x888", 2: "x999"}, Data: "xabc", }}, receiptId) - logsRepository.CreateLogs([]core.Log{{ + _ = logsRepository.CreateLogs([]core.Log{{ BlockNumber: blockNumber, Index: 1, Address: "x123", @@ -105,7 +107,7 @@ var _ = Describe("Logs Repository", func() { Topics: core.Topics{0: "x111", 1: "x222", 2: "x333"}, Data: "xdef", }}, receiptId) - logsRepository.CreateLogs([]core.Log{{ + _ = logsRepository.CreateLogs([]core.Log{{ BlockNumber: 2, Index: 0, Address: "x123", @@ -114,12 +116,14 @@ var _ = Describe("Logs Repository", func() { Data: "xabc", }}, receiptId) - log := logsRepository.GetLogs("x123", blockNumber) + log, err := logsRepository.GetLogs("x123", blockNumber) + Expect(err).NotTo(HaveOccurred()) type logIndex struct { blockNumber int64 Index int64 } + var uniqueBlockNumbers []logIndex for _, log := range log { uniqueBlockNumbers = append(uniqueBlockNumbers, @@ -199,8 +203,9 @@ var _ = Describe("Logs Repository", func() { block := core.Block{Transactions: []core.Transaction{transaction}} _, err := blockRepository.CreateOrUpdateBlock(block) Expect(err).To(Not(HaveOccurred())) - retrievedLogs := logsRepository.GetLogs("0x99041f808d598b782d5a3e498681c2452a31da08", 4745407) + retrievedLogs, err := logsRepository.GetLogs("0x99041f808d598b782d5a3e498681c2452a31da08", 4745407) + Expect(err).NotTo(HaveOccurred()) expected := logs[1:] Expect(retrievedLogs).To(Equal(expected)) }) diff --git a/pkg/datastore/postgres/repositories/receipts_repository_test.go b/pkg/datastore/postgres/repositories/receipts_repository_test.go index 4ba64db45..33572bb77 100644 --- a/pkg/datastore/postgres/repositories/receipts_repository_test.go +++ b/pkg/datastore/postgres/repositories/receipts_repository_test.go @@ -87,14 +87,19 @@ var _ = Describe("Receipts Repository", func() { Expect(err).NotTo(HaveOccurred()) Expect(persistedReceiptOne).NotTo(BeNil()) Expect(persistedReceiptOne.TxHash).To(Equal(txHashOne)) + persistedReceiptTwo, err := receiptRepository.GetReceipt(txHashTwo) Expect(err).NotTo(HaveOccurred()) Expect(persistedReceiptTwo).NotTo(BeNil()) Expect(persistedReceiptTwo.TxHash).To(Equal(txHashTwo)) - persistedAddressOneLogs := logRepository.GetLogs(addressOne, blockNumber) + + persistedAddressOneLogs, err := logRepository.GetLogs(addressOne, blockNumber) + Expect(err).NotTo(HaveOccurred()) Expect(persistedAddressOneLogs).NotTo(BeNil()) Expect(len(persistedAddressOneLogs)).To(Equal(2)) - persistedAddressTwoLogs := logRepository.GetLogs(addressTwo, blockNumber) + + persistedAddressTwoLogs, err := logRepository.GetLogs(addressTwo, blockNumber) + Expect(err).NotTo(HaveOccurred()) Expect(persistedAddressTwoLogs).NotTo(BeNil()) Expect(len(persistedAddressTwoLogs)).To(Equal(1)) }) diff --git a/pkg/fakes/mock_block_repository.go b/pkg/fakes/mock_block_repository.go index b39a034f6..4eca601ff 100644 --- a/pkg/fakes/mock_block_repository.go +++ b/pkg/fakes/mock_block_repository.go @@ -85,9 +85,10 @@ func (repository *MockBlockRepository) MissingBlockNumbers(startingBlockNumber i return repository.missingBlockNumbersReturnArray } -func (repository *MockBlockRepository) SetBlocksStatus(chainHead int64) { +func (repository *MockBlockRepository) SetBlocksStatus(chainHead int64) error { repository.setBlockStatusCalled = true repository.setBlockStatusPassedChainHead = chainHead + return nil } func (repository *MockBlockRepository) AssertCreateOrUpdateBlockCallCountEquals(times int) { diff --git a/pkg/fakes/mock_blockchain.go b/pkg/fakes/mock_blockchain.go index 1cedff568..80cdff2ff 100644 --- a/pkg/fakes/mock_blockchain.go +++ b/pkg/fakes/mock_blockchain.go @@ -108,8 +108,8 @@ func (chain *MockBlockChain) CallContract(contractHash string, input []byte, blo return []byte{}, nil } -func (chain *MockBlockChain) LastBlock() *big.Int { - return chain.lastBlock +func (chain *MockBlockChain) LastBlock() (*big.Int, error) { + return chain.lastBlock, nil } func (chain *MockBlockChain) Node() core.Node { diff --git a/pkg/fakes/mock_header_repository.go b/pkg/fakes/mock_header_repository.go index a8a4e6ba9..05f06bb3f 100644 --- a/pkg/fakes/mock_header_repository.go +++ b/pkg/fakes/mock_header_repository.go @@ -65,14 +65,6 @@ func (repository *MockHeaderRepository) MissingBlockNumbers(startingBlockNumber, return repository.missingBlockNumbers, nil } -func (repository *MockHeaderRepository) HeaderExists(blockNumber int64) (bool, error) { - return repository.headerExists, nil -} - -func (repository *MockHeaderRepository) SetHeaderExists(headerExists bool) { - repository.headerExists = headerExists -} - func (repository *MockHeaderRepository) SetGetHeaderError(err error) { repository.getHeaderError = err } diff --git a/pkg/geth/blockchain_test.go b/pkg/geth/blockchain_test.go index 217e53266..370563e22 100644 --- a/pkg/geth/blockchain_test.go +++ b/pkg/geth/blockchain_test.go @@ -221,7 +221,7 @@ var _ = Describe("Geth blockchain", func() { blockNumber := int64(100) mockClient.SetHeaderByNumberReturnHeader(&types.Header{Number: big.NewInt(blockNumber)}) - result := blockChain.LastBlock() + result, _ := blockChain.LastBlock() mockClient.AssertHeaderByNumberCalledWith(context.Background(), nil) Expect(result).To(Equal(big.NewInt(blockNumber))) diff --git a/pkg/geth/cold_import/importer_test.go b/pkg/geth/cold_import/importer_test.go index 3e50b1103..9277fd15a 100644 --- a/pkg/geth/cold_import/importer_test.go +++ b/pkg/geth/cold_import/importer_test.go @@ -20,7 +20,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/geth/cold_import" @@ -55,7 +54,8 @@ var _ = Describe("Geth cold importer", func() { mockEthereumDatabase.SetReturnBlock(fakeGethBlock) importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) - importer.Execute(startingBlockNumber, endingBlockNumber, nodeId) + err := importer.Execute(startingBlockNumber, endingBlockNumber, nodeId) + Expect(err).NotTo(HaveOccurred()) mockBlockRepository.AssertMissingBlockNumbersCalledWith(startingBlockNumber, endingBlockNumber, nodeId) mockEthereumDatabase.AssertGetBlockHashCalledWith(missingBlockNumber) @@ -76,7 +76,8 @@ var _ = Describe("Geth cold importer", func() { mockEthereumDatabase.SetReturnBlock(fakeGethBlock) importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) - importer.Execute(blockNumber, blockNumber, "node_id") + err := importer.Execute(blockNumber, blockNumber, "node_id") + Expect(err).NotTo(HaveOccurred()) mockEthereumDatabase.AssertGetBlockHashCalledWith(blockNumber) mockEthereumDatabase.AssertGetBlockCalledWith(fakeHash, blockNumber) @@ -101,8 +102,8 @@ var _ = Describe("Geth cold importer", func() { mockEthereumDatabase.SetReturnBlock(fakeGethBlock) importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) - importer.Execute(startingBlockNumber, endingBlockNumber, "node_id") - + err := importer.Execute(startingBlockNumber, endingBlockNumber, "node_id") + Expect(err).NotTo(HaveOccurred()) mockBlockRepository.AssertSetBlockStatusCalledWith(endingBlockNumber) }) @@ -122,8 +123,8 @@ var _ = Describe("Geth cold importer", func() { mockEthereumDatabase.SetReturnReceipts(fakeReceipts) importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) - importer.Execute(blockNumber, blockNumber, "node_id") - + err := importer.Execute(blockNumber, blockNumber, "node_id") + Expect(err).NotTo(HaveOccurred()) expectedReceipts := vulcCommon.ToCoreReceipts(fakeReceipts) mockReceiptRepository.AssertCreateReceiptsAndLogsCalledWith(blockId, expectedReceipts) }) diff --git a/pkg/geth/converters/common/header_converter_test.go b/pkg/geth/converters/common/header_converter_test.go index 7eb12d836..c7d5c9734 100644 --- a/pkg/geth/converters/common/header_converter_test.go +++ b/pkg/geth/converters/common/header_converter_test.go @@ -44,9 +44,8 @@ var _ = Describe("Block header converter", func() { converter := common2.HeaderConverter{} hash := fakes.FakeHash.String() - coreHeader, err := converter.Convert(gethHeader, hash) + coreHeader := converter.Convert(gethHeader, hash) - Expect(err).NotTo(HaveOccurred()) Expect(coreHeader.BlockNumber).To(Equal(gethHeader.Number.Int64())) Expect(coreHeader.Hash).To(Equal(hash)) Expect(coreHeader.Timestamp).To(Equal(gethHeader.Time.String())) @@ -56,9 +55,8 @@ var _ = Describe("Block header converter", func() { gethHeader := types.Header{Number: big.NewInt(123)} converter := common2.HeaderConverter{} - coreHeader, err := converter.Convert(&gethHeader, fakes.FakeHash.String()) + coreHeader := converter.Convert(&gethHeader, fakes.FakeHash.String()) - Expect(err).NotTo(HaveOccurred()) expectedJSON, err := json.Marshal(gethHeader) Expect(err).NotTo(HaveOccurred()) Expect(coreHeader.Raw).To(Equal(expectedJSON)) diff --git a/pkg/history/block_validator_test.go b/pkg/history/block_validator_test.go index 3509e20e5..2022dc6de 100644 --- a/pkg/history/block_validator_test.go +++ b/pkg/history/block_validator_test.go @@ -34,7 +34,8 @@ var _ = Describe("Blocks validator", func() { blocksRepository := fakes.NewMockBlockRepository() validator := history.NewBlockValidator(blockChain, blocksRepository, 2) - window := validator.ValidateBlocks() + window, err := validator.ValidateBlocks() + Expect(err).NotTo(HaveOccurred()) Expect(window).To(Equal(history.ValidationWindow{LowerBound: 5, UpperBound: 7})) blocksRepository.AssertCreateOrUpdateBlockCallCountEquals(3) @@ -43,7 +44,7 @@ var _ = Describe("Blocks validator", func() { It("returns the number of largest block", func() { blockChain := fakes.NewMockBlockChain() blockChain.SetLastBlock(big.NewInt(3)) - maxBlockNumber := blockChain.LastBlock() + maxBlockNumber, _ := blockChain.LastBlock() Expect(maxBlockNumber.Int64()).To(Equal(int64(3))) }) diff --git a/pkg/history/populate_blocks_test.go b/pkg/history/populate_blocks_test.go index 5ff67a13a..d629da283 100644 --- a/pkg/history/populate_blocks_test.go +++ b/pkg/history/populate_blocks_test.go @@ -36,8 +36,9 @@ var _ = Describe("Populating blocks", func() { blockChain.SetLastBlock(big.NewInt(2)) blockRepository.SetMissingBlockNumbersReturnArray([]int64{2}) - blocksAdded := history.PopulateMissingBlocks(blockChain, blockRepository, 1) - _, err := blockRepository.GetBlock(1) + blocksAdded, err := history.PopulateMissingBlocks(blockChain, blockRepository, 1) + Expect(err).NotTo(HaveOccurred()) + _, err = blockRepository.GetBlock(1) Expect(blocksAdded).To(Equal(1)) Expect(err).ToNot(HaveOccurred()) @@ -48,8 +49,9 @@ var _ = Describe("Populating blocks", func() { blockChain.SetLastBlock(big.NewInt(13)) blockRepository.SetMissingBlockNumbersReturnArray([]int64{5, 8, 10}) - blocksAdded := history.PopulateMissingBlocks(blockChain, blockRepository, 5) + blocksAdded, err := history.PopulateMissingBlocks(blockChain, blockRepository, 5) + Expect(err).NotTo(HaveOccurred()) Expect(blocksAdded).To(Equal(3)) blockRepository.AssertCreateOrUpdateBlocksCallCountAndBlockNumbersEquals(3, []int64{5, 8, 10}) }) @@ -59,16 +61,18 @@ var _ = Describe("Populating blocks", func() { blockChain.SetLastBlock(big.NewInt(6)) blockRepository.SetMissingBlockNumbersReturnArray([]int64{4, 5}) - numberOfBlocksCreated := history.PopulateMissingBlocks(blockChain, blockRepository, 3) + numberOfBlocksCreated, err := history.PopulateMissingBlocks(blockChain, blockRepository, 3) + Expect(err).NotTo(HaveOccurred()) Expect(numberOfBlocksCreated).To(Equal(2)) }) It("updates the repository with a range of blocks w/in the range ", func() { blockChain := fakes.NewMockBlockChain() - history.RetrieveAndUpdateBlocks(blockChain, blockRepository, history.MakeRange(2, 5)) + _, err := history.RetrieveAndUpdateBlocks(blockChain, blockRepository, history.MakeRange(2, 5)) + Expect(err).NotTo(HaveOccurred()) blockRepository.AssertCreateOrUpdateBlocksCallCountAndBlockNumbersEquals(4, []int64{2, 3, 4, 5}) }) @@ -77,8 +81,9 @@ var _ = Describe("Populating blocks", func() { blockChain.SetGetBlockByNumberErr(fakes.FakeError) blocks := history.MakeRange(1, 10) - history.RetrieveAndUpdateBlocks(blockChain, blockRepository, blocks) + _, err := history.RetrieveAndUpdateBlocks(blockChain, blockRepository, blocks) + Expect(err).To(HaveOccurred()) blockRepository.AssertCreateOrUpdateBlockCallCountEquals(0) }) }) diff --git a/pkg/history/populate_headers_test.go b/pkg/history/populate_headers_test.go index 6b06fbb47..398744e8f 100644 --- a/pkg/history/populate_headers_test.go +++ b/pkg/history/populate_headers_test.go @@ -59,7 +59,6 @@ var _ = Describe("Populating headers", func() { It("returns early if the db is already synced up to the head of the chain", func() { blockChain := fakes.NewMockBlockChain() blockChain.SetLastBlock(big.NewInt(2)) - headerRepository.SetHeaderExists(true) headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 2) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/history/validation_window_test.go b/pkg/history/validation_window_test.go index 50683be46..0d3ee1fa2 100644 --- a/pkg/history/validation_window_test.go +++ b/pkg/history/validation_window_test.go @@ -30,8 +30,9 @@ var _ = Describe("Validation window", func() { blockChain := fakes.NewMockBlockChain() blockChain.SetLastBlock(big.NewInt(5)) - validationWindow := history.MakeValidationWindow(blockChain, 2) + validationWindow, err := history.MakeValidationWindow(blockChain, 2) + Expect(err).NotTo(HaveOccurred()) Expect(validationWindow.LowerBound).To(Equal(int64(3))) Expect(validationWindow.UpperBound).To(Equal(int64(5))) }) From 1a2f46b3088e701e537a5df5bb587d2978880cb7 Mon Sep 17 00:00:00 2001 From: Edvard Date: Wed, 20 Feb 2019 12:01:19 +0100 Subject: [PATCH 4/4] Fix small issues from review --- integration_test/geth_blockchain_test.go | 11 ++++++++--- .../postgres/repositories/block_repository.go | 6 ++---- .../postgres/repositories/logs_repository.go | 13 ++++++++++--- .../postgres/repositories/logs_repository_test.go | 15 +++++++++++---- pkg/geth/blockchain_test.go | 3 ++- 5 files changed, 33 insertions(+), 15 deletions(-) diff --git a/integration_test/geth_blockchain_test.go b/integration_test/geth_blockchain_test.go index 2cd85cb3b..bec8d39bb 100644 --- a/integration_test/geth_blockchain_test.go +++ b/integration_test/geth_blockchain_test.go @@ -48,9 +48,13 @@ var _ = Describe("Reading from the Geth blockchain", func() { It("reads two blocks", func(done Done) { blocks := fakes.NewMockBlockRepository() - lastBlock, _ := blockChain.LastBlock() + lastBlock, err := blockChain.LastBlock() + Expect(err).NotTo(HaveOccurred()) + queriedBlocks := []int64{lastBlock.Int64() - 5, lastBlock.Int64() - 6} - history.RetrieveAndUpdateBlocks(blockChain, blocks, queriedBlocks) + _, err = history.RetrieveAndUpdateBlocks(blockChain, blocks, queriedBlocks) + Expect(err).NotTo(HaveOccurred()) + blocks.AssertCreateOrUpdateBlocksCallCountAndBlockNumbersEquals(2, []int64{lastBlock.Int64() - 5, lastBlock.Int64() - 6}) close(done) }, 30) @@ -60,8 +64,9 @@ var _ = Describe("Reading from the Geth blockchain", func() { Expect(err).ToNot(HaveOccurred()) firstBlock, err := blockChain.GetBlockByNumber(int64(1)) Expect(err).ToNot(HaveOccurred()) - lastBlockNumber, _ := blockChain.LastBlock() + lastBlockNumber, err := blockChain.LastBlock() + Expect(err).NotTo(HaveOccurred()) Expect(genesisBlock.Number).To(Equal(int64(0))) Expect(firstBlock.Number).To(Equal(int64(1))) Expect(lastBlockNumber.Int64()).To(BeNumerically(">", 0)) diff --git a/pkg/datastore/postgres/repositories/block_repository.go b/pkg/datastore/postgres/repositories/block_repository.go index ff52fe5a0..4a5d19ffb 100644 --- a/pkg/datastore/postgres/repositories/block_repository.go +++ b/pkg/datastore/postgres/repositories/block_repository.go @@ -49,10 +49,8 @@ func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) error { UPDATE blocks SET is_final = TRUE WHERE is_final = FALSE AND number < $1`, cutoff) - if err != nil { - return err - } - return nil + + return err } func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) (int64, error) { diff --git a/pkg/datastore/postgres/repositories/logs_repository.go b/pkg/datastore/postgres/repositories/logs_repository.go index 9bdaca849..704ac3509 100644 --- a/pkg/datastore/postgres/repositories/logs_repository.go +++ b/pkg/datastore/postgres/repositories/logs_repository.go @@ -40,13 +40,19 @@ func (logRepository LogRepository) CreateLogs(lgs []core.Log, receiptId int64) e tlog.BlockNumber, tlog.Address, tlog.TxHash, tlog.Index, tlog.Topics[0], tlog.Topics[1], tlog.Topics[2], tlog.Topics[3], tlog.Data, receiptId, ) if err != nil { - tx.Rollback() + err = tx.Rollback() + if err != nil { + logrus.Error("CreateLogs: could not perform rollback: ", err) + } return postgres.ErrDBInsertFailed } } err := tx.Commit() if err != nil { - tx.Rollback() + err = tx.Rollback() + if err != nil { + logrus.Error("CreateLogs: could not perform rollback: ", err) + } return postgres.ErrDBInsertFailed } return nil @@ -83,7 +89,8 @@ func (logRepository LogRepository) loadLogs(logsRows *sql.Rows) ([]core.Log, err var topics core.Topics err := logsRows.Scan(&blockNumber, &address, &txHash, &index, &topics[0], &topics[1], &topics[2], &topics[3], &data) if err != nil { - logrus.Warn("loadLogs: Error scanning a row in logRows") + logrus.Error("loadLogs: Error scanning a row in logRows: ", err) + return []core.Log{}, err } lg := core.Log{ BlockNumber: blockNumber, diff --git a/pkg/datastore/postgres/repositories/logs_repository_test.go b/pkg/datastore/postgres/repositories/logs_repository_test.go index 1402d6724..e6923b855 100644 --- a/pkg/datastore/postgres/repositories/logs_repository_test.go +++ b/pkg/datastore/postgres/repositories/logs_repository_test.go @@ -56,7 +56,7 @@ var _ = Describe("Logs Repository", func() { Expect(err).NotTo(HaveOccurred()) receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}) Expect(err).NotTo(HaveOccurred()) - logsRepository.CreateLogs([]core.Log{{ + err = logsRepository.CreateLogs([]core.Log{{ BlockNumber: blockNumber, Index: 0, Address: "x123", @@ -64,6 +64,7 @@ var _ = Describe("Logs Repository", func() { Topics: core.Topics{0: "x777", 1: "x888", 2: "x999"}, Data: "xabc", }}, receiptId) + Expect(err).NotTo(HaveOccurred()) log, err := logsRepository.GetLogs("x123", blockNumber) @@ -91,7 +92,8 @@ var _ = Describe("Logs Repository", func() { Expect(err).NotTo(HaveOccurred()) receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}) Expect(err).NotTo(HaveOccurred()) - _ = logsRepository.CreateLogs([]core.Log{{ + + err = logsRepository.CreateLogs([]core.Log{{ BlockNumber: blockNumber, Index: 0, Address: "x123", @@ -99,7 +101,9 @@ var _ = Describe("Logs Repository", func() { Topics: core.Topics{0: "x777", 1: "x888", 2: "x999"}, Data: "xabc", }}, receiptId) - _ = logsRepository.CreateLogs([]core.Log{{ + Expect(err).NotTo(HaveOccurred()) + + err = logsRepository.CreateLogs([]core.Log{{ BlockNumber: blockNumber, Index: 1, Address: "x123", @@ -107,7 +111,9 @@ var _ = Describe("Logs Repository", func() { Topics: core.Topics{0: "x111", 1: "x222", 2: "x333"}, Data: "xdef", }}, receiptId) - _ = logsRepository.CreateLogs([]core.Log{{ + Expect(err).NotTo(HaveOccurred()) + + err = logsRepository.CreateLogs([]core.Log{{ BlockNumber: 2, Index: 0, Address: "x123", @@ -115,6 +121,7 @@ var _ = Describe("Logs Repository", func() { Topics: core.Topics{0: "x777", 1: "x888", 2: "x999"}, Data: "xabc", }}, receiptId) + Expect(err).NotTo(HaveOccurred()) log, err := logsRepository.GetLogs("x123", blockNumber) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/geth/blockchain_test.go b/pkg/geth/blockchain_test.go index 370563e22..5d011f72a 100644 --- a/pkg/geth/blockchain_test.go +++ b/pkg/geth/blockchain_test.go @@ -221,7 +221,8 @@ var _ = Describe("Geth blockchain", func() { blockNumber := int64(100) mockClient.SetHeaderByNumberReturnHeader(&types.Header{Number: big.NewInt(blockNumber)}) - result, _ := blockChain.LastBlock() + result, err := blockChain.LastBlock() + Expect(err).NotTo(HaveOccurred()) mockClient.AssertHeaderByNumberCalledWith(context.Background(), nil) Expect(result).To(Equal(big.NewInt(blockNumber)))