Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Trusted Auctioneer: Code cleanups #24

Open
wants to merge 6 commits into
base: bharath/api-to-query-optimistic-block
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1782,9 +1782,12 @@ func (pool *LegacyPool) truncateQueue() {
// it assumes that the pool lock is being held
func (pool *LegacyPool) clearPendingAndQueued(newHead *types.Header) {
// Iterate over all accounts and demote any non-executable transactions
addrsForWhichTxsRemoved := map[common.Address]bool{}

for addr, list := range pool.pending {
dropped, invalids := list.ClearList()
pendingGauge.Dec(int64(len(dropped) + len(invalids)))

pendingGauge.Dec(int64(dropped.Len() + invalids.Len()))

for _, tx := range dropped {
pool.all.Remove(tx.Hash())
Expand All @@ -1796,12 +1799,14 @@ func (pool *LegacyPool) clearPendingAndQueued(newHead *types.Header) {
if list.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)

addrsForWhichTxsRemoved[addr] = true
}
}

for addr, list := range pool.queue {
dropped, invalids := list.ClearList()
queuedGauge.Dec(int64(len(dropped) + len(invalids)))
queuedGauge.Dec(int64(dropped.Len() + invalids.Len()))

for _, tx := range dropped {
pool.all.Remove(tx.Hash())
Expand All @@ -1811,12 +1816,15 @@ func (pool *LegacyPool) clearPendingAndQueued(newHead *types.Header) {
}

if list.Empty() {
if _, ok := pool.queue[addr]; !ok {
pool.reserve(addr, false)
}
delete(pool.queue, addr)

addrsForWhichTxsRemoved[addr] = true
}
}

for addr := range addrsForWhichTxsRemoved {
pool.reserve(addr, false)
}
}

// demoteUnexecutables removes invalid and processed transactions from the pools
Expand Down
20 changes: 7 additions & 13 deletions core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,21 +689,15 @@ func TestDropping(t *testing.T) {
tx11 = transaction(11, 200, key)
tx12 = transaction(12, 300, key)
)
pool.all.Add(tx0, false)
pool.priced.Put(tx0, false)
pool.promoteTx(account, tx0.Hash(), tx0)

pool.all.Add(tx1, false)
pool.priced.Put(tx1, false)
pool.promoteTx(account, tx1.Hash(), tx1)
pool.add(tx0, false)
pool.add(tx1, false)
pool.add(tx2, false)
pool.add(tx10, false)
pool.add(tx11, false)
pool.add(tx12, false)

pool.all.Add(tx2, false)
pool.priced.Put(tx2, false)
pool.promoteTx(account, tx2.Hash(), tx2)

pool.enqueueTx(tx10.Hash(), tx10, false, true)
pool.enqueueTx(tx11.Hash(), tx11, false, true)
pool.enqueueTx(tx12.Hash(), tx12, false, true)
pool.promoteExecutables([]common.Address{account})

// Check that pre and post validations leave the pool as is
if pool.pending[account].Len() != 3 {
Expand Down
18 changes: 9 additions & 9 deletions grpc/execution/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *ExecutionServiceServerV1) GetBlock(ctx context.Context, req *astriaPb.G
res, err := s.getBlockFromIdentifier(req.GetIdentifier())
if err != nil {
log.Error("failed finding block", err)
return nil, err
return nil, shared.WrapError(err, "failed finding block")
}

log.Debug("GetBlock completed", "request", req, "response", res)
Expand All @@ -125,7 +125,7 @@ func (s *ExecutionServiceServerV1) BatchGetBlocks(ctx context.Context, req *astr
block, err := s.getBlockFromIdentifier(id)
if err != nil {
log.Error("failed finding block with id", id, "error", err)
return nil, err
return nil, shared.WrapError(err, fmt.Sprintf("failed finding block with id %s", id.String()))
}

blocks = append(blocks, block)
Expand Down Expand Up @@ -190,20 +190,20 @@ func (s *ExecutionServiceServerV1) ExecuteBlock(ctx context.Context, req *astria
payload, err := s.Eth().Miner().BuildPayload(payloadAttributes)
if err != nil {
log.Error("failed to build payload", "err", err)
return nil, status.Errorf(codes.InvalidArgument, "Could not build block with provided txs: %v", err)
return nil, status.Errorf(codes.InvalidArgument, shared.WrapError(err, "Could not build block with provided txs").Error())
}

// call blockchain.InsertChain to actually execute and write the blocks to
// state
block, err := engine.ExecutableDataToBlock(*payload.Resolve().ExecutionPayload, nil, nil)
if err != nil {
log.Error("failed to convert executable data to block", err)
return nil, status.Error(codes.Internal, "failed to execute block")
return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to convert executable data to block").Error())
}
err = s.Bc().InsertBlockWithoutSetHead(block)
if err != nil {
log.Error("failed to insert block to chain", "hash", block.Hash(), "prevHash", req.PrevBlockHash, "err", err)
return nil, status.Error(codes.Internal, "failed to insert block to chain")
return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to insert block to chain").Error())
}

// remove txs from original mempool
Expand Down Expand Up @@ -244,12 +244,12 @@ func (s *ExecutionServiceServerV1) GetCommitmentState(ctx context.Context, req *
softBlock, err := ethHeaderToExecutionBlock(s.Bc().CurrentSafeBlock())
if err != nil {
log.Error("error finding safe block", err)
return nil, status.Error(codes.Internal, "could not locate soft block")
return nil, status.Error(codes.Internal, shared.WrapError(err, "could not locate soft block").Error())
}
firmBlock, err := ethHeaderToExecutionBlock(s.Bc().CurrentFinalBlock())
if err != nil {
log.Error("error finding final block", err)
return nil, status.Error(codes.Internal, "could not locate firm block")
return nil, status.Error(codes.Internal, shared.WrapError(err, "could not locate firm block").Error())
}

celestiaBlock := s.Bc().CurrentBaseCelestiaHeight()
Expand Down Expand Up @@ -312,7 +312,7 @@ func (s *ExecutionServiceServerV1) UpdateCommitmentState(ctx context.Context, re
if currentHead != softEthHash {
if _, err := s.Bc().SetCanonical(softBlock); err != nil {
log.Error("failed updating canonical chain to soft block", err)
return nil, status.Error(codes.Internal, "Could not update head to safe hash")
return nil, status.Error(codes.Internal, shared.WrapError(err, "Could not update head to safe hash").Error())
}
}

Expand Down Expand Up @@ -368,7 +368,7 @@ func (s *ExecutionServiceServerV1) getBlockFromIdentifier(identifier *astriaPb.B
res, err := ethHeaderToExecutionBlock(header)
if err != nil {
// This should never happen since we validate header exists above.
return nil, status.Error(codes.Internal, "internal error")
return nil, status.Error(codes.Internal, shared.WrapError(err, "internal error").Error())
}

return res, nil
Expand Down
43 changes: 26 additions & 17 deletions grpc/optimistic/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
astriaPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/execution/v1"
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
cmath "github.com/ethereum/go-ethereum/common/math"
Expand Down Expand Up @@ -77,7 +76,7 @@ func (o *OptimisticServiceV1Alpha1) GetBundleStream(_ *optimsticPb.GetBundleStre
marshalledTxs := [][]byte{}
marshalledTx, err := pendingTx.MarshalBinary()
if err != nil {
return status.Errorf(codes.Internal, "error marshalling tx: %v", err)
return status.Errorf(codes.Internal, shared.WrapError(err, "error marshalling tx").Error())
}
marshalledTxs = append(marshalledTxs, marshalledTx)

Expand All @@ -88,15 +87,21 @@ func (o *OptimisticServiceV1Alpha1) GetBundleStream(_ *optimsticPb.GetBundleStre

err = stream.Send(&optimsticPb.GetBundleStreamResponse{Bundle: &bundle})
if err != nil {
return status.Errorf(codes.Internal, "error sending bundle over stream: %v", err)
log.Error("error sending bundle over stream", "err", err)
return status.Error(codes.Internal, shared.WrapError(err, "error sending bundle over stream").Error())
}
}

case err := <-pendingTxEvent.Err():
return status.Errorf(codes.Internal, "error waiting for pending transactions: %v", err)
if err != nil {
log.Error("error waiting for pending transactions", "err", err)
return status.Error(codes.Internal, shared.WrapError(err, "error waiting for pending transactions").Error())
} else {
// TODO - what is the right error code here?
return status.Error(codes.Internal, "tx pool subscription closed")
}

case <-stream.Context().Done():
log.Debug("GetBundleStream stream closed with error", "err", stream.Context().Err())
return stream.Context().Err()
}
}
Expand Down Expand Up @@ -124,7 +129,7 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlockStream(stream optimist
// execute the optimistic block and wait for the mempool clearing event
optimisticBlock, err := o.ExecuteOptimisticBlock(stream.Context(), baseBlock)
if err != nil {
return status.Errorf(codes.Internal, "failed to execute optimistic block: %v", err)
return status.Errorf(codes.Internal, shared.WrapError(err, "failed to execute optimistic block").Error())
}
optimisticBlockHash := common.BytesToHash(optimisticBlock.Hash)

Expand All @@ -143,11 +148,15 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlockStream(stream optimist
log.Error("timed out waiting for mempool to clear after optimistic block execution")
return status.Error(codes.DeadlineExceeded, "timed out waiting for mempool to clear after optimistic block execution")
case err := <-mempoolClearingEvent.Err():
log.Error("error waiting for mempool clearing event", "err", err)
return status.Errorf(codes.Internal, "error waiting for mempool clearing event: %v", err)
case err := <-stream.Context().Done():
log.Error("ExecuteOptimisticBlockStream stream closed with error", "err", err)
return status.Errorf(codes.Internal, "stream closed with error: %v", err)
if err != nil {
log.Error("error waiting for mempool clearing event", "err", err)
return status.Errorf(codes.Internal, shared.WrapError(err, "error waiting for mempool clearing event").Error())
} else {
// TODO - what is the right error code here?
return status.Error(codes.Internal, "mempool clearance subscription closed")
}
case <-stream.Context().Done():
return stream.Context().Err()
}
}
}
Expand All @@ -159,7 +168,7 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlock(ctx context.Context,

if err := validateStaticExecuteOptimisticBlockRequest(req); err != nil {
log.Error("ExecuteOptimisticBlock called with invalid BaseBlock", "err", err)
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("BaseBlock is invalid: %s", err.Error()))
return nil, status.Error(codes.InvalidArgument, shared.WrapError(err, "invalid BaseBlock").Error())
}

if !o.SyncMethodsCalled() {
Expand All @@ -172,9 +181,7 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlock(ctx context.Context,

softBlock := o.Bc().CurrentSafeBlock()

o.BlockExecutionLock().Lock()
nextFeeRecipient := o.NextFeeRecipient()
o.BlockExecutionLock().Unlock()

// the height that this block will be at
height := o.Bc().CurrentBlock().Number.Uint64() + 1
Expand All @@ -195,25 +202,27 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlock(ctx context.Context,
payload, err := o.Eth().Miner().BuildPayload(payloadAttributes)
if err != nil {
log.Error("failed to build payload", "err", err)
return nil, status.Errorf(codes.InvalidArgument, "Could not build block with provided txs: %v", err)
return nil, status.Errorf(codes.InvalidArgument, shared.WrapError(err, "failed to build payload").Error())
}

block, err := engine.ExecutableDataToBlock(*payload.Resolve().ExecutionPayload, nil, nil)
if err != nil {
log.Error("failed to convert executable data to block", err)
return nil, status.Error(codes.Internal, "failed to execute block")
return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to convert executable data to block").Error())
}

// this will insert the optimistic block into the chain and persist it's state without
// setting it as the HEAD.
err = o.Bc().InsertBlockWithoutSetHead(block)
if err != nil {
log.Error("failed to insert block to chain", "hash", block.Hash(), "prevHash", block.ParentHash(), "err", err)
return nil, status.Error(codes.Internal, "failed to insert block to chain")
return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to insert block to chain").Error())
}

// we store a pointer to the optimistic block in the chain so that we can use it
// to retrieve the state of the optimistic block
// this method also sends an event which indicates that a new optimistic block has been set
// the mempool clearing logic is triggered when this event is received
o.Bc().SetOptimistic(block)

res := &astriaPb.Block{
Expand Down
2 changes: 1 addition & 1 deletion grpc/optimistic/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func TestNewOptimisticServiceServerV1Alpha_StreamBundles(t *testing.T) {

select {
case err := <-errorCh:
require.ErrorContains(t, err, "error waiting for pending transactions")
require.ErrorContains(t, err, "tx pool subscription closed")
}

require.Len(t, mockServerSideStreaming.sentResponses, 5, "Number of responses should match the number of requests")
Expand Down
9 changes: 4 additions & 5 deletions grpc/shared/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ type SharedServiceContainer struct {
// auctioneer address is a bech32m address
auctioneerAddress atomic.Pointer[string]

// TODO: bharath - we could make this an atomic pointer???
nextFeeRecipient common.Address // Fee recipient for the next block
nextFeeRecipient atomic.Pointer[common.Address] // Fee recipient for the next block
}

func NewSharedServiceContainer(eth *eth.Ethereum) (*SharedServiceContainer, error) {
Expand Down Expand Up @@ -123,10 +122,10 @@ func NewSharedServiceContainer(eth *eth.Ethereum) (*SharedServiceContainer, erro
bc: bc,
bridgeAddresses: bridgeAddresses,
bridgeAllowedAssets: bridgeAllowedAssets,
nextFeeRecipient: nextFeeRecipient,
}

sharedServiceContainer.SetAuctioneerAddress(auctioneerAddress)
sharedServiceContainer.SetNextFeeRecipient(nextFeeRecipient)

return sharedServiceContainer, nil
}
Expand Down Expand Up @@ -168,12 +167,12 @@ func (s *SharedServiceContainer) BlockExecutionLock() *sync.Mutex {
}

func (s *SharedServiceContainer) NextFeeRecipient() common.Address {
return s.nextFeeRecipient
return *s.nextFeeRecipient.Load()
}

// assumes that the block execution lock is being held
func (s *SharedServiceContainer) SetNextFeeRecipient(nextFeeRecipient common.Address) {
s.nextFeeRecipient = nextFeeRecipient
s.nextFeeRecipient.Store(&nextFeeRecipient)
}

func (s *SharedServiceContainer) BridgeAddresses() map[string]*params.AstriaBridgeAddressConfig {
Expand Down
19 changes: 13 additions & 6 deletions grpc/shared/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ import (
"bytes"
"crypto/ed25519"
"crypto/sha256"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/contracts"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"math/big"
)

func WrapError(err error, msg string) error {
return fmt.Errorf("%s: %w", msg, err)
}

func protoU128ToBigInt(u128 *primitivev1.Uint128) *big.Int {
lo := big.NewInt(0).SetUint64(u128.Lo)
hi := big.NewInt(0).SetUint64(u128.Hi)
Expand Down Expand Up @@ -114,6 +118,7 @@ func validateAndUnmarshallSequenceAction(tx *sequencerblockv1.RollupData) (*type
}

func unmarshallAllocationTxs(allocation *bundlev1alpha1.Allocation, prevBlockHash []byte, auctioneerBech32Address string, addressPrefix string) (types.Transactions, error) {
log.Info("Found a potential allocation in the rollup data. Checking if it is valid.")
processedTxs := types.Transactions{}
payload := allocation.GetPayload()

Expand All @@ -124,28 +129,29 @@ func unmarshallAllocationTxs(allocation *bundlev1alpha1.Allocation, prevBlockHas
publicKey := ed25519.PublicKey(allocation.GetPublicKey())
bech32Address, err := EncodeFromPublicKey(addressPrefix, publicKey)
if err != nil {
return nil, errors.Wrapf(err, "failed to encode public key to bech32m address: %s", publicKey)
return nil, WrapError(err, fmt.Sprintf("failed to encode public key to bech32m address: %s", publicKey))
}
if auctioneerBech32Address != bech32Address {
return nil, errors.Errorf("address in allocation does not match auctioneer address. expected: %s, got: %s", auctioneerBech32Address, bech32Address)
return nil, fmt.Errorf("address in allocation does not match auctioneer address. expected: %s, got: %s", auctioneerBech32Address, bech32Address)
}

message, err := proto.Marshal(allocation.GetPayload())
if err != nil {
return nil, errors.Wrap(err, "failed to marshal allocation")
return nil, WrapError(err, "failed to marshal allocation to verify signature")
}

signature := allocation.GetSignature()
if !ed25519.Verify(publicKey, message, signature) {
return nil, errors.New("failed to verify signature")
return nil, fmt.Errorf("signature in allocation does not match the public key")
}

log.Info("Allocation is valid. Unmarshalling the transactions in the bundle.")
// unmarshall the transactions in the bundle
for _, allocationTx := range payload.GetTransactions() {
ethtx := new(types.Transaction)
err := ethtx.UnmarshalBinary(allocationTx)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshall allocation transaction")
return nil, WrapError(err, "failed to unmarshall allocation transaction")
}
processedTxs = append(processedTxs, ethtx)
}
Expand All @@ -158,6 +164,7 @@ func unmarshallAllocationTxs(allocation *bundlev1alpha1.Allocation, prevBlockHas
// TODO - this function has become too big. we should start breaking it down
func UnbundleRollupDataTransactions(txs []*sequencerblockv1.RollupData, height uint64, bridgeAddresses map[string]*params.AstriaBridgeAddressConfig,
bridgeAllowedAssets map[string]struct{}, prevBlockHash []byte, auctioneerBech32Address string, addressPrefix string) types.Transactions {

processedTxs := types.Transactions{}
allocationTxs := types.Transactions{}
// we just return the allocation here and do not unmarshall the transactions in the bundle if we find it
Expand Down
Loading