Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: eigenda client confirmation depth #821

Merged
merged 12 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions api/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clients

import (
"fmt"
"log"
"time"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
Expand All @@ -23,6 +24,19 @@ type EigenDAClientConfig struct {
// The amount of time to wait between status queries of a newly dispersed blob
StatusQueryRetryInterval time.Duration

// The Ethereum RPC URL to use for querying the Ethereum blockchain.
// This is used to make sure that the blob has been confirmed on-chain.
EthRpcUrl string

// The address of the EigenDAServiceManager contract, used to make sure that the blob has been confirmed on-chain.
SvcManagerAddr string

// The number of Ethereum blocks to wait after the blob's batch has been included onchain, before returning from PutBlob calls.
// In most cases only makes sense if < 64 blocks (2 epochs). Otherwise, consider using WaitForFinalization instead.
//
// When WaitForFinalization is true, this field is ignored.
WaitForConfirmationDepth uint64

// If true, will wait for the blob to finalize, if false, will wait only for the blob to confirm.
WaitForFinalization bool

Expand Down Expand Up @@ -51,6 +65,22 @@ type EigenDAClientConfig struct {
}

func (c *EigenDAClientConfig) CheckAndSetDefaults() error {
if c.WaitForFinalization {
if c.WaitForConfirmationDepth != 0 {
samlaf marked this conversation as resolved.
Show resolved Hide resolved
log.Println("Warning: WaitForFinalization is set to true, WaitForConfirmationDepth will be ignored")
}
} else {
if c.WaitForConfirmationDepth > 64 {
log.Printf("Warning: WaitForConfirmationDepth is set to %v > 64 (2 epochs == finality). Consider setting WaitForFinalization to true instead.\n", c.WaitForConfirmationDepth)
}
}
if c.SvcManagerAddr == "" {
return fmt.Errorf("EigenDAClientConfig.SvcManagerAddr not set. Needed to verify blob confirmed on-chain.")
}
if c.EthRpcUrl == "" {
return fmt.Errorf("EigenDAClientConfig.EthRpcUrl not set. Needed to verify blob confirmed on-chain.")
}

if c.StatusQueryRetryInterval == 0 {
c.StatusQueryRetryInterval = 5 * time.Second
}
Expand Down
140 changes: 140 additions & 0 deletions api/clients/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package clients

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)

// Claude generated tests... don't blame the copy paster.
samlaf marked this conversation as resolved.
Show resolved Hide resolved
func TestEigenDAClientConfig_CheckAndSetDefaults(t *testing.T) {
// Helper function to create a valid base config
newValidConfig := func() *EigenDAClientConfig {
return &EigenDAClientConfig{
RPC: "http://localhost:8080",
EthRpcUrl: "http://localhost:8545",
SvcManagerAddr: "0x1234567890123456789012345678901234567890",
}
}

t.Run("Valid minimal configuration", func(t *testing.T) {
config := newValidConfig()
err := config.CheckAndSetDefaults()
require.NoError(t, err)

// Check default values are set
assert.Equal(t, 5*time.Second, config.StatusQueryRetryInterval)
assert.Equal(t, 25*time.Minute, config.StatusQueryTimeout)
assert.Equal(t, 30*time.Second, config.ResponseTimeout)
})

t.Run("Missing required fields", func(t *testing.T) {
testCases := []struct {
name string
modifyConf func(*EigenDAClientConfig)
expectedErr string
}{
{
name: "Missing RPC",
modifyConf: func(c *EigenDAClientConfig) {
c.RPC = ""
},
expectedErr: "EigenDAClientConfig.RPC not set",
},
{
name: "Missing EthRpcUrl",
modifyConf: func(c *EigenDAClientConfig) {
c.EthRpcUrl = ""
},
expectedErr: "EigenDAClientConfig.EthRpcUrl not set. Needed to verify blob confirmed on-chain.",
},
{
name: "Missing SvcManagerAddr",
modifyConf: func(c *EigenDAClientConfig) {
c.SvcManagerAddr = ""
},
expectedErr: "EigenDAClientConfig.SvcManagerAddr not set. Needed to verify blob confirmed on-chain.",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
config := newValidConfig()
tc.modifyConf(config)
err := config.CheckAndSetDefaults()
assert.EqualError(t, err, tc.expectedErr)
})
}
})

t.Run("SignerPrivateKeyHex validation", func(t *testing.T) {
testCases := []struct {
name string
keyHex string
shouldError bool
}{
{
name: "Empty key (valid for read-only)",
keyHex: "",
shouldError: false,
},
{
name: "Valid length key (64 bytes)",
keyHex: "1234567890123456789012345678901234567890123456789012345678901234",
shouldError: false,
},
{
name: "Invalid length key",
keyHex: "123456",
shouldError: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
config := newValidConfig()
config.SignerPrivateKeyHex = tc.keyHex
err := config.CheckAndSetDefaults()
if tc.shouldError {
assert.Error(t, err)
assert.Contains(t, err.Error(), "SignerPrivateKeyHex")
} else {
assert.NoError(t, err)
}
})
}
})

t.Run("Custom timeouts", func(t *testing.T) {
config := newValidConfig()
customRetryInterval := 10 * time.Second
customQueryTimeout := 30 * time.Minute
customResponseTimeout := 45 * time.Second

config.StatusQueryRetryInterval = customRetryInterval
config.StatusQueryTimeout = customQueryTimeout
config.ResponseTimeout = customResponseTimeout

err := config.CheckAndSetDefaults()
require.NoError(t, err)

assert.Equal(t, customRetryInterval, config.StatusQueryRetryInterval)
assert.Equal(t, customQueryTimeout, config.StatusQueryTimeout)
assert.Equal(t, customResponseTimeout, config.ResponseTimeout)
})

t.Run("Optional fields", func(t *testing.T) {
config := newValidConfig()
config.CustomQuorumIDs = []uint{2, 3, 4}
config.DisableTLS = true
config.DisablePointVerificationMode = true

err := config.CheckAndSetDefaults()
require.NoError(t, err)

assert.Equal(t, []uint{2, 3, 4}, config.CustomQuorumIDs)
assert.True(t, config.DisableTLS)
assert.True(t, config.DisablePointVerificationMode)
})
}
109 changes: 90 additions & 19 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package clients

