Skip to content

Commit

Permalink
chore(chainsync): add unit test for reorgs
Browse files Browse the repository at this point in the history
  • Loading branch information
ezdac committed May 10, 2024
1 parent 2fd714a commit fdf3dd8
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 13 deletions.
4 changes: 2 additions & 2 deletions rolling-shutter/medley/chainsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var noopLogger = &logger.NoopLogger{}
var ErrServiceNotInstantiated = errors.New("service is not instantiated, pass a handler function option")

type Client struct {
client.EthereumClient
client.SyncEthereumClient
log log.Logger

options *options
Expand Down Expand Up @@ -136,7 +136,7 @@ func (s *Client) BroadcastEonKey(ctx context.Context, eon uint64, eonPubKey []by
// This value is cached, since it is not expected to change.
func (s *Client) ChainID(ctx context.Context) (*big.Int, error) {
if s.chainID == nil {
cid, err := s.EthereumClient.ChainID(ctx)
cid, err := s.SyncEthereumClient.ChainID(ctx)
if err != nil {
return nil, err
}
Expand Down
15 changes: 14 additions & 1 deletion rolling-shutter/medley/chainsync/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

type EthereumClient interface {
type FullEthereumClient interface {
Close()
ChainID(ctx context.Context) (*big.Int, error)
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
Expand Down Expand Up @@ -45,3 +45,16 @@ type EthereumClient interface {
EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error)
SendTransaction(ctx context.Context, tx *types.Transaction) error
}

type SyncEthereumClient interface {
Close()
ChainID(ctx context.Context) (*big.Int, error)
BlockNumber(ctx context.Context) (uint64, error)
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error)
CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error)
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
}
170 changes: 170 additions & 0 deletions rolling-shutter/medley/chainsync/client/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package client

import (
"context"
"errors"
"math/big"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

var ErrNotImplemented = errors.New("not implemented")

var _ SyncEthereumClient = &TestClient{}

type TestClient struct {
headerChain []*types.Header
latestHeadIndex int
intialProgress bool
latestHeadEmitter []chan<- *types.Header
latestHeadSubscription []*Subscription
}

func NewSubscription(idx int) *Subscription {
return &Subscription{
idx: idx,
err: make(chan error, 1),
}
}

type Subscription struct {
idx int
err chan error
}

func (su *Subscription) Unsubscribe() {
// TODO: not implemented yet, but we don't want to panic
}

func (su *Subscription) Err() <-chan error {
return su.err
}

type TestClientController struct {
c *TestClient
}

func NewTestClient() (*TestClient, *TestClientController) {
c := &TestClient{
headerChain: []*types.Header{},
latestHeadIndex: 0,
}
ctrl := &TestClientController{c}
return c, ctrl
}

func (c *TestClientController) ProgressHead() bool {
if c.c.latestHeadIndex >= len(c.c.headerChain)-1 {
return false
}
c.c.latestHeadIndex++
return true
}

func (c *TestClientController) EmitEvents(ctx context.Context) error {
if len(c.c.latestHeadEmitter) == 0 {
return nil
}
h := c.c.getLatestHeader()
for _, em := range c.c.latestHeadEmitter {
select {
case em <- h:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

func (c *TestClientController) AppendNextHeaders(h ...*types.Header) {
c.c.headerChain = append(c.c.headerChain, h...)
}

func (t *TestClient) ChainID(_ context.Context) (*big.Int, error) {
return big.NewInt(42), nil
}

func (t *TestClient) Close() {
// TODO: cleanup
}

func (t *TestClient) getLatestHeader() *types.Header {
if len(t.headerChain) == 0 {
return nil
}
return t.headerChain[t.latestHeadIndex]
}

func (t *TestClient) searchBlock(f func(*types.Header) bool) *types.Header {
for i := t.latestHeadIndex; i >= 0; i-- {
h := t.headerChain[i]
if f(h) {
return h
}
}
return nil
}

func (t *TestClient) searchBlockByNumber(number *big.Int) *types.Header {
return t.searchBlock(
func(h *types.Header) bool {
return h.Number.Cmp(number) == 0
})
}

func (t *TestClient) searchBlockByHash(hash common.Hash) *types.Header {
return t.searchBlock(
func(h *types.Header) bool {
return hash.Cmp(h.Hash()) == 0
})
}

func (t *TestClient) BlockNumber(_ context.Context) (uint64, error) {
return t.getLatestHeader().Nonce.Uint64(), nil
}

func (t *TestClient) HeaderByHash(_ context.Context, hash common.Hash) (*types.Header, error) {
h := t.searchBlockByHash(hash)
if h == nil {
// TODO: what error?
}
return h, nil
}

func (t *TestClient) HeaderByNumber(_ context.Context, number *big.Int) (*types.Header, error) {
if number == nil {
return t.getLatestHeader(), nil
}
h := t.searchBlockByNumber(number)
if h == nil {
// TODO: what error?
}
return h, nil
}

func (t *TestClient) SubscribeNewHead(_ context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
t.latestHeadEmitter = append(t.latestHeadEmitter, ch)
su := NewSubscription(len(t.latestHeadSubscription) - 1)
t.latestHeadSubscription = append(t.latestHeadSubscription, su)
// TODO: unsubscribe and deleting from the array
// TODO: filling error promise in the subscription
return su, nil
}

func (t *TestClient) FilterLogs(_ context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
panic(ErrNotImplemented)
}

func (t *TestClient) SubscribeFilterLogs(_ context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
panic(ErrNotImplemented)
}

func (t *TestClient) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) {
panic(ErrNotImplemented)
}

func (t *TestClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
panic(ErrNotImplemented)
}
10 changes: 5 additions & 5 deletions rolling-shutter/medley/chainsync/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type options struct {
keyperSetManagerAddress *common.Address
keyBroadcastContractAddress *common.Address
clientURL string
ethClient syncclient.EthereumClient
ethClient syncclient.FullEthereumClient
logger log.Logger
runner service.Runner
syncStart *number.BlockNumber
Expand Down Expand Up @@ -122,7 +122,7 @@ func (o *options) applyHandler(c *Client) error {
// of shutter clients background workers.
func (o *options) apply(ctx context.Context, c *Client) error {
var (
client syncclient.EthereumClient
client syncclient.SyncEthereumClient
err error
)
if o.clientURL != "" {
Expand All @@ -132,12 +132,12 @@ func (o *options) apply(ctx context.Context, c *Client) error {
}
}
client = o.ethClient
c.EthereumClient = client
c.SyncEthereumClient = client

// the nil passthrough will use "latest" for each call,
// but we want to harmonize and fix the sync start to a specific block.
if o.syncStart.IsLatest() {
latestBlock, err := c.EthereumClient.BlockNumber(ctx)
latestBlock, err := c.SyncEthereumClient.BlockNumber(ctx)
if err != nil {
return errors.Wrap(err, "polling latest block")
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func WithLogger(l log.Logger) Option {
}
}

func WithClient(client syncclient.EthereumClient) Option {
func WithClient(client syncclient.FullEthereumClient) Option {
return func(o *options) error {
o.ethClient = client
return nil
Expand Down
2 changes: 1 addition & 1 deletion rolling-shutter/medley/chainsync/syncer/eonpubkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
var _ ManualFilterHandler = &EonPubKeySyncer{}

type EonPubKeySyncer struct {
Client client.EthereumClient
Client client.SyncEthereumClient
Log log.Logger
KeyBroadcast *bindings.KeyBroadcastContract
KeyperSetManager *bindings.KeyperSetManager
Expand Down
2 changes: 1 addition & 1 deletion rolling-shutter/medley/chainsync/syncer/keyperset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const channelSize = 10
var _ ManualFilterHandler = &KeyperSetSyncer{}

type KeyperSetSyncer struct {
Client client.EthereumClient
Client client.FullEthereumClient
Contract *bindings.KeyperSetManager
Log log.Logger
Handler event.KeyperSetHandler
Expand Down
2 changes: 1 addition & 1 deletion rolling-shutter/medley/chainsync/syncer/shutterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
var _ ManualFilterHandler = &ShutterStateSyncer{}

type ShutterStateSyncer struct {
Client client.EthereumClient
Client client.SyncEthereumClient
Contract *bindings.KeyperSetManager
Log log.Logger
Handler event.ShutterStateHandler
Expand Down
2 changes: 1 addition & 1 deletion rolling-shutter/medley/chainsync/syncer/unsafehead.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type UnsafeHeadSyncer struct {
Client client.EthereumClient
Client client.SyncEthereumClient
Log log.Logger
Handler event.BlockHandler
// Handler to be manually triggered
Expand Down
Loading

0 comments on commit fdf3dd8

Please sign in to comment.