Skip to content

Commit

Permalink
Merge pull request #5 from gasp-xyz/feature/opInfoService-retries
Browse files Browse the repository at this point in the history
Added retries to both subs used by opinfoserv
  • Loading branch information
iStrike7 authored Dec 30, 2024
2 parents ba93c1e + dfb8460 commit 0e5e1e5
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 2 deletions.
10 changes: 10 additions & 0 deletions chainio/clients/avsregistry/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,13 @@ func (r *ChainReader) QueryExistingRegisteredOperatorSockets(
}
return operatorIdToSocketMap, nil
}

func (r *ChainReader) BlockNumber(
ctx context.Context,
) (uint64, error) {
curBlockNum, err := r.ethClient.BlockNumber(ctx)
if err != nil {
return 0, utils.WrapError("Cannot get current block number", err)
}
return curBlockNum, nil
}
6 changes: 6 additions & 0 deletions internal/fakes/avs_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,9 @@ func (f *FakeAVSRegistryReader) GetCheckSignaturesIndices(
) (opstateretriever.OperatorStateRetrieverCheckSignaturesIndices, error) {
return opstateretriever.OperatorStateRetrieverCheckSignaturesIndices{}, nil
}

func (f *FakeAVSRegistryReader) BlockNumber(
ctx context.Context,
) (uint64, error) {
return 0, nil
}
6 changes: 6 additions & 0 deletions services/avsregistry/avsregistry_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,9 @@ func (f *FakeAvsRegistryService) GetCheckSignaturesIndices(
) (opstateretriever.OperatorStateRetrieverCheckSignaturesIndices, error) {
return opstateretriever.OperatorStateRetrieverCheckSignaturesIndices{}, nil
}

func (f *FakeAvsRegistryService) BlockNumber(
ctx context.Context,
) (uint64, error) {
return 0, nil
}
126 changes: 124 additions & 2 deletions services/operatorsinfo/operatorsinfo_inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package operatorsinfo
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"

blsapkreg "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry"
regcoord "github.com/Layr-Labs/eigensdk-go/contracts/bindings/RegistryCoordinator"
Expand Down Expand Up @@ -33,6 +35,10 @@ type avsRegistryReader interface {
stopBlock *big.Int,
blockRange *big.Int,
) ([]types.OperatorAddr, []types.OperatorPubkeys, error)

BlockNumber(
ctx context.Context,
) (uint64, error)
}

type avsRegistrySubscriber interface {
Expand Down Expand Up @@ -131,6 +137,10 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(
) {
go func() {

const maxRetries = 5
const retryDelay = time.Minute
const lookBackBlocks = 1000 // more than 3 hours

// TODO(samlaf): we should probably save the service in the logger itself and add it automatically to all logs
ops.logger.Debug(
"Subscribing to new pubkey registration events on blsApkRegistry contract",
Expand Down Expand Up @@ -191,7 +201,62 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(
"OperatorPubkeysServiceInMemory",
)
newPubkeyRegistrationSub.Unsubscribe()
newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations()

// Note that err is reused everywhere
// Loop to retry subscription on error
for attempt := 0; attempt < maxRetries; attempt++ {
newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations()
if err != nil {
err = fmt.Errorf("SubscribeToNewPubkeyRegistrations failed: %w", err)
}
// TODO
// Improve ugly impl
if err == nil {
var startBlock uint64
var curBlock uint64
curBlock, err = ops.avsRegistryReader.BlockNumber(ctx)
if err != nil {
err = fmt.Errorf("BlockNumber failed: %w", err)
}
if err == nil {
if curBlock < lookBackBlocks {
startBlock = 0
} else {
startBlock = curBlock - lookBackBlocks
}
// TODO
// This is duplicated. Solve this correctly maybe?
err = ops.queryPastRegisteredOperatorEventsAndFillDb(
ctx,
Opts{
StartBlock: new(big.Int).SetUint64(startBlock),
StopBlock: new(big.Int).SetUint64(curBlock),
},
)
if err != nil {
err = fmt.Errorf("queryPastRegisteredOperatorEventsAndFillDb failed: %w", err)
}
if err == nil {
break // Successfully subscribed, exit loop
}
}
}

ops.logger.Error(
"Failed to SubscribeToNewPubkeyRegistrations, retrying...",
"err",
err,
"attempt",
attempt+1,
)
select {
case <-ctx.Done():
ops.logger.Debugf("OperatorPubkeysServiceInMemory: Context cancelled, exiting, %v", ctx.Err())
return
case <-time.After(retryDelay): // Wait before retrying
continue
}
}
if err != nil {
ops.logger.Error(
"Error opening websocket subscription for new pubkey registrations",
Expand All @@ -203,6 +268,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(
// see the warning above the struct definition to understand why we panic here
panic(err)
}
ops.logger.Info("Subscribed to NewPubkeyRegistrations")
case err := <-newSocketRegistrationSub.Err():
ops.logger.Error(
"Error in websocket subscription for new socket registration events. Attempting to reconnect...",
Expand All @@ -212,7 +278,62 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(
"OperatorPubkeysServiceInMemory",
)
newSocketRegistrationSub.Unsubscribe()
newSocketRegistrationC, newSocketRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates()

// Note that err is reused everywhere
// Loop to retry subscription on error
for attempt := 0; attempt < maxRetries; attempt++ {
newSocketRegistrationC, newSocketRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates()
if err != nil {
err = fmt.Errorf("SubscribeToOperatorSocketUpdates failed: %w", err)
}
// TODO
// Improve ugly impl
if err == nil {
var startBlock uint64
var curBlock uint64
curBlock, err = ops.avsRegistryReader.BlockNumber(ctx)
if err != nil {
err = fmt.Errorf("BlockNumber failed: %w", err)
}
if err == nil {
if curBlock < lookBackBlocks {
startBlock = 0
} else {
startBlock = curBlock - lookBackBlocks
}
// TODO
// This is duplicated. Solve this correctly maybe?
err = ops.queryPastRegisteredOperatorEventsAndFillDb(
ctx,
Opts{
StartBlock: new(big.Int).SetUint64(startBlock),
StopBlock: new(big.Int).SetUint64(curBlock),
},
)
if err != nil {
err = fmt.Errorf("queryPastRegisteredOperatorEventsAndFillDb failed: %w", err)
}
if err == nil {
break // Successfully subscribed, exit loop
}
}
}

ops.logger.Error(
"Failed to SubscribeToOperatorSocketUpdates, retrying...",
"err",
err,
"attempt",
attempt+1,
)
select {
case <-ctx.Done():
ops.logger.Debugf("OperatorPubkeysServiceInMemory: Context cancelled, exiting, %v", ctx.Err())
return
case <-time.After(retryDelay): // Wait before retrying
continue
}
}
if err != nil {
ops.logger.Error(
"Error opening websocket subscription for new socket registrations",
Expand All @@ -223,6 +344,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(
)
panic(err)
}
ops.logger.Info("Subscribed to OperatorSocketUpdates")
case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC:
operatorAddr := newPubkeyRegistrationEvent.Operator
ops.pubkeyDict[operatorAddr] = types.OperatorPubkeys{
Expand Down

0 comments on commit 0e5e1e5

Please sign in to comment.