From 881d6f77971422a5a8a611b7848b81cd8d072890 Mon Sep 17 00:00:00 2001 From: Tumas Date: Wed, 16 Oct 2024 12:53:48 +0300 Subject: [PATCH] Add `el_offline` field to beacon HTTP API (#49) Use custom Endpoints struct to manage urls instead of iterators in Eth1Api. Co-authored-by: weekday --- eth1_api/src/endpoints.rs | 198 ++++++++++++++++++++++++++++++++++++++ eth1_api/src/eth1_api.rs | 55 +++++++---- eth1_api/src/lib.rs | 1 + grandine-snapshot-tests | 2 +- http_api/src/context.rs | 8 +- http_api/src/routing.rs | 9 +- http_api/src/standard.rs | 9 +- http_api/src/task.rs | 5 +- runtime/src/runtime.rs | 8 +- 9 files changed, 265 insertions(+), 30 deletions(-) create mode 100644 eth1_api/src/endpoints.rs diff --git a/eth1_api/src/endpoints.rs b/eth1_api/src/endpoints.rs new file mode 100644 index 00000000..c5ef2be7 --- /dev/null +++ b/eth1_api/src/endpoints.rs @@ -0,0 +1,198 @@ +use reqwest::Url; + +#[derive(Clone, Copy)] +#[cfg_attr(test, derive(PartialEq, Eq, Debug))] +pub enum EndpointStatus { + Online, + Offline, +} + +impl EndpointStatus { + const fn is_offline(self) -> bool { + matches!(self, Self::Offline) + } +} + +#[derive(Clone)] +#[cfg_attr(test, derive(PartialEq, Eq, Debug))] +pub struct Endpoint { + index: usize, + status: EndpointStatus, + url: Url, +} + +impl Endpoint { + pub const fn url(&self) -> &Url { + &self.url + } + + pub const fn is_fallback(&self) -> bool { + self.index > 0 + } +} + +pub struct Endpoints { + current: usize, + endpoints: Vec, +} + +impl Endpoints { + pub fn new(urls: impl IntoIterator) -> Self { + let endpoints = urls + .into_iter() + .enumerate() + .map(|(index, url)| Endpoint { + index, + status: EndpointStatus::Online, + url, + }) + .collect(); + + Self { + current: 0, + endpoints, + } + } + + pub fn el_offline(&self) -> bool { + self.endpoints + .iter() + .all(|endpoint| endpoint.status.is_offline()) + } + + pub fn current(&self) -> Option<&Endpoint> { + self.endpoints.get(self.current) + } + + pub fn is_empty(&self) -> bool { + self.endpoints.is_empty() + } + + pub fn peek_next(&self) -> Option<&Endpoint> { + self.endpoints.get(self.next_index()) + } + + pub fn advance(&mut self) { + self.current = self.next_index(); + } + + pub fn set_status(&mut self, status: EndpointStatus) { + if let Some(current) = self.current_mut() { + current.status = status; + } + } + + pub fn reset(&mut self) { + self.current = 0; + } + + const fn next_index(&self) -> usize { + self.current.saturating_add(1) + } + + fn current_mut(&mut self) -> Option<&mut Endpoint> { + self.endpoints.get_mut(self.current) + } +} + +#[cfg(test)] +mod tests { + use anyhow::Result; + + use crate::endpoints::{Endpoint, EndpointStatus, Endpoints}; + + #[test] + fn test_empty_endpoints() { + let endpoints = Endpoints::new([]); + + assert!(endpoints.is_empty()); + assert!(endpoints.el_offline()); + + assert_eq!(endpoints.current(), None); + assert_eq!(endpoints.peek_next(), None); + } + + #[test] + fn test_endpoints() -> Result<()> { + let mut endpoints = Endpoints::new([ + "https://example1.net".parse()?, + "https://example2.net".parse()?, + ]); + + assert!(!endpoints.is_empty()); + assert!(!endpoints.el_offline(), "initially endpoints are online"); + + assert_eq!( + endpoints.current().cloned(), + Some(Endpoint { + index: 0, + status: EndpointStatus::Online, + url: "https://example1.net".parse()?, + }), + ); + + assert_eq!( + endpoints.peek_next().cloned(), + Some(Endpoint { + index: 1, + status: EndpointStatus::Online, + url: "https://example2.net".parse()?, + }), + ); + + endpoints.set_status(EndpointStatus::Offline); + + assert_eq!( + endpoints.current().map(|endpoint| endpoint.status), + Some(EndpointStatus::Offline), + ); + + endpoints.advance(); + + assert_eq!( + endpoints.current().cloned(), + Some(Endpoint { + index: 1, + status: EndpointStatus::Online, + url: "https://example2.net".parse()?, + }), + ); + + assert_eq!(endpoints.peek_next(), None); + assert!(!endpoints.el_offline()); + + endpoints.set_status(EndpointStatus::Offline); + endpoints.advance(); + + assert!(!endpoints.is_empty()); + assert!(endpoints.el_offline()); + + assert_eq!(endpoints.current(), None); + assert_eq!(endpoints.peek_next(), None); + + endpoints.reset(); + + // offline endpoints are still offline after reset + assert!(endpoints.el_offline()); + + assert_eq!( + endpoints.current().cloned(), + Some(Endpoint { + index: 0, + status: EndpointStatus::Offline, + url: "https://example1.net".parse()?, + }), + ); + + assert_eq!( + endpoints.peek_next().cloned(), + Some(Endpoint { + index: 1, + status: EndpointStatus::Offline, + url: "https://example2.net".parse()?, + }), + ); + + Ok(()) + } +} diff --git a/eth1_api/src/eth1_api.rs b/eth1_api/src/eth1_api.rs index d60f6b8b..6d4c8b13 100644 --- a/eth1_api/src/eth1_api.rs +++ b/eth1_api/src/eth1_api.rs @@ -1,5 +1,5 @@ use core::{ops::RangeInclusive, time::Duration}; -use std::{collections::BTreeMap, sync::Arc, vec::IntoIter}; +use std::{collections::BTreeMap, sync::Arc}; use anyhow::{bail, ensure, Result}; use either::Either; @@ -36,8 +36,11 @@ use web3::{ }; use crate::{ - auth::Auth, deposit_event::DepositEvent, eth1_block::Eth1Block, Eth1ApiToMetrics, - Eth1ConnectionData, + auth::Auth, + deposit_event::DepositEvent, + endpoints::{Endpoint, EndpointStatus, Endpoints}, + eth1_block::Eth1Block, + Eth1ApiToMetrics, Eth1ConnectionData, }; const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_secs(8); @@ -49,8 +52,7 @@ pub struct Eth1Api { config: Arc, client: Client, auth: Arc, - original: Vec, - endpoints: Mutex>, + endpoints: Mutex, eth1_api_to_metrics_tx: Option>, metrics: Option>, } @@ -69,8 +71,7 @@ impl Eth1Api { config, client, auth, - original: eth1_rpc_urls.clone(), - endpoints: Mutex::new(eth1_rpc_urls.into_iter()), + endpoints: Mutex::new(Endpoints::new(eth1_rpc_urls)), eth1_api_to_metrics_tx, metrics, } @@ -450,13 +451,18 @@ impl Eth1Api { .await } + pub async fn el_offline(&self) -> bool { + self.endpoints.lock().await.el_offline() + } + async fn request_with_fallback(&self, request_from_api: R) -> Result where R: Fn((Eth, Option)) -> Result> + Sync + Send, O: DeserializeOwned + Send, F: Future> + Send, { - while let Some(url) = self.current_endpoint().await { + while let Some(endpoint) = self.current_endpoint().await { + let url = endpoint.url(); let http = Http::with_client(self.client.clone(), url.clone()); let api = Web3::new(http).eth(); let headers = self.auth.headers()?; @@ -464,10 +470,12 @@ impl Eth1Api { match query { Ok(result) => { + self.set_endpoint_status(EndpointStatus::Online).await; + if let Some(metrics_tx) = self.eth1_api_to_metrics_tx.as_ref() { Eth1ApiToMetrics::Eth1Connection(Eth1ConnectionData { sync_eth1_connected: true, - sync_eth1_fallback_connected: self.original.first() != Some(&url), + sync_eth1_fallback_connected: endpoint.is_fallback(), }) .send(metrics_tx); } @@ -480,9 +488,10 @@ impl Eth1Api { } match self.peek_next_endpoint().await { - Some(next_eth) => warn!( + Some(next_endpoint) => warn!( "Eth1 RPC endpoint {url} returned an error: {error}; \ - switching to {next_eth}", + switching to {}", + next_endpoint.url(), ), None => warn!( "last available Eth1 RPC endpoint {url} returned an error: {error}", @@ -494,6 +503,7 @@ impl Eth1Api { .send(metrics_tx); } + self.set_endpoint_status(EndpointStatus::Offline).await; self.next_endpoint().await; } } @@ -508,25 +518,32 @@ impl Eth1Api { // Checking this in `Eth1Api::new` would be unnecessarily strict. // Syncing a predefined network without proposing blocks does not require an Eth1 RPC // (except during the Merge transition). - ensure!(!self.original.is_empty(), Error::NoEndpointsProvided); + ensure!( + self.endpoints.lock().await.is_empty(), + Error::NoEndpointsProvided + ); bail!(Error::EndpointsExhausted) } - async fn current_endpoint(&self) -> Option { - self.endpoints.lock().await.as_slice().first().cloned() + async fn current_endpoint(&self) -> Option { + (*self.endpoints.lock().await).current().cloned() + } + + async fn set_endpoint_status(&self, status: EndpointStatus) { + (*self.endpoints.lock().await).set_status(status) } - async fn next_endpoint(&self) -> Option { - self.endpoints.lock().await.next() + async fn next_endpoint(&self) { + self.endpoints.lock().await.advance(); } - async fn peek_next_endpoint(&self) -> Option { - self.endpoints.lock().await.as_slice().get(1).cloned() + async fn peek_next_endpoint(&self) -> Option { + self.endpoints.lock().await.peek_next().cloned() } async fn reset_endpoints(&self) { - *self.endpoints.lock().await = self.original.clone().into_iter(); + self.endpoints.lock().await.reset(); } } diff --git a/eth1_api/src/lib.rs b/eth1_api/src/lib.rs index 9d500838..5ed2ea4f 100644 --- a/eth1_api/src/lib.rs +++ b/eth1_api/src/lib.rs @@ -11,6 +11,7 @@ pub use crate::{ mod auth; mod deposit_event; +mod endpoints; mod eth1_api; mod eth1_block; mod eth1_execution_engine; diff --git a/grandine-snapshot-tests b/grandine-snapshot-tests index dc758f0b..7c926b47 160000 --- a/grandine-snapshot-tests +++ b/grandine-snapshot-tests @@ -1 +1 @@ -Subproject commit dc758f0b3bb7b7d57aa14dd3b8b94a7d22a3fc13 +Subproject commit 7c926b47073576508ff66200e55b6b25d58d2f61 diff --git a/http_api/src/context.rs b/http_api/src/context.rs index 09a1b737..d4a63b72 100644 --- a/http_api/src/context.rs +++ b/http_api/src/context.rs @@ -198,8 +198,11 @@ impl Context

{ controller.on_requested_block(block, None); } - let execution_service = - ExecutionService::new(eth1_api, controller.clone_arc(), execution_service_rx); + let execution_service = ExecutionService::new( + eth1_api.clone_arc(), + controller.clone_arc(), + execution_service_rx, + ); let signer = Arc::new(Signer::new( validator_keys, @@ -342,6 +345,7 @@ impl Context

{ block_producer, controller, anchor_checkpoint_provider, + eth1_api, validator_keys, validator_config, network_config, diff --git a/http_api/src/routing.rs b/http_api/src/routing.rs index 0273c23c..5787aae2 100644 --- a/http_api/src/routing.rs +++ b/http_api/src/routing.rs @@ -7,7 +7,7 @@ use axum::{ }; use block_producer::BlockProducer; use bls::PublicKeyBytes; -use eth1_api::ApiController; +use eth1_api::{ApiController, Eth1Api}; use features::Feature; use fork_choice_control::Wait; use futures::channel::mpsc::UnboundedSender; @@ -68,6 +68,7 @@ pub struct NormalState { pub block_producer: Arc>, pub controller: ApiController, pub anchor_checkpoint_provider: AnchorCheckpointProvider

, + pub eth1_api: Arc, pub validator_keys: Arc>, pub validator_config: Arc, pub metrics: Option>, @@ -111,6 +112,12 @@ impl FromRef> for AnchorCheckpointProvider } } +impl FromRef> for Arc { + fn from_ref(state: &NormalState) -> Self { + state.eth1_api.clone_arc() + } +} + impl FromRef> for Arc> { fn from_ref(state: &NormalState) -> Self { state.validator_keys.clone_arc() diff --git a/http_api/src/standard.rs b/http_api/src/standard.rs index 93f4b866..62d6961c 100644 --- a/http_api/src/standard.rs +++ b/http_api/src/standard.rs @@ -21,7 +21,7 @@ use block_producer::{BlockBuildOptions, BlockProducer, ProposerData, ValidatorBl use bls::{PublicKeyBytes, SignatureBytes}; use builder_api::unphased::containers::SignedValidatorRegistrationV1; use enum_iterator::Sequence as _; -use eth1_api::ApiController; +use eth1_api::{ApiController, Eth1Api}; use eth2_libp2p::PeerId; use fork_choice_control::{ForkChoiceContext, ForkTip, Wait}; use futures::{ @@ -351,9 +351,6 @@ struct NodeVersionResponse<'version> { version: Option<&'version str>, } -// TODO(Grandine Team): `NodeSyncingResponse` should have an `el_offline` field. -// It was added in Eth Beacon Node API version 2.4.0. -// See . #[derive(Serialize)] pub struct NodeSyncingResponse { #[serde(with = "serde_utils::string_or_native")] @@ -362,6 +359,7 @@ pub struct NodeSyncingResponse { sync_distance: Slot, is_syncing: bool, is_optimistic: bool, + el_offline: bool, } #[derive(Serialize)] @@ -1716,6 +1714,7 @@ pub async fn node_version(State(network_config): State>) -> R /// `GET /eth/v1/node/syncing` pub async fn node_syncing_status( State(controller): State>, + State(eth1_api): State>, State(is_synced): State>, State(is_back_synced): State>, ) -> EthResponse { @@ -1723,6 +1722,7 @@ pub async fn node_syncing_status( let head_slot = snapshot.head_slot(); let is_synced = is_synced.get(); let is_back_synced = is_back_synced.get(); + let el_offline = eth1_api.el_offline().await; EthResponse::json(NodeSyncingResponse { head_slot, @@ -1731,6 +1731,7 @@ pub async fn node_syncing_status( .unwrap_or_else(|| controller.slot() - head_slot), is_syncing: !(is_synced && is_back_synced), is_optimistic: snapshot.is_optimistic(), + el_offline, }) } diff --git a/http_api/src/task.rs b/http_api/src/task.rs index 8526afea..07b7a0c5 100644 --- a/http_api/src/task.rs +++ b/http_api/src/task.rs @@ -5,7 +5,7 @@ use anyhow::{Error as AnyhowError, Result}; use axum::Router; use block_producer::BlockProducer; use bls::PublicKeyBytes; -use eth1_api::ApiController; +use eth1_api::{ApiController, Eth1Api}; use fork_choice_control::{ApiMessage, Wait}; use futures::{ channel::mpsc::{UnboundedReceiver, UnboundedSender}, @@ -53,6 +53,7 @@ pub struct HttpApi { pub block_producer: Arc>, pub controller: ApiController, pub anchor_checkpoint_provider: AnchorCheckpointProvider

, + pub eth1_api: Arc, pub validator_keys: Arc>, pub validator_config: Arc, pub network_config: Arc, @@ -84,6 +85,7 @@ impl HttpApi { block_producer, controller, anchor_checkpoint_provider, + eth1_api, validator_keys, validator_config, network_config, @@ -123,6 +125,7 @@ impl HttpApi { block_producer, controller, anchor_checkpoint_provider, + eth1_api, validator_keys, validator_config, metrics: metrics.clone(), diff --git a/runtime/src/runtime.rs b/runtime/src/runtime.rs index d2bda659..4c9e36b6 100644 --- a/runtime/src/runtime.rs +++ b/runtime/src/runtime.rs @@ -211,8 +211,11 @@ pub async fn run_after_genesis( unfinalized_blocks, )?; - let execution_service = - ExecutionService::new(eth1_api, controller.clone_arc(), execution_service_rx); + let execution_service = ExecutionService::new( + eth1_api.clone_arc(), + controller.clone_arc(), + execution_service_rx, + ); let validator_keys = Arc::new(signer_snapshot.keys().copied().collect::>()); @@ -582,6 +585,7 @@ pub async fn run_after_genesis( block_producer, controller: controller.clone_arc(), anchor_checkpoint_provider, + eth1_api, validator_keys, validator_config, network_config: Arc::new(network_config),