diff --git a/Cargo.lock b/Cargo.lock index c24e1570..c47323f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2986,6 +2986,7 @@ dependencies = [ "grandine_version", "hex-literal", "http_api", + "http_api_utils", "itertools 0.13.0", "keymanager", "log", @@ -3502,7 +3503,6 @@ dependencies = [ "ssz", "static_assertions", "std_ext", - "strum", "tap", "test-case", "test-generator", @@ -3524,7 +3524,10 @@ version = "0.0.0" dependencies = [ "anyhow", "axum", + "execution_engine", "features", + "fork_choice_store", + "helper_functions", "hex-literal", "http-body-util", "itertools 0.13.0", @@ -3532,8 +3535,14 @@ dependencies = [ "mime", "parse-display", "prometheus_metrics", + "serde", + "serde_utils", + "serde_with", + "strum", + "tap", "test-case", "thiserror", + "tokio", "tower", "tower-http", "tracing", @@ -5445,6 +5454,7 @@ dependencies = [ "futures", "good_lp", "helper_functions", + "http_api_utils", "itertools 0.13.0", "log", "prometheus_metrics", @@ -6535,6 +6545,7 @@ dependencies = [ "genesis", "grandine_version", "http_api", + "http_api_utils", "keymanager", "liveness_tracker", "log", @@ -8342,7 +8353,6 @@ dependencies = [ "doppelganger_protection", "eth1_api", "eth2_libp2p", - "execution_engine", "factory", "features", "fork_choice_control", diff --git a/execution_engine/src/lib.rs b/execution_engine/src/lib.rs index 10e1460b..02895d2d 100644 --- a/execution_engine/src/lib.rs +++ b/execution_engine/src/lib.rs @@ -3,10 +3,9 @@ pub use crate::{ types::{ EngineGetPayloadV1Response, EngineGetPayloadV2Response, EngineGetPayloadV3Response, EngineGetPayloadV4Response, ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, - ForkChoiceStateV1, ForkChoiceUpdatedResponse, PayloadAttributes, PayloadAttributesEvent, - PayloadAttributesEventData, PayloadAttributesV1, PayloadAttributesV2, PayloadAttributesV3, - PayloadId, PayloadStatus, PayloadStatusV1, PayloadStatusWithBlockHash, - PayloadValidationStatus, RawExecutionRequests, + ForkChoiceStateV1, ForkChoiceUpdatedResponse, PayloadAttributes, PayloadAttributesV1, + PayloadAttributesV2, PayloadAttributesV3, PayloadId, PayloadStatus, PayloadStatusV1, + PayloadStatusWithBlockHash, PayloadValidationStatus, RawExecutionRequests, WithdrawalV1, }, }; diff --git a/execution_engine/src/types.rs b/execution_engine/src/types.rs index 16ec0d8a..0116f8d9 100644 --- a/execution_engine/src/types.rs +++ b/execution_engine/src/types.rs @@ -22,7 +22,7 @@ use types::{ }, nonstandard::{Phase, WithBlobsAndMev}, phase0::primitives::{ - ExecutionAddress, ExecutionBlockHash, ExecutionBlockNumber, Gwei, Slot, UnixSeconds, + ExecutionAddress, ExecutionBlockHash, ExecutionBlockNumber, Gwei, UnixSeconds, ValidatorIndex, H256, }, preset::Preset, @@ -413,162 +413,6 @@ pub struct PayloadAttributesV3 { pub parent_beacon_block_root: H256, } -#[derive(Debug, Serialize)] -pub struct PayloadAttributesEvent { - pub version: Phase, - pub data: PayloadAttributesEventData, -} - -#[derive(Debug, Serialize)] -pub struct PayloadAttributesEventData { - #[serde(with = "serde_utils::string_or_native")] - pub proposal_slot: Slot, - pub parent_block_root: H256, - #[serde(with = "serde_utils::string_or_native")] - pub parent_block_number: ExecutionBlockNumber, - pub parent_block_hash: ExecutionBlockHash, - #[serde(with = "serde_utils::string_or_native")] - pub proposer_index: ValidatorIndex, - pub payload_attributes: CombinedPayloadAttributesEventData, -} - -#[derive(Debug, Serialize)] -#[serde(untagged, bound = "")] -pub enum CombinedPayloadAttributesEventData { - Bellatrix(PayloadAttributesEventDataV1), - Capella(PayloadAttributesEventDataV2), - Deneb(PayloadAttributesEventDataV3), - Electra(PayloadAttributesEventDataV3), -} - -#[derive(Debug, Serialize)] -pub struct PayloadAttributesEventDataV1 { - #[serde(with = "serde_utils::string_or_native")] - pub timestamp: UnixSeconds, - pub prev_randao: H256, - pub suggested_fee_recipient: ExecutionAddress, -} - -#[derive(Debug, Serialize)] -pub struct PayloadAttributesEventDataV2 { - #[serde(with = "serde_utils::string_or_native")] - pub timestamp: UnixSeconds, - pub prev_randao: H256, - pub suggested_fee_recipient: ExecutionAddress, - pub withdrawals: Vec, -} - -#[derive(Debug, Serialize)] -pub struct PayloadAttributesEventDataV3 { - #[serde(with = "serde_utils::string_or_native")] - pub timestamp: UnixSeconds, - pub prev_randao: H256, - pub suggested_fee_recipient: ExecutionAddress, - pub withdrawals: Vec, - pub parent_beacon_block_root: H256, -} - -#[derive(Debug, Serialize)] -pub struct WithdrawalEventDataV1 { - #[serde(with = "serde_utils::string_or_native")] - pub index: WithdrawalIndex, - #[serde(with = "serde_utils::string_or_native")] - pub validator_index: ValidatorIndex, - pub address: ExecutionAddress, - #[serde(with = "serde_utils::string_or_native")] - pub amount: Gwei, -} - -impl From for WithdrawalEventDataV1 { - fn from(withdrawal: WithdrawalV1) -> Self { - let WithdrawalV1 { - index, - validator_index, - address, - amount, - } = withdrawal; - - Self { - index, - validator_index, - address, - amount, - } - } -} - -impl From for PayloadAttributesEventDataV1 { - fn from(payload_attributes: PayloadAttributesV1) -> Self { - let PayloadAttributesV1 { - timestamp, - prev_randao, - suggested_fee_recipient, - } = payload_attributes; - - Self { - timestamp, - prev_randao, - suggested_fee_recipient, - } - } -} - -impl From> for PayloadAttributesEventDataV2 { - fn from(payload_attributes: PayloadAttributesV2

) -> Self { - let PayloadAttributesV2 { - timestamp, - prev_randao, - suggested_fee_recipient, - withdrawals, - } = payload_attributes; - - Self { - timestamp, - prev_randao, - suggested_fee_recipient, - withdrawals: withdrawals.into_iter().map(Into::into).collect::>(), - } - } -} - -impl From> for PayloadAttributesEventDataV3 { - fn from(payload_attributes: PayloadAttributesV3

) -> Self { - let PayloadAttributesV3 { - timestamp, - prev_randao, - suggested_fee_recipient, - withdrawals, - parent_beacon_block_root, - } = payload_attributes; - - Self { - timestamp, - prev_randao, - suggested_fee_recipient, - withdrawals: withdrawals.into_iter().map(Into::into).collect::>(), - parent_beacon_block_root, - } - } -} -impl From> for CombinedPayloadAttributesEventData { - fn from(payload_attributes: PayloadAttributes

) -> Self { - match payload_attributes { - PayloadAttributes::Bellatrix(payload_attributes_v1) => { - Self::Bellatrix(payload_attributes_v1.into()) - } - PayloadAttributes::Capella(payload_attributes_v2) => { - Self::Capella(payload_attributes_v2.into()) - } - PayloadAttributes::Deneb(payload_attributes_v3) => { - Self::Deneb(payload_attributes_v3.into()) - } - PayloadAttributes::Electra(payload_attributes_v3) => { - Self::Electra(payload_attributes_v3.into()) - } - } - } -} - /// [`engine_getPayloadV1` response](https://github.com/ethereum/execution-apis/blob/b7c5d3420e00648f456744d121ffbd929862924d/src/engine/paris.md#response-2). pub type EngineGetPayloadV1Response

= ExecutionPayloadV1

; diff --git a/fork_choice_control/src/controller.rs b/fork_choice_control/src/controller.rs index 013f8fd1..d069eda4 100644 --- a/fork_choice_control/src/controller.rs +++ b/fork_choice_control/src/controller.rs @@ -26,6 +26,7 @@ use fork_choice_store::{ }; use futures::channel::{mpsc::Sender as MultiSender, oneshot::Sender as OneshotSender}; use genesis::AnchorCheckpointProvider; +use http_api_utils::EventChannels; use prometheus_metrics::Metrics; use std_ext::ArcExt as _; use thiserror::Error; @@ -44,8 +45,8 @@ use types::{ use crate::{ block_processor::BlockProcessor, messages::{ - ApiMessage, AttestationVerifierMessage, MutatorMessage, P2pMessage, PoolMessage, - SubnetMessage, SyncMessage, ValidatorMessage, + AttestationVerifierMessage, MutatorMessage, P2pMessage, PoolMessage, SubnetMessage, + SyncMessage, ValidatorMessage, }, misc::{VerifyAggregateAndProofResult, VerifyAttestationResult}, mutator::Mutator, @@ -94,9 +95,9 @@ where anchor_block: Arc>, anchor_state: Arc>, tick: Tick, + event_channels: Arc, execution_engine: E, metrics: Option>, - api_tx: impl UnboundedSink>, attestation_verifier_tx: A, // impl UnboundedSink>, p2p_tx: impl UnboundedSink>, pool_tx: impl UnboundedSink, @@ -129,13 +130,13 @@ where store_snapshot.clone_arc(), state_cache.clone_arc(), block_processor.clone_arc(), + event_channels, execution_engine.clone(), storage.clone_arc(), thread_pool.clone(), metrics.clone(), mutator_tx.clone(), mutator_rx, - api_tx, attestation_verifier_tx.clone(), p2p_tx, pool_tx, diff --git a/fork_choice_control/src/lib.rs b/fork_choice_control/src/lib.rs index c452b99e..508a601b 100644 --- a/fork_choice_control/src/lib.rs +++ b/fork_choice_control/src/lib.rs @@ -18,8 +18,7 @@ pub use crate::{ controller::Controller, messages::{ - ApiMessage, AttestationVerifierMessage, BlockEvent, ChainReorgEvent, - FinalizedCheckpointEvent, HeadEvent, P2pMessage, PoolMessage, SubnetMessage, SyncMessage, + AttestationVerifierMessage, P2pMessage, PoolMessage, SubnetMessage, SyncMessage, ValidatorMessage, }, misc::{MutatorRejectionReason, VerifyAggregateAndProofResult, VerifyAttestationResult}, diff --git a/fork_choice_control/src/messages.rs b/fork_choice_control/src/messages.rs index 69b78bcb..59822e81 100644 --- a/fork_choice_control/src/messages.rs +++ b/fork_choice_control/src/messages.rs @@ -15,29 +15,22 @@ use execution_engine::PayloadStatusV1; use fork_choice_store::{ AggregateAndProofOrigin, AttestationAction, AttestationItem, AttestationValidationError, AttesterSlashingOrigin, BlobSidecarAction, BlobSidecarOrigin, BlockAction, BlockOrigin, - ChainLink, Store, + ChainLink, }; -use helper_functions::{accessors, misc}; use log::debug; use serde::Serialize; -use tap::Pipe as _; use types::{ combined::{Attestation, BeaconState, SignedAggregateAndProof, SignedBeaconBlock}, - deneb::{ - containers::{BlobIdentifier, BlobSidecar}, - primitives::{BlobIndex, KzgCommitment, VersionedHash}, - }, + deneb::containers::BlobIdentifier, phase0::{ containers::Checkpoint, - primitives::{DepositIndex, Epoch, ExecutionBlockHash, Slot, ValidatorIndex, H256}, + primitives::{DepositIndex, ExecutionBlockHash, Slot, ValidatorIndex, H256}, }, preset::Preset, - traits::SignedBeaconBlock as _, }; use crate::{ misc::{MutatorRejectionReason, VerifyAggregateAndProofResult, VerifyAttestationResult}, - storage::Storage, unbounded_sink::UnboundedSink, }; @@ -218,24 +211,6 @@ impl ValidatorMessage { } } -#[derive(Debug)] -pub enum ApiMessage { - AttestationEvent(Arc>), - BlobSidecarEvent(BlobSidecarEvent), - BlockEvent(BlockEvent), - ChainReorgEvent(ChainReorgEvent), - FinalizedCheckpoint(FinalizedCheckpointEvent), - Head(HeadEvent), -} - -impl ApiMessage

{ - pub(crate) fn send(self, tx: &impl UnboundedSink) { - if let Err(message) = tx.unbounded_send(self) { - debug!("send to HTTP API failed because the receiver was dropped: {message:?}"); - } - } -} - pub enum SubnetMessage { Slot(W, Slot), } @@ -263,135 +238,3 @@ impl SyncMessage

{ } } } - -#[derive(Debug, Serialize)] -pub struct BlobSidecarEvent { - pub block_root: H256, - #[serde(with = "serde_utils::string_or_native")] - pub index: BlobIndex, - #[serde(with = "serde_utils::string_or_native")] - pub slot: Slot, - pub kzg_commitment: KzgCommitment, - pub versioned_hash: VersionedHash, -} - -impl BlobSidecarEvent { - pub fn new(block_root: H256, blob_sidecar: &BlobSidecar

) -> Self { - let kzg_commitment = blob_sidecar.kzg_commitment; - - Self { - block_root, - index: blob_sidecar.index, - slot: blob_sidecar.slot(), - kzg_commitment, - versioned_hash: misc::kzg_commitment_to_versioned_hash(kzg_commitment), - } - } -} - -#[derive(Debug, Serialize)] -pub struct BlockEvent { - #[serde(with = "serde_utils::string_or_native")] - pub slot: Slot, - pub block: H256, - pub execution_optimistic: bool, -} - -#[derive(Debug, Serialize)] -pub struct ChainReorgEvent { - #[serde(with = "serde_utils::string_or_native")] - pub slot: Slot, - #[serde(with = "serde_utils::string_or_native")] - pub depth: u64, - pub old_head_block: H256, - pub new_head_block: H256, - pub old_head_state: H256, - pub new_head_state: H256, - #[serde(with = "serde_utils::string_or_native")] - pub epoch: Epoch, - pub execution_optimistic: bool, -} - -impl ChainReorgEvent { - // The [Eth Beacon Node API specification] does not make it clear how `slot`, `depth`, and - // `epoch` should be computed. We try to match the behavior of Lighthouse. - // - // [Eth Beacon Node API specification]: https://ethereum.github.io/beacon-APIs/ - #[must_use] - pub fn new(store: &Store

, old_head: &ChainLink

) -> Self { - let new_head = store.head(); - let old_slot = old_head.slot(); - let new_slot = new_head.slot(); - - let depth = store - .common_ancestor(old_head.block_root, new_head.block_root) - .map(ChainLink::slot) - .unwrap_or_else(|| { - // A reorganization may be triggered by an alternate chain being finalized. - // The old block will no longer be present in `store` if that happens. - // Default to the old finalized slot like Lighthouse does. - // A proper solution may require significant changes to `Mutator`. - old_head - .finalized_checkpoint - .epoch - .pipe(misc::compute_start_slot_at_epoch::

) - }) - .abs_diff(old_slot); - - Self { - slot: new_slot, - depth, - old_head_block: old_head.block_root, - new_head_block: new_head.block_root, - old_head_state: old_head.block.message().state_root(), - new_head_state: new_head.block.message().state_root(), - epoch: misc::compute_epoch_at_slot::

(new_slot), - execution_optimistic: new_head.is_optimistic(), - } - } -} - -#[derive(Debug, Serialize)] -pub struct FinalizedCheckpointEvent { - pub block: H256, - pub state: H256, - #[serde(with = "serde_utils::string_or_native")] - pub epoch: Epoch, - pub execution_optimistic: bool, -} - -#[derive(Debug, Serialize)] -pub struct HeadEvent { - #[serde(with = "serde_utils::string_or_native")] - pub slot: Slot, - pub block: H256, - pub state: H256, - pub epoch_transition: bool, - pub previous_duty_dependent_root: H256, - pub current_duty_dependent_root: H256, - pub execution_optimistic: bool, -} - -impl HeadEvent { - pub fn new( - storage: &Storage

