From a66068efa5e1eb8e9899e56759b383790ef84253 Mon Sep 17 00:00:00 2001 From: litt <102969658+litt3@users.noreply.github.com> Date: Tue, 4 Feb 2025 16:10:32 -0500 Subject: [PATCH] Wait for internal eth client to reach block num (#1204) Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- api/clients/v2/config.go | 18 ++- api/clients/v2/payload_disperser.go | 10 +- api/clients/v2/relay_payload_retriever.go | 6 +- api/clients/v2/test/cert_verifier_test.go | 70 +++++++++ api/clients/v2/verification/cert_verifier.go | 152 ++++++++++++++++++- test/v2/test_client.go | 24 +-- 6 files changed, 255 insertions(+), 25 deletions(-) create mode 100644 api/clients/v2/test/cert_verifier_test.go diff --git a/api/clients/v2/config.go b/api/clients/v2/config.go index 9dae41ed08..589c2ae8e4 100644 --- a/api/clients/v2/config.go +++ b/api/clients/v2/config.go @@ -38,6 +38,13 @@ type PayloadClientConfig struct { // The timeout duration for contract calls ContractCallTimeout time.Duration + // BlockNumberPollInterval is how frequently to check latest block number when waiting for the internal eth client + // to advance to a certain block. + // + // If this is configured to be <= 0, then contract calls which require the internal eth client to have reached a + // certain block height will fail if the internal client is behind. + BlockNumberPollInterval time.Duration + // The BlobVersion to use when creating new blobs, or interpreting blob bytes. // // BlobVersion needs to point to a version defined in the threshold registry contract. @@ -89,10 +96,11 @@ type PayloadDisperserConfig struct { // NOTE: EthRpcUrl and EigenDACertVerifierAddr do not have defined defaults. These must always be specifically configured. func getDefaultPayloadClientConfig() *PayloadClientConfig { return &PayloadClientConfig{ - BlobEncodingVersion: codecs.DefaultBlobEncoding, - PayloadPolynomialForm: codecs.PolynomialFormEval, - ContractCallTimeout: 5 * time.Second, - BlobVersion: 0, + BlobEncodingVersion: codecs.DefaultBlobEncoding, + PayloadPolynomialForm: codecs.PolynomialFormEval, + ContractCallTimeout: 5 * time.Second, + BlockNumberPollInterval: 1 * time.Second, + BlobVersion: 0, } } @@ -120,6 +128,8 @@ func (cc *PayloadClientConfig) checkAndSetDefaults() error { cc.ContractCallTimeout = defaultConfig.ContractCallTimeout } + // BlockNumberPollInterval may be 0, so don't do anything + // BlobVersion may be 0, so don't do anything return nil diff --git a/api/clients/v2/payload_disperser.go b/api/clients/v2/payload_disperser.go index c1c7f4d9fb..387f611b33 100644 --- a/api/clients/v2/payload_disperser.go +++ b/api/clients/v2/payload_disperser.go @@ -30,11 +30,11 @@ type PayloadDisperser struct { } // BuildPayloadDisperser builds a PayloadDisperser from config structs. -func BuildPayloadDisperser(log logging.Logger, payloadDispCfg PayloadDisperserConfig, +func BuildPayloadDisperser(log logging.Logger, payloadDispCfg PayloadDisperserConfig, dispClientCfg *DisperserClientConfig, ethCfg *geth.EthClientConfig, kzgConfig *kzg.KzgConfig, encoderCfg *encoding.Config) (*PayloadDisperser, error) { - + // 1 - verify key semantics and create signer signer, err := auth.NewLocalBlobRequestSigner(payloadDispCfg.SignerPaymentKey) if err != nil { @@ -42,7 +42,7 @@ func BuildPayloadDisperser(log logging.Logger, payloadDispCfg PayloadDisperserCo } // 2 - create prover (if applicable) - + var kzgProver *prover.Prover if kzgConfig != nil { if encoderCfg == nil { @@ -77,8 +77,10 @@ func BuildPayloadDisperser(log logging.Logger, payloadDispCfg PayloadDisperserCo } certVerifier, err := verification.NewCertVerifier( - *ethClient, + log, + ethClient, payloadDispCfg.EigenDACertVerifierAddr, + payloadDispCfg.BlockNumberPollInterval, ) if err != nil { diff --git a/api/clients/v2/relay_payload_retriever.go b/api/clients/v2/relay_payload_retriever.go index ff4f784a86..954749c142 100644 --- a/api/clients/v2/relay_payload_retriever.go +++ b/api/clients/v2/relay_payload_retriever.go @@ -53,7 +53,11 @@ func BuildRelayPayloadRetriever( return nil, fmt.Errorf("new eth client: %w", err) } - certVerifier, err := verification.NewCertVerifier(*ethClient, relayPayloadRetrieverConfig.EigenDACertVerifierAddr) + certVerifier, err := verification.NewCertVerifier( + log, + ethClient, + relayPayloadRetrieverConfig.EigenDACertVerifierAddr, + relayPayloadRetrieverConfig.BlockNumberPollInterval) if err != nil { return nil, fmt.Errorf("new cert verifier: %w", err) } diff --git a/api/clients/v2/test/cert_verifier_test.go b/api/clients/v2/test/cert_verifier_test.go new file mode 100644 index 0000000000..322e529bfb --- /dev/null +++ b/api/clients/v2/test/cert_verifier_test.go @@ -0,0 +1,70 @@ +package test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/api/clients/v2/verification" + "github.com/Layr-Labs/eigenda/common" + commonmock "github.com/Layr-Labs/eigenda/common/mock" + testrandom "github.com/Layr-Labs/eigenda/common/testutils/random" + "github.com/stretchr/testify/require" +) + +func TestWaitForBlockNumber(t *testing.T) { + mockEthClient := commonmock.MockEthClient{} + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + pollRate := time.Millisecond * 50 + + certVerifier, err := verification.NewCertVerifier( + logger, + &mockEthClient, + "", + pollRate) + require.NoError(t, err) + require.NotNil(t, certVerifier) + + // number of goroutines to start, each of which will call MaybeWaitForBlockNumber + callCount := 5 + + for i := uint64(0); i < uint64(callCount); i++ { + // BlockNumber will increment its return value each time it's called, up to callCount-1 + mockEthClient.On("BlockNumber").Return(i).Once() + } + // then, all subsequent calls will yield callCount -1 + mockEthClient.On("BlockNumber").Return(uint64(callCount - 1)) + + // give plenty of time on the timeout, to get the necessary number of polls in + timeoutCtx, cancel := context.WithTimeout(context.Background(), pollRate*time.Duration(callCount*2)) + defer cancel() + + waitGroup := sync.WaitGroup{} + + // start these goroutines in random order, so that it isn't always the same sequence of polling handoffs that gets exercised + indices := testrandom.NewTestRandom(t).Perm(callCount) + for _, index := range indices { + waitGroup.Add(1) + + go func(i int) { + defer waitGroup.Done() + + if i == callCount-1 { + // the last call is set up to fail, by setting the target block to a number that will never be attained + err := certVerifier.MaybeWaitForBlockNumber(timeoutCtx, uint64(i)+1) + require.Error(t, err) + } else { + // all calls except the final call wait for a block number corresponding to their index + err := certVerifier.MaybeWaitForBlockNumber(timeoutCtx, uint64(i)) + require.NoError(t, err) + } + }(index) + } + + waitGroup.Wait() + mockEthClient.AssertExpectations(t) +} diff --git a/api/clients/v2/verification/cert_verifier.go b/api/clients/v2/verification/cert_verifier.go index 845294601e..5beded3d2d 100644 --- a/api/clients/v2/verification/cert_verifier.go +++ b/api/clients/v2/verification/cert_verifier.go @@ -3,9 +3,13 @@ package verification import ( "context" "fmt" + "sync/atomic" + "time" + disperser "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2" - "github.com/Layr-Labs/eigenda/common/geth" + "github.com/Layr-Labs/eigenda/common" verifierBindings "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDACertVerifier" + "github.com/Layr-Labs/eigensdk-go/logging" "github.com/ethereum/go-ethereum/accounts/abi/bind" gethcommon "github.com/ethereum/go-ethereum/common" ) @@ -30,18 +34,35 @@ type ICertVerifier interface { // // The cert verifier contract is located at https://github.com/Layr-Labs/eigenda/blob/master/contracts/src/core/EigenDACertVerifier.sol type CertVerifier struct { + logger logging.Logger // go binding around the EigenDACertVerifier ethereum contract certVerifierCaller *verifierBindings.ContractEigenDACertVerifierCaller + ethClient common.EthClient + pollInterval time.Duration + // storage shared between goroutines, containing the most recent block number observed by calling ethClient.BlockNumber() + latestBlockNumber atomic.Uint64 + // atomic bool, so that only a single goroutine is polling the internal client with BlockNumber() calls at any given time + pollingActive atomic.Bool } var _ ICertVerifier = &CertVerifier{} // NewCertVerifier constructs a CertVerifier func NewCertVerifier( - ethClient geth.EthClient, // the eth client, which should already be set up - certVerifierAddress string, // the hex address of the EigenDACertVerifier contract + logger logging.Logger, + // the eth client, which should already be set up + ethClient common.EthClient, + // the hex address of the EigenDACertVerifier contract + certVerifierAddress string, + // pollInterval is how frequently to check latest block number when waiting for the internal eth client to advance + // to a certain block. This is needed because the RBN in a cert might be further in the future than the internal + // eth client. In such a case, we must wait for the internal client to catch up to the block number + // contained in the cert: otherwise, calls will fail. + // + // If the configured pollInterval duration is <= 0, then the block number check will be skipped, and calls that + // rely on the client having reached a certain block number will fail if the internal client is behind. + pollInterval time.Duration, ) (*CertVerifier, error) { - verifierCaller, err := verifierBindings.NewContractEigenDACertVerifierCaller( gethcommon.HexToAddress(certVerifierAddress), ethClient) @@ -50,12 +71,28 @@ func NewCertVerifier( return nil, fmt.Errorf("bind to verifier contract at %s: %w", certVerifierAddress, err) } + if pollInterval <= time.Duration(0) { + logger.Warn( + `CertVerifier poll interval is <= 0. Therefore, any method calls made with this object that + rely on the internal client having reached a certain block number will fail if + the internal client is too far behind.`, + "pollInterval", pollInterval) + } + return &CertVerifier{ + logger: logger, certVerifierCaller: verifierCaller, + ethClient: ethClient, + pollInterval: pollInterval, }, nil } -// VerifyCertV2FromSignedBatch calls the verifyDACertV2FromSignedBatch view function on the EigenDACertVerifier contract +// VerifyCertV2FromSignedBatch calls the verifyDACertV2FromSignedBatch view function on the EigenDACertVerifier contract. +// +// Before verifying the cert, this method will wait for the internal client to advance to a sufficient block height. +// This wait will time out if the duration exceeds the timeout configured for the input ctx parameter. If +// CertVerifier.pollInterval is configured to be <= 0, then this method will *not* wait for the internal client to +// advance, and will instead simply fail verification if the internal client is behind. // // This method returns nil if the cert is successfully verified. Otherwise, it returns an error. func (cv *CertVerifier) VerifyCertV2FromSignedBatch( @@ -76,6 +113,11 @@ func (cv *CertVerifier) VerifyCertV2FromSignedBatch( return fmt.Errorf("convert blob inclusion info: %w", err) } + err = cv.MaybeWaitForBlockNumber(ctx, signedBatch.GetHeader().GetReferenceBlockNumber()) + if err != nil { + return fmt.Errorf("wait for block number: %w", err) + } + err = cv.certVerifierCaller.VerifyDACertV2FromSignedBatch( &bind.CallOpts{Context: ctx}, *convertedSignedBatch, @@ -88,14 +130,24 @@ func (cv *CertVerifier) VerifyCertV2FromSignedBatch( return nil } -// VerifyCertV2 calls the VerifyCertV2 view function on the EigenDACertVerifier contract +// VerifyCertV2 calls the VerifyCertV2 view function on the EigenDACertVerifier contract. +// +// Before verifying the cert, this method will wait for the internal client to advance to a sufficient block height. +// This wait will time out if the duration exceeds the timeout configured for the input ctx parameter. If +// CertVerifier.pollInterval is configured to be <= 0, then this method will *not* wait for the internal client to +// advance, and will instead simply fail verification if the internal client is behind. // // This method returns nil if the cert is successfully verified. Otherwise, it returns an error. func (cv *CertVerifier) VerifyCertV2( ctx context.Context, eigenDACert *EigenDACert, ) error { - err := cv.certVerifierCaller.VerifyDACertV2( + err := cv.MaybeWaitForBlockNumber(ctx, uint64(eigenDACert.BatchHeader.ReferenceBlockNumber)) + if err != nil { + return fmt.Errorf("wait for block number: %w", err) + } + + err = cv.certVerifierCaller.VerifyDACertV2( &bind.CallOpts{Context: ctx}, eigenDACert.BatchHeader, eigenDACert.BlobInclusionInfo, @@ -110,6 +162,12 @@ func (cv *CertVerifier) VerifyCertV2( // GetNonSignerStakesAndSignature calls the getNonSignerStakesAndSignature view function on the EigenDACertVerifier // contract, and returns the resulting NonSignerStakesAndSignature object. +// +// Before getting the NonSignerStakesAndSignature, this method will wait for the internal client to advance to a +// sufficient block height. This wait will time out if the duration exceeds the timeout configured for the input ctx +// parameter. If CertVerifier.pollInterval is configured to be <= 0, then this method will *not* wait for the internal +// client to advance, and will instead simply fail to get the NonSignerStakesAndSignature if the internal client is +// behind. func (cv *CertVerifier) GetNonSignerStakesAndSignature( ctx context.Context, signedBatch *disperser.SignedBatch, @@ -120,6 +178,11 @@ func (cv *CertVerifier) GetNonSignerStakesAndSignature( return nil, fmt.Errorf("convert signed batch: %w", err) } + err = cv.MaybeWaitForBlockNumber(ctx, signedBatch.GetHeader().GetReferenceBlockNumber()) + if err != nil { + return nil, fmt.Errorf("wait for block number: %w", err) + } + nonSignerStakesAndSignature, err := cv.certVerifierCaller.GetNonSignerStakesAndSignature( &bind.CallOpts{Context: ctx}, *signedBatchBinding) @@ -130,3 +193,78 @@ func (cv *CertVerifier) GetNonSignerStakesAndSignature( return &nonSignerStakesAndSignature, nil } + +// MaybeWaitForBlockNumber waits until the internal eth client has advanced to a certain targetBlockNumber, unless +// configured pollInterval is <= 0, in which case this method will NOT wait for the internal client to advance. +// +// This method will check the current block number of the internal client every CertVerifier.pollInterval duration. +// It will return nil if the internal client advances to (or past) the targetBlockNumber. It will return an error +// if the input context times out, or if any error occurs when checking the block number of the internal client. +// +// This method is synchronized in a way that, if called by multiple goroutines, only a single goroutine will actually +// poll the internal eth client for most recent block number. The goroutine responsible for polling at a given time +// updates an atomic integer, so that all goroutines may check the most recent block without duplicating work. +func (cv *CertVerifier) MaybeWaitForBlockNumber(ctx context.Context, targetBlockNumber uint64) error { + if cv.pollInterval <= 0 { + // don't wait for the internal client to advance + return nil + } + + if cv.latestBlockNumber.Load() >= targetBlockNumber { + // immediately return if the local client isn't behind the target block number + return nil + } + + ticker := time.NewTicker(cv.pollInterval) + defer ticker.Stop() + + polling := false + if cv.pollingActive.CompareAndSwap(false, true) { + // no other goroutine is currently polling, so assume responsibility + polling = true + defer cv.pollingActive.Store(false) + } + + for { + select { + case <-ctx.Done(): + return fmt.Errorf( + "timed out waiting for block number %d (latest block number observed was %d): %w", + targetBlockNumber, cv.latestBlockNumber.Load(), ctx.Err()) + case <-ticker.C: + if cv.latestBlockNumber.Load() >= targetBlockNumber { + return nil + } + + if cv.pollingActive.CompareAndSwap(false, true) { + // no other goroutine is currently polling, so assume responsibility + polling = true + defer cv.pollingActive.Store(false) + } + + if polling { + actualBlockNumber, err := cv.ethClient.BlockNumber(ctx) + if err != nil { + cv.logger.Debug( + "ethClient.BlockNumber returned an error", + "targetBlockNumber", targetBlockNumber, + "latestBlockNumber", cv.latestBlockNumber.Load(), + "error", err) + + // tolerate some failures here. if failure continues for too long, it will be caught by the timeout + continue + } + + cv.latestBlockNumber.Store(actualBlockNumber) + if actualBlockNumber >= targetBlockNumber { + return nil + } + } + + cv.logger.Debug( + "local client is behind the reference block number", + "targetBlockNumber", targetBlockNumber, + "actualBlockNumber", cv.latestBlockNumber.Load()) + } + } +} diff --git a/test/v2/test_client.go b/test/v2/test_client.go index 848927bcb4..f59f62bff2 100644 --- a/test/v2/test_client.go +++ b/test/v2/test_client.go @@ -3,6 +3,12 @@ package v2 import ( "context" "fmt" + "os" + "path" + "strings" + "testing" + "time" + "github.com/Layr-Labs/eigenda/api/clients/v2" "github.com/Layr-Labs/eigenda/api/clients/v2/verification" commonv2 "github.com/Layr-Labs/eigenda/api/grpc/common/v2" @@ -22,11 +28,6 @@ import ( "github.com/docker/go-units" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" - "os" - "path" - "strings" - "testing" - "time" ) const ( @@ -195,7 +196,11 @@ func NewTestClient(t *testing.T, config *TestClientConfig) *TestClient { } gethClient, err := geth.NewClient(gethClientConfig, gethcommon.Address{}, 0, logger) require.NoError(t, err) - certVerifier, err := verification.NewCertVerifier(*gethClient, config.EigenDACertVerifierAddress) + certVerifier, err := verification.NewCertVerifier( + logger, + gethClient, + config.EigenDACertVerifierAddress, + time.Second) require.NoError(t, err) return &TestClient{ @@ -276,7 +281,8 @@ func (c *TestClient) WaitForCertification( totalElapsed.Seconds()) blobCert := reply.BlobInclusionInfo.BlobCertificate - c.VerifyBlobCertification(key, + c.VerifyBlobCertification( + key, expectedQuorums, reply.SignedBatch, reply.BlobInclusionInfo) @@ -352,8 +358,8 @@ func (c *TestClient) VerifyBlobCertification( // TODO This currently does not pass! // On-chain verification - //err = c.CertVerifier.VerifyCertV2FromSignedBatch(context.Background(), signedBatch, inclusionInfo) - //require.NoError(c.t, err) + // err = c.CertVerifier.VerifyCertV2FromSignedBatch(context.Background(), signedBatch, inclusionInfo) + // require.NoError(c.t, err) } // ReadBlobFromRelay reads a blob from the relays and compares it to the given payload.