Skip to content

Commit

Permalink
Wait for internal eth client to reach block num (#1204)
Browse files Browse the repository at this point in the history
Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com>
  • Loading branch information
litt3 authored Feb 4, 2025
1 parent cd790e8 commit a66068e
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 25 deletions.
18 changes: 14 additions & 4 deletions api/clients/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ type PayloadClientConfig struct {
// The timeout duration for contract calls
ContractCallTimeout time.Duration

// BlockNumberPollInterval is how frequently to check latest block number when waiting for the internal eth client
// to advance to a certain block.
//
// If this is configured to be <= 0, then contract calls which require the internal eth client to have reached a
// certain block height will fail if the internal client is behind.
BlockNumberPollInterval time.Duration

// The BlobVersion to use when creating new blobs, or interpreting blob bytes.
//
// BlobVersion needs to point to a version defined in the threshold registry contract.
Expand Down Expand Up @@ -89,10 +96,11 @@ type PayloadDisperserConfig struct {
// NOTE: EthRpcUrl and EigenDACertVerifierAddr do not have defined defaults. These must always be specifically configured.
func getDefaultPayloadClientConfig() *PayloadClientConfig {
return &PayloadClientConfig{
BlobEncodingVersion: codecs.DefaultBlobEncoding,
PayloadPolynomialForm: codecs.PolynomialFormEval,
ContractCallTimeout: 5 * time.Second,
BlobVersion: 0,
BlobEncodingVersion: codecs.DefaultBlobEncoding,
PayloadPolynomialForm: codecs.PolynomialFormEval,
ContractCallTimeout: 5 * time.Second,
BlockNumberPollInterval: 1 * time.Second,
BlobVersion: 0,
}
}

Expand Down Expand Up @@ -120,6 +128,8 @@ func (cc *PayloadClientConfig) checkAndSetDefaults() error {
cc.ContractCallTimeout = defaultConfig.ContractCallTimeout
}

// BlockNumberPollInterval may be 0, so don't do anything

// BlobVersion may be 0, so don't do anything

return nil
Expand Down
10 changes: 6 additions & 4 deletions api/clients/v2/payload_disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ type PayloadDisperser struct {
}

// BuildPayloadDisperser builds a PayloadDisperser from config structs.
func BuildPayloadDisperser(log logging.Logger, payloadDispCfg PayloadDisperserConfig,
func BuildPayloadDisperser(log logging.Logger, payloadDispCfg PayloadDisperserConfig,
dispClientCfg *DisperserClientConfig,
ethCfg *geth.EthClientConfig,
kzgConfig *kzg.KzgConfig, encoderCfg *encoding.Config) (*PayloadDisperser, error) {

// 1 - verify key semantics and create signer
signer, err := auth.NewLocalBlobRequestSigner(payloadDispCfg.SignerPaymentKey)
if err != nil {
return nil, fmt.Errorf("new local blob request signer: %w", err)
}

// 2 - create prover (if applicable)

var kzgProver *prover.Prover
if kzgConfig != nil {
if encoderCfg == nil {
Expand Down Expand Up @@ -77,8 +77,10 @@ func BuildPayloadDisperser(log logging.Logger, payloadDispCfg PayloadDisperserCo
}

certVerifier, err := verification.NewCertVerifier(
*ethClient,
log,
ethClient,
payloadDispCfg.EigenDACertVerifierAddr,
payloadDispCfg.BlockNumberPollInterval,
)

if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion api/clients/v2/relay_payload_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ func BuildRelayPayloadRetriever(
return nil, fmt.Errorf("new eth client: %w", err)
}

certVerifier, err := verification.NewCertVerifier(*ethClient, relayPayloadRetrieverConfig.EigenDACertVerifierAddr)
certVerifier, err := verification.NewCertVerifier(
log,
ethClient,
relayPayloadRetrieverConfig.EigenDACertVerifierAddr,
relayPayloadRetrieverConfig.BlockNumberPollInterval)
if err != nil {
return nil, fmt.Errorf("new cert verifier: %w", err)
}
Expand Down
70 changes: 70 additions & 0 deletions api/clients/v2/test/cert_verifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package test

import (
"context"
"sync"
"testing"
"time"

"github.com/Layr-Labs/eigenda/api/clients/v2/verification"
"github.com/Layr-Labs/eigenda/common"
commonmock "github.com/Layr-Labs/eigenda/common/mock"
testrandom "github.com/Layr-Labs/eigenda/common/testutils/random"
"github.com/stretchr/testify/require"
)

func TestWaitForBlockNumber(t *testing.T) {
mockEthClient := commonmock.MockEthClient{}

logger, err := common.NewLogger(common.DefaultLoggerConfig())
require.NoError(t, err)

pollRate := time.Millisecond * 50

certVerifier, err := verification.NewCertVerifier(
logger,
&mockEthClient,
"",
pollRate)
require.NoError(t, err)
require.NotNil(t, certVerifier)

// number of goroutines to start, each of which will call MaybeWaitForBlockNumber
callCount := 5

for i := uint64(0); i < uint64(callCount); i++ {
// BlockNumber will increment its return value each time it's called, up to callCount-1
mockEthClient.On("BlockNumber").Return(i).Once()
}
// then, all subsequent calls will yield callCount -1
mockEthClient.On("BlockNumber").Return(uint64(callCount - 1))

// give plenty of time on the timeout, to get the necessary number of polls in
timeoutCtx, cancel := context.WithTimeout(context.Background(), pollRate*time.Duration(callCount*2))
defer cancel()

waitGroup := sync.WaitGroup{}

// start these goroutines in random order, so that it isn't always the same sequence of polling handoffs that gets exercised
indices := testrandom.NewTestRandom(t).Perm(callCount)
for _, index := range indices {
waitGroup.Add(1)

go func(i int) {
defer waitGroup.Done()

if i == callCount-1 {
// the last call is set up to fail, by setting the target block to a number that will never be attained
err := certVerifier.MaybeWaitForBlockNumber(timeoutCtx, uint64(i)+1)
require.Error(t, err)
} else {
// all calls except the final call wait for a block number corresponding to their index
err := certVerifier.MaybeWaitForBlockNumber(timeoutCtx, uint64(i))
require.NoError(t, err)
}
}(index)
}

waitGroup.Wait()
mockEthClient.AssertExpectations(t)
}
152 changes: 145 additions & 7 deletions api/clients/v2/verification/cert_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package verification
import (
"context"
"fmt"
"sync/atomic"
"time"

disperser "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/common"
verifierBindings "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDACertVerifier"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
gethcommon "github.com/ethereum/go-ethereum/common"
)
Expand All @@ -30,18 +34,35 @@ type ICertVerifier interface {
//
// The cert verifier contract is located at https://github.com/Layr-Labs/eigenda/blob/master/contracts/src/core/EigenDACertVerifier.sol
type CertVerifier struct {
logger logging.Logger
// go binding around the EigenDACertVerifier ethereum contract
certVerifierCaller *verifierBindings.ContractEigenDACertVerifierCaller
ethClient common.EthClient
pollInterval time.Duration
// storage shared between goroutines, containing the most recent block number observed by calling ethClient.BlockNumber()
latestBlockNumber atomic.Uint64
// atomic bool, so that only a single goroutine is polling the internal client with BlockNumber() calls at any given time
pollingActive atomic.Bool
}

var _ ICertVerifier = &CertVerifier{}

// NewCertVerifier constructs a CertVerifier
func NewCertVerifier(
ethClient geth.EthClient, // the eth client, which should already be set up
certVerifierAddress string, // the hex address of the EigenDACertVerifier contract
logger logging.Logger,
// the eth client, which should already be set up
ethClient common.EthClient,
// the hex address of the EigenDACertVerifier contract
certVerifierAddress string,
// pollInterval is how frequently to check latest block number when waiting for the internal eth client to advance
// to a certain block. This is needed because the RBN in a cert might be further in the future than the internal
// eth client. In such a case, we must wait for the internal client to catch up to the block number
// contained in the cert: otherwise, calls will fail.
//
// If the configured pollInterval duration is <= 0, then the block number check will be skipped, and calls that
// rely on the client having reached a certain block number will fail if the internal client is behind.
pollInterval time.Duration,
) (*CertVerifier, error) {

verifierCaller, err := verifierBindings.NewContractEigenDACertVerifierCaller(
gethcommon.HexToAddress(certVerifierAddress),
ethClient)
Expand All @@ -50,12 +71,28 @@ func NewCertVerifier(
return nil, fmt.Errorf("bind to verifier contract at %s: %w", certVerifierAddress, err)
}

if pollInterval <= time.Duration(0) {
logger.Warn(
`CertVerifier poll interval is <= 0. Therefore, any method calls made with this object that
rely on the internal client having reached a certain block number will fail if
the internal client is too far behind.`,
"pollInterval", pollInterval)
}

return &CertVerifier{
logger: logger,
certVerifierCaller: verifierCaller,
ethClient: ethClient,
pollInterval: pollInterval,
}, nil
}

// VerifyCertV2FromSignedBatch calls the verifyDACertV2FromSignedBatch view function on the EigenDACertVerifier contract
// VerifyCertV2FromSignedBatch calls the verifyDACertV2FromSignedBatch view function on the EigenDACertVerifier contract.
//
// Before verifying the cert, this method will wait for the internal client to advance to a sufficient block height.
// This wait will time out if the duration exceeds the timeout configured for the input ctx parameter. If
// CertVerifier.pollInterval is configured to be <= 0, then this method will *not* wait for the internal client to
// advance, and will instead simply fail verification if the internal client is behind.
//
// This method returns nil if the cert is successfully verified. Otherwise, it returns an error.
func (cv *CertVerifier) VerifyCertV2FromSignedBatch(
Expand All @@ -76,6 +113,11 @@ func (cv *CertVerifier) VerifyCertV2FromSignedBatch(
return fmt.Errorf("convert blob inclusion info: %w", err)
}

err = cv.MaybeWaitForBlockNumber(ctx, signedBatch.GetHeader().GetReferenceBlockNumber())
if err != nil {
return fmt.Errorf("wait for block number: %w", err)
}

err = cv.certVerifierCaller.VerifyDACertV2FromSignedBatch(
&bind.CallOpts{Context: ctx},
*convertedSignedBatch,
Expand All @@ -88,14 +130,24 @@ func (cv *CertVerifier) VerifyCertV2FromSignedBatch(
return nil
}

// VerifyCertV2 calls the VerifyCertV2 view function on the EigenDACertVerifier contract
// VerifyCertV2 calls the VerifyCertV2 view function on the EigenDACertVerifier contract.
//
// Before verifying the cert, this method will wait for the internal client to advance to a sufficient block height.
// This wait will time out if the duration exceeds the timeout configured for the input ctx parameter. If
// CertVerifier.pollInterval is configured to be <= 0, then this method will *not* wait for the internal client to
// advance, and will instead simply fail verification if the internal client is behind.
//
// This method returns nil if the cert is successfully verified. Otherwise, it returns an error.
func (cv *CertVerifier) VerifyCertV2(
ctx context.Context,
eigenDACert *EigenDACert,
) error {
err := cv.certVerifierCaller.VerifyDACertV2(
err := cv.MaybeWaitForBlockNumber(ctx, uint64(eigenDACert.BatchHeader.ReferenceBlockNumber))
if err != nil {
return fmt.Errorf("wait for block number: %w", err)
}

err = cv.certVerifierCaller.VerifyDACertV2(
&bind.CallOpts{Context: ctx},
eigenDACert.BatchHeader,
eigenDACert.BlobInclusionInfo,
Expand All @@ -110,6 +162,12 @@ func (cv *CertVerifier) VerifyCertV2(

// GetNonSignerStakesAndSignature calls the getNonSignerStakesAndSignature view function on the EigenDACertVerifier
// contract, and returns the resulting NonSignerStakesAndSignature object.
//
// Before getting the NonSignerStakesAndSignature, this method will wait for the internal client to advance to a
// sufficient block height. This wait will time out if the duration exceeds the timeout configured for the input ctx
// parameter. If CertVerifier.pollInterval is configured to be <= 0, then this method will *not* wait for the internal
// client to advance, and will instead simply fail to get the NonSignerStakesAndSignature if the internal client is
// behind.
func (cv *CertVerifier) GetNonSignerStakesAndSignature(
ctx context.Context,
signedBatch *disperser.SignedBatch,
Expand All @@ -120,6 +178,11 @@ func (cv *CertVerifier) GetNonSignerStakesAndSignature(
return nil, fmt.Errorf("convert signed batch: %w", err)
}

err = cv.MaybeWaitForBlockNumber(ctx, signedBatch.GetHeader().GetReferenceBlockNumber())
if err != nil {
return nil, fmt.Errorf("wait for block number: %w", err)
}

nonSignerStakesAndSignature, err := cv.certVerifierCaller.GetNonSignerStakesAndSignature(
&bind.CallOpts{Context: ctx},
*signedBatchBinding)
Expand All @@ -130,3 +193,78 @@ func (cv *CertVerifier) GetNonSignerStakesAndSignature(

return &nonSignerStakesAndSignature, nil
}

// MaybeWaitForBlockNumber waits until the internal eth client has advanced to a certain targetBlockNumber, unless
// configured pollInterval is <= 0, in which case this method will NOT wait for the internal client to advance.
//
// This method will check the current block number of the internal client every CertVerifier.pollInterval duration.
// It will return nil if the internal client advances to (or past) the targetBlockNumber. It will return an error
// if the input context times out, or if any error occurs when checking the block number of the internal client.
//
// This method is synchronized in a way that, if called by multiple goroutines, only a single goroutine will actually
// poll the internal eth client for most recent block number. The goroutine responsible for polling at a given time
// updates an atomic integer, so that all goroutines may check the most recent block without duplicating work.
func (cv *CertVerifier) MaybeWaitForBlockNumber(ctx context.Context, targetBlockNumber uint64) error {
if cv.pollInterval <= 0 {
// don't wait for the internal client to advance
return nil
}

if cv.latestBlockNumber.Load() >= targetBlockNumber {
// immediately return if the local client isn't behind the target block number
return nil
}

ticker := time.NewTicker(cv.pollInterval)
defer ticker.Stop()

polling := false
if cv.pollingActive.CompareAndSwap(false, true) {
// no other goroutine is currently polling, so assume responsibility
polling = true
defer cv.pollingActive.Store(false)
}

for {
select {
case <-ctx.Done():
return fmt.Errorf(
"timed out waiting for block number %d (latest block number observed was %d): %w",
targetBlockNumber, cv.latestBlockNumber.Load(), ctx.Err())
case <-ticker.C:
if cv.latestBlockNumber.Load() >= targetBlockNumber {
return nil
}

if cv.pollingActive.CompareAndSwap(false, true) {
// no other goroutine is currently polling, so assume responsibility
polling = true
defer cv.pollingActive.Store(false)
}

if polling {
actualBlockNumber, err := cv.ethClient.BlockNumber(ctx)
if err != nil {
cv.logger.Debug(
"ethClient.BlockNumber returned an error",
"targetBlockNumber", targetBlockNumber,
"latestBlockNumber", cv.latestBlockNumber.Load(),
"error", err)

// tolerate some failures here. if failure continues for too long, it will be caught by the timeout
continue
}

cv.latestBlockNumber.Store(actualBlockNumber)
if actualBlockNumber >= targetBlockNumber {
return nil
}
}

cv.logger.Debug(
"local client is behind the reference block number",
"targetBlockNumber", targetBlockNumber,
"actualBlockNumber", cv.latestBlockNumber.Load())
}
}
}
Loading

0 comments on commit a66068e

Please sign in to comment.