Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
Merge pull request #12 from vulcanize/VDB-392-fix-io-error-propagation
Browse files Browse the repository at this point in the history
VDB-392 Fix IO error propagation
  • Loading branch information
m0ar authored Feb 20, 2019
2 parents 2ecc524 + 1a2f46b commit 53789c8
Show file tree
Hide file tree
Showing 34 changed files with 258 additions and 162 deletions.
15 changes: 10 additions & 5 deletions cmd/lightSync.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ func init() {
func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
populated, err := history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber)
if err != nil {
log.Fatal("Error populating headers: ", err)
// TODO Lots of possible errors in the call stack above. If errors occur, we still put
// 0 in the channel, triggering another round
log.Error("backfillAllHeaders: Error populating headers: ", err)
}
missingBlocksPopulated <- populated
}
Expand All @@ -84,7 +86,7 @@ func lightSync() {
case <-ticker.C:
window, err := validator.ValidateHeaders()
if err != nil {
log.Error("ValidateHeaders failed in lightSync: ", err)
log.Error("lightSync: ValidateHeaders failed: ", err)
}
log.Info(window.GetString())
case n := <-missingBlocksPopulated:
Expand All @@ -97,11 +99,14 @@ func lightSync() {
}

func validateArgs(blockChain *geth.BlockChain) {
lastBlock := blockChain.LastBlock().Int64()
if lastBlock == 0 {
lastBlock, err := blockChain.LastBlock()
if err != nil {
log.Error("validateArgs: Error getting last block: ", err)
}
if lastBlock.Int64() == 0 {
log.Fatal("geth initial: state sync not finished")
}
if startingBlockNumber > lastBlock {
if startingBlockNumber > lastBlock.Int64() {
log.Fatal("starting block number > current block number")
}
}
22 changes: 16 additions & 6 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,27 @@ func init() {
}

func backFillAllBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber)
populated, err := history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber)
if err != nil {
log.Error("backfillAllBlocks: error in populateMissingBlocks: ", err)
}
missingBlocksPopulated <- populated
}

