Skip to content

Commit

Permalink
Simplify broadcast event handling by removing unnecessary indirection:
Browse files Browse the repository at this point in the history
- Remove an extra layer of message passing, as `tokio::broadcast::Sender::send` already handles message distribution.
- Delete message types: `PoolToApiMessage`, `ValidatorToApi`, and `fork_choice_control::ApiMessage`.
- Consolidate SSE event handling into a single file: http_api_utils/src/events.rs
- Build event messages only when receivers are present.
  • Loading branch information
Tumas committed Dec 12, 2024
1 parent 58a2730 commit e48fd9e
Show file tree
Hide file tree
Showing 34 changed files with 912 additions and 728 deletions.
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions execution_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ pub use crate::{
types::{
EngineGetPayloadV1Response, EngineGetPayloadV2Response, EngineGetPayloadV3Response,
EngineGetPayloadV4Response, ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3,
ForkChoiceStateV1, ForkChoiceUpdatedResponse, PayloadAttributes, PayloadAttributesEvent,
PayloadAttributesEventData, PayloadAttributesV1, PayloadAttributesV2, PayloadAttributesV3,
PayloadId, PayloadStatus, PayloadStatusV1, PayloadStatusWithBlockHash,
PayloadValidationStatus, RawExecutionRequests,
ForkChoiceStateV1, ForkChoiceUpdatedResponse, PayloadAttributes, PayloadAttributesV1,
PayloadAttributesV2, PayloadAttributesV3, PayloadId, PayloadStatus, PayloadStatusV1,
PayloadStatusWithBlockHash, PayloadValidationStatus, RawExecutionRequests, WithdrawalV1,
},
};

Expand Down
158 changes: 1 addition & 157 deletions execution_engine/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use types::{
},
nonstandard::{Phase, WithBlobsAndMev},
phase0::primitives::{
ExecutionAddress, ExecutionBlockHash, ExecutionBlockNumber, Gwei, Slot, UnixSeconds,
ExecutionAddress, ExecutionBlockHash, ExecutionBlockNumber, Gwei, UnixSeconds,
ValidatorIndex, H256,
},
preset::Preset,
Expand Down Expand Up @@ -413,162 +413,6 @@ pub struct PayloadAttributesV3<P: Preset> {
pub parent_beacon_block_root: H256,
}

#[derive(Debug, Serialize)]
pub struct PayloadAttributesEvent {
pub version: Phase,
pub data: PayloadAttributesEventData,
}

#[derive(Debug, Serialize)]
pub struct PayloadAttributesEventData {
#[serde(with = "serde_utils::string_or_native")]
pub proposal_slot: Slot,
pub parent_block_root: H256,
#[serde(with = "serde_utils::string_or_native")]
pub parent_block_number: ExecutionBlockNumber,
pub parent_block_hash: ExecutionBlockHash,
#[serde(with = "serde_utils::string_or_native")]
pub proposer_index: ValidatorIndex,
pub payload_attributes: CombinedPayloadAttributesEventData,
}

#[derive(Debug, Serialize)]
#[serde(untagged, bound = "")]
pub enum CombinedPayloadAttributesEventData {
Bellatrix(PayloadAttributesEventDataV1),
Capella(PayloadAttributesEventDataV2),
Deneb(PayloadAttributesEventDataV3),
Electra(PayloadAttributesEventDataV3),
}

#[derive(Debug, Serialize)]
pub struct PayloadAttributesEventDataV1 {
#[serde(with = "serde_utils::string_or_native")]
pub timestamp: UnixSeconds,
pub prev_randao: H256,
pub suggested_fee_recipient: ExecutionAddress,
}

#[derive(Debug, Serialize)]
pub struct PayloadAttributesEventDataV2 {
#[serde(with = "serde_utils::string_or_native")]
pub timestamp: UnixSeconds,
pub prev_randao: H256,
pub suggested_fee_recipient: ExecutionAddress,
pub withdrawals: Vec<WithdrawalEventDataV1>,
}

#[derive(Debug, Serialize)]
pub struct PayloadAttributesEventDataV3 {
#[serde(with = "serde_utils::string_or_native")]
pub timestamp: UnixSeconds,
pub prev_randao: H256,
pub suggested_fee_recipient: ExecutionAddress,
pub withdrawals: Vec<WithdrawalEventDataV1>,
pub parent_beacon_block_root: H256,
}

#[derive(Debug, Serialize)]
pub struct WithdrawalEventDataV1 {
#[serde(with = "serde_utils::string_or_native")]
pub index: WithdrawalIndex,
#[serde(with = "serde_utils::string_or_native")]
pub validator_index: ValidatorIndex,
pub address: ExecutionAddress,
#[serde(with = "serde_utils::string_or_native")]
pub amount: Gwei,
}

impl From<WithdrawalV1> for WithdrawalEventDataV1 {
fn from(withdrawal: WithdrawalV1) -> Self {
let WithdrawalV1 {
index,
validator_index,
address,
amount,
} = withdrawal;

Self {
index,
validator_index,
address,
amount,
}
}
}

