From 35fdb075555c040442d4c0ddcda404d128b5469c Mon Sep 17 00:00:00 2001 From: iStrike7 Date: Mon, 30 Dec 2024 20:44:16 +0530 Subject: [PATCH 1/4] Added retries to both subs used by opinfoserv --- chainio/clients/avsregistry/reader.go | 11 ++ .../operatorsinfo/operatorsinfo_inmemory.go | 102 +++++++++++++++++- 2 files changed, 111 insertions(+), 2 deletions(-) diff --git a/chainio/clients/avsregistry/reader.go b/chainio/clients/avsregistry/reader.go index 3f13d8f2..3195e78a 100644 --- a/chainio/clients/avsregistry/reader.go +++ b/chainio/clients/avsregistry/reader.go @@ -566,3 +566,14 @@ func (r *ChainReader) QueryExistingRegisteredOperatorSockets( } return operatorIdToSocketMap, nil } + +func (r *ChainReader) BlockNumber( + ctx context.Context, +) (uint64, error) { + curBlockNum, err := r.ethClient.BlockNumber(ctx) + if err != nil { + return nil, utils.WrapError("Cannot get current block number", err) + } + return curBlockNum, nil +} + diff --git a/services/operatorsinfo/operatorsinfo_inmemory.go b/services/operatorsinfo/operatorsinfo_inmemory.go index c2874b92..c3cb20ab 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory.go +++ b/services/operatorsinfo/operatorsinfo_inmemory.go @@ -5,6 +5,8 @@ import ( "errors" "math/big" "sync" + "fmt" + "time" blsapkreg "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" regcoord "github.com/Layr-Labs/eigensdk-go/contracts/bindings/RegistryCoordinator" @@ -33,6 +35,10 @@ type avsRegistryReader interface { stopBlock *big.Int, blockRange *big.Int, ) ([]types.OperatorAddr, []types.OperatorPubkeys, error) + + BlockNumber( + ctx context.Context, + ) (uint64, error) } type avsRegistrySubscriber interface { @@ -131,6 +137,10 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( ) { go func() { + const maxRetries = 5 + const retryDelay = time.Minute + const lookBackBlocks = 1000 // more than 3 hours + // TODO(samlaf): we should probably save the service in the logger itself and add it automatically to all logs ops.logger.Debug( "Subscribing to new pubkey registration events on blsApkRegistry contract", @@ -191,7 +201,50 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( "OperatorPubkeysServiceInMemory", ) newPubkeyRegistrationSub.Unsubscribe() - newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() + + // Note that err is reused everywhere + // Loop to retry subscription on error + for attempt := 0; attempt < maxRetries; attempt++ { + newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() + if err != nil { + err = fmt.Errorf("SubscribeToNewPubkeyRegistrations failed: %w", err) + } + // TODO + // Improve ugly impl + if err == nil { + var startBlock uint64 + var curBlock uint64 + curBlock, err = ops.avsRegistryReader.BlockNumber(ctx) + if err != nil { + err = fmt.Errorf("BlockNumber failed: %w", err) + } + if err == nil{ + if curBlock < lookBackBlocks { + startBlock = 0 + } else { + startBlock = curBlock - lookBackBlocks + } + // TODO + // This is duplicated. Solve this correctly maybe? + err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, Opts{StartBlock:new(big.Int).SetUint64(startBlock), StopBlock:new(big.Int).SetUint64(curBlock)}) + if err != nil { + err = fmt.Errorf("queryPastRegisteredOperatorEventsAndFillDb failed: %w", err) + } + if err == nil { + break // Successfully subscribed, exit loop + } + } + } + + ops.logger.Error("Failed to SubscribeToNewPubkeyRegistrations, retrying...", "err", err, "attempt", attempt+1) + select { + case <-ctx.Done(): + ops.logger.Debugf("OperatorPubkeysServiceInMemory: Context cancelled, exiting, %v", ctx.Err()) + return + case <-time.After(retryDelay): // Wait before retrying + continue + } + } if err != nil { ops.logger.Error( "Error opening websocket subscription for new pubkey registrations", @@ -203,6 +256,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( // see the warning above the struct definition to understand why we panic here panic(err) } + ops.logger.Info("Subscribed to NewPubkeyRegistrations") case err := <-newSocketRegistrationSub.Err(): ops.logger.Error( "Error in websocket subscription for new socket registration events. Attempting to reconnect...", @@ -212,7 +266,50 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( "OperatorPubkeysServiceInMemory", ) newSocketRegistrationSub.Unsubscribe() - newSocketRegistrationC, newSocketRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() + + // Note that err is reused everywhere + // Loop to retry subscription on error + for attempt := 0; attempt < maxRetries; attempt++ { + newSocketRegistrationC, newSocketRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() + if err != nil { + err = fmt.Errorf("SubscribeToOperatorSocketUpdates failed: %w", err) + } + // TODO + // Improve ugly impl + if err == nil { + var startBlock uint64 + var curBlock uint64 + curBlock, err = ops.avsRegistryReader.BlockNumber(ctx) + if err != nil { + err = fmt.Errorf("BlockNumber failed: %w", err) + } + if err == nil{ + if curBlock < lookBackBlocks { + startBlock = 0 + } else { + startBlock = curBlock - lookBackBlocks + } + // TODO + // This is duplicated. Solve this correctly maybe? + err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, Opts{StartBlock:new(big.Int).SetUint64(startBlock), StopBlock:new(big.Int).SetUint64(curBlock)}) + if err != nil { + err = fmt.Errorf("queryPastRegisteredOperatorEventsAndFillDb failed: %w", err) + } + if err == nil { + break // Successfully subscribed, exit loop + } + } + } + + ops.logger.Error("Failed to SubscribeToOperatorSocketUpdates, retrying...", "err", err, "attempt", attempt+1) + select { + case <-ctx.Done(): + ops.logger.Debugf("OperatorPubkeysServiceInMemory: Context cancelled, exiting, %v", ctx.Err()) + return + case <-time.After(retryDelay): // Wait before retrying + continue + } + } if err != nil { ops.logger.Error( "Error opening websocket subscription for new socket registrations", @@ -223,6 +320,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( ) panic(err) } + ops.logger.Info("Subscribed to OperatorSocketUpdates") case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC: operatorAddr := newPubkeyRegistrationEvent.Operator ops.pubkeyDict[operatorAddr] = types.OperatorPubkeys{ From fabcf20e73d1ff8ba8c73c899ee3d2b6cce46e70 Mon Sep 17 00:00:00 2001 From: iStrike7 Date: Mon, 30 Dec 2024 20:49:21 +0530 Subject: [PATCH 2/4] make fmt --- chainio/clients/avsregistry/reader.go | 1 - .../operatorsinfo/operatorsinfo_inmemory.go | 46 ++++++++++++++----- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/chainio/clients/avsregistry/reader.go b/chainio/clients/avsregistry/reader.go index 3195e78a..6d8dec5d 100644 --- a/chainio/clients/avsregistry/reader.go +++ b/chainio/clients/avsregistry/reader.go @@ -576,4 +576,3 @@ func (r *ChainReader) BlockNumber( } return curBlockNum, nil } - diff --git a/services/operatorsinfo/operatorsinfo_inmemory.go b/services/operatorsinfo/operatorsinfo_inmemory.go index c3cb20ab..3f9e55e0 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory.go +++ b/services/operatorsinfo/operatorsinfo_inmemory.go @@ -3,9 +3,9 @@ package operatorsinfo import ( "context" "errors" + "fmt" "math/big" "sync" - "fmt" "time" blsapkreg "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" @@ -35,10 +35,10 @@ type avsRegistryReader interface { stopBlock *big.Int, blockRange *big.Int, ) ([]types.OperatorAddr, []types.OperatorPubkeys, error) - + BlockNumber( ctx context.Context, - ) (uint64, error) + ) (uint64, error) } type avsRegistrySubscriber interface { @@ -218,7 +218,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( if err != nil { err = fmt.Errorf("BlockNumber failed: %w", err) } - if err == nil{ + if err == nil { if curBlock < lookBackBlocks { startBlock = 0 } else { @@ -226,7 +226,13 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( } // TODO // This is duplicated. Solve this correctly maybe? - err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, Opts{StartBlock:new(big.Int).SetUint64(startBlock), StopBlock:new(big.Int).SetUint64(curBlock)}) + err = ops.queryPastRegisteredOperatorEventsAndFillDb( + ctx, + Opts{ + StartBlock: new(big.Int).SetUint64(startBlock), + StopBlock: new(big.Int).SetUint64(curBlock), + }, + ) if err != nil { err = fmt.Errorf("queryPastRegisteredOperatorEventsAndFillDb failed: %w", err) } @@ -235,8 +241,14 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( } } } - - ops.logger.Error("Failed to SubscribeToNewPubkeyRegistrations, retrying...", "err", err, "attempt", attempt+1) + + ops.logger.Error( + "Failed to SubscribeToNewPubkeyRegistrations, retrying...", + "err", + err, + "attempt", + attempt+1, + ) select { case <-ctx.Done(): ops.logger.Debugf("OperatorPubkeysServiceInMemory: Context cancelled, exiting, %v", ctx.Err()) @@ -283,7 +295,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( if err != nil { err = fmt.Errorf("BlockNumber failed: %w", err) } - if err == nil{ + if err == nil { if curBlock < lookBackBlocks { startBlock = 0 } else { @@ -291,7 +303,13 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( } // TODO // This is duplicated. Solve this correctly maybe? - err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, Opts{StartBlock:new(big.Int).SetUint64(startBlock), StopBlock:new(big.Int).SetUint64(curBlock)}) + err = ops.queryPastRegisteredOperatorEventsAndFillDb( + ctx, + Opts{ + StartBlock: new(big.Int).SetUint64(startBlock), + StopBlock: new(big.Int).SetUint64(curBlock), + }, + ) if err != nil { err = fmt.Errorf("queryPastRegisteredOperatorEventsAndFillDb failed: %w", err) } @@ -300,8 +318,14 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( } } } - - ops.logger.Error("Failed to SubscribeToOperatorSocketUpdates, retrying...", "err", err, "attempt", attempt+1) + + ops.logger.Error( + "Failed to SubscribeToOperatorSocketUpdates, retrying...", + "err", + err, + "attempt", + attempt+1, + ) select { case <-ctx.Done(): ops.logger.Debugf("OperatorPubkeysServiceInMemory: Context cancelled, exiting, %v", ctx.Err()) From 39504c4f8df4cdd662c675c8f5f758c7a53af23d Mon Sep 17 00:00:00 2001 From: iStrike7 Date: Mon, 30 Dec 2024 20:59:24 +0530 Subject: [PATCH 3/4] lint --- chainio/clients/avsregistry/reader.go | 2 +- internal/fakes/avs_registry.go | 6 ++++++ services/avsregistry/avsregistry_fake.go | 6 ++++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/chainio/clients/avsregistry/reader.go b/chainio/clients/avsregistry/reader.go index 6d8dec5d..dc553d07 100644 --- a/chainio/clients/avsregistry/reader.go +++ b/chainio/clients/avsregistry/reader.go @@ -572,7 +572,7 @@ func (r *ChainReader) BlockNumber( ) (uint64, error) { curBlockNum, err := r.ethClient.BlockNumber(ctx) if err != nil { - return nil, utils.WrapError("Cannot get current block number", err) + return 0, utils.WrapError("Cannot get current block number", err) } return curBlockNum, nil } diff --git a/internal/fakes/avs_registry.go b/internal/fakes/avs_registry.go index e17ce7f9..d4f9f150 100644 --- a/internal/fakes/avs_registry.go +++ b/internal/fakes/avs_registry.go @@ -98,3 +98,9 @@ func (f *FakeAVSRegistryReader) GetCheckSignaturesIndices( ) (opstateretriever.OperatorStateRetrieverCheckSignaturesIndices, error) { return opstateretriever.OperatorStateRetrieverCheckSignaturesIndices{}, nil } + +func (f *FakeAVSRegistryReader) BlockNumber( + ctx context.Context, +) (uint64, error) { + return 0, nil +} \ No newline at end of file diff --git a/services/avsregistry/avsregistry_fake.go b/services/avsregistry/avsregistry_fake.go index d5cf8e59..154fcda6 100644 --- a/services/avsregistry/avsregistry_fake.go +++ b/services/avsregistry/avsregistry_fake.go @@ -88,3 +88,9 @@ func (f *FakeAvsRegistryService) GetCheckSignaturesIndices( ) (opstateretriever.OperatorStateRetrieverCheckSignaturesIndices, error) { return opstateretriever.OperatorStateRetrieverCheckSignaturesIndices{}, nil } + +func (f *FakeAvsRegistryService) BlockNumber( + ctx context.Context, +) (uint64, error) { + return 0, nil +} From dfb84601ea86c32979f442e7ddad87724f4faf0c Mon Sep 17 00:00:00 2001 From: iStrike7 Date: Mon, 30 Dec 2024 21:00:14 +0530 Subject: [PATCH 4/4] make fmt --- internal/fakes/avs_registry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/fakes/avs_registry.go b/internal/fakes/avs_registry.go index d4f9f150..c26d130c 100644 --- a/internal/fakes/avs_registry.go +++ b/internal/fakes/avs_registry.go @@ -103,4 +103,4 @@ func (f *FakeAVSRegistryReader) BlockNumber( ctx context.Context, ) (uint64, error) { return 0, nil -} \ No newline at end of file +}