diff --git a/chainio/clients/avsregistry/reader.go b/chainio/clients/avsregistry/reader.go index 3f13d8f2..dc553d07 100644 --- a/chainio/clients/avsregistry/reader.go +++ b/chainio/clients/avsregistry/reader.go @@ -566,3 +566,13 @@ func (r *ChainReader) QueryExistingRegisteredOperatorSockets( } return operatorIdToSocketMap, nil } + +func (r *ChainReader) BlockNumber( + ctx context.Context, +) (uint64, error) { + curBlockNum, err := r.ethClient.BlockNumber(ctx) + if err != nil { + return 0, utils.WrapError("Cannot get current block number", err) + } + return curBlockNum, nil +} diff --git a/internal/fakes/avs_registry.go b/internal/fakes/avs_registry.go index e17ce7f9..c26d130c 100644 --- a/internal/fakes/avs_registry.go +++ b/internal/fakes/avs_registry.go @@ -98,3 +98,9 @@ func (f *FakeAVSRegistryReader) GetCheckSignaturesIndices( ) (opstateretriever.OperatorStateRetrieverCheckSignaturesIndices, error) { return opstateretriever.OperatorStateRetrieverCheckSignaturesIndices{}, nil } + +func (f *FakeAVSRegistryReader) BlockNumber( + ctx context.Context, +) (uint64, error) { + return 0, nil +} diff --git a/services/avsregistry/avsregistry_fake.go b/services/avsregistry/avsregistry_fake.go index d5cf8e59..154fcda6 100644 --- a/services/avsregistry/avsregistry_fake.go +++ b/services/avsregistry/avsregistry_fake.go @@ -88,3 +88,9 @@ func (f *FakeAvsRegistryService) GetCheckSignaturesIndices( ) (opstateretriever.OperatorStateRetrieverCheckSignaturesIndices, error) { return opstateretriever.OperatorStateRetrieverCheckSignaturesIndices{}, nil } + +func (f *FakeAvsRegistryService) BlockNumber( + ctx context.Context, +) (uint64, error) { + return 0, nil +} diff --git a/services/operatorsinfo/operatorsinfo_inmemory.go b/services/operatorsinfo/operatorsinfo_inmemory.go index c2874b92..3f9e55e0 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory.go +++ b/services/operatorsinfo/operatorsinfo_inmemory.go @@ -3,8 +3,10 @@ package operatorsinfo import ( "context" "errors" + "fmt" "math/big" "sync" + "time" blsapkreg "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" regcoord "github.com/Layr-Labs/eigensdk-go/contracts/bindings/RegistryCoordinator" @@ -33,6 +35,10 @@ type avsRegistryReader interface { stopBlock *big.Int, blockRange *big.Int, ) ([]types.OperatorAddr, []types.OperatorPubkeys, error) + + BlockNumber( + ctx context.Context, + ) (uint64, error) } type avsRegistrySubscriber interface { @@ -131,6 +137,10 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( ) { go func() { + const maxRetries = 5 + const retryDelay = time.Minute + const lookBackBlocks = 1000 // more than 3 hours + // TODO(samlaf): we should probably save the service in the logger itself and add it automatically to all logs ops.logger.Debug( "Subscribing to new pubkey registration events on blsApkRegistry contract", @@ -191,7 +201,62 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( "OperatorPubkeysServiceInMemory", ) newPubkeyRegistrationSub.Unsubscribe() - newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() + + // Note that err is reused everywhere + // Loop to retry subscription on error + for attempt := 0; attempt < maxRetries; attempt++ { + newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() + if err != nil { + err = fmt.Errorf("SubscribeToNewPubkeyRegistrations failed: %w", err) + } + // TODO + // Improve ugly impl + if err == nil { + var startBlock uint64 + var curBlock uint64 + curBlock, err = ops.avsRegistryReader.BlockNumber(ctx) + if err != nil { + err = fmt.Errorf("BlockNumber failed: %w", err) + } + if err == nil { + if curBlock < lookBackBlocks { + startBlock = 0 + } else { + startBlock = curBlock - lookBackBlocks + } + // TODO + // This is duplicated. Solve this correctly maybe? + err = ops.queryPastRegisteredOperatorEventsAndFillDb( + ctx, + Opts{ + StartBlock: new(big.Int).SetUint64(startBlock), + StopBlock: new(big.Int).SetUint64(curBlock), + }, + ) + if err != nil { + err = fmt.Errorf("queryPastRegisteredOperatorEventsAndFillDb failed: %w", err) + } + if err == nil { + break // Successfully subscribed, exit loop + } + } + } + + ops.logger.Error( + "Failed to SubscribeToNewPubkeyRegistrations, retrying...", + "err", + err, + "attempt", + attempt+1, + ) + select { + case <-ctx.Done(): + ops.logger.Debugf("OperatorPubkeysServiceInMemory: Context cancelled, exiting, %v", ctx.Err()) + return + case <-time.After(retryDelay): // Wait before retrying + continue + } + } if err != nil { ops.logger.Error( "Error opening websocket subscription for new pubkey registrations", @@ -203,6 +268,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( // see the warning above the struct definition to understand why we panic here panic(err) } + ops.logger.Info("Subscribed to NewPubkeyRegistrations") case err := <-newSocketRegistrationSub.Err(): ops.logger.Error( "Error in websocket subscription for new socket registration events. Attempting to reconnect...", @@ -212,7 +278,62 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( "OperatorPubkeysServiceInMemory", ) newSocketRegistrationSub.Unsubscribe() - newSocketRegistrationC, newSocketRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() + + // Note that err is reused everywhere + // Loop to retry subscription on error + for attempt := 0; attempt < maxRetries; attempt++ { + newSocketRegistrationC, newSocketRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() + if err != nil { + err = fmt.Errorf("SubscribeToOperatorSocketUpdates failed: %w", err) + } + // TODO + // Improve ugly impl + if err == nil { + var startBlock uint64 + var curBlock uint64 + curBlock, err = ops.avsRegistryReader.BlockNumber(ctx) + if err != nil { + err = fmt.Errorf("BlockNumber failed: %w", err) + } + if err == nil { + if curBlock < lookBackBlocks { + startBlock = 0 + } else { + startBlock = curBlock - lookBackBlocks + } + // TODO + // This is duplicated. Solve this correctly maybe? + err = ops.queryPastRegisteredOperatorEventsAndFillDb( + ctx, + Opts{ + StartBlock: new(big.Int).SetUint64(startBlock), + StopBlock: new(big.Int).SetUint64(curBlock), + }, + ) + if err != nil { + err = fmt.Errorf("queryPastRegisteredOperatorEventsAndFillDb failed: %w", err) + } + if err == nil { + break // Successfully subscribed, exit loop + } + } + } + + ops.logger.Error( + "Failed to SubscribeToOperatorSocketUpdates, retrying...", + "err", + err, + "attempt", + attempt+1, + ) + select { + case <-ctx.Done(): + ops.logger.Debugf("OperatorPubkeysServiceInMemory: Context cancelled, exiting, %v", ctx.Err()) + return + case <-time.After(retryDelay): // Wait before retrying + continue + } + } if err != nil { ops.logger.Error( "Error opening websocket subscription for new socket registrations", @@ -223,6 +344,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( ) panic(err) } + ops.logger.Info("Subscribed to OperatorSocketUpdates") case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC: operatorAddr := newPubkeyRegistrationEvent.Operator ops.pubkeyDict[operatorAddr] = types.OperatorPubkeys{