diff --git a/ic-agent/Cargo.toml b/ic-agent/Cargo.toml index e6b4e47c..00e1eb95 100644 --- a/ic-agent/Cargo.toml +++ b/ic-agent/Cargo.toml @@ -15,11 +15,11 @@ keywords = ["internet-computer", "agent", "icp", "dfinity"] include = ["src", "Cargo.toml", "../LICENSE", "README.md"] [dependencies] -arc-swap = { version = "1.7", optional = true } -async-channel = { version = "1.9", optional = true } +arc-swap = "1.7" +async-channel = "1.9" async-lock = "3.3" async-trait = "0.1" -async-watch = { version = "0.3", optional = true } +async-watch = "0.3" backoff = "0.4.0" cached = { version = "0.52", features = ["ahash"], default-features = false } candid = { workspace = true } @@ -48,7 +48,7 @@ serde_cbor = { workspace = true } serde_repr = { workspace = true } sha2 = { workspace = true } simple_asn1 = "0.6.1" -stop-token = { version = "0.7", optional = true } +stop-token = "0.7" thiserror = { workspace = true } time = { workspace = true } tower-service = "0.3" @@ -77,6 +77,7 @@ web-sys = { version = "0.3", features = ["Window"], optional = true } [dev-dependencies] serde_json.workspace = true tracing-subscriber = "0.3" +tracing = "0.1" [target.'cfg(not(target_family = "wasm"))'.dev-dependencies] tokio = { workspace = true, features = ["full"] } @@ -109,16 +110,10 @@ wasm-bindgen = [ "backoff/wasm-bindgen", "cached/wasm", ] -_internal_dynamic-routing = [ - "dep:arc-swap", - "dep:async-channel", - "dep:async-watch", - "dep:stop-token", - "tracing", -] +_internal_dynamic-routing = [] tracing = ["dep:tracing"] # Does very little right now. [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu", "wasm32-unknown-unknown"] rustdoc-args = ["--cfg=docsrs"] -features = ["_internal_dynamic-routing"] +features = [] diff --git a/ic-agent/src/agent/agent_config.rs b/ic-agent/src/agent/agent_config.rs index 6a637701..11ed2f35 100644 --- a/ic-agent/src/agent/agent_config.rs +++ b/ic-agent/src/agent/agent_config.rs @@ -1,4 +1,5 @@ use reqwest::Client; +use url::Url; use crate::{ agent::{NonceFactory, NonceGenerator}, @@ -33,6 +34,10 @@ pub struct AgentConfig { pub http_service: Option>, /// See [`with_max_polling_time`](super::AgentBuilder::with_max_polling_time). pub max_polling_time: Duration, + /// See [`with_background_dynamic_routing`](super::AgentBuilder::with_background_dynamic_routing). + pub background_dynamic_routing: bool, + /// See [`with_url`](super::AgentBuilder::with_url). + pub url: Option, } impl Default for AgentConfig { @@ -49,6 +54,8 @@ impl Default for AgentConfig { max_response_body_size: None, max_tcp_error_retries: 0, max_polling_time: Duration::from_secs(60 * 5), + background_dynamic_routing: false, + url: None, } } } diff --git a/ic-agent/src/agent/builder.rs b/ic-agent/src/agent/builder.rs index cc42752f..388cd0aa 100644 --- a/ic-agent/src/agent/builder.rs +++ b/ic-agent/src/agent/builder.rs @@ -1,5 +1,3 @@ -use url::Url; - use crate::{ agent::{agent_config::AgentConfig, Agent}, AgentError, Identity, NonceFactory, NonceGenerator, @@ -20,34 +18,27 @@ impl AgentBuilder { Agent::new(self.config) } - /// Set the dynamic transport layer for the [`Agent`], performing continuous discovery of the API boundary nodes and routing traffic via them based on latency. - #[cfg(feature = "_internal_dynamic-routing")] - pub async fn with_discovery_transport(self, client: reqwest::Client) -> Self { - use crate::agent::route_provider::dynamic_routing::{ - dynamic_route_provider::{DynamicRouteProviderBuilder, IC0_SEED_DOMAIN}, - node::Node, - snapshot::latency_based_routing::LatencyRoutingSnapshot, - }; - // TODO: This is a temporary solution to get the seed node. - let seed = Node::new(IC0_SEED_DOMAIN).unwrap(); - - let route_provider = DynamicRouteProviderBuilder::new( - LatencyRoutingSnapshot::new(), - vec![seed], - client.clone(), - ) - .build() - .await; - - let route_provider = Arc::new(route_provider) as Arc; - - self.with_arc_route_provider(route_provider) - .with_http_client(client) + /// Set the dynamic transport layer for the [`Agent`], performing continuous discovery of the API boundary nodes + /// and routing traffic via them based on latency. Cannot be set together with `with_route_provider`. + /// + /// See [`DynamicRouteProvider`](super::route_provider::DynamicRouteProvider) if more customization is needed such as polling intervals. + pub async fn with_background_dynamic_routing(mut self) -> Self { + assert!( + self.config.route_provider.is_none(), + "with_background_dynamic_routing cannot be called with with_route_provider" + ); + self.config.background_dynamic_routing = true; + self } - /// Set the URL of the [Agent]. - pub fn with_url>(self, url: S) -> Self { - self.with_route_provider(url.into().parse::().unwrap()) + /// Set the URL of the [`Agent`]. Either this or `with_route_provider` must be called (but not both). + pub fn with_url>(mut self, url: S) -> Self { + assert!( + self.config.route_provider.is_none(), + "with_url cannot be called with with_route_provider" + ); + self.config.url = Some(url.into().parse().unwrap()); + self } /// Add a `NonceFactory` to this Agent. By default, no nonce is produced. @@ -125,6 +116,14 @@ impl AgentBuilder { /// Same as [`Self::with_route_provider`], but reuses an existing `Arc`. pub fn with_arc_route_provider(mut self, provider: Arc) -> Self { + assert!( + !self.config.background_dynamic_routing, + "with_background_dynamic_routing cannot be called with with_route_provider" + ); + assert!( + self.config.url.is_none(), + "with_url cannot be called with with_route_provider" + ); self.config.route_provider = Some(provider); self } diff --git a/ic-agent/src/agent/mod.rs b/ic-agent/src/agent/mod.rs index 57254eef..d760e201 100644 --- a/ic-agent/src/agent/mod.rs +++ b/ic-agent/src/agent/mod.rs @@ -29,7 +29,13 @@ pub use ic_transport_types::{ pub use nonce::{NonceFactory, NonceGenerator}; use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns}; use reqwest::{Body, Client, Request, Response}; -use route_provider::RouteProvider; +use route_provider::{ + dynamic_routing::{ + dynamic_route_provider::DynamicRouteProviderBuilder, node::Node, + snapshot::latency_based_routing::LatencyRoutingSnapshot, + }, + RouteProvider, UrlUntilReady, +}; use time::OffsetDateTime; use tower_service::Service; @@ -58,7 +64,7 @@ use std::{ borrow::Cow, collections::HashMap, convert::TryFrom, - fmt, + fmt::{self, Debug}, future::{Future, IntoFuture}, pin::Pin, sync::{Arc, Mutex, RwLock}, @@ -177,32 +183,54 @@ impl Agent { /// Create an instance of an [`Agent`]. pub fn new(config: agent_config::AgentConfig) -> Result { + let client = config.http_service.unwrap_or_else(|| { + Arc::new(Retry429Logic { + client: config.client.unwrap_or_else(|| { + #[cfg(not(target_family = "wasm"))] + { + Client::builder() + .use_rustls_tls() + .timeout(Duration::from_secs(360)) + .build() + .expect("Could not create HTTP client.") + } + #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))] + { + Client::new() + } + }), + }) + }); Ok(Agent { nonce_factory: config.nonce_factory, identity: config.identity, ingress_expiry: config.ingress_expiry, root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())), - client: config.http_service.unwrap_or_else(|| { - Arc::new(Retry429Logic { - client: config.client.unwrap_or_else(|| { - #[cfg(not(target_family = "wasm"))] - { - Client::builder() - .use_rustls_tls() - .timeout(Duration::from_secs(360)) - .build() - .expect("Could not create HTTP client.") - } - #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))] - { - Client::new() - } - }), - }) - }), - route_provider: config - .route_provider - .expect("missing `url` or `route_provider` in `AgentBuilder`"), + client: client.clone(), + route_provider: if let Some(route_provider) = config.route_provider { + route_provider + } else if let Some(url) = config.url { + if config.background_dynamic_routing { + assert!( + url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(), + "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme" + ); + let seeds = vec![Node::new(url.domain().unwrap()).unwrap()]; + UrlUntilReady::new(url, async move { + DynamicRouteProviderBuilder::new( + LatencyRoutingSnapshot::new(), + seeds, + client, + ) + .build() + .await + }) as Arc + } else { + Arc::new(url) + } + } else { + panic!("either route_provider or url must be specified"); + }, subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())), verify_query_signatures: config.verify_query_signatures, concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)), @@ -1862,7 +1890,7 @@ impl<'agent> IntoFuture for UpdateBuilder<'agent> { /// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`. #[cfg_attr(target_family = "wasm", async_trait(?Send))] #[cfg_attr(not(target_family = "wasm"), async_trait)] -pub trait HttpService: Send + Sync { +pub trait HttpService: Send + Sync + Debug { /// Perform a HTTP request. Any retry logic should call `req` again, instead of `Request::try_clone`. async fn call<'a>( &'a self, @@ -1876,7 +1904,7 @@ impl HttpService for T where for<'a> &'a T: Service, for<'a> <&'a Self as Service>::Future: Send, - T: Send + Sync + ?Sized, + T: Send + Sync + Debug + ?Sized, { #[allow(clippy::needless_arbitrary_self_type)] async fn call<'a>( @@ -1907,7 +1935,7 @@ where impl HttpService for T where for<'a> &'a T: Service, - T: Send + Sync + ?Sized, + T: Send + Sync + Debug + ?Sized, { #[allow(clippy::needless_arbitrary_self_type)] async fn call<'a>( @@ -1919,6 +1947,7 @@ where } } +#[derive(Debug)] struct Retry429Logic { client: Client, } diff --git a/ic-agent/src/agent/route_provider.rs b/ic-agent/src/agent/route_provider.rs index 77344b8b..8720c05a 100644 --- a/ic-agent/src/agent/route_provider.rs +++ b/ic-agent/src/agent/route_provider.rs @@ -1,12 +1,29 @@ //! A [`RouteProvider`] for dynamic generation of routing urls. +use arc_swap::ArcSwapOption; +use dynamic_routing::{ + dynamic_route_provider::DynamicRouteProviderBuilder, + node::Node, + snapshot::{ + latency_based_routing::LatencyRoutingSnapshot, + round_robin_routing::RoundRobinRoutingSnapshot, + }, +}; use std::{ + future::Future, str::FromStr, - sync::atomic::{AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, }; use url::Url; use crate::agent::AgentError; +use super::HttpService; +#[cfg(not(feature = "_internal_dynamic-routing"))] +pub(crate) mod dynamic_routing; #[cfg(feature = "_internal_dynamic-routing")] pub mod dynamic_routing; @@ -118,6 +135,143 @@ impl RouteProvider for Url { } } +/// A [`RouteProvider`] that will attempt to discover new boundary nodes and cycle through them, optionally prioritizing those with low latency. +#[derive(Debug)] +pub struct DynamicRouteProvider { + inner: Box, +} + +impl DynamicRouteProvider { + /// Create a new `DynamicRouter` from a list of seed domains and a routing strategy. + pub async fn run_in_background( + seed_domains: Vec, + client: Arc, + strategy: DynamicRoutingStrategy, + ) -> Result { + let seed_nodes: Result, _> = seed_domains.into_iter().map(Node::new).collect(); + let boxed = match strategy { + DynamicRoutingStrategy::ByLatency => Box::new( + DynamicRouteProviderBuilder::new( + LatencyRoutingSnapshot::new(), + seed_nodes?, + client, + ) + .build() + .await, + ) as Box, + DynamicRoutingStrategy::RoundRobin => Box::new( + DynamicRouteProviderBuilder::new( + RoundRobinRoutingSnapshot::new(), + seed_nodes?, + client, + ) + .build() + .await, + ), + }; + Ok(Self { inner: boxed }) + } + /// Same as [`run_in_background`](Self::run_in_background), but with custom intervals for refreshing the routing list and health-checking nodes. + pub async fn run_in_background_with_intervals( + seed_domains: Vec, + client: Arc, + strategy: DynamicRoutingStrategy, + list_update_interval: Duration, + health_check_interval: Duration, + ) -> Result { + let seed_nodes: Result, _> = seed_domains.into_iter().map(Node::new).collect(); + let boxed = match strategy { + DynamicRoutingStrategy::ByLatency => Box::new( + DynamicRouteProviderBuilder::new( + LatencyRoutingSnapshot::new(), + seed_nodes?, + client, + ) + .with_fetch_period(list_update_interval) + .with_check_period(health_check_interval) + .build() + .await, + ) as Box, + DynamicRoutingStrategy::RoundRobin => Box::new( + DynamicRouteProviderBuilder::new( + RoundRobinRoutingSnapshot::new(), + seed_nodes?, + client, + ) + .with_fetch_period(list_update_interval) + .with_check_period(health_check_interval) + .build() + .await, + ), + }; + Ok(Self { inner: boxed }) + } +} + +impl RouteProvider for DynamicRouteProvider { + fn route(&self) -> Result { + self.inner.route() + } + fn n_ordered_routes(&self, n: usize) -> Result, AgentError> { + self.inner.n_ordered_routes(n) + } +} + +/// Strategy for [`DynamicRouteProvider`]'s routing mechanism. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub enum DynamicRoutingStrategy { + /// Prefer nodes with low latency. + ByLatency, + /// Cycle through discovered nodes with no regard for latency. + RoundRobin, +} + +#[derive(Debug)] +pub(crate) struct UrlUntilReady { + url: Url, + router: ArcSwapOption, +} + +impl UrlUntilReady { + pub(crate) fn new< + #[cfg(not(target_family = "wasm"))] F: Future + Send + 'static, + #[cfg(target_family = "wasm")] F: Future + 'static, + >( + url: Url, + fut: F, + ) -> Arc { + let s = Arc::new(Self { + url, + router: ArcSwapOption::empty(), + }); + let weak = Arc::downgrade(&s); + crate::util::spawn(async move { + let router = fut.await; + if let Some(outer) = weak.upgrade() { + outer.router.store(Some(Arc::new(router))) + } + }); + s + } +} + +impl RouteProvider for UrlUntilReady { + fn n_ordered_routes(&self, n: usize) -> Result, AgentError> { + if let Some(r) = &*self.router.load() { + r.n_ordered_routes(n) + } else { + self.url.n_ordered_routes(n) + } + } + fn route(&self) -> Result { + if let Some(r) = &*self.router.load() { + r.route() + } else { + self.url.route() + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/dynamic_route_provider.rs b/ic-agent/src/agent/route_provider/dynamic_routing/dynamic_route_provider.rs index 92b18d4a..ad9b7b40 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/dynamic_route_provider.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/dynamic_route_provider.rs @@ -8,28 +8,30 @@ use std::{ use arc_swap::ArcSwap; use candid::Principal; use futures_util::FutureExt; -use reqwest::Client; use stop_token::StopSource; use thiserror::Error; -use tracing::{error, info, warn}; use url::Url; use crate::{ - agent::route_provider::{ - dynamic_routing::{ - health_check::{HealthCheck, HealthChecker, HealthManagerActor}, - messages::FetchedNodes, - node::Node, - nodes_fetch::{Fetch, NodesFetchActor, NodesFetcher}, - snapshot::routing_snapshot::RoutingSnapshot, - type_aliases::AtomicSwap, + agent::{ + route_provider::{ + dynamic_routing::{ + health_check::{HealthCheck, HealthChecker, HealthManagerActor}, + messages::FetchedNodes, + node::Node, + nodes_fetch::{Fetch, NodesFetchActor, NodesFetcher}, + snapshot::routing_snapshot::RoutingSnapshot, + type_aliases::AtomicSwap, + }, + RouteProvider, }, - RouteProvider, + HttpService, }, AgentError, }; /// The default seed domain for boundary node discovery. +#[allow(unused)] pub const IC0_SEED_DOMAIN: &str = "ic0.app"; const MAINNET_ROOT_SUBNET_ID: &str = @@ -38,9 +40,10 @@ const MAINNET_ROOT_SUBNET_ID: &str = const FETCH_PERIOD: Duration = Duration::from_secs(5); const FETCH_RETRY_INTERVAL: Duration = Duration::from_millis(250); const TIMEOUT_AWAIT_HEALTHY_SEED: Duration = Duration::from_millis(1000); +#[allow(unused)] const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(1); const HEALTH_CHECK_PERIOD: Duration = Duration::from_secs(1); - +#[allow(unused)] const DYNAMIC_ROUTE_PROVIDER: &str = "DynamicRouteProvider"; /// A dynamic route provider. @@ -75,9 +78,6 @@ pub enum DynamicRouteProviderError { /// An error when checking API node's health. #[error("An error when checking API node's health: {0}")] HealthCheckError(String), - /// An invalid domain name provided. - #[error("Provided domain name is invalid: {0}")] - InvalidDomainName(String), } /// A builder for the `DynamicRouteProvider`. @@ -93,13 +93,17 @@ pub struct DynamicRouteProviderBuilder { impl DynamicRouteProviderBuilder { /// Creates a new instance of the builder. - pub fn new(snapshot: S, seeds: Vec, http_client: Client) -> Self { + pub fn new(snapshot: S, seeds: Vec, http_client: Arc) -> Self { let fetcher = Arc::new(NodesFetcher::new( http_client.clone(), Principal::from_text(MAINNET_ROOT_SUBNET_ID).unwrap(), None, )); - let checker = Arc::new(HealthChecker::new(http_client, HEALTH_CHECK_TIMEOUT)); + let checker = Arc::new(HealthChecker::new( + http_client, + #[cfg(not(target_family = "wasm"))] + HEALTH_CHECK_TIMEOUT, + )); Self { fetcher, fetch_period: FETCH_PERIOD, @@ -112,6 +116,7 @@ impl DynamicRouteProviderBuilder { } /// Sets the fetcher of the nodes in the topology. + #[allow(unused)] pub fn with_fetcher(mut self, fetcher: Arc) -> Self { self.fetcher = fetcher; self @@ -124,6 +129,7 @@ impl DynamicRouteProviderBuilder { } /// Sets the node health checker. + #[allow(unused)] pub fn with_checker(mut self, checker: Arc) -> Self { self.checker = checker; self @@ -194,7 +200,7 @@ where /// - Starts/stops health check tasks (HealthCheckActors) based on the newly added/removed nodes. /// - These spawned health check tasks periodically update the snapshot with the latest node health info. pub async fn run(&self) { - info!("{DYNAMIC_ROUTE_PROVIDER}: started ..."); + log!(info, "{DYNAMIC_ROUTE_PROVIDER}: started ..."); // Communication channel between NodesFetchActor and HealthManagerActor. let (fetch_sender, fetch_receiver) = async_watch::channel(None); @@ -213,25 +219,30 @@ where crate::util::spawn(async move { health_manager_actor.run().await }); // Dispatch all seed nodes for initial health checks - if let Err(err) = fetch_sender.send(Some(FetchedNodes { + if let Err(_err) = fetch_sender.send(Some(FetchedNodes { nodes: self.seeds.clone(), })) { - error!("{DYNAMIC_ROUTE_PROVIDER}: failed to send results to HealthManager: {err:?}"); + log!( + error, + "{DYNAMIC_ROUTE_PROVIDER}: failed to send results to HealthManager: {_err:?}" + ); } // Try await for healthy seeds. - let start = Instant::now(); + let _start = Instant::now(); futures_util::select! { _ = crate::util::sleep(TIMEOUT_AWAIT_HEALTHY_SEED).fuse() => { - warn!( + log!( + warn, "{DYNAMIC_ROUTE_PROVIDER}: no healthy seeds found within {:?}", - start.elapsed() + _start.elapsed() ); } _ = init_receiver.recv().fuse() => { - info!( + log!( + info, "{DYNAMIC_ROUTE_PROVIDER}: found healthy seeds within {:?}", - start.elapsed() + _start.elapsed() ); } } @@ -247,13 +258,14 @@ where self.token.token(), ); crate::util::spawn(async move { fetch_actor.run().await }); - info!( + log!( + info, "{DYNAMIC_ROUTE_PROVIDER}: NodesFetchActor and HealthManagerActor started successfully" ); } } -#[cfg(test)] +#[cfg(all(test, not(target_family = "wasm")))] mod tests { use candid::Principal; use reqwest::Client; @@ -288,7 +300,10 @@ mod tests { pub fn setup_tracing() { TRACING_INIT.call_once(|| { - FmtSubscriber::builder().with_max_level(Level::TRACE).init(); + FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .with_test_writer() + .init(); }); } @@ -339,7 +354,7 @@ mod tests { let route_provider = DynamicRouteProviderBuilder::new( LatencyRoutingSnapshot::new(), vec![seed], - client.clone(), + Arc::new(client.clone()), ) .build() .await; @@ -385,7 +400,7 @@ mod tests { let snapshot = RoundRobinRoutingSnapshot::new(); let client = Client::builder().build().unwrap(); let route_provider = - DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], Arc::new(client)) .with_fetcher(fetcher.clone()) .with_checker(checker.clone()) .with_fetch_period(fetch_interval) @@ -483,7 +498,7 @@ mod tests { let route_provider = DynamicRouteProviderBuilder::new( snapshot, vec![node_1.clone(), node_2.clone()], - client, + Arc::new(client), ) .with_fetcher(fetcher) .with_checker(checker.clone()) @@ -528,7 +543,7 @@ mod tests { let snapshot = RoundRobinRoutingSnapshot::new(); let client = Client::builder().build().unwrap(); let route_provider = - DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], Arc::new(client)) .with_fetcher(fetcher) .with_checker(checker.clone()) .with_fetch_period(fetch_interval) @@ -572,7 +587,7 @@ mod tests { let snapshot = RoundRobinRoutingSnapshot::new(); let client = Client::builder().build().unwrap(); let route_provider = - DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], Arc::new(client)) .with_fetcher(fetcher) .with_checker(checker) .with_fetch_period(fetch_interval) @@ -612,7 +627,7 @@ mod tests { let route_provider = DynamicRouteProviderBuilder::new( snapshot, vec![node_1.clone(), node_2.clone()], - client, + Arc::new(client), ) .with_fetcher(fetcher) .with_checker(checker.clone()) @@ -652,7 +667,7 @@ mod tests { let snapshot = RoundRobinRoutingSnapshot::new(); let client = Client::builder().build().unwrap(); let route_provider = - DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], Arc::new(client)) .with_fetcher(fetcher.clone()) .with_checker(checker.clone()) .with_fetch_period(fetch_interval) diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/health_check.rs b/ic-agent/src/agent/route_provider/dynamic_routing/health_check.rs index daa00e1a..da15c3d4 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/health_check.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/health_check.rs @@ -1,28 +1,31 @@ use async_trait::async_trait; use futures_util::FutureExt; use http::{Method, StatusCode}; -use reqwest::{Client, Request}; +use reqwest::Request; use std::{ fmt::Debug, sync::Arc, time::{Duration, Instant}, }; use stop_token::{StopSource, StopToken}; -use tracing::{debug, error, info, warn}; use url::Url; -use crate::agent::route_provider::dynamic_routing::{ - dynamic_route_provider::DynamicRouteProviderError, - messages::{FetchedNodes, NodeHealthState}, - node::Node, - snapshot::routing_snapshot::RoutingSnapshot, - type_aliases::{AtomicSwap, ReceiverMpsc, ReceiverWatch, SenderMpsc}, +use crate::agent::{ + route_provider::dynamic_routing::{ + dynamic_route_provider::DynamicRouteProviderError, + messages::{FetchedNodes, NodeHealthState}, + node::Node, + snapshot::routing_snapshot::RoutingSnapshot, + type_aliases::{AtomicSwap, ReceiverMpsc, ReceiverWatch, SenderMpsc}, + }, + HttpService, }; const CHANNEL_BUFFER: usize = 128; /// A trait representing a health check of the node. -#[async_trait] +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] pub trait HealthCheck: Send + Sync + Debug { /// Checks the health of the node. async fn check(&self, node: &Node) -> Result; @@ -54,15 +57,20 @@ impl HealthCheckStatus { /// A struct implementing the `HealthCheck` for the nodes. #[derive(Debug)] pub struct HealthChecker { - http_client: Client, + http_client: Arc, + #[cfg(not(target_family = "wasm"))] timeout: Duration, } impl HealthChecker { /// Creates a new `HealthChecker` instance. - pub fn new(http_client: Client, timeout: Duration) -> Self { + pub fn new( + http_client: Arc, + #[cfg(not(target_family = "wasm"))] timeout: Duration, + ) -> Self { Self { http_client, + #[cfg(not(target_family = "wasm"))] timeout, } } @@ -70,21 +78,30 @@ impl HealthChecker { const HEALTH_CHECKER: &str = "HealthChecker"; -#[async_trait] +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] impl HealthCheck for HealthChecker { + #[allow(unused_mut)] async fn check(&self, node: &Node) -> Result { // API boundary node exposes /health endpoint and should respond with 204 (No Content) if it's healthy. let url = Url::parse(&format!("https://{}/health", node.domain())).unwrap(); let mut request = Request::new(Method::GET, url.clone()); - *request.timeout_mut() = Some(self.timeout); + #[cfg(not(target_family = "wasm"))] + { + *request.timeout_mut() = Some(self.timeout); + } let start = Instant::now(); - let response = self.http_client.execute(request).await.map_err(|err| { - DynamicRouteProviderError::HealthCheckError(format!( - "Failed to execute GET request to {url}: {err}" - )) - })?; + let response = self + .http_client + .call(&|| Ok(request.try_clone().unwrap()), 1) + .await + .map_err(|err| { + DynamicRouteProviderError::HealthCheckError(format!( + "Failed to execute GET request to {url}: {err}" + )) + })?; let latency = start.elapsed(); if response.status() != StatusCode::NO_CONTENT { @@ -92,7 +109,7 @@ impl HealthCheck for HealthChecker { "{HEALTH_CHECKER}: Unexpected http status code {} for url={url} received", response.status() ); - error!(err_msg); + log!(error, err_msg); return Err(DynamicRouteProviderError::HealthCheckError(err_msg)); } @@ -100,6 +117,7 @@ impl HealthCheck for HealthChecker { } } +#[allow(unused)] const HEALTH_CHECK_ACTOR: &str = "HealthCheckActor"; /// A struct performing the health check of the node and sending the health status to the listener. @@ -151,7 +169,7 @@ impl HealthCheckActor { continue; } _ = self.token.clone().fuse() => { - info!("{HEALTH_CHECK_ACTOR}: was gracefully cancelled for node {:?}", self.node); + log!(info, "{HEALTH_CHECK_ACTOR}: was gracefully cancelled for node {:?}", self.node); break; } } @@ -160,6 +178,7 @@ impl HealthCheckActor { } /// The name of the health manager actor. +#[allow(unused)] pub(super) const HEALTH_MANAGER_ACTOR: &str = "HealthManagerActor"; /// A struct managing the health checks of the nodes. @@ -225,8 +244,8 @@ where result = self.fetch_receiver.recv().fuse() => { let value = match result { Ok(value) => value, - Err(err) => { - error!("{HEALTH_MANAGER_ACTOR}: nodes fetch sender has been dropped: {err:?}"); + Err(_err) => { + log!(error, "{HEALTH_MANAGER_ACTOR}: nodes fetch sender has been dropped: {_err:?}"); continue; } }; @@ -243,7 +262,7 @@ where _ = self.token.clone().fuse() => { self.stop_all_checks().await; self.check_receiver.close(); - warn!("{HEALTH_MANAGER_ACTOR}: was gracefully cancelled, all nodes health checks stopped"); + log!(warn, "{HEALTH_MANAGER_ACTOR}: was gracefully cancelled, all nodes health checks stopped"); break; } } @@ -267,10 +286,17 @@ where // This is a bug in the IC registry. There should be at least one API Boundary Node in the registry. // Updating nodes snapshot with an empty array, would lead to an irrecoverable error, as new nodes couldn't be fetched. // We avoid such updates and just wait for a non-empty list. - error!("{HEALTH_MANAGER_ACTOR}: list of fetched nodes is empty"); + log!( + error, + "{HEALTH_MANAGER_ACTOR}: list of fetched nodes is empty" + ); return; } - debug!("{HEALTH_MANAGER_ACTOR}: fetched nodes received {:?}", nodes); + log!( + debug, + "{HEALTH_MANAGER_ACTOR}: fetched nodes received {:?}", + nodes + ); let current_snapshot = self.routing_snapshot.load_full(); let mut new_snapshot = (*current_snapshot).clone(); // If the snapshot has changed, store it and restart all node's health checks. @@ -285,7 +311,10 @@ where // Create a single cancellation token for all started health checks. self.nodes_token = StopSource::new(); for node in nodes { - debug!("{HEALTH_MANAGER_ACTOR}: starting health check for node {node:?}"); + log!( + debug, + "{HEALTH_MANAGER_ACTOR}: starting health check for node {node:?}" + ); let actor = HealthCheckActor::new( Arc::clone(&self.checker), self.period, @@ -298,7 +327,10 @@ where } async fn stop_all_checks(&mut self) { - warn!("{HEALTH_MANAGER_ACTOR}: stopping all running health checks"); + log!( + warn, + "{HEALTH_MANAGER_ACTOR}: stopping all running health checks" + ); self.nodes_token = StopSource::new(); } } diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/mod.rs b/ic-agent/src/agent/route_provider/dynamic_routing/mod.rs index ccb29064..5bfc2af1 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/mod.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/mod.rs @@ -1,6 +1,6 @@ //! Dynamic routing implementation. //! -//! This is an internal unstable feature. It works, but it's still in the oven; its design will go through drastic changes before it is released. +//! This is an internal unstable feature. The guts of this system are not subject to semver and are likely to change. pub mod dynamic_route_provider; /// Health check implementation. @@ -14,6 +14,7 @@ pub mod nodes_fetch; /// Routing snapshot implementation. pub mod snapshot; #[cfg(test)] +#[cfg_attr(target_family = "wasm", allow(unused))] pub(super) mod test_utils; /// Type aliases used in dynamic routing. pub(super) mod type_aliases; diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/node.rs b/ic-agent/src/agent/route_provider/dynamic_routing/node.rs index 3aa7ca7f..cd02ddce 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/node.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/node.rs @@ -1,9 +1,6 @@ use url::Url; -use crate::agent::{ - route_provider::dynamic_routing::dynamic_route_provider::DynamicRouteProviderError, - ApiBoundaryNode, -}; +use crate::agent::ApiBoundaryNode; /// Represents a node in the dynamic routing. #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -13,20 +10,15 @@ pub struct Node { impl Node { /// Creates a new `Node` instance from the domain name. - pub fn new(domain: &str) -> Result { - if !is_valid_domain(domain) { - return Err(DynamicRouteProviderError::InvalidDomainName( - domain.to_string(), - )); - } - Ok(Self { - domain: domain.to_string(), - }) + pub fn new(domain: impl Into) -> Result { + let domain = domain.into(); + check_valid_domain(&domain)?; + Ok(Self { domain }) } /// Returns the domain name of the node. - pub fn domain(&self) -> String { - self.domain.clone() + pub fn domain(&self) -> &str { + &self.domain } } @@ -44,17 +36,18 @@ impl From<&Node> for Url { } } -impl TryFrom<&ApiBoundaryNode> for Node { - type Error = DynamicRouteProviderError; +impl TryFrom for Node { + type Error = url::ParseError; - fn try_from(value: &ApiBoundaryNode) -> Result { - Node::new(&value.domain) + fn try_from(value: ApiBoundaryNode) -> Result { + Node::new(value.domain) } } /// Checks if the given domain is a valid URL. -fn is_valid_domain>(domain: S) -> bool { +fn check_valid_domain>(domain: S) -> Result<(), url::ParseError> { // Prepend scheme to make it a valid URL let url_string = format!("http://{}", domain.as_ref()); - Url::parse(&url_string).is_ok() + Url::parse(&url_string)?; + Ok(()) } diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/nodes_fetch.rs b/ic-agent/src/agent/route_provider/dynamic_routing/nodes_fetch.rs index 9a47cab3..3318f03d 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/nodes_fetch.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/nodes_fetch.rs @@ -1,28 +1,28 @@ use async_trait::async_trait; use candid::Principal; use futures_util::FutureExt; -use reqwest::Client; use std::{fmt::Debug, sync::Arc, time::Duration}; use stop_token::StopToken; -use tracing::{error, warn}; use url::Url; +#[allow(unused)] +use crate::agent::route_provider::dynamic_routing::health_check::HEALTH_MANAGER_ACTOR; use crate::agent::{ route_provider::dynamic_routing::{ dynamic_route_provider::DynamicRouteProviderError, - health_check::HEALTH_MANAGER_ACTOR, messages::FetchedNodes, node::Node, snapshot::routing_snapshot::RoutingSnapshot, type_aliases::{AtomicSwap, SenderWatch}, }, - Agent, + Agent, HttpService, }; - +#[allow(unused)] const NODES_FETCH_ACTOR: &str = "NodesFetchActor"; /// Fetcher of nodes in the topology. -#[async_trait] +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] pub trait Fetch: Sync + Send + Debug { /// Fetches the nodes from the topology. async fn fetch(&self, url: Url) -> Result, DynamicRouteProviderError>; @@ -31,7 +31,7 @@ pub trait Fetch: Sync + Send + Debug { /// A struct representing the fetcher of the nodes from the topology. #[derive(Debug)] pub struct NodesFetcher { - http_client: Client, + http_client: Arc, subnet_id: Principal, // By default, the nodes fetcher is configured to talk to the mainnet of Internet Computer, and verifies responses using a hard-coded public key. // However, for testnets one can set up a custom public key. @@ -40,7 +40,11 @@ pub struct NodesFetcher { impl NodesFetcher { /// Creates a new `NodesFetcher` instance. - pub fn new(http_client: Client, subnet_id: Principal, root_key: Option>) -> Self { + pub fn new( + http_client: Arc, + subnet_id: Principal, + root_key: Option>, + ) -> Self { Self { http_client, subnet_id, @@ -49,12 +53,13 @@ impl NodesFetcher { } } -#[async_trait] +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] impl Fetch for NodesFetcher { async fn fetch(&self, url: Url) -> Result, DynamicRouteProviderError> { let agent = Agent::builder() .with_url(url) - .with_http_client(self.http_client.clone()) + .with_arc_http_middleware(self.http_client.clone()) .build() .map_err(|err| { DynamicRouteProviderError::NodesFetchError(format!( @@ -74,7 +79,7 @@ impl Fetch for NodesFetcher { })?; // If some API BNs have invalid domain names, they are discarded. let nodes = api_bns - .iter() + .into_iter() .filter_map(|api_node| api_node.try_into().ok()) .collect(); return Ok(nodes); @@ -138,21 +143,25 @@ where let msg = Some(FetchedNodes { nodes }); match self.fetch_sender.send(msg) { Ok(()) => break, // message sent successfully, exist the loop - Err(err) => { - error!("{NODES_FETCH_ACTOR}: failed to send results to {HEALTH_MANAGER_ACTOR}: {err:?}"); + Err(_err) => { + log!(error, "{NODES_FETCH_ACTOR}: failed to send results to {HEALTH_MANAGER_ACTOR}: {_err:?}"); } } } - Err(err) => { - error!("{NODES_FETCH_ACTOR}: failed to fetch nodes: {err:?}"); + Err(_err) => { + log!( + error, + "{NODES_FETCH_ACTOR}: failed to fetch nodes: {_err:?}" + ); } }; } else { // No healthy nodes in the snapshot, break the cycle and wait for the next fetch cycle - error!("{NODES_FETCH_ACTOR}: no nodes in the snapshot"); + log!(error, "{NODES_FETCH_ACTOR}: no nodes in the snapshot"); break; }; - warn!( + log!( + warn, "Retrying to fetch the nodes in {:?}", self.fetch_retry_interval ); @@ -163,7 +172,7 @@ where continue; } _ = self.token.clone().fuse() => { - warn!("{NODES_FETCH_ACTOR}: was gracefully cancelled"); + log!(warn, "{NODES_FETCH_ACTOR}: was gracefully cancelled"); break; } } diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/latency_based_routing.rs b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/latency_based_routing.rs index 2aa3fd90..028bbb2c 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/latency_based_routing.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/latency_based_routing.rs @@ -181,6 +181,7 @@ impl LatencyRoutingSnapshot { } /// Sets whether to use availability penalty in the score computation. + #[allow(unused)] pub fn set_availability_penalty(mut self, use_penalty: bool) -> Self { self.use_availability_penalty = use_penalty; self @@ -188,6 +189,7 @@ impl LatencyRoutingSnapshot { /// Sets the weights for the sliding window. /// The weights are ordered from left to right, where the leftmost weight is for the most recent health check. + #[allow(unused)] pub fn set_window_weights(mut self, weights: &[f64]) -> Self { self.window_weights_sum = weights.iter().sum(); self.window_weights = weights.to_vec(); diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/routing_snapshot.rs b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/routing_snapshot.rs index 9f9331a2..ef1e10af 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/routing_snapshot.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/routing_snapshot.rs @@ -5,6 +5,7 @@ use crate::agent::route_provider::dynamic_routing::{health_check::HealthCheckSta /// A trait for interacting with the snapshot of nodes (routing table). pub trait RoutingSnapshot: Send + Sync + Clone + Debug { /// Returns `true` if the snapshot has nodes. + #[allow(unused)] fn has_nodes(&self) -> bool; /// Get next node from the snapshot. fn next_node(&self) -> Option; diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/test_utils.rs b/ic-agent/src/agent/route_provider/dynamic_routing/test_utils.rs index fea43caf..1ae39c09 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/test_utils.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/test_utils.rs @@ -25,17 +25,17 @@ pub(super) fn route_n_times(n: usize, f: Arc) -> Ve pub(super) fn assert_routed_domains( actual: Vec, - expected: Vec, + expected: Vec<&str>, expected_repetitions: usize, ) where T: AsRef + Eq + Hash + Debug + Ord, { - fn build_count_map(items: &[T]) -> HashMap<&T, usize> + fn build_count_map(items: &[T]) -> HashMap<&str, usize> where - T: Eq + Hash, + T: AsRef, { items.iter().fold(HashMap::new(), |mut map, item| { - *map.entry(item).or_insert(0) += 1; + *map.entry(item.as_ref()).or_insert(0) += 1; map }) } @@ -62,7 +62,8 @@ pub(super) struct NodesFetcherMock { pub nodes: AtomicSwap>, } -#[async_trait] +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] impl Fetch for NodesFetcherMock { async fn fetch(&self, _url: Url) -> Result, DynamicRouteProviderError> { let nodes = (*self.nodes.load_full()).clone(); @@ -99,7 +100,8 @@ impl Default for NodeHealthCheckerMock { } } -#[async_trait] +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] impl HealthCheck for NodeHealthCheckerMock { async fn check(&self, node: &Node) -> Result { let nodes = self.healthy_nodes.load_full(); diff --git a/ic-agent/src/lib.rs b/ic-agent/src/lib.rs index 882f5e70..40254364 100644 --- a/ic-agent/src/lib.rs +++ b/ic-agent/src/lib.rs @@ -103,10 +103,13 @@ )] #![cfg_attr(not(target_family = "wasm"), warn(clippy::future_not_send))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +#[macro_use] +mod util; + pub mod agent; pub mod export; pub mod identity; -mod util; use agent::response_authentication::LookupPath; #[doc(inline)] diff --git a/ic-agent/src/util.rs b/ic-agent/src/util.rs index a61dfda0..232b2eca 100644 --- a/ic-agent/src/util.rs +++ b/ic-agent/src/util.rs @@ -32,3 +32,7 @@ pub fn spawn(f: impl Future + 'static) { pub fn spawn(f: impl Future + Send + 'static) { tokio::spawn(f); } + +macro_rules! log { + ($name:ident, $($t:tt)*) => { #[cfg(feature = "tracing")] { tracing::$name!($($t)*) } }; +}