diff --git a/core/txindexer_test.go b/core/txindexer_test.go index ca1c86a055..cfbec850ef 100644 --- a/core/txindexer_test.go +++ b/core/txindexer_test.go @@ -71,9 +71,11 @@ func TestTxIndexer(t *testing.T) { } verify := func(db ethdb.Database, expTail uint64, indexer *txIndexer) { tail := rawdb.ReadTxIndexTail(db) + //nolint: staticcheck if tail == nil { t.Fatal("Failed to write tx index tail") } + //nolint: staticcheck if *tail != expTail { t.Fatalf("Unexpected tx index tail, want %v, got %d", expTail, *tail) } diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go index 67d247a46f..a7fca75302 100644 --- a/core/vm/interpreter.go +++ b/core/vm/interpreter.go @@ -41,10 +41,12 @@ var ( ErrNoCurrentTx = errors.New("no current tx found in interruptCtx") ) +type InterruptKeyType string + const ( // These are keys for the interruptCtx - InterruptCtxDelayKey = "delay" - InterruptCtxOpcodeDelayKey = "opcodeDelay" + InterruptCtxDelayKey InterruptKeyType = "delay" + InterruptCtxOpcodeDelayKey InterruptKeyType = "opcodeDelay" // InterruptedTxCacheSize is size of lru cache for interrupted txs InterruptedTxCacheSize = 90000 diff --git a/miner/test_backend.go b/miner/test_backend.go deleted file mode 100644 index a91422cde1..0000000000 --- a/miner/test_backend.go +++ /dev/null @@ -1,665 +0,0 @@ -package miner - -import ( - "context" - "errors" - "sync" - "sync/atomic" - "time" - - // nolint:typecheck - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/consensus/misc/eip4844" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/blockstm" - "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/core/txpool" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/rlp" - "github.com/holiman/uint256" - - lru "github.com/hashicorp/golang-lru" -) - -// newWorkerWithDelay is newWorker() with extra params to induce artficial delays for tests such as commit-interrupt. -// nolint:staticcheck -func newWorkerWithDelay(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool, delay uint, opcodeDelay uint) *worker { - worker := &worker{ - config: config, - chainConfig: chainConfig, - engine: engine, - eth: eth, - chain: eth.BlockChain(), - mux: mux, - isLocalBlock: isLocalBlock, - coinbase: config.Etherbase, - extra: config.ExtraData, - pendingTasks: make(map[common.Hash]*task), - txsCh: make(chan core.NewTxsEvent, txChanSize), - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - newWorkCh: make(chan *newWorkReq), - getWorkCh: make(chan *getWorkReq), - taskCh: make(chan *task), - resultCh: make(chan *types.Block, resultQueueSize), - startCh: make(chan struct{}, 1), - exitCh: make(chan struct{}), - resubmitIntervalCh: make(chan time.Duration), - resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), - interruptCommitFlag: config.CommitInterruptFlag, - } - worker.noempty.Store(true) - worker.profileCount = new(int32) - // Subscribe for transaction insertion events (whether from network or resurrects) - worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) - // Subscribe events for blockchain - worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) - - interruptedTxCache, err := lru.New(vm.InterruptedTxCacheSize) - if err != nil { - log.Warn("Failed to create interrupted tx cache", "err", err) - } - - worker.interruptedTxCache = &vm.TxCache{ - Cache: interruptedTxCache, - } - - if !worker.interruptCommitFlag { - worker.noempty.Store(false) - } - - // Sanitize recommit interval if the user-specified one is too short. - recommit := worker.config.Recommit - if recommit < minRecommitInterval { - log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval) - recommit = minRecommitInterval - } - - worker.recommit = recommit - - // Sanitize the timeout config for creating payload. - newpayloadTimeout := worker.config.NewPayloadTimeout - if newpayloadTimeout == 0 { - log.Warn("Sanitizing new payload timeout to default", "provided", newpayloadTimeout, "updated", DefaultConfig.NewPayloadTimeout) - newpayloadTimeout = DefaultConfig.NewPayloadTimeout - } - - if newpayloadTimeout < time.Millisecond*100 { - log.Warn("Low payload timeout may cause high amount of non-full blocks", "provided", newpayloadTimeout, "default", DefaultConfig.NewPayloadTimeout) - } - - worker.newpayloadTimeout = newpayloadTimeout - - worker.wg.Add(4) - - go worker.mainLoopWithDelay(delay, opcodeDelay) - go worker.newWorkLoop(recommit) - go worker.resultLoop() - go worker.taskLoop() - - // Submit first work to initialize pending state. - if init { - worker.startCh <- struct{}{} - } - - return worker -} - -// mainLoopWithDelay is mainLoop() with extra params to induce artficial delays for tests such as commit-interrupt. -// nolint:gocognit -func (w *worker) mainLoopWithDelay(delay uint, opcodeDelay uint) { - defer w.wg.Done() - defer w.txsSub.Unsubscribe() - defer w.chainHeadSub.Unsubscribe() - defer func() { - if w.current != nil { - w.current.discard() - } - }() - - for { - select { - case req := <-w.newWorkCh: - if w.chainConfig.ChainID.Cmp(params.BorMainnetChainConfig.ChainID) == 0 || w.chainConfig.ChainID.Cmp(params.MumbaiChainConfig.ChainID) == 0 { - if w.eth.PeerCount() > 0 { - //nolint:contextcheck - w.commitWorkWithDelay(req.interrupt, req.noempty, req.timestamp, delay, opcodeDelay) - } - } else { - //nolint:contextcheck - w.commitWorkWithDelay(req.interrupt, req.noempty, req.timestamp, delay, opcodeDelay) - } - - case req := <-w.getWorkCh: - req.result <- w.generateWork(req.params) - - case ev := <-w.txsCh: - // Apply transactions to the pending state if we're not sealing - // - // Note all transactions received may not be continuous with transactions - // already included in the current sealing block. These transactions will - // be automatically eliminated. - // nolint : nestif - if !w.IsRunning() && w.current != nil { - // If block is already full, abort - if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas { - continue - } - txs := make(map[common.Address][]*txpool.LazyTransaction, len(ev.Txs)) - for _, tx := range ev.Txs { - acc, _ := types.Sender(w.current.signer, tx) - txs[acc] = append(txs[acc], &txpool.LazyTransaction{ - Pool: w.eth.TxPool(), // We don't know where this came from, yolo resolve from everywhere - Hash: tx.Hash(), - Tx: nil, // Do *not* set this! We need to resolve it later to pull blobs in - Time: tx.Time(), - GasFeeCap: uint256.NewInt(tx.GasFeeCap().Uint64()), - GasTipCap: uint256.NewInt(tx.GasTipCap().Uint64()), - Gas: tx.Gas(), - BlobGas: tx.BlobGas(), - }) - } - txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) - tcount := w.current.tcount - w.commitTransactions(w.current, txset, nil, nil, new(uint256.Int), context.Background()) - - // Only update the snapshot if any new transactons were added - // to the pending block - if tcount != w.current.tcount { - w.updateSnapshot(w.current) - } - } else { - // Special case, if the consensus engine is 0 period clique(dev mode), - // submit sealing work here since all empty submission will be rejected - // by clique. Of course the advance sealing(empty submission) is disabled. - if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 { - w.commitWork(nil, true, time.Now().Unix()) - } - } - - w.newTxs.Add(int32(len(ev.Txs))) - - // System stopped - case <-w.exitCh: - return - case <-w.txsSub.Err(): - return - case <-w.chainHeadSub.Err(): - return - } - } -} - -// commitWorkWithDelay is commitWork() with extra params to induce artficial delays for tests such as commit-interrupt. -func (w *worker) commitWorkWithDelay(interrupt *atomic.Int32, noempty bool, timestamp int64, delay uint, opcodeDelay uint) { - // Abort committing if node is still syncing - if w.syncing.Load() { - return - } - start := time.Now() - - var ( - work *environment - err error - ) - - // Set the coinbase if the worker is running or it's required - var coinbase common.Address - if w.IsRunning() { - coinbase = w.etherbase() - if coinbase == (common.Address{}) { - log.Error("Refusing to mine without etherbase") - return - } - } - - work, err = w.prepareWork(&generateParams{ - timestamp: uint64(timestamp), - coinbase: coinbase, - }) - if err != nil { - return - } - - // nolint:contextcheck - var interruptCtx = context.Background() - - stopFn := func() {} - defer func() { - stopFn() - }() - - if !noempty && w.interruptCommitFlag { - block := w.chain.GetBlockByHash(w.chain.CurrentBlock().Hash()) - interruptCtx, stopFn = getInterruptTimer(work, block) - // nolint : staticcheck - interruptCtx = vm.PutCache(interruptCtx, w.interruptedTxCache) - // nolint : staticcheck - interruptCtx = context.WithValue(interruptCtx, vm.InterruptCtxDelayKey, delay) - // nolint : staticcheck - interruptCtx = context.WithValue(interruptCtx, vm.InterruptCtxOpcodeDelayKey, opcodeDelay) - } - - // Create an empty block based on temporary copied state for - // sealing in advance without waiting block execution finished. - if !noempty && !w.noempty.Load() { - _ = w.commit(work.copy(), nil, false, start) - } - // Fill pending transactions from the txpool into the block. - err = w.fillTransactionsWithDelay(interrupt, work, interruptCtx) - - switch { - case err == nil: - // The entire block is filled, decrease resubmit interval in case - // of current interval is larger than the user-specified one. - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - - case errors.Is(err, errBlockInterruptedByRecommit): - // Notify resubmit loop to increase resubmitting interval if the - // interruption is due to frequent commits. - gaslimit := work.header.GasLimit - - ratio := float64(gaslimit-work.gasPool.Gas()) / float64(gaslimit) - if ratio < 0.1 { - ratio = 0.1 - } - w.resubmitAdjustCh <- &intervalAdjust{ - ratio: ratio, - inc: true, - } - - case errors.Is(err, errBlockInterruptedByNewHead): - // If the block building is interrupted by newhead event, discard it - // totally. Committing the interrupted block introduces unnecessary - // delay, and possibly causes miner to mine on the previous head, - // which could result in higher uncle rate. - work.discard() - return - } - // Submit the generated block for consensus sealing. - _ = w.commit(work.copy(), w.fullTaskHook, true, start) - - // Swap out the old work with the new one, terminating any leftover - // prefetcher processes in the mean time and starting a new one. - if w.current != nil { - w.current.discard() - } - - w.current = work -} - -// fillTransactionsWithDelay is fillTransactions() with extra params to induce artficial delays for tests such as commit-interrupt. -// nolint:gocognit -func (w *worker) fillTransactionsWithDelay(interrupt *atomic.Int32, env *environment, interruptCtx context.Context) error { - w.mu.RLock() - tip := w.tip - w.mu.RUnlock() - - // Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees - filter := txpool.PendingFilter{ - MinTip: uint256.MustFromBig(tip.ToBig()), - } - - if env.header.BaseFee != nil { - filter.BaseFee = uint256.MustFromBig(env.header.BaseFee) - } - - if env.header.ExcessBlobGas != nil { - filter.BlobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas)) - } - - var ( - localPlainTxs, remotePlainTxs, localBlobTxs, remoteBlobTxs map[common.Address][]*txpool.LazyTransaction - ) - - filter.OnlyPlainTxs, filter.OnlyBlobTxs = true, false - pendingPlainTxs := w.eth.TxPool().Pending(filter) - - filter.OnlyPlainTxs, filter.OnlyBlobTxs = false, true - pendingBlobTxs := w.eth.TxPool().Pending(filter) - - // Split the pending transactions into locals and remotes. - localPlainTxs, remotePlainTxs = make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs - localBlobTxs, remoteBlobTxs = make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs - - for _, account := range w.eth.TxPool().Locals() { - if txs := remotePlainTxs[account]; len(txs) > 0 { - delete(remotePlainTxs, account) - localPlainTxs[account] = txs - } - if txs := remoteBlobTxs[account]; len(txs) > 0 { - delete(remoteBlobTxs, account) - localBlobTxs[account] = txs - } - } - - // Fill the block with all available pending transactions. - if len(localPlainTxs) > 0 || len(localBlobTxs) > 0 { - var plainTxs, blobTxs *transactionsByPriceAndNonce - - plainTxs = newTransactionsByPriceAndNonce(env.signer, localPlainTxs, env.header.BaseFee) - blobTxs = newTransactionsByPriceAndNonce(env.signer, localBlobTxs, env.header.BaseFee) - - if err := w.commitTransactionsWithDelay(env, plainTxs, blobTxs, interrupt, new(uint256.Int), interruptCtx); err != nil { - return err - } - } - - if len(remotePlainTxs) > 0 || len(remoteBlobTxs) > 0 { - var plainTxs, blobTxs *transactionsByPriceAndNonce - - plainTxs = newTransactionsByPriceAndNonce(env.signer, remotePlainTxs, env.header.BaseFee) - blobTxs = newTransactionsByPriceAndNonce(env.signer, remoteBlobTxs, env.header.BaseFee) - - if err := w.commitTransactionsWithDelay(env, plainTxs, blobTxs, interrupt, new(uint256.Int), interruptCtx); err != nil { - return err - } - } - - return nil -} - -// commitTransactionsWithDelay is commitTransactions() with extra params to induce artficial delays for tests such as commit-interrupt. -// nolint:gocognit, unparam -func (w *worker) commitTransactionsWithDelay(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32, minTip *uint256.Int, interruptCtx context.Context) error { - gasLimit := env.header.GasLimit - if env.gasPool == nil { - env.gasPool = new(core.GasPool).AddGas(gasLimit) - } - - var coalescedLogs []*types.Log - - var deps map[int]map[int]bool - - chDeps := make(chan blockstm.TxDep) - - var depsWg sync.WaitGroup - var once sync.Once - - EnableMVHashMap := w.chainConfig.IsCancun(env.header.Number) - - // create and add empty mvHashMap in statedb - if EnableMVHashMap && w.IsRunning() { - deps = map[int]map[int]bool{} - - chDeps = make(chan blockstm.TxDep) - - // Make sure we safely close the channel in case of interrupt - defer once.Do(func() { - close(chDeps) - }) - - depsWg.Add(1) - - go func(chDeps chan blockstm.TxDep) { - for t := range chDeps { - deps = blockstm.UpdateDeps(deps, t) - } - - depsWg.Done() - }(chDeps) - } - - var lastTxHash common.Hash - -mainloop: - for { - // Check interruption signal and abort building if it's fired. - if interrupt != nil { - if signal := interrupt.Load(); signal != commitInterruptNone { - return signalToErr(signal) - } - } - - if interruptCtx != nil { - if EnableMVHashMap && w.IsRunning() { - env.state.AddEmptyMVHashMap() - } - - // case of interrupting by timeout - select { - case <-interruptCtx.Done(): - txCommitInterruptCounter.Inc(1) - log.Warn("Tx Level Interrupt", "hash", lastTxHash) - break mainloop - default: - } - } - - // If we don't have enough gas for any further transactions then we're done. - if env.gasPool.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) - break - } - // If we don't have enough blob space for any further blob transactions, - // skip that list altogether - if !blobTxs.Empty() && env.blobs*params.BlobTxBlobGasPerBlob >= params.MaxBlobGasPerBlock { - log.Trace("Not enough blob space for further blob transactions") - blobTxs.Clear() - // Fall though to pick up any plain txs - } - // Retrieve the next transaction and abort if all done. - - var ( - ltx *txpool.LazyTransaction - txs *transactionsByPriceAndNonce - ) - pltx, ptip := plainTxs.Peek() - bltx, btip := blobTxs.Peek() - - switch { - case pltx == nil: - txs, ltx = blobTxs, bltx - case bltx == nil: - txs, ltx = plainTxs, pltx - default: - if ptip.Lt(btip) { - txs, ltx = blobTxs, bltx - } else { - txs, ltx = plainTxs, pltx - } - } - if ltx == nil { - break - } - lastTxHash = ltx.Hash - // If we don't have enough space for the next transaction, skip the account. - if env.gasPool.Gas() < ltx.Gas { - log.Trace("Not enough gas left for transaction", "hash", ltx.Hash, "left", env.gasPool.Gas(), "needed", ltx.Gas) - txs.Pop() - continue - } - if left := uint64(params.MaxBlobGasPerBlock - env.blobs*params.BlobTxBlobGasPerBlob); left < ltx.BlobGas { - log.Trace("Not enough blob gas left for transaction", "hash", ltx.Hash, "left", left, "needed", ltx.BlobGas) - txs.Pop() - continue - } - // If we don't receive enough tip for the next transaction, skip the account - if ptip.Cmp(minTip) < 0 { - log.Trace("Not enough tip for transaction", "hash", ltx.Hash, "tip", ptip, "needed", minTip) - break // If the next-best is too low, surely no better will be available - } - // Transaction seems to fit, pull it up from the pool - tx := ltx.Resolve() - if tx == nil { - log.Trace("Ignoring evicted transaction", "hash", ltx.Hash) - txs.Pop() - continue - } - // Error may be ignored here. The error has already been checked - // during transaction acceptance in the transaction pool. - from, _ := types.Sender(env.signer, tx) - - // not prioritising conditional transaction, yet. - //nolint:nestif - if options := tx.GetOptions(); options != nil { - if err := env.header.ValidateBlockNumberOptionsPIP15(options.BlockNumberMin, options.BlockNumberMax); err != nil { - log.Trace("Dropping conditional transaction", "from", from, "hash", tx.Hash(), "reason", err) - txs.Pop() - - continue - } - - if err := env.header.ValidateTimestampOptionsPIP15(options.TimestampMin, options.TimestampMax); err != nil { - log.Trace("Dropping conditional transaction", "from", from, "hash", tx.Hash(), "reason", err) - txs.Pop() - - continue - } - - if err := env.state.ValidateKnownAccounts(options.KnownAccounts); err != nil { - log.Trace("Dropping conditional transaction", "from", from, "hash", tx.Hash(), "reason", err) - txs.Pop() - - continue - } - } - - // Check whether the tx is replay protected. If we're not in the EIP155 hf - // phase, start ignoring the sender until we do. - if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { - log.Trace("Ignoring replay protected transaction", "hash", ltx.Hash, "eip155", w.chainConfig.EIP155Block) - txs.Pop() - continue - } - // Start executing the transaction - env.state.SetTxContext(tx.Hash(), env.tcount) - - logs, err := w.commitTransaction(env, tx, interruptCtx) - - if interruptCtx != nil { - if delay := interruptCtx.Value(vm.InterruptCtxDelayKey); delay != nil { - // nolint : durationcheck - time.Sleep(time.Duration(delay.(uint)) * time.Millisecond) - } - } - - switch { - case errors.Is(err, core.ErrNonceTooLow): - // New head notification data race between the transaction pool and miner, shift - log.Trace("Skipping transaction with low nonce", "hash", ltx.Hash, "sender", from, "nonce", tx.Nonce()) - txs.Shift() - - case errors.Is(err, nil): - // Everything ok, collect the logs and shift in the next transaction from the same account - coalescedLogs = append(coalescedLogs, logs...) - env.tcount++ - - if EnableMVHashMap && w.IsRunning() { - env.depsMVFullWriteList = append(env.depsMVFullWriteList, env.state.MVFullWriteList()) - env.mvReadMapList = append(env.mvReadMapList, env.state.MVReadMap()) - - if env.tcount > len(env.depsMVFullWriteList) { - log.Warn("blockstm - env.tcount > len(env.depsMVFullWriteList)", "env.tcount", env.tcount, "len(depsMVFullWriteList)", len(env.depsMVFullWriteList)) - } - - temp := blockstm.TxDep{ - Index: env.tcount - 1, - ReadList: env.state.MVReadList(), - FullWriteList: env.depsMVFullWriteList, - } - - chDeps <- temp - } - - txs.Shift() - default: - // Transaction is regarded as invalid, drop all consecutive transactions from - // the same sender because of `nonce-too-high` clause. - log.Debug("Transaction failed, account skipped", "hash", ltx.Hash, "err", err) - txs.Pop() - } - - if EnableMVHashMap && w.IsRunning() { - env.state.ClearReadMap() - env.state.ClearWriteMap() - } - } - - // nolint:nestif - if EnableMVHashMap && w.IsRunning() { - once.Do(func() { - close(chDeps) - }) - depsWg.Wait() - - var blockExtraData types.BlockExtraData - - tempVanity := env.header.Extra[:types.ExtraVanityLength] - tempSeal := env.header.Extra[len(env.header.Extra)-types.ExtraSealLength:] - - if len(env.mvReadMapList) > 0 { - tempDeps := make([][]uint64, len(env.mvReadMapList)) - - for j := range deps[0] { - tempDeps[0] = append(tempDeps[0], uint64(j)) - } - - delayFlag := true - - for i := 1; i <= len(env.mvReadMapList)-1; i++ { - reads := env.mvReadMapList[i-1] - - _, ok1 := reads[blockstm.NewSubpathKey(env.coinbase, state.BalancePath)] - _, ok2 := reads[blockstm.NewSubpathKey(common.HexToAddress(w.chainConfig.Bor.CalculateBurntContract(env.header.Number.Uint64())), state.BalancePath)] - - if ok1 || ok2 { - delayFlag = false - break - } - - for j := range deps[i] { - tempDeps[i] = append(tempDeps[i], uint64(j)) - } - } - - if err := rlp.DecodeBytes(env.header.Extra[types.ExtraVanityLength:len(env.header.Extra)-types.ExtraSealLength], &blockExtraData); err != nil { - log.Error("error while decoding block extra data", "err", err) - return err - } - - if delayFlag { - blockExtraData.TxDependency = tempDeps - } else { - blockExtraData.TxDependency = nil - } - } else { - blockExtraData.TxDependency = nil - } - - blockExtraDataBytes, err := rlp.EncodeToBytes(blockExtraData) - if err != nil { - log.Error("error while encoding block extra data: %v", err) - return err - } - - env.header.Extra = []byte{} - - env.header.Extra = append(tempVanity, blockExtraDataBytes...) - - env.header.Extra = append(env.header.Extra, tempSeal...) - } - - if !w.IsRunning() && len(coalescedLogs) > 0 { - // We don't push the pendingLogsEvent while we are sealing. The reason is that - // when we are sealing, the worker will regenerate a sealing block every 3 seconds. - // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. - // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined - // logs by filling in the block hash when the block was mined by the local miner. This can - // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. - cpy := make([]*types.Log, len(coalescedLogs)) - for i, l := range coalescedLogs { - cpy[i] = new(types.Log) - *cpy[i] = *l - } - - w.pendingLogsFeed.Send(cpy) - } - - return nil -} diff --git a/miner/worker.go b/miner/worker.go index a930a4f120..5cd594f08b 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -261,8 +261,9 @@ type worker struct { fullTaskHook func() // Method to call before pushing the full sealing task. resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. - profileCount *int32 // Global count for profiling - interruptCommitFlag bool // Interrupt commit ( Default true ) + // Interrupt commit to stop block building on time + interruptCommitFlag bool // Denotes whether interrupt commit is enabled or not + interruptCtx context.Context interruptedTxCache *vm.TxCache // noempty is the flag used to control whether the feature of pre-seal empty @@ -300,7 +301,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus interruptCommitFlag: config.CommitInterruptFlag, } worker.noempty.Store(true) - worker.profileCount = new(int32) // Subscribe for transaction insertion events (whether from network or resurrects) worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) // Subscribe events for blockchain @@ -311,6 +311,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus log.Warn("Failed to create interrupted tx cache", "err", err) } + worker.interruptCtx = context.Background() worker.interruptedTxCache = &vm.TxCache{ Cache: interruptedTxCache, } @@ -633,7 +634,7 @@ func (w *worker) mainLoop() { tcount := w.current.tcount - w.commitTransactions(w.current, plainTxs, blobTxs, nil, new(uint256.Int), context.Background()) + w.commitTransactions(w.current, plainTxs, blobTxs, nil, new(uint256.Int)) // Only update the snapshot if any new transactons were added // to the pending block @@ -859,16 +860,14 @@ func (w *worker) updateSnapshot(env *environment) { w.snapshotState = env.state.Copy() } -func (w *worker) commitTransaction(env *environment, tx *types.Transaction, interruptCtx context.Context) ([]*types.Log, error) { +func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*types.Log, error) { var ( snap = env.state.Snapshot() gp = env.gasPool.Gas() ) - // nolint : staticcheck - interruptCtx = vm.SetCurrentTxOnContext(interruptCtx, tx.Hash()) - - receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig(), interruptCtx) + w.interruptCtx = vm.SetCurrentTxOnContext(w.interruptCtx, tx.Hash()) + receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig(), w.interruptCtx) if err != nil { env.state.RevertToSnapshot(snap) env.gasPool.SetGas(gp) @@ -881,7 +880,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, inte return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32, minTip *uint256.Int, interruptCtx context.Context) error { +func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32, minTip *uint256.Int) error { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -931,14 +930,14 @@ mainloop: } } - if interruptCtx != nil { + if w.interruptCtx != nil { if EnableMVHashMap && w.IsRunning() { env.state.AddEmptyMVHashMap() } // case of interrupting by timeout select { - case <-interruptCtx.Done(): + case <-w.interruptCtx.Done(): txCommitInterruptCounter.Inc(1) log.Warn("Tx Level Interrupt", "hash", lastTxHash) break mainloop @@ -1045,7 +1044,15 @@ mainloop: // Start executing the transaction env.state.SetTxContext(tx.Hash(), env.tcount) - logs, err := w.commitTransaction(env, tx, interruptCtx) + logs, err := w.commitTransaction(env, tx) + + // Check if we have a `delay` set in interrup context. It's only set during tests. + if w.interruptCtx != nil { + if delay := w.interruptCtx.Value(vm.InterruptCtxDelayKey); delay != nil { + // nolint : durationcheck + time.Sleep(time.Duration(delay.(uint)) * time.Millisecond) + } + } switch { case errors.Is(err, core.ErrNonceTooLow): @@ -1275,7 +1282,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // //nolint:gocognit -func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment, interruptCtx context.Context) error { +func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) error { w.mu.RLock() tip := w.tip w.mu.RUnlock() @@ -1325,7 +1332,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment, int plainTxs = newTransactionsByPriceAndNonce(env.signer, localPlainTxs, env.header.BaseFee) blobTxs = newTransactionsByPriceAndNonce(env.signer, localBlobTxs, env.header.BaseFee) - if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt, new(uint256.Int), interruptCtx); err != nil { + if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt, new(uint256.Int)); err != nil { return err } } @@ -1336,7 +1343,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment, int plainTxs = newTransactionsByPriceAndNonce(env.signer, remotePlainTxs, env.header.BaseFee) blobTxs = newTransactionsByPriceAndNonce(env.signer, remoteBlobTxs, env.header.BaseFee) - if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt, new(uint256.Int), interruptCtx); err != nil { + if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt, new(uint256.Int)); err != nil { return err } } @@ -1352,9 +1359,7 @@ func (w *worker) generateWork(params *generateParams) *newPayloadResult { } defer work.discard() - // nolint : contextcheck - var interruptCtx = context.Background() - + w.interruptCtx = resetAndCopyInterruptCtx(w.interruptCtx) if !params.noTxs { interrupt := new(atomic.Int32) @@ -1363,7 +1368,7 @@ func (w *worker) generateWork(params *generateParams) *newPayloadResult { }) defer timer.Stop() - err := w.fillTransactions(interrupt, work, interruptCtx) + err := w.fillTransactions(interrupt, work) if errors.Is(err, errBlockInterruptedByTimeout) { log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout)) } @@ -1416,9 +1421,7 @@ func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int return } - // nolint:contextcheck - var interruptCtx = context.Background() - + w.interruptCtx = resetAndCopyInterruptCtx(w.interruptCtx) stopFn := func() {} defer func() { stopFn() @@ -1426,9 +1429,8 @@ func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int if !noempty && w.interruptCommitFlag { block := w.chain.GetBlockByHash(w.chain.CurrentBlock().Hash()) - interruptCtx, stopFn = getInterruptTimer(work, block) - // nolint : staticcheck - interruptCtx = vm.PutCache(interruptCtx, w.interruptedTxCache) + w.interruptCtx, stopFn = getInterruptTimer(w.interruptCtx, work, block) + w.interruptCtx = vm.PutCache(w.interruptCtx, w.interruptedTxCache) } // Create an empty block based on temporary copied state for @@ -1437,7 +1439,7 @@ func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int _ = w.commit(work.copy(), nil, false, start) } // Fill pending transactions from the txpool into the block. - err = w.fillTransactions(interrupt, work, interruptCtx) + err = w.fillTransactions(interrupt, work) switch { case err == nil: @@ -1479,11 +1481,26 @@ func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int w.current = work } -func getInterruptTimer(work *environment, current *types.Block) (context.Context, func()) { - delay := time.Until(time.Unix(int64(work.header.Time), 0)) +// resetAndCopyInterruptCtx resets the interrupt context and copies the values set +// from the old one to newly created one. It is necessary to reset context in this way +// to get rid of the older parent timeout context. +func resetAndCopyInterruptCtx(interruptCtx context.Context) context.Context { + // Create a fresh new context and copy values from old one + newCtx := context.Background() + if delay := interruptCtx.Value(vm.InterruptCtxDelayKey); delay != nil { + newCtx = context.WithValue(newCtx, vm.InterruptCtxDelayKey, delay) + } + if opcodeDelay := interruptCtx.Value(vm.InterruptCtxOpcodeDelayKey); opcodeDelay != nil { + newCtx = context.WithValue(newCtx, vm.InterruptCtxOpcodeDelayKey, opcodeDelay) + } - interruptCtx, cancel := context.WithTimeout(context.Background(), delay) + return newCtx +} +func getInterruptTimer(interruptCtx context.Context, work *environment, current *types.Block) (context.Context, func()) { + delay := time.Until(time.Unix(int64(work.header.Time), 0)) + + interruptCtx, cancel := context.WithTimeout(interruptCtx, delay) blockNumber := current.NumberU64() + 1 go func() { @@ -1566,6 +1583,12 @@ func (w *worker) adjustResubmitInterval(message *intervalAdjust) { } } +// setInterruptCtx sets `value` for given `key` for interrupt commit logic. To be only +// used for e2e unit tests. +func (w *worker) setInterruptCtx(key any, value any) { + w.interruptCtx = context.WithValue(w.interruptCtx, key, value) +} + // copyReceipts makes a deep copy of the given receipts. func copyReceipts(receipts []*types.Receipt) []*types.Receipt { result := make([]*types.Receipt, len(receipts)) diff --git a/miner/worker_test.go b/miner/worker_test.go index 1d1fcc32f5..d3c1188191 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -320,13 +320,10 @@ func (b *testWorkerBackend) newStorageContractCallTx(to common.Address, nonce ui func newTestWorker(t TensingObject, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, noempty bool, delay uint, opcodeDelay uint) (*worker, *testWorkerBackend, func()) { backend := newTestWorkerBackend(t, chainConfig, engine, db) backend.txPool.Add(pendingTxs, true, false) - var w *worker + w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false) if delay != 0 || opcodeDelay != 0 { - //nolint:staticcheck - w = newWorkerWithDelay(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false, delay, opcodeDelay) - } else { - //nolint:staticcheck - w = newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false) + w.setInterruptCtx(vm.InterruptCtxDelayKey, delay) + w.setInterruptCtx(vm.InterruptCtxOpcodeDelayKey, opcodeDelay) } w.setEtherbase(testBankAddress) // enable empty blocks @@ -771,9 +768,7 @@ func testCommitInterruptExperimentBorContract(t *testing.T, delay uint, txCount } wrapped := make([]*types.Transaction, len(txs)) - for i, tx := range txs { - wrapped[i] = tx - } + copy(wrapped, txs) b.TxPool().Add(wrapped, false, false) @@ -783,8 +778,9 @@ func testCommitInterruptExperimentBorContract(t *testing.T, delay uint, txCount w.stop() currentBlockNumber := w.current.header.Number.Uint64() - assert.Check(t, txCount >= w.chain.GetBlockByNumber(currentBlockNumber-1).Transactions().Len()) - assert.Check(t, 0 < w.chain.GetBlockByNumber(currentBlockNumber-1).Transactions().Len()+1) + prevBlockTxCount := w.chain.GetBlockByNumber(currentBlockNumber - 1).Transactions().Len() + assert.Check(t, prevBlockTxCount > 0) + assert.Check(t, prevBlockTxCount <= txCount) } // // nolint : thelper