import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"fmt"
"math/big"
"net"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
grpcdisperser "github.com/Layr-Labs/eigenda/api/grpc/disperser"
edasm "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/ethereum/go-ethereum/log"
)

// IEigenDAClient is a wrapper around the DisperserClient interface which
Expand All @@ -29,10 +36,12 @@ type IEigenDAClient interface {
type EigenDAClient struct {
// TODO: all of these should be private, to prevent users from using them directly,
// which breaks encapsulation and makes it hard for us to do refactors or changes
Config EigenDAClientConfig
Log log.Logger
Client DisperserClient
Codec codecs.BlobCodec
Config EigenDAClientConfig
Log log.Logger
Client DisperserClient
ethClient *ethclient.Client
edasmCaller *edasm.ContractEigenDAServiceManagerCaller
Codec codecs.BlobCodec
}

var _ IEigenDAClient = &EigenDAClient{}
Expand Down Expand Up @@ -70,6 +79,17 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
return nil, err
}

var ethClient *ethclient.Client
var edasmCaller *edasm.ContractEigenDAServiceManagerCaller
ethClient, err = ethclient.Dial(config.EthRpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to dial ETH RPC node: %w", err)
}
edasmCaller, err = edasm.NewContractEigenDAServiceManagerCaller(common.HexToAddress(config.SvcManagerAddr), ethClient)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR make cert verification mandatory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but I still made the eth-rpc and service-manager address mandatory, because I realized we should probably be checking the confirmed status of the batch onchain when the disperser tells us (defensive programming).

Realized that it didn't make sense to require them only when confirmationDepth > 0, because why also not verify that the batch has landed onchain when confirmationDepth = 0 (meaning its included only in the currentBlock)? See

batchConfirmed, err := m.batchIdConfirmedAtDepth(ctx, batchId, m.Config.WaitForConfirmationDepth)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@epociask is that OK with you or do you see a problem with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked offline. We are fine with this. Added a breaking change comment to the PR description.

if err != nil {
return nil, fmt.Errorf("failed to create EigenDAServiceManagerCaller: %w", err)
}

host, port, err := net.SplitHostPort(config.RPC)
if err != nil {
return nil, fmt.Errorf("failed to parse EigenDA RPC: %w", err)
Expand Down Expand Up @@ -101,15 +121,15 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
}

return &EigenDAClient{
Log: log,
Config: config,
Client: disperserClient,
Codec: codec,
Log: log,
Config: config,
Client: disperserClient,
ethClient: ethClient,
edasmCaller: edasmCaller,
Codec: codec,
}, nil
}

