Skip to content

Commit

Permalink
simplify and hot reaload validator cfg
Browse files Browse the repository at this point in the history
  • Loading branch information
Ferret-san committed Feb 4, 2025
1 parent 7baf95c commit 49f842c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 85 deletions.
4 changes: 1 addition & 3 deletions cmd/celestiadaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ func startup() error {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// todo: add comething to zero out Validator config
serverConfig.CelestiaDa.ValidatorConfig = nil
celestiaDA, err := das.NewCelestiaDA(&serverConfig.CelestiaDa, nil)
celestiaDA, err := das.NewCelestiaDA(&serverConfig.CelestiaDa)
var celestiaReader das.CelestiaReader
var celestiaWriter das.CelestiaWriter
var rpcServer *http.Server
Expand Down
125 changes: 43 additions & 82 deletions das/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"math/big"
"regexp"
"strings"
"sync"
"time"
Expand All @@ -31,24 +30,23 @@ import (
)

type DAConfig struct {
Enable bool `koanf:"enable"`
GasPrice float64 `koanf:"gas-price" reload:"hot"`
GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"`
Rpc string `koanf:"rpc" reload:"hot"`
ReadRpc string `koanf:"read-rpc" reload:"hot"`
NamespaceId string `koanf:"namespace-id" `
AuthToken string `koanf:"auth-token" reload:"hot"`
ReadAuthToken string `koanf:"read-auth-token" reload:"hot"`
KeyName string `koanf:"keyname" reload:"hot"`
NoopWriter bool `koanf:"noop-writer" reload:"hot"`
ValidatorConfig *ValidatorConfig `koanf:"validator-config"`
ReorgOnReadFailure bool `koanf:"dangerous-reorg-on-read-failure"`
CacheCleanupTime time.Duration `koanf:"cache-time"`
Enable bool `koanf:"enable"`
GasPrice float64 `koanf:"gas-price" reload:"hot"`
GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"`
Rpc string `koanf:"rpc" reload:"hot"`
ReadRpc string `koanf:"read-rpc" reload:"hot"`
NamespaceId string `koanf:"namespace-id" `
AuthToken string `koanf:"auth-token" reload:"hot"`
ReadAuthToken string `koanf:"read-auth-token" reload:"hot"`
NoopWriter bool `koanf:"noop-writer" reload:"hot"`
ValidatorConfig ValidatorConfig `koanf:"validator-config" reload:"hot"`
ReorgOnReadFailure bool `koanf:"dangerous-reorg-on-read-failure"`
CacheCleanupTime time.Duration `koanf:"cache-time"`
}

type ValidatorConfig struct {
EthClient string `koanf:"eth-rpc" reload:"hot"`
BlobstreamAddr string `koanf:"blobstream"`
BlobstreamAddr string `koanf:"blobstream" reload:"hot"`
}

var (
Expand Down Expand Up @@ -93,17 +91,10 @@ type CelestiaDA struct {
ReadClient *node.Client

Namespace *libshare.Namespace
Prover *CelestiaProver
KeyName string

messageCache sync.Map
}

type CelestiaProver struct {
EthClient *ethclient.Client
BlobstreamX *blobstreamx.BlobstreamX
}

func CelestiaDAConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", false, "Enable Celestia DA")
f.Float64(prefix+".gas-price", 0.01, "Gas for retrying Celestia transactions")
Expand All @@ -113,15 +104,14 @@ func CelestiaDAConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".namespace-id", "", "Celestia Namespace to post data to")
f.String(prefix+".auth-token", "", "Auth token for Celestia Node")
f.String(prefix+".read-auth-token", "", "Auth token for Celestia Node")
f.String(prefix+".keyname", "my_cel_key", "Keyring keyname for Celestia Node for blobs submission")
f.Bool(prefix+".noop-writer", false, "Noop writer (disable posting to celestia)")
f.String(prefix+".validator-config"+".eth-rpc", "", "Parent chain connection, only used for validation")
f.String(prefix+".validator-config"+".blobstream", "", "Blobstream address, only used for validation")
f.Bool(prefix+".dangerous-reorg-on-read-failure", false, "DANGEROUS: reorg if any error during reads from celestia node")
f.Duration(prefix+".cache-time", time.Hour/2, "how often to clean the in memory cache")
}

func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, error) {
func NewCelestiaDA(cfg *DAConfig) (*CelestiaDA, error) {
if cfg == nil {
return nil, errors.New("celestia cfg cannot be blank")
}
Expand Down Expand Up @@ -153,48 +143,6 @@ func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, err
return nil, err
}

if cfg.KeyName == "" {
return nil, errors.New("keyring keyname cannot be blank")
}
if !isValidKeyName(cfg.KeyName) {
return nil, fmt.Errorf("invalid keyring keyname format: %s", cfg.KeyName)
}

if cfg.ValidatorConfig != nil {

var ethRpc *ethclient.Client
if ethClient != nil {
ethRpc = ethClient
} else if len(cfg.ValidatorConfig.EthClient) > 0 {
ethRpc, err = ethclient.Dial(cfg.ValidatorConfig.EthClient)
if err != nil {
return nil, err
}
}

blobstreamx, err := blobstreamx.NewBlobstreamX(common.HexToAddress(cfg.ValidatorConfig.BlobstreamAddr), ethClient)
if err != nil {
return nil, err
}

da := &CelestiaDA{
Cfg: cfg,
Client: daClient,
ReadClient: readClient,
Namespace: &namespace,
KeyName: cfg.KeyName,
Prover: &CelestiaProver{
EthClient: ethRpc,
BlobstreamX: blobstreamx,
},
}

da.StartCacheCleanup(cfg.CacheCleanupTime)

return da, nil

}

da := &CelestiaDA{
Cfg: cfg,
Client: daClient,
Expand All @@ -208,7 +156,6 @@ func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, err
}

func (c *CelestiaDA) Stop() error {
c.Prover.EthClient.Close()
c.Client.Close()
return nil
}
Expand Down Expand Up @@ -523,17 +470,31 @@ BlobLoop:
}

