From 5083ee58374c4a5799ef8aa0ccf9923c71419e0f Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Mon, 25 Jul 2022 18:10:09 +0530 Subject: [PATCH] Use watched addresses from direct indexing params by default while serving statediff APIs (#262) * Use watched addresses from direct indexing params in statediff APIs by default * Avoid using indexer object when direct indexing is off * Add nil check before accessing watched addresses from direct indexing params --- statediff/service.go | 87 +++++++++++++++++++++++++++++---------- statediff/service_test.go | 7 ++-- 2 files changed, 70 insertions(+), 24 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 517ab3466c06..30d227f175f5 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -170,7 +170,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params var db sql.Database var err error quitCh := make(chan bool) - if params.IndexerConfig != nil { + indexerConfigAvailable := params.IndexerConfig != nil + if indexerConfigAvailable { info := nodeinfo.Info{ GenesisBlock: blockChain.Genesis().Hash().Hex(), NetworkID: strconv.FormatUint(cfg.NetworkId, 10), @@ -201,11 +202,13 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params statediffMetrics: statediffMetrics, sqlFileWaitingForWrite: false, } - if params.IndexerConfig.Type() == shared.POSTGRES { - knownGaps.checkForGaps = true - } else { - log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!") - knownGaps.checkForGaps = false + if indexerConfigAvailable { + if params.IndexerConfig.Type() == shared.POSTGRES { + knownGaps.checkForGaps = true + } else { + log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!") + knownGaps.checkForGaps = false + } } sds := &Service{ Mutex: sync.Mutex{}, @@ -226,9 +229,11 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) - err = loadWatchedAddresses(indexer) - if err != nil { - return err + if indexerConfigAvailable { + err = loadWatchedAddresses(indexer) + if err != nil { + return err + } } return nil @@ -477,6 +482,13 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, er currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) log.Info("sending state diff", "block height", blockNumber) + // use watched addresses from statediffing write loop if not provided + if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil { + writeLoopParams.RLock() + params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) + copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) + writeLoopParams.RUnlock() + } // compute leaf paths of watched addresses in the params params.ComputeWatchedAddressesLeafPaths() @@ -493,6 +505,13 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload currentBlock := sds.BlockChain.GetBlockByHash(blockHash) log.Info("sending state diff", "block hash", blockHash) + // use watched addresses from statediffing write loop if not provided + if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil { + writeLoopParams.RLock() + params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) + copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) + writeLoopParams.RUnlock() + } // compute leaf paths of watched addresses in the params params.ComputeWatchedAddressesLeafPaths() @@ -777,6 +796,15 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ // This operation cannot be performed back past the point of db pruning; it requires an archival node // for historical data func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { + log.Info("writing state diff at", "block height", blockNumber) + + // use watched addresses from statediffing write loop if not provided + if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil { + writeLoopParams.RLock() + params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) + copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) + writeLoopParams.RUnlock() + } // compute leaf paths of watched addresses in the params params.ComputeWatchedAddressesLeafPaths() @@ -793,6 +821,15 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { // This operation cannot be performed back past the point of db pruning; it requires an archival node // for historical data func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error { + log.Info("writing state diff for", "block hash", blockHash) + + // use watched addresses from statediffing write loop if not provided + if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil { + writeLoopParams.RLock() + params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) + copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) + writeLoopParams.RUnlock() + } // compute leaf paths of watched addresses in the params params.ComputeWatchedAddressesLeafPaths() @@ -895,9 +932,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W } // update the db - err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber) - if err != nil { - return err + if sds.indexer != nil { + err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber) + if err != nil { + return err + } } // update in-memory params @@ -917,9 +956,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W } // update the db - err = sds.indexer.RemoveWatchedAddresses(args) - if err != nil { - return err + if sds.indexer != nil { + err = sds.indexer.RemoveWatchedAddresses(args) + if err != nil { + return err + } } // update in-memory params @@ -933,9 +974,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W } // update the db - err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber) - if err != nil { - return err + if sds.indexer != nil { + err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber) + if err != nil { + return err + } } // update in-memory params @@ -943,9 +986,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W writeLoopParams.ComputeWatchedAddressesLeafPaths() case types2.Clear: // update the db - err := sds.indexer.ClearWatchedAddresses() - if err != nil { - return err + if sds.indexer != nil { + err := sds.indexer.ClearWatchedAddresses() + if err != nil { + return err + } } // update in-memory params diff --git a/statediff/service_test.go b/statediff/service_test.go index 3f80f1d2efab..ecf8bdadbeb6 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -78,9 +78,10 @@ var ( event3 = core.ChainEvent{Block: testBlock3} defaultParams = statediff.Params{ - IncludeBlock: true, - IncludeReceipts: true, - IncludeTD: true, + IncludeBlock: true, + IncludeReceipts: true, + IncludeTD: true, + WatchedAddresses: []common.Address{}, } )