impl From<PayloadAttributesV1> for PayloadAttributesEventDataV1 {
fn from(payload_attributes: PayloadAttributesV1) -> Self {
let PayloadAttributesV1 {
timestamp,
prev_randao,
suggested_fee_recipient,
} = payload_attributes;

Self {
timestamp,
prev_randao,
suggested_fee_recipient,
}
}
}

impl<P: Preset> From<PayloadAttributesV2<P>> for PayloadAttributesEventDataV2 {
fn from(payload_attributes: PayloadAttributesV2<P>) -> Self {
let PayloadAttributesV2 {
timestamp,
prev_randao,
suggested_fee_recipient,
withdrawals,
} = payload_attributes;

Self {
timestamp,
prev_randao,
suggested_fee_recipient,
withdrawals: withdrawals.into_iter().map(Into::into).collect::<Vec<_>>(),
}
}
}

impl<P: Preset> From<PayloadAttributesV3<P>> for PayloadAttributesEventDataV3 {
fn from(payload_attributes: PayloadAttributesV3<P>) -> Self {
let PayloadAttributesV3 {
timestamp,
prev_randao,
suggested_fee_recipient,
withdrawals,
parent_beacon_block_root,
} = payload_attributes;

Self {
timestamp,
prev_randao,
suggested_fee_recipient,
withdrawals: withdrawals.into_iter().map(Into::into).collect::<Vec<_>>(),
parent_beacon_block_root,
}
}
}
impl<P: Preset> From<PayloadAttributes<P>> for CombinedPayloadAttributesEventData {
fn from(payload_attributes: PayloadAttributes<P>) -> Self {
match payload_attributes {
PayloadAttributes::Bellatrix(payload_attributes_v1) => {
Self::Bellatrix(payload_attributes_v1.into())
}
PayloadAttributes::Capella(payload_attributes_v2) => {
Self::Capella(payload_attributes_v2.into())
}
PayloadAttributes::Deneb(payload_attributes_v3) => {
Self::Deneb(payload_attributes_v3.into())
}
PayloadAttributes::Electra(payload_attributes_v3) => {
Self::Electra(payload_attributes_v3.into())
}
}
}
}

/// [`engine_getPayloadV1` response](https://github.com/ethereum/execution-apis/blob/b7c5d3420e00648f456744d121ffbd929862924d/src/engine/paris.md#response-2).
pub type EngineGetPayloadV1Response<P> = ExecutionPayloadV1<P>;

Expand Down
9 changes: 5 additions & 4 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use fork_choice_store::{
};
use futures::channel::{mpsc::Sender as MultiSender, oneshot::Sender as OneshotSender};
use genesis::AnchorCheckpointProvider;
use http_api_utils::EventChannels;
use prometheus_metrics::Metrics;
use std_ext::ArcExt as _;
use thiserror::Error;
Expand All @@ -44,8 +45,8 @@ use types::{
use crate::{
block_processor::BlockProcessor,
messages::{
ApiMessage, AttestationVerifierMessage, MutatorMessage, P2pMessage, PoolMessage,
SubnetMessage, SyncMessage, ValidatorMessage,
AttestationVerifierMessage, MutatorMessage, P2pMessage, PoolMessage, SubnetMessage,
SyncMessage, ValidatorMessage,
},
misc::{VerifyAggregateAndProofResult, VerifyAttestationResult},
mutator::Mutator,
Expand Down Expand Up @@ -94,9 +95,9 @@ where
anchor_block: Arc<SignedBeaconBlock<P>>,
anchor_state: Arc<BeaconState<P>>,
tick: Tick,
event_channels: Arc<EventChannels>,
execution_engine: E,
metrics: Option<Arc<Metrics>>,
api_tx: impl UnboundedSink<ApiMessage<P>>,
attestation_verifier_tx: A, // impl UnboundedSink<AttestationVerifierMessage<P, W>>,
p2p_tx: impl UnboundedSink<P2pMessage<P>>,
pool_tx: impl UnboundedSink<PoolMessage>,
Expand Down Expand Up @@ -129,13 +130,13 @@ where
store_snapshot.clone_arc(),
state_cache.clone_arc(),
block_processor.clone_arc(),
event_channels,
execution_engine.clone(),
storage.clone_arc(),
thread_pool.clone(),
metrics.clone(),
mutator_tx.clone(),
mutator_rx,
api_tx,
attestation_verifier_tx.clone(),
p2p_tx,
pool_tx,
Expand Down
3 changes: 1 addition & 2 deletions fork_choice_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
pub use crate::{
controller::Controller,
messages::{
ApiMessage, AttestationVerifierMessage, BlockEvent, ChainReorgEvent,
FinalizedCheckpointEvent, HeadEvent, P2pMessage, PoolMessage, SubnetMessage, SyncMessage,
AttestationVerifierMessage, P2pMessage, PoolMessage, SubnetMessage, SyncMessage,
ValidatorMessage,
},
misc::{MutatorRejectionReason, VerifyAggregateAndProofResult, VerifyAttestationResult},
Expand Down
Loading

0 comments on commit e48fd9e

Please sign in to comment.