func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
if c.Prover == nil {
if c.Cfg.ValidatorConfig.EthClient == "" || c.Cfg.ValidatorConfig.BlobstreamAddr == "" {
celestiaValidationFailureCounter.Inc(1)
return nil, fmt.Errorf("no celestia prover config found")
return nil, fmt.Errorf("no celestia prover config")
}

ethRpc, err := ethclient.Dial(c.Cfg.ValidatorConfig.EthClient)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Error("Couldn't dial to eth rpc for Blobstream proof", "rpcAddr", c.Cfg.ValidatorConfig.EthClient, "err", err)
return nil, err
}

blobstream, err := blobstreamx.NewBlobstreamX(common.HexToAddress(c.Cfg.ValidatorConfig.BlobstreamAddr), ethRpc)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Error("Couldn't instantiate client for blobstream", "rpcAddr", c.Cfg.ValidatorConfig.EthClient, "blobstreamAddr", common.HexToAddress(c.Cfg.ValidatorConfig.BlobstreamAddr), "err", err)
return nil, err
}

fmt.Printf("Inbox Message: %v\n", msg)
buf := bytes.NewBuffer(msg)
// msgLength := uint32(len(msg) + 1)
blobPointer := BlobPointer{}
blobBytes := buf.Bytes()
err := blobPointer.UnmarshalBinary(blobBytes)
err = blobPointer.UnmarshalBinary(blobBytes)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Error("Couldn't unmarshal Celestia blob pointer", "err", err)
Expand All @@ -548,15 +509,15 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
return nil, err
}

latestBlockNumber, err := c.Prover.EthClient.BlockNumber(context.Background())
latestBlockNumber, err := ethRpc.BlockNumber(context.Background())
if err != nil {
log.Warn("could not fetch latest L1 block", "err", err)
celestiaValidationFailureCounter.Inc(1)
return nil, err
}

// check the latest celestia block on the Blobstream contract
latestCelestiaBlock, err := c.Prover.BlobstreamX.LatestBlock(&bind.CallOpts{
latestCelestiaBlock, err := blobstream.LatestBlock(&bind.CallOpts{
Pending: false,
BlockNumber: big.NewInt(int64(latestBlockNumber)),
Context: ctx,
Expand All @@ -579,7 +540,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {

var event *blobstreamx.BlobstreamXDataCommitmentStored

event, err = c.filter(ctx, latestBlockNumber, blobPointer.BlockHeight, backwards)
event, err = c.filter(ctx, ethRpc, blobstream, latestBlockNumber, blobPointer.BlockHeight, backwards)
if err != nil {
log.Warn("event filtering error", "err", err)
celestiaValidationFailureCounter.Inc(1)
Expand Down Expand Up @@ -610,7 +571,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
NumLeaves: big.NewInt((*dataRootProof).Total),
}

valid, err := c.Prover.BlobstreamX.VerifyAttestation(
valid, err := blobstream.VerifyAttestation(
&bind.CallOpts{},
event.ProofNonce,
tuple,
Expand Down Expand Up @@ -662,14 +623,19 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {

celestiaValidationSuccessCounter.Inc(1)
celestiaValidationLastSuccesfulActionGauge.Update(time.Now().Unix())
ethRpc.Close()
return proofData, nil
}

celestiaValidationFailureCounter.Inc(1)
ethRpc.Close()
return nil, err
}

func (c *CelestiaDA) filter(ctx context.Context, latestBlock uint64, celestiaHeight uint64, backwards bool) (*blobstreamx.BlobstreamXDataCommitmentStored, error) {
// write GetProofWithClient method that establishes a new connection with an eth node and creates the necessary wrapper

func (c *CelestiaDA) filter(ctx context.Context, ethRpc *ethclient.Client,
blobstream *blobstreamx.BlobstreamX, latestBlock uint64, celestiaHeight uint64, backwards bool) (*blobstreamx.BlobstreamXDataCommitmentStored, error) {
// Geth has a default of 5000 block limit for filters
start := uint64(0)
if latestBlock > 5000 {
Expand All @@ -678,7 +644,7 @@ func (c *CelestiaDA) filter(ctx context.Context, latestBlock uint64, celestiaHei
end := latestBlock

for attempt := 0; attempt < 11; attempt++ {
eventsIterator, err := c.Prover.BlobstreamX.FilterDataCommitmentStored(
eventsIterator, err := blobstream.FilterDataCommitmentStored(
&bind.FilterOpts{
Context: ctx,
Start: start,
Expand Down Expand Up @@ -731,7 +697,7 @@ func (c *CelestiaDA) filter(ctx context.Context, latestBlock uint64, celestiaHei
}
} else {
time.Sleep(time.Second * 3600)
latestBlockNumber, err := c.Prover.EthClient.BlockNumber(context.Background())
latestBlockNumber, err := ethRpc.BlockNumber(context.Background())
if err != nil {
return nil, err
}
Expand All @@ -753,8 +719,3 @@ func (c *CelestiaDA) returnErrorHelper(err error) (*ReadResult, error) {

return nil, err
}

// Validate that the KeyName is a alphanumeric string of length > 0
func isValidKeyName(name string) bool {
return len(name) > 0 && regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString(name)
}

0 comments on commit 49f842c

Please sign in to comment.