Skip to content

Commit

Permalink
Add el_offline field to beacon HTTP API (#49)
Browse files Browse the repository at this point in the history
Use custom Endpoints struct to manage urls instead of iterators in Eth1Api.

Co-authored-by: weekday <weekday@grandine.io>
  • Loading branch information
Tumas and weekday-grandine-io committed Oct 16, 2024
1 parent c4950fa commit 881d6f7
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 30 deletions.
198 changes: 198 additions & 0 deletions eth1_api/src/endpoints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
use reqwest::Url;

#[derive(Clone, Copy)]
#[cfg_attr(test, derive(PartialEq, Eq, Debug))]
pub enum EndpointStatus {
Online,
Offline,
}

impl EndpointStatus {
const fn is_offline(self) -> bool {
matches!(self, Self::Offline)
}
}

#[derive(Clone)]
#[cfg_attr(test, derive(PartialEq, Eq, Debug))]
pub struct Endpoint {
index: usize,
status: EndpointStatus,
url: Url,
}

impl Endpoint {
pub const fn url(&self) -> &Url {
&self.url
}

pub const fn is_fallback(&self) -> bool {
self.index > 0
}
}

pub struct Endpoints {
current: usize,
endpoints: Vec<Endpoint>,
}

impl Endpoints {
pub fn new(urls: impl IntoIterator<Item = Url>) -> Self {
let endpoints = urls
.into_iter()
.enumerate()
.map(|(index, url)| Endpoint {
index,
status: EndpointStatus::Online,
url,
})
.collect();

Self {
current: 0,
endpoints,
}
}

pub fn el_offline(&self) -> bool {
self.endpoints
.iter()
.all(|endpoint| endpoint.status.is_offline())
}

pub fn current(&self) -> Option<&Endpoint> {
self.endpoints.get(self.current)
}

pub fn is_empty(&self) -> bool {
self.endpoints.is_empty()
}

pub fn peek_next(&self) -> Option<&Endpoint> {
self.endpoints.get(self.next_index())
}

pub fn advance(&mut self) {
self.current = self.next_index();
}

pub fn set_status(&mut self, status: EndpointStatus) {
if let Some(current) = self.current_mut() {
current.status = status;
}
}

pub fn reset(&mut self) {
self.current = 0;
}

const fn next_index(&self) -> usize {
self.current.saturating_add(1)
}

fn current_mut(&mut self) -> Option<&mut Endpoint> {
self.endpoints.get_mut(self.current)
}
}

#[cfg(test)]
mod tests {
use anyhow::Result;

use crate::endpoints::{Endpoint, EndpointStatus, Endpoints};

#[test]
fn test_empty_endpoints() {
let endpoints = Endpoints::new([]);

assert!(endpoints.is_empty());
assert!(endpoints.el_offline());

assert_eq!(endpoints.current(), None);
assert_eq!(endpoints.peek_next(), None);
}

#[test]
fn test_endpoints() -> Result<()> {
let mut endpoints = Endpoints::new([
"https://example1.net".parse()?,
"https://example2.net".parse()?,
]);

assert!(!endpoints.is_empty());
assert!(!endpoints.el_offline(), "initially endpoints are online");

assert_eq!(
endpoints.current().cloned(),
Some(Endpoint {
index: 0,
status: EndpointStatus::Online,
url: "https://example1.net".parse()?,
}),
);

assert_eq!(
endpoints.peek_next().cloned(),
Some(Endpoint {
index: 1,
status: EndpointStatus::Online,
url: "https://example2.net".parse()?,
}),
);

endpoints.set_status(EndpointStatus::Offline);

assert_eq!(
endpoints.current().map(|endpoint| endpoint.status),
Some(EndpointStatus::Offline),
);

endpoints.advance();

assert_eq!(
endpoints.current().cloned(),
Some(Endpoint {
index: 1,
status: EndpointStatus::Online,
url: "https://example2.net".parse()?,
}),
);

assert_eq!(endpoints.peek_next(), None);
assert!(!endpoints.el_offline());

endpoints.set_status(EndpointStatus::Offline);
endpoints.advance();

assert!(!endpoints.is_empty());
assert!(endpoints.el_offline());

assert_eq!(endpoints.current(), None);
assert_eq!(endpoints.peek_next(), None);

endpoints.reset();

// offline endpoints are still offline after reset
assert!(endpoints.el_offline());

assert_eq!(
endpoints.current().cloned(),
Some(Endpoint {
index: 0,
status: EndpointStatus::Offline,
url: "https://example1.net".parse()?,
}),
);

assert_eq!(
endpoints.peek_next().cloned(),
Some(Endpoint {
index: 1,
status: EndpointStatus::Offline,
url: "https://example2.net".parse()?,
}),
);

Ok(())
}
}
55 changes: 36 additions & 19 deletions eth1_api/src/eth1_api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::{ops::RangeInclusive, time::Duration};
use std::{collections::BTreeMap, sync::Arc, vec::IntoIter};
use std::{collections::BTreeMap, sync::Arc};

use anyhow::{bail, ensure, Result};
use either::Either;
Expand Down Expand Up @@ -36,8 +36,11 @@ use web3::{
};

use crate::{
auth::Auth, deposit_event::DepositEvent, eth1_block::Eth1Block, Eth1ApiToMetrics,
Eth1ConnectionData,
auth::Auth,
deposit_event::DepositEvent,
endpoints::{Endpoint, EndpointStatus, Endpoints},
eth1_block::Eth1Block,
Eth1ApiToMetrics, Eth1ConnectionData,
};

const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_secs(8);
Expand All @@ -49,8 +52,7 @@ pub struct Eth1Api {
config: Arc<Config>,
client: Client,
auth: Arc<Auth>,
original: Vec<Url>,
endpoints: Mutex<IntoIter<Url>>,
endpoints: Mutex<Endpoints>,
eth1_api_to_metrics_tx: Option<UnboundedSender<Eth1ApiToMetrics>>,
metrics: Option<Arc<Metrics>>,
}
Expand All @@ -69,8 +71,7 @@ impl Eth1Api {
config,
client,
auth,
original: eth1_rpc_urls.clone(),
endpoints: Mutex::new(eth1_rpc_urls.into_iter()),
endpoints: Mutex::new(Endpoints::new(eth1_rpc_urls)),
eth1_api_to_metrics_tx,
metrics,
}
Expand Down Expand Up @@ -450,24 +451,31 @@ impl Eth1Api {
.await
}

pub async fn el_offline(&self) -> bool {
self.endpoints.lock().await.el_offline()
}

async fn request_with_fallback<R, O, F>(&self, request_from_api: R) -> Result<O>
where
R: Fn((Eth<Http>, Option<HeaderMap>)) -> Result<CallFuture<O, F>> + Sync + Send,
O: DeserializeOwned + Send,
F: Future<Output = Result<Value, Web3Error>> + Send,
{
while let Some(url) = self.current_endpoint().await {
while let Some(endpoint) = self.current_endpoint().await {
let url = endpoint.url();
let http = Http::with_client(self.client.clone(), url.clone());
let api = Web3::new(http).eth();
let headers = self.auth.headers()?;
let query = request_from_api((api, headers))?.await;

match query {
Ok(result) => {
self.set_endpoint_status(EndpointStatus::Online).await;

if let Some(metrics_tx) = self.eth1_api_to_metrics_tx.as_ref() {
Eth1ApiToMetrics::Eth1Connection(Eth1ConnectionData {
sync_eth1_connected: true,
sync_eth1_fallback_connected: self.original.first() != Some(&url),
sync_eth1_fallback_connected: endpoint.is_fallback(),
})
.send(metrics_tx);
}
Expand All @@ -480,9 +488,10 @@ impl Eth1Api {
}

match self.peek_next_endpoint().await {
Some(next_eth) => warn!(
Some(next_endpoint) => warn!(
"Eth1 RPC endpoint {url} returned an error: {error}; \
switching to {next_eth}",
switching to {}",
next_endpoint.url(),
),
None => warn!(
"last available Eth1 RPC endpoint {url} returned an error: {error}",
Expand All @@ -494,6 +503,7 @@ impl Eth1Api {
.send(metrics_tx);
}

self.set_endpoint_status(EndpointStatus::Offline).await;
self.next_endpoint().await;
}
}
Expand All @@ -508,25 +518,32 @@ impl Eth1Api {
// Checking this in `Eth1Api::new` would be unnecessarily strict.
// Syncing a predefined network without proposing blocks does not require an Eth1 RPC
// (except during the Merge transition).
ensure!(!self.original.is_empty(), Error::NoEndpointsProvided);
ensure!(
self.endpoints.lock().await.is_empty(),
Error::NoEndpointsProvided
);

bail!(Error::EndpointsExhausted)
}

async fn current_endpoint(&self) -> Option<Url> {
self.endpoints.lock().await.as_slice().first().cloned()
async fn current_endpoint(&self) -> Option<Endpoint> {
(*self.endpoints.lock().await).current().cloned()
}

async fn set_endpoint_status(&self, status: EndpointStatus) {
(*self.endpoints.lock().await).set_status(status)
}

async fn next_endpoint(&self) -> Option<Url> {
self.endpoints.lock().await.next()
async fn next_endpoint(&self) {
self.endpoints.lock().await.advance();
}

async fn peek_next_endpoint(&self) -> Option<Url> {
self.endpoints.lock().await.as_slice().get(1).cloned()
async fn peek_next_endpoint(&self) -> Option<Endpoint> {
self.endpoints.lock().await.peek_next().cloned()
}

async fn reset_endpoints(&self) {
*self.endpoints.lock().await = self.original.clone().into_iter();
self.endpoints.lock().await.reset();
}
}

Expand Down
1 change: 1 addition & 0 deletions eth1_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use crate::{

mod auth;
mod deposit_event;
mod endpoints;
mod eth1_api;
mod eth1_block;
mod eth1_execution_engine;
Expand Down
8 changes: 6 additions & 2 deletions http_api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,11 @@ impl<P: Preset> Context<P> {
controller.on_requested_block(block, None);
}

let execution_service =
ExecutionService::new(eth1_api, controller.clone_arc(), execution_service_rx);
let execution_service = ExecutionService::new(
eth1_api.clone_arc(),
controller.clone_arc(),
execution_service_rx,
);

let signer = Arc::new(Signer::new(
validator_keys,
Expand Down Expand Up @@ -342,6 +345,7 @@ impl<P: Preset> Context<P> {
block_producer,
controller,
anchor_checkpoint_provider,
eth1_api,
validator_keys,
validator_config,
network_config,
Expand Down
9 changes: 8 additions & 1 deletion http_api/src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::{
};
use block_producer::BlockProducer;
use bls::PublicKeyBytes;
use eth1_api::ApiController;
use eth1_api::{ApiController, Eth1Api};
use features::Feature;
use fork_choice_control::Wait;
use futures::channel::mpsc::UnboundedSender;
Expand Down Expand Up @@ -68,6 +68,7 @@ pub struct NormalState<P: Preset, W: Wait> {
pub block_producer: Arc<BlockProducer<P, W>>,
pub controller: ApiController<P, W>,
pub anchor_checkpoint_provider: AnchorCheckpointProvider<P>,
pub eth1_api: Arc<Eth1Api>,
pub validator_keys: Arc<HashSet<PublicKeyBytes>>,
pub validator_config: Arc<ValidatorConfig>,
pub metrics: Option<Arc<Metrics>>,
Expand Down Expand Up @@ -111,6 +112,12 @@ impl<P: Preset, W: Wait> FromRef<NormalState<P, W>> for AnchorCheckpointProvider
}
}

impl<P: Preset, W: Wait> FromRef<NormalState<P, W>> for Arc<Eth1Api> {
fn from_ref(state: &NormalState<P, W>) -> Self {
state.eth1_api.clone_arc()
}
}

impl<P: Preset, W: Wait> FromRef<NormalState<P, W>> for Arc<HashSet<PublicKeyBytes>> {
fn from_ref(state: &NormalState<P, W>) -> Self {
state.validator_keys.clone_arc()
Expand Down
Loading

0 comments on commit 881d6f7

Please sign in to comment.