func sync() {
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()

blockChain := getBlockChain()
lastBlock := blockChain.LastBlock().Int64()
if lastBlock == 0 {
lastBlock, err := blockChain.LastBlock()
if err != nil {
log.Error("sync: Error getting last block: ", err)
}
if lastBlock.Int64() == 0 {
log.Fatal("geth initial: state sync not finished")
}
if startingBlockNumber > lastBlock {
log.Fatal("starting block number > current block number")
if startingBlockNumber > lastBlock.Int64() {
log.Fatal("sync: starting block number > current block number")
}

db := utils.LoadPostgres(databaseConfig, blockChain.Node())
Expand All @@ -85,7 +92,10 @@ func sync() {
for {
select {
case <-ticker.C:
window := validator.ValidateBlocks()
window, err := validator.ValidateBlocks()
if err != nil {
log.Error("sync: error in validateBlocks: ", err)
}
log.Info(window.GetString())
case <-missingBlocksPopulated:
go backFillAllBlocks(blockChain, blockRepository, missingBlocksPopulated, startingBlockNumber)
Expand Down
4 changes: 4 additions & 0 deletions environments/staging.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
[database]
name = "vulcanize_public"
hostname = "localhost"
user = "vulcanize"
password = "vulcanize"
port = 5432

[client]
Expand Down
11 changes: 8 additions & 3 deletions integration_test/geth_blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ var _ = Describe("Reading from the Geth blockchain", func() {

It("reads two blocks", func(done Done) {
blocks := fakes.NewMockBlockRepository()
lastBlock := blockChain.LastBlock()
lastBlock, err := blockChain.LastBlock()
Expect(err).NotTo(HaveOccurred())

queriedBlocks := []int64{lastBlock.Int64() - 5, lastBlock.Int64() - 6}
history.RetrieveAndUpdateBlocks(blockChain, blocks, queriedBlocks)
_, err = history.RetrieveAndUpdateBlocks(blockChain, blocks, queriedBlocks)
Expect(err).NotTo(HaveOccurred())

blocks.AssertCreateOrUpdateBlocksCallCountAndBlockNumbersEquals(2, []int64{lastBlock.Int64() - 5, lastBlock.Int64() - 6})
close(done)
}, 30)
Expand All @@ -60,8 +64,9 @@ var _ = Describe("Reading from the Geth blockchain", func() {
Expect(err).ToNot(HaveOccurred())
firstBlock, err := blockChain.GetBlockByNumber(int64(1))
Expect(err).ToNot(HaveOccurred())
lastBlockNumber := blockChain.LastBlock()
lastBlockNumber, err := blockChain.LastBlock()

Expect(err).NotTo(HaveOccurred())
Expect(genesisBlock.Number).To(Equal(int64(0)))
Expect(firstBlock.Number).To(Equal(int64(1)))
Expect(lastBlockNumber.Int64()).To(BeNumerically(">", 0))
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type BlockChain interface {
GetHeaderByNumbers(blockNumbers []int64) ([]Header, error)
GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error)
GetEthLogsWithCustomQuery(query ethereum.FilterQuery) ([]types.Log, error)
LastBlock() *big.Int
LastBlock() (*big.Int, error)
Node() Node
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/datastore/ethereum/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ethereum

import (
"fmt"
"github.com/sirupsen/logrus"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
Expand All @@ -37,6 +38,7 @@ func CreateDatabase(config DatabaseConfig) (Database, error) {
case Level:
levelDBConnection, err := ethdb.NewLDBDatabase(config.Path, 128, 1024)
if err != nil {
logrus.Error("CreateDatabase: error connecting to new LDBD: ", err)
return nil, err
}
levelDBReader := level.NewLevelDatabaseReader(levelDBConnection)
Expand Down
3 changes: 2 additions & 1 deletion pkg/datastore/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ var _ = Describe("Postgres DB", func() {
err := logRepository.CreateLogs([]core.Log{badLog}, 123)

Expect(err).ToNot(BeNil())
savedBlock := logRepository.GetLogs("x123", 1)
savedBlock, err := logRepository.GetLogs("x123", 1)
Expect(savedBlock).To(BeNil())
Expect(err).To(Not(HaveOccurred()))
})

It("does not commit block or transactions if transaction is invalid", func() {
Expand Down
15 changes: 12 additions & 3 deletions pkg/datastore/postgres/repositories/block_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ func NewBlockRepository(database *postgres.DB) *BlockRepository {
return &BlockRepository{database: database}
}

func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) {
func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) error {
cutoff := chainHead - blocksFromHeadBeforeFinal
blockRepository.database.Exec(`
_, err := blockRepository.database.Exec(`
UPDATE blocks SET is_final = TRUE
WHERE is_final = FALSE AND number < $1`,
cutoff)

return err
}

func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) (int64, error) {
Expand All @@ -70,7 +72,7 @@ func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) (in

func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64, nodeId string) []int64 {
numbers := make([]int64, 0)
blockRepository.database.Select(&numbers,
err := blockRepository.database.Select(&numbers,
`SELECT all_block_numbers
FROM (
SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series
Expand All @@ -79,6 +81,9 @@ func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber i
) `,
startingBlockNumber,
highestBlockNumber, nodeId)
if err != nil {
log.Error("MissingBlockNumbers: error getting blocks: ", err)
}
return numbers
}

Expand Down Expand Up @@ -108,6 +113,7 @@ func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block,
case sql.ErrNoRows:
return core.Block{}, datastore.ErrBlockDoesNotExist(blockNumber)
default:
log.Error("GetBlock: error loading blocks: ", err)
return savedBlock, err
}
}
Expand Down Expand Up @@ -202,6 +208,7 @@ func (blockRepository BlockRepository) createReceipt(tx *sql.Tx, blockId int64,
RETURNING id`,
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId)
if err != nil {
log.Error("createReceipt: error inserting receipt: ", err)
return receiptId, err
}
return receiptId, nil
Expand Down Expand Up @@ -256,6 +263,7 @@ func (blockRepository BlockRepository) loadBlock(blockRows *sqlx.Row) (core.Bloc
var block b
err := blockRows.StructScan(&block)
if err != nil {
log.Error("loadBlock: error loading block: ", err)
return core.Block{}, err
}
transactionRows, err := blockRepository.database.Queryx(`
Expand All @@ -271,6 +279,7 @@ func (blockRepository BlockRepository) loadBlock(blockRows *sqlx.Row) (core.Bloc
WHERE block_id = $1
ORDER BY hash`, block.ID)
if err != nil {
log.Error("loadBlock: error fetting transactions: ", err)
return core.Block{}, err
}
block.Transactions = blockRepository.LoadTransactions(transactionRows)
Expand Down
23 changes: 16 additions & 7 deletions pkg/datastore/postgres/repositories/contract_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,17 @@ func (contractRepository ContractRepository) CreateContract(contract core.Contra
return nil
}

func (contractRepository ContractRepository) ContractExists(contractHash string) bool {
func (contractRepository ContractRepository) ContractExists(contractHash string) (bool, error) {
var exists bool
contractRepository.DB.QueryRow(
err := contractRepository.DB.QueryRow(
`SELECT exists(
SELECT 1
FROM watched_contracts
WHERE contract_hash = $1)`, contractHash).Scan(&exists)
return exists
if err != nil {
return false, err
}
return exists, nil
}

func (contractRepository ContractRepository) GetContract(contractHash string) (core.Contract, error) {
Expand All @@ -66,12 +69,15 @@ func (contractRepository ContractRepository) GetContract(contractHash string) (c
if err == sql.ErrNoRows {
return core.Contract{}, datastore.ErrContractDoesNotExist(contractHash)
}
savedContract := contractRepository.addTransactions(core.Contract{Hash: hash, Abi: abi})
savedContract, err := contractRepository.addTransactions(core.Contract{Hash: hash, Abi: abi})
if err != nil {
return core.Contract{}, err
}
return savedContract, nil
}

func (contractRepository ContractRepository) addTransactions(contract core.Contract) core.Contract {
transactionRows, _ := contractRepository.DB.Queryx(`
func (contractRepository ContractRepository) addTransactions(contract core.Contract) (core.Contract, error) {
transactionRows, err := contractRepository.DB.Queryx(`
SELECT hash,
nonce,
tx_to,
Expand All @@ -83,8 +89,11 @@ func (contractRepository ContractRepository) addTransactions(contract core.Contr
FROM transactions
WHERE tx_to = $1
ORDER BY block_id DESC`, contract.Hash)
if err != nil {
return core.Contract{}, err
}
blockRepository := &BlockRepository{contractRepository.DB}
transactions := blockRepository.LoadTransactions(transactionRows)
savedContract := core.Contract{Hash: contract.Hash, Transactions: transactions, Abi: contract.Abi}
return savedContract
return savedContract, nil
}
19 changes: 7 additions & 12 deletions pkg/datastore/postgres/repositories/header_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int
if headerDoesNotExist(err) {
return repository.insertHeader(header)
}
log.Error("CreateOrUpdateHeader: error getting header hash: ", err)
return 0, err
}
if headerMustBeReplaced(hash, header) {
Expand All @@ -54,6 +55,7 @@ func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, er
var header core.Header
err := repository.database.Get(&header, `SELECT id, block_number, hash, raw, block_timestamp FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,
blockNumber, repository.database.Node.ID)
log.Error("GetHeader: error getting headers: ", err)
return header, err
}

Expand All @@ -74,18 +76,6 @@ func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endi
return numbers, nil
}

func (repository HeaderRepository) HeaderExists(blockNumber int64) (bool, error) {
_, err := repository.GetHeader(blockNumber)
if err != nil {
if headerDoesNotExist(err) {
return false, nil
}
return false, err
}

return true, nil
}

func headerMustBeReplaced(hash string, header core.Header) bool {
return hash != header.Hash
}
Expand All @@ -98,6 +88,7 @@ func (repository HeaderRepository) getHeaderHash(header core.Header) (string, er
var hash string
err := repository.database.Get(&hash, `SELECT hash FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,
header.BlockNumber, repository.database.Node.ID)
log.Error("getHeaderHash: error getting headers: ", err)
return hash, err
}

Expand All @@ -106,13 +97,17 @@ func (repository HeaderRepository) insertHeader(header core.Header) (int64, erro
err := repository.database.QueryRowx(
`INSERT INTO public.headers (block_number, hash, block_timestamp, raw, eth_node_id, eth_node_fingerprint) VALUES ($1, $2, $3::NUMERIC, $4, $5, $6) RETURNING id`,
header.BlockNumber, header.Hash, header.Timestamp, header.Raw, repository.database.NodeID, repository.database.Node.ID).Scan(&headerId)
if err != nil {
log.Error("insertHeader: error inserting header: ", err)
}
return headerId, err
}

func (repository HeaderRepository) replaceHeader(header core.Header) (int64, error) {
_, err := repository.database.Exec(`DELETE FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,
header.BlockNumber, repository.database.Node.ID)
if err != nil {
log.Error("replaceHeader: error deleting headers: ", err)
return 0, err
}
return repository.insertHeader(header)
Expand Down
17 changes: 0 additions & 17 deletions pkg/datastore/postgres/repositories/header_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,21 +273,4 @@ var _ = Describe("Block header repository", func() {
Expect(missingBlockNumbers).To(ConsistOf([]int64{1, 2, 3, 4, 5}))
})
})

Describe("HeaderExists", func() {
It("returns true if the header record exists", func() {
_, err = repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())

result, err := repo.HeaderExists(header.BlockNumber)
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeTrue())
})

It("returns false if the header record doesn't exist", func() {
result, err := repo.HeaderExists(1)
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeFalse())
})
})
})
Loading

0 comments on commit 53789c8

Please sign in to comment.