From fb96b70869b98ffd79be65a03a5b90f577d923d3 Mon Sep 17 00:00:00 2001 From: Jim McDonald Date: Thu, 20 Feb 2025 19:38:31 +0000 Subject: [PATCH] Update events handlers. --- CHANGELOG.md | 2 + api/eventsopts.go | 111 +++++++ go.mod | 8 +- go.sum | 14 +- http/events.go | 607 ++++++++++++++++++++++++++--------- http/events_internal_test.go | 12 +- http/events_test.go | 14 +- mock/events.go | 6 +- mock/service.go | 3 +- multi/events.go | 260 +++++++++++++-- multi/events_test.go | 5 +- service.go | 5 +- testclients/erroring.go | 4 +- testclients/sleepy.go | 4 +- 14 files changed, 854 insertions(+), 201 deletions(-) create mode 100644 api/eventsopts.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 11ba2eb3..13134b1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ dev: - support single_attestation event - support change to attestation event; this event now emits a spec.VersionedAttestation + - support change to attester_slashing event; this event now emits an electra.AttesterSlashing + - update Events endpoint to provide specific handlers for each event 0.24.0: - support electra diff --git a/api/eventsopts.go b/api/eventsopts.go new file mode 100644 index 00000000..01e4dff7 --- /dev/null +++ b/api/eventsopts.go @@ -0,0 +1,111 @@ +// Copyright © 2025 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + + apiv1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/altair" + "github.com/attestantio/go-eth2-client/spec/capella" + "github.com/attestantio/go-eth2-client/spec/electra" + "github.com/attestantio/go-eth2-client/spec/phase0" +) + +// EventsOpts are the options for obtaining events. +type EventsOpts struct { + Common CommonOpts + + // Topics are the topics of events to which we want to listen. + Topics []string + + // Handler is a generic handler function to which to send all events. + // In general, it is better to use event-specific handlers as they avoid casting, and also provide a context. + Handler EventHandlerFunc + + // AttestationHandler is a handler for the attestation event. + AttestationHandler AttestationEventHandlerFunc + // AttesterSlashingHandler is a handler for the attester_slashing event. + AttesterSlashingHandler AttesterSlashingEventHandlerFunc + // BlobSidecarHandler is a handler for the blob_sidecar event. + BlobSidecarHandler BlobSidecarEventHandlerFunc + // BlockHandler is a handler for the block event. + BlockHandler BlockEventHandlerFunc + // BlockGossipHandler is a handler for the block_gossip event. + BlockGossipHandler BlockGossipEventHandlerFunc + // BLSToExecutionChangeHandler is a handler for the bls_to_execution_change event. + BLSToExecutionChangeHandler BLSToExecutionChangeEventHandlerFunc + // ChainReorgHandler is a handler for the chain_reorg event. + ChainReorgHandler ChainReorgEventHandlerFunc + // ContributionAndProofHandler is a handler for the contribution_and_proof event. + ContributionAndProofHandler ContributionAndProofEventHandlerFunc + // FinalizedCheckpointHandler is a handler for the finalized_checkpoint event. + FinalizedCheckpointHandler FinalizedCheckpointEventHandlerFunc + // HeadHandler is a handler for the head event. + HeadHandler HeadEventHandlerFunc + // PayloadAttributesHandler is a handler for the payload_attributes event. + PayloadAttributesHandler PayloadAttributesEventHandlerFunc + // ProposerSlashingHandler is a handler for the proposer_slashing event. + ProposerSlashingHandler ProposerSlashingEventHandlerFunc + // SingleAttestationHandler is a handler for the single_attestation event. + SingleAttestationHandler SingleAttestationEventHandlerFunc + // VoluntaryExitHandler is a handler for the voluntary_exit event. + VoluntaryExitHandler VoluntaryExitEventHandlerFunc +} + +// EventHandlerFunc is the handler for generic events. +type EventHandlerFunc func(*apiv1.Event) + +// AttestationEventHandlerFunc is the handler for attestation events. +type AttestationEventHandlerFunc func(context.Context, *spec.VersionedAttestation) + +// AttesterSlashingEventHandlerFunc is the handler for attestation_slashing events. +type AttesterSlashingEventHandlerFunc func(context.Context, *electra.AttesterSlashing) + +// BlobSidecarEventHandlerFunc is the handler for blob_sidecar events. +type BlobSidecarEventHandlerFunc func(context.Context, *apiv1.BlobSidecarEvent) + +// BlockEventHandlerFunc is the handler for block events. +type BlockEventHandlerFunc func(context.Context, *apiv1.BlockEvent) + +// BlockGossipEventHandlerFunc is the handler for block_gossip events. +type BlockGossipEventHandlerFunc func(context.Context, *apiv1.BlockGossipEvent) + +// BLSToExecutionChangeEventHandlerFunc is the handler for bls_to_execution_change events. +type BLSToExecutionChangeEventHandlerFunc func(context.Context, *capella.SignedBLSToExecutionChange) + +// ChainReorgEventHandlerFunc is the handler for chain_reorg events. +type ChainReorgEventHandlerFunc func(context.Context, *apiv1.ChainReorgEvent) + +// ContributionAndProofEventHandlerFunc is the handler for contribution_and_proof events. +type ContributionAndProofEventHandlerFunc func(context.Context, *altair.SignedContributionAndProof) + +// FinalizedCheckpointEventHandlerFunc is the handler for finalized_checkpoint events. +type FinalizedCheckpointEventHandlerFunc func(context.Context, *apiv1.FinalizedCheckpointEvent) + +// HeadEventHandlerFunc is the handler for head events. +type HeadEventHandlerFunc func(context.Context, *apiv1.HeadEvent) + +// PayloadAttributesEventHandlerFunc is the handler for payload_attributes events. +type PayloadAttributesEventHandlerFunc func(context.Context, *apiv1.PayloadAttributesEvent) + +// ProposerSlashingEventHandlerFunc is the handler for proposer_slashing events. +type ProposerSlashingEventHandlerFunc func(context.Context, *phase0.ProposerSlashing) + +// SingleAttestationEventHandlerFunc is the handler for single_attestation events. +type SingleAttestationEventHandlerFunc func(context.Context, *electra.SingleAttestation) + +// VoluntaryExitEventHandlerFunc is the handler for voluntary_exit events. +type VoluntaryExitEventHandlerFunc func(context.Context, *phase0.SignedVoluntaryExit) diff --git a/go.mod b/go.mod index c6ab51d3..da74c098 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/stretchr/testify v1.8.4 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/trace v1.16.0 - golang.org/x/crypto v0.32.0 + golang.org/x/crypto v0.33.0 golang.org/x/sync v0.2.0 ) @@ -29,7 +29,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/dot v1.6.4 // indirect - github.com/fatih/color v1.18.0 // indirect + github.com/fatih/color v1.10.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.3 // indirect @@ -47,8 +47,8 @@ require ( github.com/rogpeppe/go-internal v1.11.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect golang.org/x/net v0.21.0 // indirect - golang.org/x/sys v0.29.0 // indirect - golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/Knetic/govaluate.v3 v3.0.0 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect diff --git a/go.sum b/go.sum index 6c28015e..1eb4f7cb 100644 --- a/go.sum +++ b/go.sum @@ -9,9 +9,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emicklei/dot v1.6.4 h1:cG9ycT67d9Yw22G+mAb4XiuUz6E6H1S0zePp/5Cwe/c= github.com/emicklei/dot v1.6.4/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= +github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= -github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= -github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeDY= github.com/ferranbt/fastssz v0.1.4/go.mod h1:Ea3+oeoRGGLGm5shYAeDgu6PGUlcvQhE2fILyD9+tGg= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -108,8 +107,8 @@ go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZE go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= -golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= +golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= @@ -124,15 +123,14 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= -golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= diff --git a/http/events.go b/http/events.go index 0e92333e..7e9f022e 100644 --- a/http/events.go +++ b/http/events.go @@ -1,4 +1,4 @@ -// Copyright © 2020 - 2024 Attestant Limited. +// Copyright © 2020 - 2025 Attestant Limited. // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -24,8 +24,9 @@ import ( "strings" "time" - consensusclient "github.com/attestantio/go-eth2-client" - api "github.com/attestantio/go-eth2-client/api/v1" + client "github.com/attestantio/go-eth2-client" + "github.com/attestantio/go-eth2-client/api" + apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec" "github.com/attestantio/go-eth2-client/spec/altair" "github.com/attestantio/go-eth2-client/spec/capella" @@ -36,39 +37,39 @@ import ( ) // Events feeds requested events with the given topics to the supplied handler. -func (s *Service) Events(ctx context.Context, topics []string, handler consensusclient.EventHandlerFunc) error { +func (s *Service) Events(ctx context.Context, opts *api.EventsOpts) error { if err := s.assertIsActive(ctx); err != nil { return err } - if len(topics) == 0 { - return errors.Join(errors.New("no topics supplied"), consensusclient.ErrInvalidOptions) + if opts == nil { + return client.ErrNoOptions + } + if len(opts.Topics) == 0 { + return errors.Join(errors.New("no topics supplied"), client.ErrInvalidOptions) } // #nosec G404 log := s.log.With().Str("id", fmt.Sprintf("%02x", rand.Int31())).Str("address", s.address).Logger() ctx = log.WithContext(ctx) - // Ensure we support the requested topic(s). - for i := range topics { - if _, exists := api.SupportedEventTopics[topics[i]]; !exists { - return fmt.Errorf("unsupported event topic %s", topics[i]) - } + if err := s.checkEventsOpts(opts); err != nil { + return err } endpoint := "/eth/v1/events" - query := "topics=" + strings.Join(topics, "&topics=") + query := "topics=" + strings.Join(opts.Topics, "&topics=") callURL := urlForCall(s.base, endpoint, query) log.Trace().Str("url", callURL.String()).Msg("GET request to events stream") - client := sse.NewClient(callURL.String()) + sseClient := sse.NewClient(callURL.String()) for k, v := range s.extraHeaders { - client.Headers[k] = v + sseClient.Headers[k] = v } - if _, exists := client.Headers["User-Agent"]; !exists { - client.Headers["User-Agent"] = defaultUserAgent + if _, exists := sseClient.Headers["User-Agent"]; !exists { + sseClient.Headers["User-Agent"] = defaultUserAgent } - client.Headers["Accept"] = "text/event-stream" - client.Connection.Transport = &http.Transport{ + sseClient.Headers["Accept"] = "text/event-stream" + sseClient.Connection.Transport = &http.Transport{ Dial: (&net.Dialer{ Timeout: 2 * time.Second, KeepAlive: 2 * time.Second, @@ -80,8 +81,8 @@ func (s *Service) Events(ctx context.Context, topics []string, handler consensus select { case <-time.After(time.Second): log.Trace().Msg("Connecting to events stream") - if err := client.SubscribeRawWithContext(ctx, func(msg *sse.Event) { - s.handleEvent(ctx, msg, handler) + if err := sseClient.SubscribeRawWithContext(ctx, func(msg *sse.Event) { + s.handleEvent(ctx, msg, opts) }); err != nil { log.Error().Err(err).Msg("Failed to subscribe to event stream") } @@ -97,160 +98,476 @@ func (s *Service) Events(ctx context.Context, topics []string, handler consensus return nil } -// handleEvent parses an event and passes it on to the handler. -// -//nolint:gocyclo -func (*Service) handleEvent(ctx context.Context, msg *sse.Event, handler consensusclient.EventHandlerFunc) { - log := zerolog.Ctx(ctx) +func (s *Service) checkEventsOpts(opts *api.EventsOpts) error { + // Ensure we support the requested topic(s), and have a handler for each. + for _, topic := range opts.Topics { + if _, exists := apiv1.SupportedEventTopics[topic]; !exists { + return fmt.Errorf("unsupported event topic %s", topic) + } + if opts.Handler != nil { + // There is a generic handler in place, no further checks for this topic required. + continue + } + if err := s.checkEventSpecificHandler(opts, topic); err != nil { + return err + } + } - if handler == nil { - log.Debug().Msg("No handler supplied; ignoring") + return nil +} - return +func (*Service) checkEventSpecificHandler(opts *api.EventsOpts, topic string) error { + var hasHandler bool + + switch topic { + case "attestation": + hasHandler = opts.AttestationHandler != nil + case "attester_slashing": + hasHandler = opts.AttesterSlashingHandler != nil + case "blob_sidecar": + hasHandler = opts.BlobSidecarHandler != nil + case "block": + hasHandler = opts.BlockHandler != nil + case "block_gossip": + hasHandler = opts.BlockGossipHandler != nil + case "bls_to_execution_change": + hasHandler = opts.BLSToExecutionChangeHandler != nil + case "chain_reorg": + hasHandler = opts.ChainReorgHandler != nil + case "contribution_and_proof": + hasHandler = opts.ContributionAndProofHandler != nil + case "finalized_checkpoint": + hasHandler = opts.FinalizedCheckpointHandler != nil + case "head": + hasHandler = opts.HeadHandler != nil + case "payload_attributes": + hasHandler = opts.PayloadAttributesHandler != nil + case "proposer_slashing": + hasHandler = opts.ProposerSlashingHandler != nil + case "single_attestation": + hasHandler = opts.SingleAttestationHandler != nil + case "voluntary_exit": + hasHandler = opts.VoluntaryExitHandler != nil + default: + return fmt.Errorf("unsupported event %s", topic) + } + + if !hasHandler { + return fmt.Errorf("no handler for %s event", topic) } + + return nil +} + +// handleEvent handles all events. +func (s *Service) handleEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + if msg == nil { log.Debug().Msg("No message supplied; ignoring") return } - event := &api.Event{ - Topic: string(msg.Event), - } switch string(msg.Event) { case "attestation": - data := &spec.VersionedAttestation{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse attestation") - - return - } - event.Data = data + s.handleAttestationEvent(ctx, msg, opts) case "attester_slashing": - data := &phase0.AttesterSlashing{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse attester slashing event") - - return - } - event.Data = data + s.handleAttesterSlashingEvent(ctx, msg, opts) case "blob_sidecar": - data := &api.BlobSidecarEvent{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse blob sidecar event") - - return - } - event.Data = data + s.handleBlobSidecarEvent(ctx, msg, opts) case "block": - data := &api.BlockEvent{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse block event") - - return - } - event.Data = data + s.handleBlockEvent(ctx, msg, opts) case "block_gossip": - data := &api.BlockGossipEvent{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse block gossip event") - - return - } - event.Data = data + s.handleBlockGossipEvent(ctx, msg, opts) case "bls_to_execution_change": - data := &capella.SignedBLSToExecutionChange{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse bls to execution change event") - - return - } - event.Data = data + s.handleBLSToExecutionChangeEvent(ctx, msg, opts) case "chain_reorg": - data := &api.ChainReorgEvent{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse chain reorg event") - - return - } - event.Data = data + s.handleChainReorgEvent(ctx, msg, opts) case "contribution_and_proof": - data := &altair.SignedContributionAndProof{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse contribution and proof event") - - return - } - event.Data = data + s.handleContributionAndProofEvent(ctx, msg, opts) case "finalized_checkpoint": - data := &api.FinalizedCheckpointEvent{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse finalized checkpoint event") - - return - } - event.Data = data + s.handleFinalizedCheckpointEvent(ctx, msg, opts) case "head": - data := &api.HeadEvent{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse head event") - - return - } - event.Data = data + s.handleHeadEvent(ctx, msg, opts) case "payload_attributes": - data := &api.PayloadAttributesEvent{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse payload attributes event") - - return - } - event.Data = data + s.handlePayloadAttributesEvent(ctx, msg, opts) case "proposer_slashing": - data := &phase0.ProposerSlashing{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse proposer slashing event") - - return - } - event.Data = data + s.handleProposerSlashingEvent(ctx, msg, opts) case "single_attestation": - data := &electra.SingleAttestation{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse single attestation event") - - return - } - event.Data = data + s.handleSingleAttestationEvent(ctx, msg, opts) case "voluntary_exit": - data := &phase0.SignedVoluntaryExit{} - err := json.Unmarshal(msg.Data, data) - if err != nil { - log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse voluntary exit") - - return - } - event.Data = data + s.handleVoluntaryExitEvent(ctx, msg, opts) case "": // Used as keepalive. Ignore. - return default: log.Warn().Str("topic", string(msg.Event)).Msg("Received message with unhandled topic; ignoring") + } +} + +func (*Service) handleAttestationEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &spec.VersionedAttestation{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse attestation") + + return + } + + switch { + case opts.AttestationHandler != nil: + opts.AttestationHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleAttesterSlashingEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &electra.AttesterSlashing{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse attester slashing event") + + return + } + + switch { + case opts.AttesterSlashingHandler != nil: + opts.AttesterSlashingHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleBlobSidecarEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &apiv1.BlobSidecarEvent{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse blob sidecar event") + + return + } + + switch { + case opts.BlobSidecarHandler != nil: + opts.BlobSidecarHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleBlockEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &apiv1.BlockEvent{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse block event") + + return + } + + switch { + case opts.BlockHandler != nil: + opts.BlockHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleBlockGossipEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &apiv1.BlockGossipEvent{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse block gossip event") + + return + } + + switch { + case opts.BlockGossipHandler != nil: + opts.BlockGossipHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleBLSToExecutionChangeEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &capella.SignedBLSToExecutionChange{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse bls to execution change event") + + return + } + + switch { + case opts.BLSToExecutionChangeHandler != nil: + opts.BLSToExecutionChangeHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleChainReorgEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &apiv1.ChainReorgEvent{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse chain reorg event") + + return + } + + switch { + case opts.ChainReorgHandler != nil: + opts.ChainReorgHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleContributionAndProofEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &altair.SignedContributionAndProof{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse contribution and proof event") + + return + } + + switch { + case opts.ContributionAndProofHandler != nil: + opts.ContributionAndProofHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleFinalizedCheckpointEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &apiv1.FinalizedCheckpointEvent{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse finalized checkpoint event") return } - handler(event) + + switch { + case opts.FinalizedCheckpointHandler != nil: + opts.FinalizedCheckpointHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleHeadEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &apiv1.HeadEvent{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse head event") + + return + } + + switch { + case opts.HeadHandler != nil: + opts.HeadHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handlePayloadAttributesEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &apiv1.PayloadAttributesEvent{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse payload attributes event") + + return + } + + switch { + case opts.PayloadAttributesHandler != nil: + opts.PayloadAttributesHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleProposerSlashingEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &phase0.ProposerSlashing{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse proposer slashing event") + + return + } + + switch { + case opts.ProposerSlashingHandler != nil: + opts.ProposerSlashingHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleSingleAttestationEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &electra.SingleAttestation{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse single attestation") + + return + } + + switch { + case opts.SingleAttestationHandler != nil: + opts.SingleAttestationHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } +} + +func (*Service) handleVoluntaryExitEvent(ctx context.Context, + msg *sse.Event, + opts *api.EventsOpts, +) { + log := zerolog.Ctx(ctx) + data := &phase0.SignedVoluntaryExit{} + err := json.Unmarshal(msg.Data, data) + if err != nil { + log.Error().Err(err).RawJSON("data", msg.Data).Msg("Failed to parse voluntary exit") + + return + } + + switch { + case opts.VoluntaryExitHandler != nil: + opts.VoluntaryExitHandler(ctx, data) + case opts.Handler != nil: + opts.Handler(&apiv1.Event{ + Topic: string(msg.Event), + Data: data, + }) + default: + log.Debug().Msg("No specific or generic handler supplied; ignoring") + } } diff --git a/http/events_internal_test.go b/http/events_internal_test.go index f00e7c95..43091fd2 100644 --- a/http/events_internal_test.go +++ b/http/events_internal_test.go @@ -20,8 +20,8 @@ import ( "testing" "time" - client "github.com/attestantio/go-eth2-client" - api "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/api" + apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/r3labs/sse/v2" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -35,14 +35,14 @@ func TestEventHandler(t *testing.T) { defer cancel() handled := false - handler := func(*api.Event) { + handler := func(*apiv1.Event) { handled = true } tests := []struct { name string message *sse.Event - handler client.EventHandlerFunc + handler api.EventHandlerFunc handled bool }{ { @@ -177,7 +177,9 @@ func TestEventHandler(t *testing.T) { handled = false log := zerolog.New(&bytes.Buffer{}) ctx = log.WithContext(ctx) - h.handleEvent(ctx, test.message, test.handler) + h.handleEvent(ctx, test.message, &api.EventsOpts{ + Handler: test.handler, + }) require.Equal(t, test.handled, handled) }) } diff --git a/http/events_test.go b/http/events_test.go index a2eb0967..47ef5c7e 100644 --- a/http/events_test.go +++ b/http/events_test.go @@ -21,7 +21,8 @@ import ( "time" client "github.com/attestantio/go-eth2-client" - api "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/api" + apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/http" "github.com/stretchr/testify/require" ) @@ -51,10 +52,13 @@ func TestEvents(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) eventsMu := sync.Mutex{} events := 0 - err := service.(client.EventsProvider).Events(ctx, test.topics, func(event *api.Event) { - eventsMu.Lock() - events++ - eventsMu.Unlock() + err := service.(client.EventsProvider).Events(ctx, &api.EventsOpts{ + Topics: test.topics, + Handler: func(*apiv1.Event) { + eventsMu.Lock() + events++ + eventsMu.Unlock() + }, }) require.NoError(t, err) time.Sleep(30 * time.Second) diff --git a/mock/events.go b/mock/events.go index 8811f284..c00c58d2 100644 --- a/mock/events.go +++ b/mock/events.go @@ -16,13 +16,13 @@ package mock import ( "context" - client "github.com/attestantio/go-eth2-client" + "github.com/attestantio/go-eth2-client/api" ) // Events feeds requested events with the given topics to the supplied handler. -func (s *Service) Events(ctx context.Context, topics []string, handler client.EventHandlerFunc) error { +func (s *Service) Events(ctx context.Context, opts *api.EventsOpts) error { if s.EventsFunc != nil { - return s.EventsFunc(ctx, topics, handler) + return s.EventsFunc(ctx, opts) } return nil diff --git a/mock/service.go b/mock/service.go index 90339221..6b3c8d66 100644 --- a/mock/service.go +++ b/mock/service.go @@ -17,7 +17,6 @@ import ( "context" "time" - client "github.com/attestantio/go-eth2-client" "github.com/attestantio/go-eth2-client/api" apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec" @@ -61,7 +60,7 @@ type Service struct { BeaconStateRootFunc func(context.Context, *api.BeaconStateRootOpts) (*api.Response[*phase0.Root], error) BlockRewardsFunc func(context.Context, *api.BlockRewardsOpts) (*api.Response[*apiv1.BlockRewards], error) DepositContractFunc func(context.Context, *api.DepositContractOpts) (*api.Response[*apiv1.DepositContract], error) - EventsFunc func(context.Context, []string, client.EventHandlerFunc) error + EventsFunc func(context.Context, *api.EventsOpts) error FinalityFunc func(context.Context, *api.FinalityOpts) (*api.Response[*apiv1.Finality], error) ForkChoiceFunc func(context.Context, *api.ForkChoiceOpts) (*api.Response[*apiv1.ForkChoice], error) ForkFunc func(context.Context, *api.ForkOpts) (*api.Response[*phase0.Fork], error) diff --git a/multi/events.go b/multi/events.go index 72352402..efeb83eb 100644 --- a/multi/events.go +++ b/multi/events.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Attestant Limited. +// Copyright © 2021, 2025 Attestant Limited. // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -22,14 +22,22 @@ import ( consensusclient "github.com/attestantio/go-eth2-client" "github.com/attestantio/go-eth2-client/api" apiv1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/altair" + "github.com/attestantio/go-eth2-client/spec/capella" + "github.com/attestantio/go-eth2-client/spec/electra" + "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/rs/zerolog" ) // Events feeds requested events with the given topics to the supplied handler. func (s *Service) Events(ctx context.Context, - topics []string, - handler consensusclient.EventHandlerFunc, + opts *api.EventsOpts, ) error { + if opts == nil { + return consensusclient.ErrNoOptions + } + // #nosec G404 log := s.log.With().Str("id", fmt.Sprintf("%02x", rand.Int31())).Logger() @@ -48,14 +56,30 @@ func (s *Service) Events(ctx context.Context, s: s, log: log.With().Logger(), address: client.Address(), - handler: handler, + opts: &api.EventsOpts{ + Common: opts.Common, + }, } - if err := client.(consensusclient.EventsProvider).Events(ctx, topics, ah.handleEvent); err != nil { + ah.opts.Handler = ah.genericHandler + ah.opts.AttestationHandler = ah.attestationHandler + ah.opts.AttesterSlashingHandler = ah.attesterSlashingHandler + ah.opts.BlobSidecarHandler = ah.blobSidecarHandler + ah.opts.BLSToExecutionChangeHandler = ah.blsToExecutionChangeHandler + ah.opts.ChainReorgHandler = ah.chainReorgHandler + ah.opts.ContributionAndProofHandler = ah.contributionAndProofHandler + ah.opts.FinalizedCheckpointHandler = ah.finalizedCheckpointHandler + ah.opts.HeadHandler = ah.headHandler + ah.opts.PayloadAttributesHandler = ah.payloadAttributesHandler + ah.opts.ProposerSlashingHandler = ah.proposerSlashingHandler + ah.opts.SingleAttestationHandler = ah.singleAttestationHandler + ah.opts.VoluntaryExitHandler = ah.voluntaryExitHandler + + if err := client.(consensusclient.EventsProvider).Events(ctx, ah.opts); err != nil { inactiveClients = append(inactiveClients, client) continue } - log.Trace().Str("address", ah.address).Strs("topics", topics).Msg("Events handler active") + log.Trace().Str("address", ah.address).Strs("topics", opts.Topics).Msg("Events handler active") } // Periodically try all inactive clients, quitting as they become active. @@ -64,7 +88,7 @@ func (s *Service) Events(ctx context.Context, s: s, log: log.With().Logger(), address: inactiveClient.Address(), - handler: handler, + opts: opts, } go func(c consensusclient.Service, ah *activeHandler) { for { @@ -72,7 +96,7 @@ func (s *Service) Events(ctx context.Context, if !isProvider { ah.log.Error(). Str("address", ah.address). - Strs("topics", topics). + Strs("topics", opts.Topics). Msg("Not a node syncing provider") return @@ -81,7 +105,7 @@ func (s *Service) Events(ctx context.Context, if err != nil { ah.log.Error(). Str("address", ah.address). - Strs("topics", topics). + Strs("topics", opts.Topics). Err(err). Msg("Failed to obtain sync state from node") @@ -89,10 +113,10 @@ func (s *Service) Events(ctx context.Context, } if !syncResponse.Data.IsSyncing { // Client is now synced, set up the events call. - if err := c.(consensusclient.EventsProvider).Events(ctx, topics, ah.handleEvent); err != nil { + if err := c.(consensusclient.EventsProvider).Events(ctx, opts); err != nil { ah.log.Error(). Str("address", ah.address). - Strs("topics", topics). + Strs("topics", opts.Topics). Err(err). Msg("Failed to set up events handler") } @@ -112,19 +136,215 @@ type activeHandler struct { s *Service log zerolog.Logger address string - handler consensusclient.EventHandlerFunc + opts *api.EventsOpts +} + +func (h *activeHandler) attestationHandler(ctx context.Context, data *spec.VersionedAttestation) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("Attestation event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.AttestationHandler(ctx, data) +} + +func (h *activeHandler) attesterSlashingHandler(ctx context.Context, data *electra.AttesterSlashing) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("Attester slashing event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.AttesterSlashingHandler(ctx, data) +} + +func (h *activeHandler) blobSidecarHandler(ctx context.Context, data *apiv1.BlobSidecarEvent) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("Blob sidecar event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.BlobSidecarHandler(ctx, data) } -func (h *activeHandler) handleEvent(event *apiv1.Event) { - h.log.Trace().Str("address", h.address).Str("topic", event.Topic).Msg("Event received") +func (h *activeHandler) blsToExecutionChangeHandler(ctx context.Context, data *capella.SignedBLSToExecutionChange) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("BLS to execution change event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.BLSToExecutionChangeHandler(ctx, data) +} + +func (h *activeHandler) chainReorgHandler(ctx context.Context, data *apiv1.ChainReorgEvent) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("Chain reorg event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.ChainReorgHandler(ctx, data) +} + +func (h *activeHandler) contributionAndProofHandler(ctx context.Context, data *altair.SignedContributionAndProof) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("Chain reorg event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.ContributionAndProofHandler(ctx, data) +} + +func (h *activeHandler) finalizedCheckpointHandler(ctx context.Context, data *apiv1.FinalizedCheckpointEvent) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("Finalized checkpoint event received") + // We only forward events from the currently active provider. If we did not do this then we could end up with // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head // block end up with an earlier block. - if h.s.Address() == h.address { - h.log.Trace(). - Str("address", h.address). - Str("topic", event.Topic). - Msg("Forwarding due to primary active address") - h.handler(event) + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.FinalizedCheckpointHandler(ctx, data) +} + +func (h *activeHandler) headHandler(ctx context.Context, data *apiv1.HeadEvent) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("Head event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.HeadHandler(ctx, data) +} + +func (h *activeHandler) payloadAttributesHandler(ctx context.Context, data *apiv1.PayloadAttributesEvent) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("Payload attributes event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.PayloadAttributesHandler(ctx, data) +} + +func (h *activeHandler) proposerSlashingHandler(ctx context.Context, data *phase0.ProposerSlashing) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("Proposer slashing event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.ProposerSlashingHandler(ctx, data) +} + +func (h *activeHandler) singleAttestationHandler(ctx context.Context, data *electra.SingleAttestation) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("Single attestation event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.SingleAttestationHandler(ctx, data) +} + +func (h *activeHandler) voluntaryExitHandler(ctx context.Context, data *phase0.SignedVoluntaryExit) { + log := h.log.With().Str("address", h.address).Logger() + log.Trace().Msg("Voluntary exit event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + h.opts.VoluntaryExitHandler(ctx, data) +} + +func (h *activeHandler) genericHandler(event *apiv1.Event) { + log := h.log.With().Str("address", h.address).Str("topic", event.Topic).Logger() + log.Trace().Msg("Event received") + + // We only forward events from the currently active provider. If we did not do this then we could end up with + // inconsistent results, for example a client may receive a `head` event and a subsequent call to fetch the head + // block end up with an earlier block. + if h.s.Address() != h.address { + return + } + + log.Trace().Msg("Forwarding due to primary active address") + + if h.opts.Handler != nil { + h.opts.Handler(event) } } diff --git a/multi/events_test.go b/multi/events_test.go index 4a0f7f41..ab3f03b6 100644 --- a/multi/events_test.go +++ b/multi/events_test.go @@ -18,6 +18,7 @@ import ( "testing" consensusclient "github.com/attestantio/go-eth2-client" + "github.com/attestantio/go-eth2-client/api" "github.com/attestantio/go-eth2-client/mock" "github.com/attestantio/go-eth2-client/multi" "github.com/attestantio/go-eth2-client/testclients" @@ -49,5 +50,7 @@ func TestEvents(t *testing.T) { ) require.NoError(t, err) - require.NoError(t, multiClient.(consensusclient.EventsProvider).Events(ctx, []string{}, nil)) + require.NoError(t, multiClient.(consensusclient.EventsProvider).Events(ctx, &api.EventsOpts{ + Topics: []string{"block"}, + })) } diff --git a/service.go b/service.go index 6bcf6759..79b048ea 100644 --- a/service.go +++ b/service.go @@ -148,9 +148,6 @@ type SyncCommitteesProvider interface { ) } -// EventHandlerFunc is the handler for events. -type EventHandlerFunc func(*apiv1.Event) - // // Standard API // @@ -425,7 +422,7 @@ type ValidatorRegistrationsSubmitter interface { // EventsProvider is the interface for providing events. type EventsProvider interface { // Events feeds requested events with the given topics to the supplied handler. - Events(ctx context.Context, topics []string, handler EventHandlerFunc) error + Events(ctx context.Context, opts *api.EventsOpts) error } // FinalityProvider is the interface for providing finality information. diff --git a/testclients/erroring.go b/testclients/erroring.go index bb701632..d2bf3e34 100644 --- a/testclients/erroring.go +++ b/testclients/erroring.go @@ -503,7 +503,7 @@ func (s *Erroring) BeaconState(ctx context.Context, } // Events feeds requested events with the given topics to the supplied handler. -func (s *Erroring) Events(ctx context.Context, topics []string, handler consensusclient.EventHandlerFunc) error { +func (s *Erroring) Events(ctx context.Context, opts *api.EventsOpts) error { if err := s.maybeError(ctx); err != nil { return err } @@ -512,7 +512,7 @@ func (s *Erroring) Events(ctx context.Context, topics []string, handler consensu return fmt.Errorf("%s@%s does not support this call", s.next.Name(), s.next.Address()) } - return next.Events(ctx, topics, handler) + return next.Events(ctx, opts) } // Finality provides the finality given a state ID. diff --git a/testclients/sleepy.go b/testclients/sleepy.go index 2ff0917d..ae3563c9 100644 --- a/testclients/sleepy.go +++ b/testclients/sleepy.go @@ -378,14 +378,14 @@ func (s *Sleepy) BeaconState(ctx context.Context, } // Events feeds requested events with the given topics to the supplied handler. -func (s *Sleepy) Events(ctx context.Context, topics []string, handler consensusclient.EventHandlerFunc) error { +func (s *Sleepy) Events(ctx context.Context, opts *api.EventsOpts) error { s.sleep(ctx) next, isNext := s.next.(consensusclient.EventsProvider) if !isNext { return errors.New("next does not support this call") } - return next.Events(ctx, topics, handler) + return next.Events(ctx, opts) } // Finality provides the finality given a state ID.