, - store: &Store

, - head: &ChainLink

, - ) -> Result { - let slot = head.slot(); - let state = head.state(store); - let previous_epoch = accessors::get_previous_epoch(&state); - let current_epoch = accessors::get_current_epoch(&state); - let dependent_root = |epoch| storage.dependent_root(store, &state, epoch); - - Ok(Self { - slot, - block: head.block_root, - state: head.block.message().state_root(), - epoch_transition: misc::is_epoch_start::

(slot), - previous_duty_dependent_root: dependent_root(previous_epoch)?, - current_duty_dependent_root: dependent_root(current_epoch)?, - execution_optimistic: head.is_optimistic(), - }) - } -} diff --git a/fork_choice_control/src/mutator.rs b/fork_choice_control/src/mutator.rs index b876d6f5..6c2ae108 100644 --- a/fork_choice_control/src/mutator.rs +++ b/fork_choice_control/src/mutator.rs @@ -38,6 +38,7 @@ use fork_choice_store::{ }; use futures::channel::{mpsc::Sender as MultiSender, oneshot::Sender as OneshotSender}; use helper_functions::{accessors, misc, predicates, verifier::NullVerifier}; +use http_api_utils::{DependentRootsBundle, EventChannels}; use itertools::{Either, Itertools as _}; use log::{debug, error, info, warn}; use num_traits::identities::Zero as _; @@ -59,8 +60,8 @@ use types::{ use crate::{ block_processor::BlockProcessor, messages::{ - AttestationVerifierMessage, BlobSidecarEvent, MutatorMessage, P2pMessage, PoolMessage, - SubnetMessage, SyncMessage, ValidatorMessage, + AttestationVerifierMessage, MutatorMessage, P2pMessage, PoolMessage, SubnetMessage, + SyncMessage, ValidatorMessage, }, misc::{ Delayed, MutatorRejectionReason, PendingAggregateAndProof, PendingAttestation, @@ -75,15 +76,15 @@ use crate::{ thread_pool::{Spawn, ThreadPool}, unbounded_sink::UnboundedSink, wait::Wait, - ApiMessage, BlockEvent, ChainReorgEvent, FinalizedCheckpointEvent, HeadEvent, }; #[expect(clippy::struct_field_names)] -pub struct Mutator { +pub struct Mutator { store: Arc>, store_snapshot: Arc>>, state_cache: Arc>, block_processor: Arc>, + event_channels: Arc, execution_engine: E, delayed_until_blobs: HashMap>, delayed_until_block: HashMap>, @@ -113,7 +114,6 @@ pub struct Mutator { metrics: Option>, mutator_tx: Sender>, mutator_rx: Receiver>, - api_tx: AS, attestation_verifier_tx: TS, p2p_tx: PS, pool_tx: LS, @@ -122,12 +122,11 @@ pub struct Mutator { validator_tx: VS, } -impl Mutator +impl Mutator where P: Preset, E: ExecutionEngine

+ Clone + Send + Sync + 'static, W: Wait, - AS: UnboundedSink>, TS: UnboundedSink>, PS: UnboundedSink>, LS: UnboundedSink, @@ -140,13 +139,13 @@ where store_snapshot: Arc>>, state_cache: Arc>, block_processor: Arc>, + event_channels: Arc, execution_engine: E, storage: Arc>, thread_pool: ThreadPool, metrics: Option>, mutator_tx: Sender>, mutator_rx: Receiver>, - api_tx: AS, attestation_verifier_tx: TS, p2p_tx: PS, pool_tx: LS, @@ -159,6 +158,7 @@ where store_snapshot, state_cache, block_processor, + event_channels, execution_engine, delayed_until_blobs: HashMap::new(), delayed_until_block: HashMap::new(), @@ -170,7 +170,6 @@ where metrics, mutator_tx, mutator_rx, - api_tx, attestation_verifier_tx, p2p_tx, pool_tx, @@ -815,7 +814,8 @@ where debug!("attestation accepted (attestation: {attestation:?})"); if attestation.origin.should_generate_event() { - ApiMessage::AttestationEvent(attestation.item.clone_arc()).send(&self.api_tx); + self.event_channels + .send_attestation_event(&attestation.item); } if attestation.origin.send_to_validator() { @@ -1291,12 +1291,11 @@ where .unfinalized_chain_link_by_execution_block_hash(execution_block_hash) { if chain_link.is_valid() { - ApiMessage::BlockEvent(BlockEvent { - slot: chain_link.slot(), - block: chain_link.block_root, - execution_optimistic: false, - }) - .send(&self.api_tx); + self.event_channels.send_block_event( + chain_link.slot(), + chain_link.block_root, + false, + ); } } @@ -1315,10 +1314,8 @@ where // Do not send API events about optimistic blocks. // Vouch treats all head events as non-optimistic. if (head_changed || head_was_optimistic) && head.is_valid() { - match HeadEvent::new(&self.storage, &self.store, head) { - Ok(event) => ApiMessage::Head(event).send(&self.api_tx), - Err(error) => warn!("{error:#}"), - } + self.event_channels + .send_head_event(head, |head| self.calculate_dependent_roots(head)); if !head_changed { // The call to `Store::notify_about_reorganization` below sends @@ -1459,12 +1456,8 @@ where // Do not send API events about optimistic blocks. // Vouch treats all head events as non-optimistic. if is_valid { - ApiMessage::BlockEvent(BlockEvent { - slot: block_slot, - block: block_root, - execution_optimistic: false, - }) - .send(&self.api_tx); + self.event_channels + .send_block_event(block_slot, block_root, false); } // TODO(Grandine Team): Performing the validation here results in the block being added to the @@ -1577,10 +1570,8 @@ where // Do not send API events about optimistic blocks. // Vouch treats all head events as non-optimistic. if new_head.is_valid() { - match HeadEvent::new(&self.storage, &self.store, &new_head) { - Ok(event) => ApiMessage::Head(event).send(&self.api_tx), - Err(error) => warn!("{error:#}"), - } + self.event_channels + .send_head_event(&new_head, |head| self.calculate_dependent_roots(head)); ValidatorMessage::Head(wait_group.clone(), new_head.clone()) .send(&self.validator_tx); @@ -1626,8 +1617,8 @@ where self.retry_block(wait_group.clone(), pending_block); } - ApiMessage::BlobSidecarEvent(BlobSidecarEvent::new(block_root, blob_sidecar)) - .send(&self.api_tx); + self.event_channels + .send_blob_sidecar_event(block_root, blob_sidecar); self.spawn(PersistBlobSidecarsTask { store_snapshot: self.owned_store(), @@ -1673,20 +1664,18 @@ where ) .send(&self.validator_tx); - ApiMessage::FinalizedCheckpoint(FinalizedCheckpointEvent { - block: head.block_root, - state: finalized_checkpoint.root, - epoch: finalized_checkpoint.epoch, - execution_optimistic: head.is_optimistic(), - }) - .send(&self.api_tx); + self.event_channels.send_finalized_checkpoint_event( + head.block_root, + finalized_checkpoint, + head.is_optimistic(), + ); } fn notify_about_reorganization(&self, wait_group: W, old_head: &ChainLink

) { let new_head = self.store.head().clone(); - let event = ChainReorgEvent::new(&self.store, old_head); - ApiMessage::ChainReorgEvent(event).send(&self.api_tx); + self.event_channels + .send_chain_reorg_event(&self.store, old_head); if let Some(metrics) = self.metrics.as_ref() { metrics.beacon_reorgs_total.inc(); @@ -1737,6 +1726,27 @@ where ); } + // This may even involve a DB lookup so it would be best + // if we can avoid making it if no event listeners are present + fn calculate_dependent_roots(&self, head: &ChainLink

) -> Result { + let state = head.state(&self.store); + let current_epoch = accessors::get_current_epoch(&state); + let previous_epoch = accessors::get_previous_epoch(&state); + + let current_duty_dependent_root = + self.storage + .dependent_root(&self.store, &state, current_epoch)?; + + let previous_duty_dependent_root = + self.storage + .dependent_root(&self.store, &state, previous_epoch)?; + + Ok(DependentRootsBundle { + current_duty_dependent_root, + previous_duty_dependent_root, + }) + } + fn delay_block_until_blobs(&mut self, beacon_block_root: H256, pending_block: PendingBlock

) { self.delayed_until_blobs .insert(beacon_block_root, pending_block); diff --git a/fork_choice_control/src/specialized.rs b/fork_choice_control/src/specialized.rs index a42dad0a..2b033e95 100644 --- a/fork_choice_control/src/specialized.rs +++ b/fork_choice_control/src/specialized.rs @@ -7,6 +7,7 @@ use database::Database; use execution_engine::{ExecutionEngine, NullExecutionEngine}; use fork_choice_store::StoreConfig; use futures::sink::Drain; +use http_api_utils::EventChannels; use prometheus_metrics::Metrics; use std_ext::ArcExt as _; use tap::Pipe as _; @@ -104,16 +105,18 @@ where false, )); + let event_channels = Arc::new(EventChannels::default()); + Self::new( chain_config, store_config, anchor_block, anchor_state, tick, + event_channels, execution_engine, metrics, futures::sink::drain(), - futures::sink::drain(), p2p_tx, futures::sink::drain(), futures::sink::drain(), diff --git a/grandine/Cargo.toml b/grandine/Cargo.toml index 2b4bbf12..65f90774 100644 --- a/grandine/Cargo.toml +++ b/grandine/Cargo.toml @@ -35,6 +35,7 @@ glob = { workspace = true } grandine_version = { workspace = true } hex-literal = { workspace = true } http_api = { workspace = true } +http_api_utils = { workspace = true } itertools = { workspace = true } libmdbx = { workspace = true } keymanager = { workspace = true } diff --git a/grandine/src/grandine_args.rs b/grandine/src/grandine_args.rs index 2e445eba..010ba76f 100644 --- a/grandine/src/grandine_args.rs +++ b/grandine/src/grandine_args.rs @@ -34,6 +34,7 @@ use fork_choice_control::DEFAULT_ARCHIVAL_EPOCH_INTERVAL; use fork_choice_store::{StoreConfig, DEFAULT_CACHE_LOCK_TIMEOUT_MILLIS}; use grandine_version::{APPLICATION_NAME, APPLICATION_NAME_AND_VERSION, APPLICATION_VERSION}; use http_api::HttpApiConfig; +use http_api_utils::DEFAULT_MAX_EVENTS; use itertools::{EitherOrBoth, Itertools as _}; use log::warn; use metrics::{MetricsServerConfig, MetricsServiceConfig}; @@ -200,10 +201,6 @@ struct HttpApiOptions { #[clap(long, value_delimiter = ',')] http_allowed_origins: Vec, - /// Max number of events stored in a single channel for HTTP API /events api call - #[clap(long, default_value_t = HttpApiConfig::default().max_events)] - max_events: usize, - /// HTTP API timeout in milliseconds #[clap(long, default_value_t = HttpApiOptions::default_timeout())] timeout: u64, @@ -215,7 +212,6 @@ impl From for HttpApiConfig { http_address, http_port, http_allowed_origins, - max_events, timeout, } = http_api_options; @@ -228,7 +224,6 @@ impl From for HttpApiConfig { Self { address, allow_origin: headers_to_allow_origin(http_allowed_origins).unwrap_or(allow_origin), - max_events, timeout: Some(Duration::from_millis(timeout)), } } @@ -254,6 +249,10 @@ struct BeaconNodeOptions { #[clap(long, default_value_t = ValidatorConfig::default().max_empty_slots)] max_empty_slots: u64, + /// Max number of events stored in a single channel for HTTP API /events api call + #[clap(long, default_value_t = DEFAULT_MAX_EVENTS)] + max_events: usize, + /// Beacon node API URL to load recent finalized checkpoint and sync from it /// [default: None] #[clap(long)] @@ -880,6 +879,7 @@ impl GrandineArgs { let BeaconNodeOptions { max_empty_slots, + max_events, checkpoint_sync_url, eth1_rpc_urls, force_checkpoint_sync, @@ -1264,6 +1264,7 @@ impl GrandineArgs { builder_config, web3signer_config, http_api_config, + max_events, metrics_config, track_liveness, detect_doppelgangers, diff --git a/grandine/src/grandine_config.rs b/grandine/src/grandine_config.rs index aaa2b9c8..42711d3e 100644 --- a/grandine/src/grandine_config.rs +++ b/grandine/src/grandine_config.rs @@ -58,6 +58,7 @@ pub struct GrandineConfig { pub builder_config: Option, pub web3signer_config: Web3SignerConfig, pub http_api_config: HttpApiConfig, + pub max_events: usize, pub metrics_config: MetricsConfig, pub track_liveness: bool, pub detect_doppelgangers: bool, diff --git a/grandine/src/main.rs b/grandine/src/main.rs index f74f3c39..d1c2ab1d 100644 --- a/grandine/src/main.rs +++ b/grandine/src/main.rs @@ -24,7 +24,7 @@ use logging::PEER_LOG_METRICS; use metrics::MetricsServerConfig; use p2p::{ListenAddr, NetworkConfig}; use reqwest::{Client, ClientBuilder}; -use runtime::{MetricsConfig, StorageConfig}; +use runtime::{MetricsConfig, RuntimeConfig, StorageConfig}; use signer::{KeyOrigin, Signer}; use slasher::SlasherConfig; use slashing_protection::{interchange_format::InterchangeData, SlashingProtector}; @@ -97,6 +97,7 @@ struct Context { state_slot: Option, eth1_auth: Arc, http_api_config: HttpApiConfig, + max_events: usize, metrics_config: MetricsConfig, track_liveness: bool, detect_doppelgangers: bool, @@ -179,6 +180,7 @@ impl Context { state_slot, eth1_auth, http_api_config, + max_events, metrics_config, track_liveness, detect_doppelgangers, @@ -280,6 +282,14 @@ impl Context { runtime::run_after_genesis( chain_config, + RuntimeConfig { + back_sync_enabled, + detect_doppelgangers, + max_events, + slashing_protection_history_limit, + track_liveness, + validator_enabled, + }, store_config, validator_api_config, validator_config, @@ -293,14 +303,9 @@ impl Context { signer, slasher_config, http_api_config, - back_sync_enabled, metrics_config, - track_liveness, - detect_doppelgangers, eth1_api_to_metrics_tx, eth1_api_to_metrics_rx, - slashing_protection_history_limit, - validator_enabled, ) .await } @@ -377,6 +382,7 @@ fn try_main() -> Result<()> { builder_config, web3signer_config, http_api_config, + max_events, metrics_config, track_liveness, detect_doppelgangers, @@ -527,6 +533,7 @@ fn try_main() -> Result<()> { state_slot, eth1_auth, http_api_config, + max_events, metrics_config, track_liveness, detect_doppelgangers, diff --git a/http_api/Cargo.toml b/http_api/Cargo.toml index bcf7d6a2..3187ae8f 100644 --- a/http_api/Cargo.toml +++ b/http_api/Cargo.toml @@ -43,7 +43,6 @@ serde_with = { workspace = true } ssz = { workspace = true } static_assertions = { workspace = true } std_ext = { workspace = true } -strum = { workspace = true } tap = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/http_api/src/context.rs b/http_api/src/context.rs index c89b47d3..dbb24564 100644 --- a/http_api/src/context.rs +++ b/http_api/src/context.rs @@ -21,6 +21,7 @@ use fork_choice_control::{ use fork_choice_store::StoreConfig; use futures::{future::FutureExt as _, lock::Mutex, select_biased}; use genesis::AnchorCheckpointProvider; +use http_api_utils::EventChannels; use keymanager::KeyManager; use liveness_tracker::LivenessTracker; use once_cell::sync::OnceCell; @@ -87,7 +88,6 @@ impl Context

{ let (api_to_liveness_tx, api_to_liveness_rx) = futures::channel::mpsc::unbounded(); let (api_to_p2p_tx, api_to_p2p_rx) = futures::channel::mpsc::unbounded(); let (api_to_validator_tx, api_to_validator_rx) = futures::channel::mpsc::unbounded(); - let (fc_to_api_tx, fc_to_api_rx) = futures::channel::mpsc::unbounded(); let (fc_to_attestation_verifier_tx, fc_to_attestation_verifier_rx) = futures::channel::mpsc::unbounded(); let (fc_to_p2p_tx, fc_to_p2p_rx) = futures::channel::mpsc::unbounded(); @@ -96,14 +96,12 @@ impl Context

{ let (fc_to_sync_tx, fc_to_sync_rx) = futures::channel::mpsc::unbounded(); let (fc_to_validator_tx, fc_to_validator_rx) = futures::channel::mpsc::unbounded(); let (_, p2p_to_validator_rx) = futures::channel::mpsc::unbounded(); - let (pool_to_api_tx, pool_to_api_rx) = futures::channel::mpsc::unbounded(); let (pool_to_liveness_tx, pool_to_liveness_rx) = futures::channel::mpsc::unbounded(); let (pool_to_p2p_tx, pool_to_p2p_rx) = futures::channel::mpsc::unbounded(); let (subnet_service_to_p2p_tx, _subnet_service_to_p2p_rx) = futures::channel::mpsc::unbounded(); let (sync_to_api_tx, sync_to_api_rx) = futures::channel::mpsc::unbounded(); let (subnet_service_tx, subnet_service_rx) = futures::channel::mpsc::unbounded(); - let (validator_to_api_tx, validator_to_api_rx) = futures::channel::mpsc::unbounded(); let (validator_to_liveness_tx, validator_to_liveness_rx) = futures::channel::mpsc::unbounded(); let (validator_to_p2p_tx, validator_to_p2p_rx) = futures::channel::mpsc::unbounded(); @@ -173,15 +171,17 @@ impl Context

{ .unwrap_or(&anchor_block) .pipe(Tick::block_proposal); + let event_channels = Arc::new(EventChannels::default()); + let (controller, mutator_handle) = Controller::new( chain_config, store_config, anchor_block, anchor_state.clone_arc(), tick, + event_channels.clone_arc(), execution_engine.clone_arc(), None, - fc_to_api_tx, fc_to_attestation_verifier_tx, fc_to_p2p_tx, fc_to_pool_tx, @@ -261,7 +261,7 @@ impl Context

{ let (bls_to_execution_change_pool, bls_to_execution_change_pool_service) = BlsToExecutionChangePool::new( controller.clone_arc(), - pool_to_api_tx, + event_channels.clone_arc(), pool_to_p2p_tx, None, ); @@ -297,7 +297,6 @@ impl Context

{ p2p_to_validator_rx, slasher_to_validator_rx: None, subnet_service_tx: subnet_service_tx.clone(), - validator_to_api_tx, validator_to_liveness_tx: Some(validator_to_liveness_tx), validator_to_slasher_tx: None, }; @@ -309,6 +308,7 @@ impl Context

{ attestation_agg_pool.clone_arc(), None, None, + event_channels.clone_arc(), keymanager.proposer_configs().clone_arc(), signer, slashing_protector, @@ -338,11 +338,8 @@ impl Context

{ api_to_metrics_tx: None, api_to_p2p_tx, api_to_validator_tx, - fc_to_api_rx, - pool_to_api_rx, subnet_service_tx, sync_to_api_rx, - validator_to_api_rx, }; let http_api = HttpApi { @@ -350,6 +347,7 @@ impl Context

{ controller, anchor_checkpoint_provider, eth1_api, + event_channels, validator_keys, validator_config, network_config, diff --git a/http_api/src/events.rs b/http_api/src/events.rs deleted file mode 100644 index e8c1f68e..00000000 --- a/http_api/src/events.rs +++ /dev/null @@ -1,80 +0,0 @@ -use axum::{response::sse::Event, Error}; -use serde::Serialize; -use serde_with::DeserializeFromStr; -use strum::{AsRefStr, EnumString}; -use tokio::sync::broadcast::{self, Receiver, Sender}; - -#[derive(Clone, Copy, AsRefStr, EnumString, DeserializeFromStr)] -#[strum(serialize_all = "snake_case")] -pub enum Topic { - Attestation, - AttesterSlashing, - BlobSidecar, - Block, - BlsToExecutionChange, - ChainReorg, - ContributionAndProof, - FinalizedCheckpoint, - Head, - PayloadAttributes, - ProposerSlashing, - VoluntaryExit, -} - -impl Topic { - pub fn build(self, data: impl Serialize) -> Result { - Event::default().event(self).json_data(data) - } -} - -pub struct EventChannels { - pub attestations: Sender, - pub attester_slashings: Sender, - pub blob_sidecars: Sender, - pub blocks: Sender, - pub bls_to_execution_changes: Sender, - pub chain_reorgs: Sender, - pub contribution_and_proofs: Sender, - pub finalized_checkpoints: Sender, - pub heads: Sender, - pub payload_attributes: Sender, - pub proposer_slashings: Sender, - pub voluntary_exits: Sender, -} - -impl EventChannels { - pub fn new(max_events: usize) -> Self { - Self { - attestations: broadcast::channel(max_events).0, - attester_slashings: broadcast::channel(max_events).0, - blob_sidecars: broadcast::channel(max_events).0, - blocks: broadcast::channel(max_events).0, - bls_to_execution_changes: broadcast::channel(max_events).0, - chain_reorgs: broadcast::channel(max_events).0, - contribution_and_proofs: broadcast::channel(max_events).0, - finalized_checkpoints: broadcast::channel(max_events).0, - heads: broadcast::channel(max_events).0, - payload_attributes: broadcast::channel(max_events).0, - proposer_slashings: broadcast::channel(max_events).0, - voluntary_exits: broadcast::channel(max_events).0, - } - } - - pub fn receiver_for(&self, topic: Topic) -> Receiver { - match topic { - Topic::Attestation => &self.attestations, - Topic::AttesterSlashing => &self.attester_slashings, - Topic::BlobSidecar => &self.blob_sidecars, - Topic::Block => &self.blocks, - Topic::BlsToExecutionChange => &self.bls_to_execution_changes, - Topic::ChainReorg => &self.chain_reorgs, - Topic::ContributionAndProof => &self.contribution_and_proofs, - Topic::FinalizedCheckpoint => &self.finalized_checkpoints, - Topic::Head => &self.heads, - Topic::PayloadAttributes => &self.payload_attributes, - Topic::ProposerSlashing => &self.proposer_slashings, - Topic::VoluntaryExit => &self.voluntary_exits, - } - .subscribe() - } -} diff --git a/http_api/src/http_api_config.rs b/http_api/src/http_api_config.rs index 6c9c7d45..34ec3134 100644 --- a/http_api/src/http_api_config.rs +++ b/http_api/src/http_api_config.rs @@ -11,7 +11,6 @@ use tower_http::cors::AllowOrigin; pub struct HttpApiConfig { pub address: SocketAddr, pub allow_origin: AllowOrigin, - pub max_events: usize, // `HttpApiConfig.timeout` is optional to prevent timeouts in tests. pub timeout: Option, } @@ -34,7 +33,6 @@ impl HttpApiConfig { Self { address, allow_origin: AllowOrigin::list([allowed_origin]), - max_events: 100, timeout: None, } } diff --git a/http_api/src/lib.rs b/http_api/src/lib.rs index 427899fc..6f0f2a67 100644 --- a/http_api/src/lib.rs +++ b/http_api/src/lib.rs @@ -5,7 +5,6 @@ pub use crate::{ mod block_id; mod error; -mod events; mod extractors; mod full_config; mod global; diff --git a/http_api/src/routing.rs b/http_api/src/routing.rs index 86ee0d53..2c43629b 100644 --- a/http_api/src/routing.rs +++ b/http_api/src/routing.rs @@ -12,6 +12,7 @@ use features::Feature; use fork_choice_control::Wait; use futures::channel::mpsc::UnboundedSender; use genesis::AnchorCheckpointProvider; +use http_api_utils::EventChannels; use liveness_tracker::ApiToLiveness; use metrics::ApiToMetrics; use operation_pools::{AttestationAggPool, BlsToExecutionChangePool, SyncCommitteeAggPool}; @@ -24,7 +25,6 @@ use validator::{ApiToValidator, ValidatorConfig}; use crate::{ error::Error, - events::EventChannels, global::{self}, gui, middleware, misc::{BackSyncedStatus, SyncedStatus}, diff --git a/http_api/src/standard.rs b/http_api/src/standard.rs index c9f481ef..e9679532 100644 --- a/http_api/src/standard.rs +++ b/http_api/src/standard.rs @@ -27,7 +27,7 @@ use futures::{ }; use genesis::AnchorCheckpointProvider; use helper_functions::{accessors, misc}; -use http_api_utils::{BlockId, StateId}; +use http_api_utils::{BlockId, EventChannels, StateId, Topic}; use itertools::{izip, Either, Itertools as _}; use liveness_tracker::ApiToLiveness; use log::{debug, info, warn}; @@ -87,7 +87,6 @@ use validator::{ApiToValidator, ValidatorConfig}; use crate::{ block_id, error::{Error, IndexedError}, - events::{EventChannels, Topic}, extractors::{EthJson, EthJsonOrSsz, EthPath, EthQuery}, full_config::FullConfig, misc::{APIBlock, BackSyncedStatus, BroadcastValidation, SignedAPIBlock, SyncedStatus}, @@ -1258,10 +1257,7 @@ pub async fn submit_pool_proposer_slashing( .await?; if outcome.is_publishable() { - if let Err(error) = send_proposer_slashing_event(*proposer_slashing, &event_channels) { - warn!("unable to send proposer slashing event: {error}"); - } - + event_channels.send_proposer_slashing_event(&proposer_slashing); ApiToP2p::PublishProposerSlashing(proposer_slashing).send(&api_to_p2p_tx); } @@ -1293,10 +1289,7 @@ pub async fn submit_pool_voluntary_exit( .await?; if outcome.is_publishable() { - if let Err(error) = send_voluntary_exit_event(*signed_voluntary_exit, &event_channels) { - warn!("unable to send voluntary exit event: {error}"); - } - + event_channels.send_voluntary_exit_event(&signed_voluntary_exit); ApiToP2p::PublishVoluntaryExit(signed_voluntary_exit).send(&api_to_p2p_tx); } @@ -1328,12 +1321,7 @@ pub async fn submit_pool_attester_slashing( .await?; if outcome.is_publishable() { - if let Err(error) = - send_attester_slashing_event(*attester_slashing.clone(), &event_channels) - { - warn!("unable to send attester slashing event: {error}"); - } - + event_channels.send_attester_slashing_event(&attester_slashing); ApiToP2p::PublishAttesterSlashing(attester_slashing).send(&api_to_p2p_tx); } @@ -2800,36 +2788,6 @@ async fn submit_blob_sidecars( Ok(()) } -fn send_attester_slashing_event( - attester_slashing: AttesterSlashing

, - event_channels: &EventChannels, -) -> Result<()> { - let event = Topic::AttesterSlashing.build(attester_slashing)?; - event_channels.attester_slashings.send(event)?; - - Ok(()) -} - -fn send_proposer_slashing_event( - proposer_slashing: ProposerSlashing, - event_channels: &EventChannels, -) -> Result<()> { - let event = Topic::ProposerSlashing.build(proposer_slashing)?; - event_channels.proposer_slashings.send(event)?; - - Ok(()) -} - -fn send_voluntary_exit_event( - voluntary_exit: SignedVoluntaryExit, - event_channels: &EventChannels, -) -> Result<()> { - let event = Topic::VoluntaryExit.build(voluntary_exit)?; - event_channels.voluntary_exits.send(event)?; - - Ok(()) -} - #[cfg(test)] mod tests { use core::fmt::Display; diff --git a/http_api/src/task.rs b/http_api/src/task.rs index a865aa42..65f62192 100644 --- a/http_api/src/task.rs +++ b/http_api/src/task.rs @@ -6,7 +6,7 @@ use axum::Router; use block_producer::BlockProducer; use bls::PublicKeyBytes; use eth1_api::{ApiController, Eth1Api}; -use fork_choice_control::{ApiMessage, Wait}; +use fork_choice_control::Wait; use futures::{ channel::mpsc::{UnboundedReceiver, UnboundedSender}, future::{FutureExt as _, TryFutureExt as _}, @@ -14,23 +14,20 @@ use futures::{ stream::StreamExt as _, }; use genesis::AnchorCheckpointProvider; -use http_api_utils::ApiMetrics; +use http_api_utils::{ApiMetrics, EventChannels}; use liveness_tracker::ApiToLiveness; -use log::{debug, info}; +use log::info; use metrics::ApiToMetrics; -use operation_pools::{ - AttestationAggPool, BlsToExecutionChangePool, PoolToApiMessage, SyncCommitteeAggPool, -}; +use operation_pools::{AttestationAggPool, BlsToExecutionChangePool, SyncCommitteeAggPool}; use p2p::{ApiToP2p, NetworkConfig, SyncToApi, ToSubnetService}; use prometheus_metrics::Metrics; use std_ext::ArcExt as _; use tokio::net::TcpListener; use types::preset::Preset; -use validator::{ApiToValidator, ValidatorConfig, ValidatorToApi}; +use validator::{ApiToValidator, ValidatorConfig}; use crate::{ error::Error, - events::{EventChannels, Topic}, http_api_config::HttpApiConfig, misc::{BackSyncedStatus, SyncedStatus}, routing::{self, NormalState}, @@ -41,11 +38,8 @@ pub struct Channels { pub api_to_metrics_tx: Option>, pub api_to_p2p_tx: UnboundedSender>, pub api_to_validator_tx: UnboundedSender>, - pub fc_to_api_rx: UnboundedReceiver>, - pub pool_to_api_rx: UnboundedReceiver, pub subnet_service_tx: UnboundedSender, pub sync_to_api_rx: UnboundedReceiver, - pub validator_to_api_rx: UnboundedReceiver>, } #[expect(clippy::struct_field_names)] @@ -54,6 +48,7 @@ pub struct HttpApi { pub controller: ApiController, pub anchor_checkpoint_provider: AnchorCheckpointProvider

, pub eth1_api: Arc, + pub event_channels: Arc, pub validator_keys: Arc>, pub validator_config: Arc, pub network_config: Arc, @@ -85,6 +80,7 @@ impl HttpApi { controller, anchor_checkpoint_provider, eth1_api, + event_channels, validator_keys, validator_config, network_config, @@ -99,7 +95,6 @@ impl HttpApi { let HttpApiConfig { address, allow_origin, - max_events, timeout, } = http_api_config; @@ -108,16 +103,12 @@ impl HttpApi { api_to_metrics_tx, api_to_p2p_tx, api_to_validator_tx, - fc_to_api_rx, - pool_to_api_rx, subnet_service_tx, sync_to_api_rx, - validator_to_api_rx, } = channels; let is_synced = Arc::new(SyncedStatus::new(controller.is_forward_synced())); let is_back_synced = Arc::new(BackSyncedStatus::default()); - let event_channels = Arc::new(EventChannels::new(max_events)); let state = NormalState { chain_config: controller.chain_config().clone_arc(), @@ -134,7 +125,7 @@ impl HttpApi { bls_to_execution_change_pool, is_synced: is_synced.clone_arc(), is_back_synced: is_back_synced.clone_arc(), - event_channels: event_channels.clone_arc(), + event_channels, api_to_liveness_tx, api_to_metrics_tx, api_to_p2p_tx, @@ -156,49 +147,22 @@ impl HttpApi { .into_future() .map_err(AnyhowError::new); - let handle_events = handle_events( - is_synced, - is_back_synced, - event_channels, - fc_to_api_rx, - pool_to_api_rx, - sync_to_api_rx, - validator_to_api_rx, - ); + let handle_sync_statuses = handle_sync_statuses(is_synced, is_back_synced, sync_to_api_rx); info!("HTTP server listening on {address}"); select! { result = serve_requests.fuse() => result, - result = handle_events.fuse() => result, + result = handle_sync_statuses.fuse() => result, } } } -async fn handle_events( +async fn handle_sync_statuses( is_synced: Arc, is_back_synced: Arc, - event_channels: Arc, - mut fc_to_api_rx: UnboundedReceiver>, - mut pool_to_api_rx: UnboundedReceiver, mut sync_to_api_rx: UnboundedReceiver, - mut validator_to_api_rx: UnboundedReceiver>, ) -> Result<()> { - let EventChannels { - attestations, - attester_slashings, - blob_sidecars, - blocks, - bls_to_execution_changes, - chain_reorgs, - contribution_and_proofs, - finalized_checkpoints, - heads, - payload_attributes, - proposer_slashings, - voluntary_exits, - } = event_channels.as_ref(); - loop { select! { message = sync_to_api_rx.select_next_some() => { @@ -208,77 +172,6 @@ async fn handle_events( } } - message = validator_to_api_rx.select_next_some() => { - let receivers = match message { - ValidatorToApi::AttesterSlashing(attester_slashing) => { - let event = Topic::AttesterSlashing.build(attester_slashing)?; - attester_slashings.send(event).unwrap_or_default() - } - ValidatorToApi::ContributionAndProof(signed_contribution_and_proof) => { - let event = - Topic::ContributionAndProof.build(signed_contribution_and_proof)?; - contribution_and_proofs.send(event).unwrap_or_default() - } - ValidatorToApi::PayloadAttributes(payload_attributes_event) => { - let event = - Topic::PayloadAttributes.build(payload_attributes_event)?; - payload_attributes.send(event).unwrap_or_default() - } - ValidatorToApi::ProposerSlashing(proposer_slashing) => { - let event = Topic::ProposerSlashing.build(proposer_slashing)?; - proposer_slashings.send(event).unwrap_or_default() - } - ValidatorToApi::VoluntaryExit(signed_voluntary_exit) => { - let event = Topic::VoluntaryExit.build(signed_voluntary_exit)?; - voluntary_exits.send(event).unwrap_or_default() - } - }; - - debug!("event from validator sent to {receivers} receivers"); - } - - message = fc_to_api_rx.select_next_some() => { - let receivers = match message { - ApiMessage::AttestationEvent(attestation) => { - let event = Topic::Attestation.build(attestation)?; - attestations.send(event).unwrap_or_default() - } - ApiMessage::BlobSidecarEvent(blob_sidecar) => { - let event = Topic::BlobSidecar.build(blob_sidecar)?; - blob_sidecars.send(event).unwrap_or_default() - } - ApiMessage::BlockEvent(block_event) => { - let event = Topic::Block.build(block_event)?; - blocks.send(event).unwrap_or_default() - } - ApiMessage::ChainReorgEvent(chain_reorg_event) => { - let event = Topic::ChainReorg.build(chain_reorg_event)?; - chain_reorgs.send(event).unwrap_or_default() - } - ApiMessage::FinalizedCheckpoint(finalized_checkpoint_event) => { - let event = Topic::FinalizedCheckpoint.build(finalized_checkpoint_event)?; - finalized_checkpoints.send(event).unwrap_or_default() - } - ApiMessage::Head(head_event) => { - let event = Topic::Head.build(head_event)?; - heads.send(event).unwrap_or_default() - } - }; - - debug!("event from fork choice store sent to {receivers} receivers"); - } - - message = pool_to_api_rx.select_next_some() => { - let receivers = match message { - PoolToApiMessage::SignedBlsToExecutionChange(signed_bls_to_execution_change) => { - let event = Topic::BlsToExecutionChange.build(signed_bls_to_execution_change)?; - bls_to_execution_changes.send(event).unwrap_or_default() - } - }; - - debug!("event from operation pool sent to {receivers} receivers"); - } - complete => break Ok(()), } } diff --git a/http_api_utils/Cargo.toml b/http_api_utils/Cargo.toml index dae1daba..778ff26b 100644 --- a/http_api_utils/Cargo.toml +++ b/http_api_utils/Cargo.toml @@ -9,18 +9,27 @@ workspace = true [dependencies] anyhow = { workspace = true } axum = { workspace = true } +execution_engine = { workspace = true } features = { workspace = true } +fork_choice_store = { workspace = true } +helper_functions = { workspace = true } http-body-util = { workspace = true } itertools = { workspace = true } log = { workspace = true } mime = { workspace = true } parse-display = { workspace = true } prometheus_metrics = { workspace = true } +serde = { workspace = true } +serde_utils = { workspace = true } +serde_with = { workspace = true } +tap = { workspace = true } thiserror = { workspace = true } tower = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } types = { workspace = true } +tokio = { workspace = true } +strum = { workspace = true } [dev-dependencies] hex-literal = { workspace = true } diff --git a/http_api_utils/src/events.rs b/http_api_utils/src/events.rs new file mode 100644 index 00000000..189d6557 --- /dev/null +++ b/http_api_utils/src/events.rs @@ -0,0 +1,714 @@ +use anyhow::Result; +use axum::{response::sse::Event, Error}; +use execution_engine::{ + PayloadAttributes, PayloadAttributesV1, PayloadAttributesV2, PayloadAttributesV3, WithdrawalV1, +}; +use fork_choice_store::{ChainLink, Store}; +use helper_functions::misc; +use log::warn; +use serde::Serialize; +use serde_with::DeserializeFromStr; +use strum::{AsRefStr, EnumString}; +use tap::Pipe as _; +use tokio::sync::broadcast::{self, Receiver, Sender}; +use types::{ + altair::containers::SignedContributionAndProof, + capella::{containers::SignedBlsToExecutionChange, primitives::WithdrawalIndex}, + combined::{Attestation, AttesterSlashing}, + deneb::{ + containers::BlobSidecar, + primitives::{BlobIndex, KzgCommitment, VersionedHash}, + }, + nonstandard::Phase, + phase0::{ + containers::{Checkpoint, ProposerSlashing, SignedVoluntaryExit}, + primitives::{ + Epoch, ExecutionAddress, ExecutionBlockHash, ExecutionBlockNumber, Gwei, Slot, + UnixSeconds, ValidatorIndex, H256, + }, + }, + preset::Preset, + traits::SignedBeaconBlock as _, +}; + +pub const DEFAULT_MAX_EVENTS: usize = 100; + +#[derive(Clone, Copy, AsRefStr, EnumString, DeserializeFromStr)] +#[strum(serialize_all = "snake_case")] +pub enum Topic { + Attestation, + AttesterSlashing, + BlobSidecar, + Block, + BlsToExecutionChange, + ChainReorg, + ContributionAndProof, + FinalizedCheckpoint, + Head, + PayloadAttributes, + ProposerSlashing, + VoluntaryExit, +} + +impl Topic { + pub fn build(self, data: impl Serialize) -> Result { + Event::default().event(self).json_data(data) + } +} + +pub struct EventChannels { + pub attestations: Sender, + pub attester_slashings: Sender, + pub blob_sidecars: Sender, + pub blocks: Sender, + pub bls_to_execution_changes: Sender, + pub chain_reorgs: Sender, + pub contribution_and_proofs: Sender, + pub finalized_checkpoints: Sender, + pub heads: Sender, + pub payload_attributes: Sender, + pub proposer_slashings: Sender, + pub voluntary_exits: Sender, +} + +impl Default for EventChannels { + fn default() -> Self { + Self::new(DEFAULT_MAX_EVENTS) + } +} + +impl EventChannels { + #[must_use] + pub fn new(max_events: usize) -> Self { + Self { + attestations: broadcast::channel(max_events).0, + attester_slashings: broadcast::channel(max_events).0, + blob_sidecars: broadcast::channel(max_events).0, + blocks: broadcast::channel(max_events).0, + bls_to_execution_changes: broadcast::channel(max_events).0, + chain_reorgs: broadcast::channel(max_events).0, + contribution_and_proofs: broadcast::channel(max_events).0, + finalized_checkpoints: broadcast::channel(max_events).0, + heads: broadcast::channel(max_events).0, + payload_attributes: broadcast::channel(max_events).0, + proposer_slashings: broadcast::channel(max_events).0, + voluntary_exits: broadcast::channel(max_events).0, + } + } + + #[must_use] + pub fn receiver_for(&self, topic: Topic) -> Receiver { + match topic { + Topic::Attestation => &self.attestations, + Topic::AttesterSlashing => &self.attester_slashings, + Topic::BlobSidecar => &self.blob_sidecars, + Topic::Block => &self.blocks, + Topic::BlsToExecutionChange => &self.bls_to_execution_changes, + Topic::ChainReorg => &self.chain_reorgs, + Topic::ContributionAndProof => &self.contribution_and_proofs, + Topic::FinalizedCheckpoint => &self.finalized_checkpoints, + Topic::Head => &self.heads, + Topic::PayloadAttributes => &self.payload_attributes, + Topic::ProposerSlashing => &self.proposer_slashings, + Topic::VoluntaryExit => &self.voluntary_exits, + } + .subscribe() + } + + pub fn send_attestation_event(&self, attestation: &Attestation

) { + if let Err(error) = self.send_attestation_event_internal(attestation) { + warn!("unable to send attestation event: {error}"); + } + } + + pub fn send_attester_slashing_event(&self, attester_slashing: &AttesterSlashing

) { + if let Err(error) = self.send_attester_slashing_event_internal(attester_slashing) { + warn!("unable to send attester slashing event: {error}"); + } + } + + pub fn send_blob_sidecar_event( + &self, + block_root: H256, + blob_sidecar: &BlobSidecar

, + ) { + if let Err(error) = self.send_blob_sidecar_event_internal(block_root, blob_sidecar) { + warn!("unable to send blob sidecar event: {error}"); + } + } + + pub fn send_block_event(&self, slot: Slot, block_root: H256, execution_optimistic: bool) { + if let Err(error) = self.send_block_event_internal(slot, block_root, execution_optimistic) { + warn!("unable to send block event: {error}"); + } + } + + pub fn send_bls_to_execution_change_event( + &self, + signed_bls_to_execution_change: &SignedBlsToExecutionChange, + ) { + if let Err(error) = + self.send_bls_to_execution_change_event_internal(signed_bls_to_execution_change) + { + warn!("unable to send bls to execution change event: {error}"); + } + } + + pub fn send_chain_reorg_event(&self, store: &Store

, old_head: &ChainLink

) { + if let Err(error) = self.send_chain_reorg_event_internal(store, old_head) { + warn!("unable to send chain reorg event: {error}"); + } + } + + pub fn send_contribution_and_proof_event( + &self, + signed_contribution_and_proof: &SignedContributionAndProof

, + ) { + if let Err(error) = + self.send_contribution_and_proof_event_internal(signed_contribution_and_proof) + { + warn!("unable to send contribution and proof event: {error}"); + } + } + + pub fn send_finalized_checkpoint_event( + &self, + block_root: H256, + finalized_checkpoint: Checkpoint, + execution_optimistic: bool, + ) { + if let Err(error) = self.send_finalized_checkpoint_event_internal( + block_root, + finalized_checkpoint, + execution_optimistic, + ) { + warn!("unable to send finalized checkpoint event: {error}"); + } + } + + pub fn send_head_event( + &self, + head: &ChainLink

, + calculate_dependent_roots: impl FnOnce(&ChainLink

) -> Result, + ) { + if let Err(error) = self.send_head_event_internal(head, calculate_dependent_roots) { + warn!("unable to send head event: {error}"); + } + } + + #[expect(clippy::too_many_arguments)] + pub fn send_payload_attributes_event( + &self, + phase: Phase, + proposal_slot: Slot, + proposer_index: ValidatorIndex, + parent_block_root: H256, + payload_attributes: &PayloadAttributes

, + parent_block_number: ExecutionBlockNumber, + parent_block_hash: ExecutionBlockHash, + ) { + if let Err(error) = self.send_payload_attributes_event_internal( + phase, + proposal_slot, + proposer_index, + parent_block_root, + payload_attributes, + parent_block_number, + parent_block_hash, + ) { + warn!("unable to send payload attributes event: {error}"); + } + } + + pub fn send_proposer_slashing_event(&self, proposer_slashing: &ProposerSlashing) { + if let Err(error) = self.send_proposer_slashing_event_internal(proposer_slashing) { + warn!("unable to send proposer slashing event: {error}"); + } + } + + pub fn send_voluntary_exit_event(&self, voluntary_exit: &SignedVoluntaryExit) { + if let Err(error) = self.send_voluntary_exit_event_internal(voluntary_exit) { + warn!("unable to send voluntary exit event: {error}"); + } + } + + fn send_attestation_event_internal( + &self, + attestation: &Attestation

, + ) -> Result<()> { + if self.attestations.receiver_count() > 0 { + let event = Topic::Attestation.build(attestation)?; + self.attestations.send(event)?; + } + + Ok(()) + } + + fn send_attester_slashing_event_internal( + &self, + attester_slashing: &AttesterSlashing

, + ) -> Result<()> { + if self.attester_slashings.receiver_count() > 0 { + let event = Topic::AttesterSlashing.build(attester_slashing)?; + self.attester_slashings.send(event)?; + } + + Ok(()) + } + + fn send_blob_sidecar_event_internal( + &self, + block_root: H256, + blob_sidecar: &BlobSidecar

, + ) -> Result<()> { + if self.blob_sidecars.receiver_count() > 0 { + let blob_sidecar_event = BlobSidecarEvent::new(block_root, blob_sidecar); + let event = Topic::BlobSidecar.build(blob_sidecar_event)?; + self.blob_sidecars.send(event)?; + } + + Ok(()) + } + + fn send_block_event_internal( + &self, + slot: Slot, + block_root: H256, + execution_optimistic: bool, + ) -> Result<()> { + if self.blocks.receiver_count() > 0 { + let block_event = BlockEvent { + slot, + block: block_root, + execution_optimistic, + }; + + let event = Topic::Block.build(block_event)?; + self.blocks.send(event)?; + } + + Ok(()) + } + + fn send_bls_to_execution_change_event_internal( + &self, + signed_bls_to_execution_change: &SignedBlsToExecutionChange, + ) -> Result<()> { + if self.bls_to_execution_changes.receiver_count() > 0 { + let event = Topic::BlsToExecutionChange.build(signed_bls_to_execution_change)?; + self.bls_to_execution_changes.send(event)?; + } + + Ok(()) + } + + fn send_chain_reorg_event_internal( + &self, + store: &Store

, + old_head: &ChainLink

, + ) -> Result<()> { + if self.chain_reorgs.receiver_count() > 0 { + let chain_reorg_event = ChainReorgEvent::new(store, old_head); + let event = Topic::ChainReorg.build(chain_reorg_event)?; + self.chain_reorgs.send(event)?; + } + + Ok(()) + } + + fn send_contribution_and_proof_event_internal( + &self, + signed_contribution_and_proof: &SignedContributionAndProof

, + ) -> Result<()> { + if self.contribution_and_proofs.receiver_count() > 0 { + let event = Topic::ContributionAndProof.build(signed_contribution_and_proof)?; + self.contribution_and_proofs.send(event)?; + } + + Ok(()) + } + + fn send_finalized_checkpoint_event_internal( + &self, + block_root: H256, + finalized_checkpoint: Checkpoint, + execution_optimistic: bool, + ) -> Result<()> { + if self.finalized_checkpoints.receiver_count() > 0 { + let Checkpoint { epoch, root } = finalized_checkpoint; + + let finalized_checkpoint_event = FinalizedCheckpointEvent { + block: block_root, + state: root, + epoch, + execution_optimistic, + }; + + let event = Topic::FinalizedCheckpoint.build(finalized_checkpoint_event)?; + self.finalized_checkpoints.send(event)?; + } + + Ok(()) + } + + fn send_head_event_internal( + &self, + head: &ChainLink

, + calculate_dependent_roots: impl FnOnce(&ChainLink

) -> Result, + ) -> Result<()> { + if self.heads.receiver_count() > 0 { + let head_event = HeadEvent::new(head, calculate_dependent_roots(head)?); + let event = Topic::Head.build(head_event)?; + self.heads.send(event)?; + } + + Ok(()) + } + + #[expect(clippy::too_many_arguments)] + fn send_payload_attributes_event_internal( + &self, + phase: Phase, + proposal_slot: Slot, + proposer_index: ValidatorIndex, + parent_block_root: H256, + payload_attributes: &PayloadAttributes

, + parent_block_number: ExecutionBlockNumber, + parent_block_hash: ExecutionBlockHash, + ) -> Result<()> { + if self.payload_attributes.receiver_count() > 0 { + let payload_attributes_event = PayloadAttributesEvent { + version: phase, + data: PayloadAttributesEventData { + proposal_slot, + proposer_index, + parent_block_root, + payload_attributes: payload_attributes.clone().into(), + parent_block_number, + parent_block_hash, + }, + }; + + let event = Topic::PayloadAttributes.build(payload_attributes_event)?; + self.payload_attributes.send(event)?; + } + + Ok(()) + } + + fn send_proposer_slashing_event_internal( + &self, + proposer_slashing: &ProposerSlashing, + ) -> Result<()> { + if self.proposer_slashings.receiver_count() > 0 { + let event = Topic::ProposerSlashing.build(proposer_slashing)?; + self.proposer_slashings.send(event)?; + } + + Ok(()) + } + + fn send_voluntary_exit_event_internal( + &self, + voluntary_exit: &SignedVoluntaryExit, + ) -> Result<()> { + if self.voluntary_exits.receiver_count() > 0 { + let event = Topic::VoluntaryExit.build(voluntary_exit)?; + self.voluntary_exits.send(event)?; + } + + Ok(()) + } +} + +#[derive(Clone, Copy)] +pub struct DependentRootsBundle { + pub current_duty_dependent_root: H256, + pub previous_duty_dependent_root: H256, +} + +#[derive(Debug, Serialize)] +struct BlobSidecarEvent { + block_root: H256, + #[serde(with = "serde_utils::string_or_native")] + index: BlobIndex, + #[serde(with = "serde_utils::string_or_native")] + slot: Slot, + kzg_commitment: KzgCommitment, + versioned_hash: VersionedHash, +} + +impl BlobSidecarEvent { + fn new(block_root: H256, blob_sidecar: &BlobSidecar

) -> Self { + let kzg_commitment = blob_sidecar.kzg_commitment; + + Self { + block_root, + index: blob_sidecar.index, + slot: blob_sidecar.slot(), + kzg_commitment, + versioned_hash: misc::kzg_commitment_to_versioned_hash(kzg_commitment), + } + } +} + +#[derive(Debug, Serialize)] +struct BlockEvent { + #[serde(with = "serde_utils::string_or_native")] + slot: Slot, + block: H256, + execution_optimistic: bool, +} + +#[derive(Debug, Serialize)] +struct ChainReorgEvent { + #[serde(with = "serde_utils::string_or_native")] + slot: Slot, + #[serde(with = "serde_utils::string_or_native")] + depth: u64, + old_head_block: H256, + new_head_block: H256, + old_head_state: H256, + new_head_state: H256, + #[serde(with = "serde_utils::string_or_native")] + epoch: Epoch, + execution_optimistic: bool, +} + +impl ChainReorgEvent { + // The [Eth Beacon Node API specification] does not make it clear how `slot`, `depth`, and + // `epoch` should be computed. We try to match the behavior of Lighthouse. + // + // [Eth Beacon Node API specification]: https://ethereum.github.io/beacon-APIs/ + #[must_use] + fn new(store: &Store

, old_head: &ChainLink

) -> Self { + let new_head = store.head(); + let old_slot = old_head.slot(); + let new_slot = new_head.slot(); + + let depth = store + .common_ancestor(old_head.block_root, new_head.block_root) + .map(ChainLink::slot) + .unwrap_or_else(|| { + // A reorganization may be triggered by an alternate chain being finalized. + // The old block will no longer be present in `store` if that happens. + // Default to the old finalized slot like Lighthouse does. + // A proper solution may require significant changes to `Mutator`. + old_head + .finalized_checkpoint + .epoch + .pipe(misc::compute_start_slot_at_epoch::

) + }) + .abs_diff(old_slot); + + Self { + slot: new_slot, + depth, + old_head_block: old_head.block_root, + new_head_block: new_head.block_root, + old_head_state: old_head.block.message().state_root(), + new_head_state: new_head.block.message().state_root(), + epoch: misc::compute_epoch_at_slot::

(new_slot), + execution_optimistic: new_head.is_optimistic(), + } + } +} + +#[derive(Debug, Serialize)] +struct FinalizedCheckpointEvent { + block: H256, + state: H256, + #[serde(with = "serde_utils::string_or_native")] + epoch: Epoch, + execution_optimistic: bool, +} + +#[derive(Debug, Serialize)] +struct HeadEvent { + #[serde(with = "serde_utils::string_or_native")] + slot: Slot, + block: H256, + state: H256, + epoch_transition: bool, + previous_duty_dependent_root: H256, + current_duty_dependent_root: H256, + execution_optimistic: bool, +} + +impl HeadEvent { + fn new(head: &ChainLink

, dependent_roots_bundle: DependentRootsBundle) -> Self { + let DependentRootsBundle { + current_duty_dependent_root, + previous_duty_dependent_root, + } = dependent_roots_bundle; + + let slot = head.slot(); + + Self { + slot, + block: head.block_root, + state: head.block.message().state_root(), + epoch_transition: misc::is_epoch_start::

(slot), + previous_duty_dependent_root, + current_duty_dependent_root, + execution_optimistic: head.is_optimistic(), + } + } +} + +#[derive(Debug, Serialize)] +struct PayloadAttributesEvent { + version: Phase, + data: PayloadAttributesEventData, +} + +#[derive(Debug, Serialize)] +struct PayloadAttributesEventData { + #[serde(with = "serde_utils::string_or_native")] + proposal_slot: Slot, + parent_block_root: H256, + #[serde(with = "serde_utils::string_or_native")] + parent_block_number: ExecutionBlockNumber, + parent_block_hash: ExecutionBlockHash, + #[serde(with = "serde_utils::string_or_native")] + proposer_index: ValidatorIndex, + payload_attributes: CombinedPayloadAttributesEventData, +} + +#[derive(Debug, Serialize)] +#[serde(untagged, bound = "")] +enum CombinedPayloadAttributesEventData { + Bellatrix(PayloadAttributesEventDataV1), + Capella(PayloadAttributesEventDataV2), + Deneb(PayloadAttributesEventDataV3), + Electra(PayloadAttributesEventDataV3), +} + +#[derive(Debug, Serialize)] +struct PayloadAttributesEventDataV1 { + #[serde(with = "serde_utils::string_or_native")] + timestamp: UnixSeconds, + prev_randao: H256, + suggested_fee_recipient: ExecutionAddress, +} + +#[derive(Debug, Serialize)] +struct PayloadAttributesEventDataV2 { + #[serde(with = "serde_utils::string_or_native")] + timestamp: UnixSeconds, + prev_randao: H256, + suggested_fee_recipient: ExecutionAddress, + withdrawals: Vec, +} + +#[derive(Debug, Serialize)] +struct PayloadAttributesEventDataV3 { + #[serde(with = "serde_utils::string_or_native")] + timestamp: UnixSeconds, + prev_randao: H256, + suggested_fee_recipient: ExecutionAddress, + withdrawals: Vec, + parent_beacon_block_root: H256, +} + +#[derive(Debug, Serialize)] +struct WithdrawalEventDataV1 { + #[serde(with = "serde_utils::string_or_native")] + index: WithdrawalIndex, + #[serde(with = "serde_utils::string_or_native")] + validator_index: ValidatorIndex, + address: ExecutionAddress, + #[serde(with = "serde_utils::string_or_native")] + amount: Gwei, +} + +impl From for WithdrawalEventDataV1 { + fn from(withdrawal: WithdrawalV1) -> Self { + let WithdrawalV1 { + index, + validator_index, + address, + amount, + } = withdrawal; + + Self { + index, + validator_index, + address, + amount, + } + } +} + +impl From for PayloadAttributesEventDataV1 { + fn from(payload_attributes: PayloadAttributesV1) -> Self { + let PayloadAttributesV1 { + timestamp, + prev_randao, + suggested_fee_recipient, + } = payload_attributes; + + Self { + timestamp, + prev_randao, + suggested_fee_recipient, + } + } +} + +impl From> for PayloadAttributesEventDataV2 { + fn from(payload_attributes: PayloadAttributesV2

) -> Self { + let PayloadAttributesV2 { + timestamp, + prev_randao, + suggested_fee_recipient, + withdrawals, + } = payload_attributes; + + Self { + timestamp, + prev_randao, + suggested_fee_recipient, + withdrawals: withdrawals.into_iter().map(Into::into).collect::>(), + } + } +} + +impl From> for PayloadAttributesEventDataV3 { + fn from(payload_attributes: PayloadAttributesV3

) -> Self { + let PayloadAttributesV3 { + timestamp, + prev_randao, + suggested_fee_recipient, + withdrawals, + parent_beacon_block_root, + } = payload_attributes; + + Self { + timestamp, + prev_randao, + suggested_fee_recipient, + withdrawals: withdrawals.into_iter().map(Into::into).collect::>(), + parent_beacon_block_root, + } + } +} + +impl From> for CombinedPayloadAttributesEventData { + fn from(payload_attributes: PayloadAttributes

) -> Self { + match payload_attributes { + PayloadAttributes::Bellatrix(payload_attributes_v1) => { + Self::Bellatrix(payload_attributes_v1.into()) + } + PayloadAttributes::Capella(payload_attributes_v2) => { + Self::Capella(payload_attributes_v2.into()) + } + PayloadAttributes::Deneb(payload_attributes_v3) => { + Self::Deneb(payload_attributes_v3.into()) + } + PayloadAttributes::Electra(payload_attributes_v3) => { + Self::Electra(payload_attributes_v3.into()) + } + } + } +} diff --git a/http_api_utils/src/lib.rs b/http_api_utils/src/lib.rs index d14ed6dd..a8b4f482 100644 --- a/http_api_utils/src/lib.rs +++ b/http_api_utils/src/lib.rs @@ -1,4 +1,5 @@ pub use block_id::BlockId; +pub use events::{DependentRootsBundle, EventChannels, Topic, DEFAULT_MAX_EVENTS}; pub use helpers::extend_router_with_middleware; pub use misc::{ApiMetrics, Direction}; pub use state_id::StateId; @@ -9,6 +10,7 @@ pub mod middleware; mod block_id; mod error; +mod events; mod helpers; mod misc; mod state_id; diff --git a/operation_pools/Cargo.toml b/operation_pools/Cargo.toml index db7a550d..227b6507 100644 --- a/operation_pools/Cargo.toml +++ b/operation_pools/Cargo.toml @@ -20,6 +20,7 @@ fork_choice_control = { workspace = true } futures = { workspace = true } good_lp = { workspace = true } helper_functions = { workspace = true } +http_api_utils = { workspace = true } itertools = { workspace = true } log = { workspace = true } prometheus_metrics = { workspace = true } diff --git a/operation_pools/src/bls_to_execution_change_pool.rs b/operation_pools/src/bls_to_execution_change_pool.rs index 98cb7028..c8617f54 100644 --- a/operation_pools/src/bls_to_execution_change_pool.rs +++ b/operation_pools/src/bls_to_execution_change_pool.rs @@ -11,6 +11,7 @@ use futures::{ stream::StreamExt as _, }; use helper_functions::predicates; +use http_api_utils::EventChannels; use itertools::Itertools as _; use log::{debug, warn}; use prometheus_metrics::Metrics; @@ -21,7 +22,7 @@ use types::{ }; use crate::{ - messages::{PoolToApiMessage, PoolToP2pMessage}, + messages::PoolToP2pMessage, misc::{Origin, PoolAdditionOutcome, PoolRejectionReason}, }; @@ -33,7 +34,7 @@ impl BlsToExecutionChangePool { #[must_use] pub fn new( controller: ApiController, - pool_to_api_tx: UnboundedSender, + event_channels: Arc, pool_to_p2p_tx: UnboundedSender, metrics: Option>, ) -> (Arc, Service) { @@ -44,8 +45,8 @@ impl BlsToExecutionChangePool { let service = Service { controller, bls_to_execution_changes: HashMap::new(), + event_channels, metrics, - pool_to_api_tx, pool_to_p2p_tx, rx, }; @@ -97,8 +98,8 @@ impl BlsToExecutionChangePool { pub struct Service { controller: ApiController, bls_to_execution_changes: HashMap, + event_channels: Arc, metrics: Option>, - pool_to_api_tx: UnboundedSender, pool_to_p2p_tx: UnboundedSender, rx: UnboundedReceiver, } @@ -172,10 +173,8 @@ impl Service { } } - PoolToApiMessage::SignedBlsToExecutionChange(Box::new( - signed_bls_to_execution_change, - )) - .send(&self.pool_to_api_tx); + self.event_channels + .send_bls_to_execution_change_event(&signed_bls_to_execution_change); PoolAdditionOutcome::Accept } diff --git a/operation_pools/src/lib.rs b/operation_pools/src/lib.rs index d3a07e31..f77e782f 100644 --- a/operation_pools/src/lib.rs +++ b/operation_pools/src/lib.rs @@ -7,7 +7,7 @@ pub use crate::{ BlsToExecutionChangePool, Service as BlsToExecutionChangePoolService, }, manager::Manager, - messages::{PoolToApiMessage, PoolToLivenessMessage, PoolToP2pMessage}, + messages::{PoolToLivenessMessage, PoolToP2pMessage}, misc::{Origin, PoolAdditionOutcome, PoolRejectionReason}, sync_committee_agg_pool::Manager as SyncCommitteeAggPool, }; diff --git a/operation_pools/src/messages.rs b/operation_pools/src/messages.rs index aa82fd37..c03d0dbe 100644 --- a/operation_pools/src/messages.rs +++ b/operation_pools/src/messages.rs @@ -35,15 +35,3 @@ impl PoolToLivenessMessage { } } } - -pub enum PoolToApiMessage { - SignedBlsToExecutionChange(Box), -} - -impl PoolToApiMessage { - pub fn send(self, tx: &UnboundedSender) { - if let Err(message) = tx.unbounded_send(self) { - debug!("send to HTTP API failed because the receiver was dropped: {message:?}"); - } - } -} diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 04a345bc..bd35a4ab 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -26,6 +26,7 @@ futures = { workspace = true } genesis = { workspace = true } grandine_version = { workspace = true } http_api = { workspace = true } +http_api_utils = { workspace = true } keymanager = { workspace = true } liveness_tracker = { workspace = true } log = { workspace = true } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 6c4c0f81..81e7d15a 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -6,7 +6,7 @@ pub use crate::{ DEFAULT_TARGET_PEERS, DEFAULT_TARGET_SUBNET_PEERS, DEFAULT_TIMEOUT, }, misc::{MetricsConfig, StorageConfig}, - runtime::run_after_genesis, + runtime::{run_after_genesis, RuntimeConfig}, schema::initialize as initialize_schema, }; diff --git a/runtime/src/runtime.rs b/runtime/src/runtime.rs index 6c26690f..3c486e9d 100644 --- a/runtime/src/runtime.rs +++ b/runtime/src/runtime.rs @@ -25,6 +25,7 @@ use futures::{ }; use genesis::AnchorCheckpointProvider; use http_api::{Channels as HttpApiChannels, HttpApi, HttpApiConfig}; +use http_api_utils::EventChannels; use keymanager::KeyManager; use liveness_tracker::LivenessTracker; use log::{info, warn}; @@ -50,11 +51,22 @@ use crate::misc::{MetricsConfig, StorageConfig}; #[cfg(unix)] use tokio::signal::unix::SignalKind; +#[expect(clippy::module_name_repetitions)] +#[expect(clippy::struct_excessive_bools)] +pub struct RuntimeConfig { + pub back_sync_enabled: bool, + pub detect_doppelgangers: bool, + pub max_events: usize, + pub slashing_protection_history_limit: u64, + pub track_liveness: bool, + pub validator_enabled: bool, +} + #[expect(clippy::too_many_arguments)] #[expect(clippy::too_many_lines)] -#[expect(clippy::fn_params_excessive_bools)] pub async fn run_after_genesis( chain_config: Arc, + runtime_config: RuntimeConfig, store_config: StoreConfig, validator_api_config: Option, validator_config: Arc, @@ -68,15 +80,19 @@ pub async fn run_after_genesis( signer: Arc, slasher_config: Option, http_api_config: HttpApiConfig, - back_sync_enabled: bool, metrics_config: MetricsConfig, - track_liveness: bool, - detect_doppelgangers: bool, eth1_api_to_metrics_tx: Option>, eth1_api_to_metrics_rx: Option>, - slashing_protection_history_limit: u64, - validator_enabled: bool, ) -> Result<()> { + let RuntimeConfig { + back_sync_enabled, + detect_doppelgangers, + max_events, + slashing_protection_history_limit, + track_liveness, + validator_enabled, + } = runtime_config; + let MetricsConfig { metrics, metrics_server_config, @@ -112,10 +128,7 @@ pub async fn run_after_genesis( let (validator_to_p2p_tx, validator_to_p2p_rx) = mpsc::unbounded(); let (api_to_p2p_tx, api_to_p2p_rx) = mpsc::unbounded(); let (sync_to_api_tx, sync_to_api_rx) = mpsc::unbounded(); - let (fc_to_api_tx, fc_to_api_rx) = mpsc::unbounded(); let (api_to_validator_tx, api_to_validator_rx) = mpsc::unbounded(); - let (validator_to_api_tx, validator_to_api_rx) = mpsc::unbounded(); - let (pool_to_api_tx, pool_to_api_rx) = mpsc::unbounded(); let (pool_to_p2p_tx, pool_to_p2p_rx) = mpsc::unbounded(); let (subnet_service_to_p2p_tx, subnet_service_to_p2p_rx) = mpsc::unbounded(); let (subnet_service_tx, subnet_service_rx) = mpsc::unbounded(); @@ -192,15 +205,17 @@ pub async fn run_after_genesis( let current_tick = Tick::current(&chain_config, anchor_state.genesis_time())?; + let event_channels = Arc::new(EventChannels::new(max_events)); + let (controller, mutator_handle) = Controller::new( chain_config.clone_arc(), store_config, anchor_block, anchor_state.clone_arc(), current_tick, + event_channels.clone_arc(), execution_engine.clone_arc(), metrics.clone(), - fc_to_api_tx, fork_choice_to_attestation_verifier_tx, fork_choice_to_p2p_tx, fork_choice_to_pool_tx, @@ -480,7 +495,7 @@ pub async fn run_after_genesis( let (bls_to_execution_change_pool, bls_to_execution_change_pool_service) = BlsToExecutionChangePool::new( controller.clone_arc(), - pool_to_api_tx, + event_channels.clone_arc(), pool_to_p2p_tx, metrics.clone(), ); @@ -513,7 +528,6 @@ pub async fn run_after_genesis( p2p_to_validator_rx, slasher_to_validator_rx, subnet_service_tx: subnet_service_tx.clone(), - validator_to_api_tx, validator_to_liveness_tx, validator_to_slasher_tx, }; @@ -525,6 +539,7 @@ pub async fn run_after_genesis( attestation_agg_pool.clone_arc(), builder_api, doppelganger_protection, + event_channels.clone_arc(), keymanager.proposer_configs().clone_arc(), signer.clone_arc(), slashing_protector, @@ -578,11 +593,8 @@ pub async fn run_after_genesis( api_to_metrics_tx, api_to_p2p_tx, api_to_validator_tx, - fc_to_api_rx, - pool_to_api_rx, subnet_service_tx, sync_to_api_rx, - validator_to_api_rx, }; let http_api = HttpApi { @@ -590,6 +602,7 @@ pub async fn run_after_genesis( controller: controller.clone_arc(), anchor_checkpoint_provider, eth1_api, + event_channels, validator_keys, validator_config, network_config, diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 30f5119a..70442aa7 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -21,7 +21,6 @@ directories = { workspace = true } doppelganger_protection = { workspace = true } eth1_api = { workspace = true } eth2_libp2p = { workspace = true } -execution_engine = { workspace = true } features = { workspace = true } fork_choice_control = { workspace = true } fork_choice_store = { workspace = true } diff --git a/validator/src/lib.rs b/validator/src/lib.rs index 49896825..fb722245 100644 --- a/validator/src/lib.rs +++ b/validator/src/lib.rs @@ -1,6 +1,6 @@ pub use crate::{ api::{run_validator_api, ValidatorApiConfig}, - messages::{ApiToValidator, ValidatorToApi}, + messages::ApiToValidator, validator::{Channels as ValidatorChannels, Validator}, validator_config::ValidatorConfig, }; diff --git a/validator/src/messages.rs b/validator/src/messages.rs index 4744168e..329e3d9e 100644 --- a/validator/src/messages.rs +++ b/validator/src/messages.rs @@ -3,15 +3,9 @@ use std::collections::HashSet; use anyhow::{Error, Result}; use bls::PublicKeyBytes; use builder_api::unphased::containers::SignedValidatorRegistrationV1; -use execution_engine::PayloadAttributesEvent; use futures::channel::{mpsc::UnboundedSender, oneshot::Sender}; use log::warn; -use types::{ - altair::containers::SignedContributionAndProof, - combined::AttesterSlashing, - phase0::containers::{ProposerSlashing, SignedVoluntaryExit}, - preset::Preset, -}; +use types::{altair::containers::SignedContributionAndProof, preset::Preset}; pub enum ApiToValidator { RegisteredValidators(Sender>), @@ -44,19 +38,3 @@ impl InternalMessage { } } } - -pub enum ValidatorToApi { - AttesterSlashing(Box>), - ContributionAndProof(Box>), - PayloadAttributes(Box), - ProposerSlashing(Box), - VoluntaryExit(Box), -} - -impl ValidatorToApi

{ - pub fn send(self, tx: &UnboundedSender) { - if tx.unbounded_send(self).is_err() { - warn!("send from validator to HTTP API failed because the receiver was dropped"); - } - } -} diff --git a/validator/src/validator.rs b/validator/src/validator.rs index c32545c9..b93cc11e 100644 --- a/validator/src/validator.rs +++ b/validator/src/validator.rs @@ -20,7 +20,6 @@ use derive_more::Display; use doppelganger_protection::DoppelgangerProtection; use eth1_api::ApiController; use eth2_libp2p::GossipId; -use execution_engine::{PayloadAttributesEvent, PayloadAttributesEventData}; use features::Feature; use fork_choice_control::{ValidatorMessage, Wait}; use fork_choice_store::{AttestationItem, AttestationOrigin, ChainLink, StateCacheError}; @@ -35,6 +34,7 @@ use helper_functions::{ accessors, misc, signing::{RandaoEpoch, SignForAllForks, SignForSingleFork}, }; +use http_api_utils::EventChannels; use itertools::Itertools as _; use keymanager::ProposerConfigs; use liveness_tracker::ValidatorToLiveness; @@ -78,7 +78,7 @@ use types::{ }; use crate::{ - messages::{ApiToValidator, InternalMessage, ValidatorToApi}, + messages::{ApiToValidator, InternalMessage}, misc::{Aggregator, SyncCommitteeMember}, own_beacon_committee_members::{BeaconCommitteeMember, OwnBeaconCommitteeMembers}, own_sync_committee_subscriptions::OwnSyncCommitteeSubscriptions, @@ -115,7 +115,6 @@ pub struct Channels { pub p2p_to_validator_rx: UnboundedReceiver>, pub slasher_to_validator_rx: Option>>, pub subnet_service_tx: UnboundedSender, - pub validator_to_api_tx: UnboundedSender>, pub validator_to_liveness_tx: Option>>, pub validator_to_slasher_tx: Option>, } @@ -142,6 +141,7 @@ pub struct Validator { validator_votes: HashMap>, builder_api: Option>, doppelganger_protection: Option>, + event_channels: Arc, last_registration_epoch: Option, proposer_configs: Arc, signer: Arc, @@ -154,7 +154,6 @@ pub struct Validator { metrics: Option>, internal_tx: UnboundedSender, internal_rx: UnboundedReceiver, - validator_to_api_tx: UnboundedSender>, validator_to_liveness_tx: Option>>, validator_to_slasher_tx: Option>, } @@ -169,6 +168,7 @@ impl Validator { attestation_agg_pool: Arc>, builder_api: Option>, doppelganger_protection: Option>, + event_channels: Arc, proposer_configs: Arc, signer: Arc, slashing_protector: Arc>, @@ -183,7 +183,6 @@ impl Validator { p2p_to_validator_rx, slasher_to_validator_rx, subnet_service_tx, - validator_to_api_tx, validator_to_liveness_tx, validator_to_slasher_tx, } = channels; @@ -216,6 +215,7 @@ impl Validator { validator_votes: HashMap::new(), builder_api, doppelganger_protection, + event_channels, last_registration_epoch: None, proposer_configs, signer, @@ -227,7 +227,6 @@ impl Validator { metrics, internal_rx, internal_tx, - validator_to_api_tx, validator_to_liveness_tx, validator_to_slasher_tx, } @@ -308,20 +307,15 @@ impl Validator { if let Some(state) = slot_head.beacon_state.post_bellatrix() { let payload = state.latest_execution_payload_header(); - let payload_attributes_event = PayloadAttributesEvent { - version: slot_head.beacon_state.phase(), - data: PayloadAttributesEventData { - proposal_slot: slot, - proposer_index, - parent_block_root: slot_head.beacon_block_root, - payload_attributes: payload_attributes.clone().into(), - parent_block_number: payload.block_number(), - parent_block_hash: payload.block_hash(), - }, - }; - - ValidatorToApi::PayloadAttributes(Box::new(payload_attributes_event)) - .send(&self.validator_to_api_tx); + self.event_channels.send_payload_attributes_event( + slot_head.beacon_state.phase(), + proposer_index, + slot, + slot_head.beacon_block_root, + &payload_attributes, + payload.block_number(), + payload.block_hash(), + ); } block_build_context.prepare_execution_payload_for_slot( @@ -351,7 +345,7 @@ impl Validator { .await?; if matches!(outcome, PoolAdditionOutcome::Accept) { - ValidatorToApi::AttesterSlashing(slashing).send(&self.validator_to_api_tx); + self.event_channels.send_attester_slashing_event(&slashing); } self.handle_pool_addition_outcome_for_p2p(outcome, gossip_id); @@ -363,7 +357,7 @@ impl Validator { .await?; if matches!(outcome, PoolAdditionOutcome::Accept) { - ValidatorToApi::ProposerSlashing(slashing).send(&self.validator_to_api_tx); + self.event_channels.send_proposer_slashing_event(&slashing); } self.handle_pool_addition_outcome_for_p2p(outcome, gossip_id); @@ -375,7 +369,7 @@ impl Validator { .await?; if matches!(outcome, PoolAdditionOutcome::Accept) { - ValidatorToApi::VoluntaryExit(voluntary_exit).send(&self.validator_to_api_tx); + self.event_channels.send_voluntary_exit_event(&voluntary_exit); } self.handle_pool_addition_outcome_for_p2p(outcome, gossip_id); @@ -1953,12 +1947,14 @@ impl Validator { .filter_map(|(index, contribution_and_proof, result)| async move { match result { Ok(_) => { - ValidatorToApi::ContributionAndProof(Box::new(contribution_and_proof)) - .send(&self.validator_to_api_tx); + self.event_channels + .send_contribution_and_proof_event(&contribution_and_proof); + ValidatorToP2p::PublishContributionAndProof(Box::new( contribution_and_proof, )) .send(&self.p2p_tx); + None } Err(error) => Some((index, error)),