Skip to content

Commit

Permalink
Merge pull request #1317 from libangzhu/fix-chainstuck
Browse files Browse the repository at this point in the history
Fix chainstuck
  • Loading branch information
bysomeone authored Nov 15, 2023
2 parents eb340f7 + ba1c2d3 commit d887f73
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 91 deletions.
71 changes: 38 additions & 33 deletions rpc/ethrpc/eth/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var (
log = log15.New("module", "ethrpc_eth")
)

//NewEthAPI new eth api
// NewEthAPI new eth api
func NewEthAPI(cfg *ctypes.Chain33Config, c queue.Client, api client.QueueProtocolAPI) interface{} {
e := &ethHandler{}
e.cli.Init(c, api)
Expand All @@ -65,7 +65,7 @@ func NewEthAPI(cfg *ctypes.Chain33Config, c queue.Client, api client.QueueProtoc
return e
}

//GetBalance eth_getBalance tag:"latest", "earliest" or "pending"
// GetBalance eth_getBalance tag:"latest", "earliest" or "pending"
func (e *ethHandler) GetBalance(address string, tag *string) (hexutil.Big, error) {
var req ctypes.ReqBalance
var balance hexutil.Big
Expand All @@ -82,13 +82,13 @@ func (e *ethHandler) GetBalance(address string, tag *string) (hexutil.Big, error
return hexutil.Big(*bn), nil
}

//nolint
// nolint
func (e *ethHandler) ChainId() (hexutil.Big, error) {
bigID := big.NewInt(e.evmChainID)
return hexutil.Big(*bigID), nil
}

//BlockNumber eth_blockNumber 获取区块高度
// BlockNumber eth_blockNumber 获取区块高度
func (e *ethHandler) BlockNumber() (hexutil.Uint64, error) {
log.Debug("eth_blockNumber")
header, err := e.cli.GetLastHeader()
Expand All @@ -100,7 +100,7 @@ func (e *ethHandler) BlockNumber() (hexutil.Uint64, error) {
return hexutil.Uint64(header.Height), nil
}

//GetBlockByNumber eth_getBlockByNumber
// GetBlockByNumber eth_getBlockByNumber
func (e *ethHandler) GetBlockByNumber(in string, full bool) (*types.Block, error) {
log.Debug("GetBlockByNumber", "param", in, "full", full)
var num int64
Expand Down Expand Up @@ -132,7 +132,7 @@ func (e *ethHandler) GetBlockByNumber(in string, full bool) (*types.Block, error

}

//GetBlockByHash eth_getBlockByHash 通过区块哈希获取区块交易详情
// GetBlockByHash eth_getBlockByHash 通过区块哈希获取区块交易详情
func (e *ethHandler) GetBlockByHash(txhash common.Hash, full bool) (*types.Block, error) {
log.Debug("GetBlockByHash", "txhash", txhash, "full", full)
var req ctypes.ReqHashes
Expand All @@ -146,7 +146,7 @@ func (e *ethHandler) GetBlockByHash(txhash common.Hash, full bool) (*types.Block

}

//GetTransactionByHash eth_getTransactionByHash
// GetTransactionByHash eth_getTransactionByHash
func (e *ethHandler) GetTransactionByHash(txhash common.Hash) (*types.Transaction, error) {
log.Debug("GetTransactionByHash", "txhash", txhash)
var req ctypes.ReqHash
Expand Down Expand Up @@ -182,7 +182,7 @@ func (e *ethHandler) GetTransactionByHash(txhash common.Hash) (*types.Transactio

}

//GetTransactionReceipt eth_getTransactionReceipt
// GetTransactionReceipt eth_getTransactionReceipt
func (e *ethHandler) GetTransactionReceipt(txhash common.Hash) (*types.Receipt, error) {
log.Debug("GetTransactionReceipt", "txhash", txhash)
var req ctypes.ReqHashes
Expand Down Expand Up @@ -216,7 +216,7 @@ func (e *ethHandler) GetTransactionReceipt(txhash common.Hash) (*types.Receipt,
return nil, nil
}

//GetBlockTransactionCountByNumber eth_getBlockTransactionCountByNumber
// GetBlockTransactionCountByNumber eth_getBlockTransactionCountByNumber
func (e *ethHandler) GetBlockTransactionCountByNumber(blockNum *hexutil.Big) (hexutil.Uint64, error) {
log.Debug("GetBlockTransactionCountByNumber", "blockNum", blockNum)
var req ctypes.ReqBlocks
Expand All @@ -230,10 +230,11 @@ func (e *ethHandler) GetBlockTransactionCountByNumber(blockNum *hexutil.Big) (he

}

//GetBlockTransactionCountByHash
// GetBlockTransactionCountByHash
// parameters: 32 Bytes - hash of a block
// Returns: integer of the number of transactions in this block.
//
//method:eth_getBlockTransactionCountByHash
//parameters: 32 Bytes - hash of a block
//Returns: integer of the number of transactions in this block.
func (e *ethHandler) GetBlockTransactionCountByHash(hash common.Hash) (hexutil.Uint64, error) {
log.Debug("GetBlockTransactionCountByHash", "hash", hash)
var req ctypes.ReqHashes
Expand All @@ -247,7 +248,7 @@ func (e *ethHandler) GetBlockTransactionCountByHash(hash common.Hash) (hexutil.U
return hexutil.Uint64(len(blockdetails.GetItems()[0].GetBlock().GetTxs())), nil
}

//Accounts eth_accounts
// Accounts eth_accounts
func (e *ethHandler) Accounts() ([]string, error) {
log.Debug("Accounts", "Accounts", "")
req := &ctypes.ReqAccountList{WithoutBalance: true}
Expand All @@ -268,7 +269,7 @@ func (e *ethHandler) Accounts() ([]string, error) {

}

//Call eth_call evm合约相关操作,合约相关信息查询
// Call eth_call evm合约相关操作,合约相关信息查询
func (e *ethHandler) Call(msg types.CallMsg, tag *string) (interface{}, error) {
log.Debug("eth_call", "msg", msg)
var param rpctypes.Query4Jrpc
Expand Down Expand Up @@ -312,7 +313,7 @@ func (e *ethHandler) Call(msg types.CallMsg, tag *string) (interface{}, error) {

}

//SendRawTransaction eth_sendRawTransaction
// SendRawTransaction eth_sendRawTransaction
func (e *ethHandler) SendRawTransaction(rawData string) (hexutil.Bytes, error) {
log.Info("eth_sendRawTransaction", "rawData", rawData)
rawhexData := common.FromHex(rawData)
Expand Down Expand Up @@ -378,7 +379,8 @@ func (e *ethHandler) SendRawTransaction(rawData string) (hexutil.Bytes, error) {

chain33Tx := types.AssembleChain33Tx(ntx, sig, pubkey, e.cfg)
reply, err := e.cli.SendTx(chain33Tx)
log.Info("SendRawTransaction", "cacuHash", common.Bytes2Hex(chain33Tx.Hash()), "ethHash:", ntx.Hash().String(), "exec", string(chain33Tx.Execer), "reply:", common.Bytes2Hex(reply.GetMsg()))
log.Info("SendRawTransaction", "cacuHash", common.Bytes2Hex(chain33Tx.Hash()),
"ethHash:", ntx.Hash().String(), "exec", string(chain33Tx.Execer), "mempool reply:", common.Bytes2Hex(reply.GetMsg()), "err:", err)
//调整为返回eth 交易哈希,之前是reply.GteMsg() chain33 哈希
conf := ctypes.Conf(e.cfg, "config.rpc.sub.eth")
//打开 enableRlpTxHash 返回 eth 交易哈希 , 通过此hash 查询交易详情需要配合enableTxQuickIndex =false 使用
Expand All @@ -389,7 +391,7 @@ func (e *ethHandler) SendRawTransaction(rawData string) (hexutil.Bytes, error) {

}

//Sign method:eth_sign
// Sign method:eth_sign
func (e *ethHandler) Sign(address string, digestHash *hexutil.Bytes) (string, error) {
//导出私钥
log.Debug("Sign", "eth_sign,hash", digestHash, "addr", address)
Expand All @@ -411,11 +413,12 @@ func (e *ethHandler) Sign(address string, digestHash *hexutil.Bytes) (string, er
return hexutil.Encode(sig), nil
}

//Syncing ...
//Returns an object with data about the sync status or false.
//Returns: FALSE:when not syncing,
// Syncing ...
// Returns an object with data about the sync status or false.
// Returns: FALSE:when not syncing,
// params:[]
//
//method:eth_syncing
//params:[]
func (e *ethHandler) Syncing() (interface{}, error) {
log.Debug("eth_syncing", "eth_syncing", "")
var syncing struct {
Expand Down Expand Up @@ -462,10 +465,11 @@ func (e *ethHandler) Mining() (bool, error) {
return false, err
}

// Returns:Returns the number of transactions sent from an address.
// Paramters: address,tag(disable):latest,pending,earliest
// GetTransactionCount 获取nonce
//
//method:eth_getTransactionCount
//Returns:Returns the number of transactions sent from an address.
//Paramters: address,tag(disable):latest,pending,earliest
//GetTransactionCount 获取nonce
func (e *ethHandler) GetTransactionCount(address, tag string) (hexutil.Uint64, error) {
log.Debug("GetTransactionCount", "eth_getTransactionCount address", address)
exec := e.cfg.ExecName("evm")
Expand Down Expand Up @@ -501,8 +505,9 @@ func (e *ethHandler) GetTransactionCount(address, tag string) (hexutil.Uint64, e
return hexutil.Uint64(bigNonce.Uint64()), err
}

// EstimateGas 获取gas
//
//method:eth_estimateGas
//EstimateGas 获取gas
func (e *ethHandler) EstimateGas(callMsg *types.CallMsg) (hexutil.Uint64, error) {
log.Info("EstimateGas", "callMsg.From", callMsg.From, "callMsg.To:", callMsg.To, "callMsg.Value:", callMsg.Value)
if callMsg == nil {
Expand Down Expand Up @@ -621,14 +626,14 @@ func (e *ethHandler) EstimateGas(callMsg *types.CallMsg) (hexutil.Uint64, error)

}

//GasPrice eth_gasPrice default 10 gwei
// GasPrice eth_gasPrice default 10 gwei
func (e *ethHandler) GasPrice() (*hexutil.Big, error) {
log.Debug("GasPrice", "eth_gasPrice ", "")
return (*hexutil.Big)(new(big.Int).Div(big.NewInt(1e18), big.NewInt(e.cfg.GetCoinPrecision()))), nil
}

//Hashrate
//method: eth_hashrate
// Hashrate
// method: eth_hashrate
func (e *ethHandler) Hashrate() (hexutil.Uint64, error) {
log.Debug("eth_hashrate", "eth_hashrate ", "")
header, err := e.cli.GetLastHeader()
Expand All @@ -639,7 +644,7 @@ func (e *ethHandler) Hashrate() (hexutil.Uint64, error) {
return hexutil.Uint64(header.Difficulty), nil
}

//GetContractorAddress eth_getContractorAddress
// GetContractorAddress eth_getContractorAddress
func (e *ethHandler) GetContractorAddress(from common.Address, nonce hexutil.Uint64) (*common.Address, error) {
log.Debug("eth_getContractorAddress", "addr", from, "nonce", nonce)

Expand All @@ -648,7 +653,7 @@ func (e *ethHandler) GetContractorAddress(from common.Address, nonce hexutil.Uin

}

//GetCode eth_getCode 获取部署合约的合约代码
// GetCode eth_getCode 获取部署合约的合约代码
func (e *ethHandler) GetCode(addr *common.Address, tag string) (*hexutil.Bytes, error) {

exec := e.cfg.ExecName("evm")
Expand Down Expand Up @@ -700,14 +705,14 @@ func (e *ethHandler) GetCode(addr *common.Address, tag string) (*hexutil.Bytes,

}

//HistoryParam ...
// HistoryParam ...
type HistoryParam struct {
BlockCount hexutil.Uint64
NewestBlock string
//reward_percentiles []int
}

//FeeHistory eth_feeHistory feehistory
// FeeHistory eth_feeHistory feehistory
func (e *ethHandler) FeeHistory(BlockCount, tag string, options []interface{}) (interface{}, error) {
log.Debug("eth_feeHistory", "FeeHistory blockcout", BlockCount)
header, err := e.cli.GetLastHeader()
Expand All @@ -727,7 +732,7 @@ func (e *ethHandler) FeeHistory(BlockCount, tag string, options []interface{}) (
return &result, nil
}

//GetLogs eth_getLogs
// GetLogs eth_getLogs
func (e *ethHandler) GetLogs(options *types.FilterQuery) ([]*types.EvmLog, error) {
//通过Grpc 客户端
log.Info("GetLogs", "Logs,options:", options)
Expand Down
54 changes: 24 additions & 30 deletions system/mempool/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package mempool
import (
"bytes"
"encoding/hex"
"sort"
"sync"
"sync/atomic"
"time"
Expand All @@ -22,7 +21,7 @@ import (

var mlog = log.New("module", "mempool.base")

//Mempool mempool 基础类
// Mempool mempool 基础类
type Mempool struct {
proxyMtx sync.RWMutex
in chan *queue.Message
Expand Down Expand Up @@ -54,14 +53,14 @@ func (mem *Mempool) getAPI() client.QueueProtocolAPI {
return mem.api
}

//GetSync 判断是否mempool 同步
// GetSync 判断是否mempool 同步
func (mem *Mempool) getSync() bool {
mem.proxyMtx.RLock()
defer mem.proxyMtx.RUnlock()
return mem.sync
}

//NewMempool 新建mempool 实例
// NewMempool 新建mempool 实例
func NewMempool(cfg *types.Mempool) *Mempool {
pool := &Mempool{}
if cfg.MaxTxNumPerAccount == 0 {
Expand All @@ -85,7 +84,7 @@ func NewMempool(cfg *types.Mempool) *Mempool {
return pool
}

//Close 关闭mempool
// Close 关闭mempool
func (mem *Mempool) Close() {
if mem.isClose() {
return
Expand All @@ -101,7 +100,7 @@ func (mem *Mempool) Close() {
mlog.Info("mempool module closed")
}

//SetQueueClient 初始化mempool模块
// SetQueueClient 初始化mempool模块
func (mem *Mempool) SetQueueClient(cli queue.Client) {
mem.client = cli
mem.client.Sub("mempool")
Expand Down Expand Up @@ -136,7 +135,7 @@ func (mem *Mempool) SetMinFee(fee int64) {
mem.proxyMtx.Unlock()
}

//SetQueueCache 设置排队策略
// SetQueueCache 设置排队策略
func (mem *Mempool) SetQueueCache(qcache QueueCache) {
mem.cache.SetQueueCache(qcache)
}
Expand Down Expand Up @@ -184,17 +183,20 @@ func (mem *Mempool) filterTxList(count int64, dupMap map[string]bool, isAll bool
return txs
}

//对eth signtype 的交易,同地址下nonce 按照从小到达的顺序排序
//确保nonce 按照递增顺序发给blockchain
// 对eth signtype 的交易,同地址下nonce 按照从小到达的顺序排序
// 确保nonce 按照递增顺序发给blockchain
func (mem *Mempool) sortEthSignTyTx(txs []*types.Transaction) []*types.Transaction {
//平行链架构下,主链节点无法获取到平行链evm的nonce
var merge []*types.Transaction
var ethsignTxs = make(map[string][]*types.Transaction)
var ethsignTxs = make(map[string]map[int64]*types.Transaction)
for _, tx := range txs {
//只有eth 签名且非平行链交易才能进入mempool 中进行 nonce 排序
if types.IsEthSignID(tx.GetSignature().GetTy()) && !bytes.HasPrefix(tx.GetExecer(), []byte(types.ParaKeyX)) {
//暂时不考虑组交易的情况
ethsignTxs[tx.From()] = append(ethsignTxs[tx.From()], tx)
if _, ok := ethsignTxs[tx.From()]; !ok {
ethsignTxs[tx.From()] = make(map[int64]*types.Transaction)
}
ethsignTxs[tx.From()][tx.GetNonce()] = tx
continue
}
//非eth 签名 和 平行链交易 在主网节点中直接返回给blockchain,因为主网节点不知道此tx.From地址在主网节点的nonce 状态,没法排序,只能在平行链节点rpc层过滤掉
Expand All @@ -204,27 +206,18 @@ func (mem *Mempool) sortEthSignTyTx(txs []*types.Transaction) []*types.Transacti
if len(merge) == len(txs) {
return txs
}

//sort
for from, etxs := range ethsignTxs {
sort.SliceStable(etxs, func(i, j int) bool { //nonce asc
return etxs[i].GetNonce() < etxs[j].GetNonce()
})
//check exts[0].Nonce 是否等于current nonce, merge
if len(etxs) != 0 && mem.getCurrentNonce(from) == etxs[0].GetNonce() {
merge = append(merge, etxs[0])
for i, etx := range etxs {
if i == 0 {
continue
}
//要求nonce 具有连续性
if etx.GetNonce() == etxs[i-1].GetNonce()+1 {
merge = append(merge, etxs[i])
continue
}
for from, txs := range ethsignTxs {
currentNonce := mem.getCurrentNonce(from)
for nonce := currentNonce; ; nonce++ {
if tx, ok := txs[nonce]; ok {
merge = append(merge, tx)

} else {
break
}
}

}

return merge
Expand Down Expand Up @@ -272,7 +265,7 @@ func (mem *Mempool) PushTx(tx *types.Transaction) error {
return err
}

// setHeader设置mempool.header
// setHeader设置mempool.header
func (mem *Mempool) setHeader(h *types.Header) {
atomic.StoreInt64(&mem.currHeight, h.Height)
mem.proxyMtx.Lock()
Expand All @@ -287,7 +280,7 @@ func (mem *Mempool) GetHeader() *types.Header {
return mem.header
}

//IsClose 判断是否mempool 关闭
// IsClose 判断是否mempool 关闭
func (mem *Mempool) isClose() bool {
return atomic.LoadInt32(&mem.isclose) == 1
}
Expand Down Expand Up @@ -482,6 +475,7 @@ func (mem *Mempool) delBlock(block *types.Block) {
if !mem.checkExpireValid(tx) {
continue
}

err = mem.PushTx(tx)
if err != nil {
mlog.Error("mem", "push tx err", err)
Expand Down
Loading

0 comments on commit d887f73

Please sign in to comment.