Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(proposer): introduce types.TransactionsByPriceAndNonce to stor…
Browse files Browse the repository at this point in the history
…e pool content (#120)
  • Loading branch information
davidtaikocha authored Jan 12, 2023
1 parent 1beae15 commit fe3177e
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 78 deletions.
39 changes: 16 additions & 23 deletions pkg/rpc/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/big"
"sort"
"time"

ethereum "github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -162,35 +161,29 @@ func (c *Client) WaitL1Origin(ctx context.Context, blockID *big.Int) (*rawdb.L1O
// PoolContent represents a response body of a `txpool_content` RPC call.
type PoolContent map[common.Address]map[string]*types.Transaction

type TxLists []types.Transactions

// ToTxLists flattens all transactions in pool content into transactions lists,
// each list contains transactions from a single account sorted by nonce.
func (pc PoolContent) ToTxLists() TxLists {
txLists := make([]types.Transactions, 0)

// Len returns the number of transactions in the PoolContent.
func (pc PoolContent) Len() int {
len := 0
for _, pendingTxs := range pc {
var txsByNonce types.TxByNonce

for _, pendingTx := range pendingTxs {
txsByNonce = append(txsByNonce, pendingTx)
for range pendingTxs {
len += 1
}

sort.Sort(txsByNonce)

txLists = append(txLists, types.Transactions(txsByNonce))
}

return txLists
return len
}

// Len returns the number of transactions inside the transactions lists.
func (t TxLists) Len() int {
var length = 0
for _, pendingTxs := range t {
length += len(pendingTxs)
// ToTxsByPriceAndNonce creates a transaction set that can retrieve price sorted transactions in a nonce-honouring way.
func (pc PoolContent) ToTxsByPriceAndNonce(chainID *big.Int) *types.TransactionsByPriceAndNonce {
txs := map[common.Address]types.Transactions{}

for address, txsWithNonce := range pc {
for _, tx := range txsWithNonce {
txs[address] = append(txs[address], tx)
}
}
return length

return types.NewTransactionsByPriceAndNonce(types.LatestSignerForChainID(chainID), txs, nil)
}

// L2PoolContent fetches the transaction pool content from a L2 execution engine.
Expand Down
35 changes: 16 additions & 19 deletions pkg/rpc/methods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)

Expand All @@ -32,7 +31,7 @@ func TestL2AccountNonce(t *testing.T) {
require.Zero(t, nonce)
}

func TestPoolContentToTxLists(t *testing.T) {
func TestPoolContentLen(t *testing.T) {
poolContent := &PoolContent{
testAddress1: map[string]*types.Transaction{
"6": types.NewTransaction(6, common.Address{}, common.Big0, 0, common.Big0, []byte{}),
Expand All @@ -45,25 +44,23 @@ func TestPoolContentToTxLists(t *testing.T) {
},
}

txLists := poolContent.ToTxLists()

require.Equal(t, 2, len(txLists))

for _, txs := range txLists {
switch len(txs) {
case 2:
require.Equal(t, uint64(1), txs[0].Nonce())
require.Equal(t, uint64(2), txs[1].Nonce())
case 3:
require.Equal(t, uint64(5), txs[0].Nonce())
require.Equal(t, uint64(6), txs[1].Nonce())
require.Equal(t, uint64(7), txs[2].Nonce())
default:
log.Crit("Invalid txs length")
}
require.Equal(t, 5, poolContent.Len())
}

func TestToTxsByPriceAndNonce(t *testing.T) {
poolContent := &PoolContent{
testAddress1: map[string]*types.Transaction{
"6": types.NewTransaction(6, common.Address{}, common.Big0, 0, common.Big0, []byte{}),
"5": types.NewTransaction(5, common.Address{}, common.Big0, 0, common.Big0, []byte{}),
"7": types.NewTransaction(7, common.Address{}, common.Big0, 0, common.Big0, []byte{}),
},
testAddress2: map[string]*types.Transaction{
"2": types.NewTransaction(2, common.Address{}, common.Big0, 0, common.Big0, []byte{}),
"1": types.NewTransaction(1, common.Address{}, common.Big0, 0, common.Big0, []byte{}),
},
}

require.Equal(t, 5, txLists.Len())
require.NotNil(t, poolContent.ToTxsByPriceAndNonce(newTestClient(t).L2ChainID))
}

func TestGetGenesisL1Header(t *testing.T) {
Expand Down
58 changes: 31 additions & 27 deletions proposer/pool_content_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package proposer

import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/les/utils"
Expand All @@ -15,6 +16,7 @@ import (
// which fetched from a `txpool_content` RPC call response into several smaller transactions lists
// and make sure each splitted list satisfies the limits defined in Taiko protocol.
type poolContentSplitter struct {
chainID *big.Int
shufflePoolContent bool
maxTransactionsPerBlock uint64
blockMaxGasLimit uint64
Expand All @@ -24,39 +26,41 @@ type poolContentSplitter struct {

// split splits the given transaction pool content to make each splitted
// transactions list satisfies the rules defined in Taiko protocol.
func (p *poolContentSplitter) split(poolContent rpc.PoolContent) [][]*types.Transaction {
func (p *poolContentSplitter) split(poolContent rpc.PoolContent) []types.Transactions {
var (
txLists = poolContent.ToTxLists()
splittedTxLists = make([][]*types.Transaction, 0)
txs = poolContent.ToTxsByPriceAndNonce(p.chainID)
splittedTxLists = make([]types.Transactions, 0)
txBuffer = make([]*types.Transaction, 0, p.maxTransactionsPerBlock)
gasBuffer uint64 = 0
)

if p.shufflePoolContent {
txLists = p.weightedShuffle(txLists)
}
for {
tx := txs.Peek()
if tx == nil {
break
}

// If the transaction is invalid, we simply ignore it.
if err := p.validateTx(tx); err != nil {
log.Debug("Invalid pending transaction", "hash", tx.Hash(), "error", err)
metrics.ProposerInvalidTxsCounter.Inc(1)
txs.Pop() // If this tx is invalid, ignore this sender's other txs in pool.
continue
}

for _, txList := range txLists {
for _, tx := range txList {
// If the transaction is invalid, we simply ignore it.
if err := p.validateTx(tx); err != nil {
log.Debug("Invalid pending transaction", "hash", tx.Hash(), "error", err)
metrics.ProposerInvalidTxsCounter.Inc(1)
break // If this tx is invalid, ingore this sender's other txs with larger nonce.
}

// If the transactions buffer is full, we make all transactions in
// current buffer a new splitted transaction list, and then reset the
// buffer.
if p.isTxBufferFull(tx, txBuffer, gasBuffer) {
splittedTxLists = append(splittedTxLists, txBuffer)
txBuffer = make([]*types.Transaction, 0, p.maxTransactionsPerBlock)
gasBuffer = 0
}

txBuffer = append(txBuffer, tx)
gasBuffer += tx.Gas()
// If the transactions buffer is full, we make all transactions in
// current buffer a new splitted transaction list, and then reset the
// buffer.
if p.isTxBufferFull(tx, txBuffer, gasBuffer) {
splittedTxLists = append(splittedTxLists, txBuffer)
txBuffer = make([]*types.Transaction, 0, p.maxTransactionsPerBlock)
gasBuffer = 0
}

txBuffer = append(txBuffer, tx)
gasBuffer += tx.Gas()

txs.Shift()
}

// Maybe there are some remaining transactions in current buffer,
Expand All @@ -67,7 +71,7 @@ func (p *poolContentSplitter) split(poolContent rpc.PoolContent) [][]*types.Tran

// If the pool content is shuffled, we will only propose the first transactions list.
if p.shufflePoolContent && len(splittedTxLists) > 0 {
splittedTxLists = [][]*types.Transaction{splittedTxLists[0]}
splittedTxLists = []types.Transactions{p.weightedShuffle(splittedTxLists)[0]}
}

return splittedTxLists
Expand Down
29 changes: 22 additions & 7 deletions proposer/pool_content_splitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/taikoxyz/taiko-client/bindings"
"github.com/taikoxyz/taiko-client/pkg/rpc"
"github.com/taikoxyz/taiko-client/testutils"
)

func (s *ProposerTestSuite) TestPoolContentSplit() {
// Gas limit is smaller than the limit.
splitter := &poolContentSplitter{minTxGasLimit: 21000}
splitter := &poolContentSplitter{
chainID: s.RpcClient.L2ChainID,
minTxGasLimit: 21000,
}

splitted := splitter.split(rpc.PoolContent{
common.BytesToAddress(testutils.RandomBytes(32)): {
Expand All @@ -24,7 +29,10 @@ func (s *ProposerTestSuite) TestPoolContentSplit() {
s.Empty(splitted)

// Gas limit is larger than the limit.
splitter = &poolContentSplitter{minTxGasLimit: 21000}
splitter = &poolContentSplitter{
chainID: s.RpcClient.L2ChainID,
minTxGasLimit: 21000,
}

splitted = splitter.split(rpc.PoolContent{
common.BytesToAddress(testutils.RandomBytes(32)): {
Expand All @@ -42,6 +50,7 @@ func (s *ProposerTestSuite) TestPoolContentSplit() {
s.NotEmpty(bytes)

splitter = &poolContentSplitter{
chainID: s.RpcClient.L2ChainID,
maxBytesPerTxList: uint64(len(bytes) - 1),
minTxGasLimit: uint64(len(bytes) - 2),
}
Expand All @@ -53,21 +62,27 @@ func (s *ProposerTestSuite) TestPoolContentSplit() {
s.Empty(splitted)

// Transactions that meet the limits
tx := types.NewTx(&types.LegacyTx{Gas: 21001})
goldenTouchPriKey, err := crypto.HexToECDSA(bindings.GoldenTouchPrivKey[2:])
s.Nil(err)

signer := types.LatestSignerForChainID(s.RpcClient.L2ChainID)
tx1 := types.MustSignNewTx(goldenTouchPriKey, signer, &types.LegacyTx{Gas: 21001, Nonce: 1})
tx2 := types.MustSignNewTx(goldenTouchPriKey, signer, &types.LegacyTx{Gas: 21001, Nonce: 2})

bytes, err = rlp.EncodeToBytes(tx)
bytes, err = rlp.EncodeToBytes(tx1)
s.Nil(err)
s.NotEmpty(bytes)

splitter = &poolContentSplitter{
chainID: s.RpcClient.L2ChainID,
minTxGasLimit: 21000,
maxBytesPerTxList: uint64(len(bytes) + 1),
maxBytesPerTxList: uint64(len(bytes) + 1000),
maxTransactionsPerBlock: 1,
blockMaxGasLimit: tx.Gas() + 1,
blockMaxGasLimit: tx1.Gas() + 1000,
}

splitted = splitter.split(rpc.PoolContent{
common.BytesToAddress(testutils.RandomBytes(32)): {"0": tx, "1": tx},
bindings.GoldenTouchAddress: {"1": tx1, "2": tx2},
})

s.Equal(2, len(splitted))
Expand Down
3 changes: 2 additions & 1 deletion proposer/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func InitFromConfig(ctx context.Context, p *Proposer, cfg *Config) (err error) {
log.Info("Protocol configs", "configs", p.protocolConfigs)

p.poolContentSplitter = &poolContentSplitter{
chainID: p.rpc.L2ChainID,
shufflePoolContent: cfg.ShufflePoolContent,
maxTransactionsPerBlock: p.protocolConfigs.MaxTransactionsPerBlock.Uint64(),
blockMaxGasLimit: p.protocolConfigs.BlockMaxGasLimit.Uint64(),
Expand Down Expand Up @@ -165,7 +166,7 @@ func (p *Proposer) ProposeOp(ctx context.Context) error {
return fmt.Errorf("failed to fetch transaction pool content: %w", err)
}

log.Info("Fetching L2 pending transactions finished", "length", pendingContent.ToTxLists().Len())
log.Info("Fetching L2 pending transactions finished", "length", pendingContent.Len())

var commitTxListResQueue []*commitTxListRes
for i, txs := range p.poolContentSplitter.split(pendingContent) {
Expand Down
2 changes: 1 addition & 1 deletion version/verison.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package version

// Version info.
var (
Version = "0.2.2"
Version = "0.2.3"
Meta = "dev"
)

Expand Down

0 comments on commit fe3177e

Please sign in to comment.