// Deprecated: do not rely on this function. Do not use m.Codec directly either.
// These will eventually be removed and not exposed.
func (m *EigenDAClient) GetCodec() codecs.BlobCodec {
return m.Codec
}
Expand Down Expand Up @@ -143,6 +163,10 @@ func (m *EigenDAClient) GetBlob(ctx context.Context, batchHeaderHash []byte, blo
// PutBlob encodes and writes a blob to EigenDA, waiting for a desired blob status
// to be reached (guarded by WaitForFinalization config param) before returning.
// This function is resilient to transient failures and timeouts.
//
// Upon return the blob is guaranteed to be:
// - finalized onchain (if Config.WaitForFinalization is true), or
// - confirmed at a certain depth (if Config.WaitForFinalization is false, in which case Config.WaitForConfirmationDepth specifies the depth).
func (m *EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperser.BlobInfo, error) {
resultChan, errorChan := m.PutBlobAsync(ctx, data)
select { // no timeout here because we depend on the configured timeout in PutBlobAsync
Expand Down Expand Up @@ -204,7 +228,7 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
defer cancel()

alreadyWaitingForDispersal := false
alreadyWaitingForFinalization := false
alreadyWaitingForConfirmationOrFinality := false
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -235,16 +259,30 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
case grpcdisperser.BlobStatus_CONFIRMED:
if m.Config.WaitForFinalization {
// to prevent log clutter, we only log at info level once
if alreadyWaitingForFinalization {
m.Log.Debug("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID)
if alreadyWaitingForConfirmationOrFinality {
m.Log.Debug("EigenDA blob included onchain, waiting for finalization", "requestID", base64RequestID)
} else {
m.Log.Info("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID)
alreadyWaitingForFinalization = true
m.Log.Info("EigenDA blob included onchain, waiting for finalization", "requestID", base64RequestID)
alreadyWaitingForConfirmationOrFinality = true
}
} else {
m.Log.Info("EigenDA blob confirmed", "requestID", base64RequestID)
resultChan <- statusRes.Info
return
batchId := statusRes.Info.BlobVerificationProof.GetBatchId()
batchConfirmed, err := m.batchIdConfirmedAtDepth(ctx, batchId, m.Config.WaitForConfirmationDepth)
if err != nil {
m.Log.Warn("Error checking if batch ID is confirmed at depth. Will retry...", "requestID", base64RequestID, "err", err)
}
if batchConfirmed {
m.Log.Info("EigenDA blob confirmed", "requestID", base64RequestID, "confirmationDepth", m.Config.WaitForConfirmationDepth)
resultChan <- statusRes.Info
return
}
// to prevent log clutter, we only log at info level once
if alreadyWaitingForConfirmationOrFinality {
m.Log.Debug("EigenDA blob included onchain, waiting for confirmation", "requestID", base64RequestID, "confirmationDepth", m.Config.WaitForConfirmationDepth)
} else {
m.Log.Info("EigenDA blob included onchain, waiting for confirmation", "requestID", base64RequestID, "confirmationDepth", m.Config.WaitForConfirmationDepth)
alreadyWaitingForConfirmationOrFinality = true
}
}
case grpcdisperser.BlobStatus_FINALIZED:
batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash))
Expand All @@ -265,3 +303,36 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
func (c *EigenDAClient) Close() error {
return c.Client.Close()
}

// getConfDeepBlockNumber returns the block number that is `depth` blocks behind the current block number.
func (m EigenDAClient) getConfDeepBlockNumber(ctx context.Context, depth uint64) (*big.Int, error) {
curBlockNumber, err := m.ethClient.BlockNumber(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get latest block number: %w", err)
}
// If curBlock < depth, this will return the genesis block number (0),
// which would cause to accept as confirmed a block that isn't depth deep.
// TODO: there's prob a better way to deal with this, like returning a special error
if curBlockNumber < depth {
return big.NewInt(0), nil
}
return new(big.Int).SetUint64(curBlockNumber - depth), nil
}

// batchIdConfirmedAtDepth checks if a batch ID has been confirmed at a certain depth.
// It returns true if the batch ID has been confirmed at the given depth, and false otherwise,
// or returns an error if any of the network calls fail.
func (m EigenDAClient) batchIdConfirmedAtDepth(ctx context.Context, batchId uint32, depth uint64) (bool, error) {
confDeepBlockNumber, err := m.getConfDeepBlockNumber(ctx, depth)
if err != nil {
return false, fmt.Errorf("failed to get confirmation deep block number: %w", err)
}
onchainBatchMetadataHash, err := m.edasmCaller.BatchIdToBatchMetadataHash(&bind.CallOpts{BlockNumber: confDeepBlockNumber}, batchId)
if err != nil {
return false, fmt.Errorf("failed to get batch metadata hash: %w", err)
}
if bytes.Equal(onchainBatchMetadataHash[:], make([]byte, 32)) {
return false, nil
}
return true, nil
}
Loading
Loading