Skip to content

Commit

Permalink
feat: dynamic routing mvp (#619)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamspofford-dfinity authored Dec 16, 2024
1 parent a53948e commit f76fecf
Show file tree
Hide file tree
Showing 15 changed files with 422 additions and 176 deletions.
19 changes: 7 additions & 12 deletions ic-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ keywords = ["internet-computer", "agent", "icp", "dfinity"]
include = ["src", "Cargo.toml", "../LICENSE", "README.md"]

[dependencies]
arc-swap = { version = "1.7", optional = true }
async-channel = { version = "1.9", optional = true }
arc-swap = "1.7"
async-channel = "1.9"
async-lock = "3.3"
async-trait = "0.1"
async-watch = { version = "0.3", optional = true }
async-watch = "0.3"
backoff = "0.4.0"
cached = { version = "0.52", features = ["ahash"], default-features = false }
candid = { workspace = true }
Expand Down Expand Up @@ -48,7 +48,7 @@ serde_cbor = { workspace = true }
serde_repr = { workspace = true }
sha2 = { workspace = true }
simple_asn1 = "0.6.1"
stop-token = { version = "0.7", optional = true }
stop-token = "0.7"
thiserror = { workspace = true }
time = { workspace = true }
tower-service = "0.3"
Expand Down Expand Up @@ -77,6 +77,7 @@ web-sys = { version = "0.3", features = ["Window"], optional = true }
[dev-dependencies]
serde_json.workspace = true
tracing-subscriber = "0.3"
tracing = "0.1"

[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
tokio = { workspace = true, features = ["full"] }
Expand Down Expand Up @@ -109,16 +110,10 @@ wasm-bindgen = [
"backoff/wasm-bindgen",
"cached/wasm",
]
_internal_dynamic-routing = [
"dep:arc-swap",
"dep:async-channel",
"dep:async-watch",
"dep:stop-token",
"tracing",
]
_internal_dynamic-routing = []
tracing = ["dep:tracing"] # Does very little right now.

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu", "wasm32-unknown-unknown"]
rustdoc-args = ["--cfg=docsrs"]
features = ["_internal_dynamic-routing"]
features = []
7 changes: 7 additions & 0 deletions ic-agent/src/agent/agent_config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use reqwest::Client;
use url::Url;

use crate::{
agent::{NonceFactory, NonceGenerator},
Expand Down Expand Up @@ -33,6 +34,10 @@ pub struct AgentConfig {
pub http_service: Option<Arc<dyn HttpService>>,
/// See [`with_max_polling_time`](super::AgentBuilder::with_max_polling_time).
pub max_polling_time: Duration,
/// See [`with_background_dynamic_routing`](super::AgentBuilder::with_background_dynamic_routing).
pub background_dynamic_routing: bool,
/// See [`with_url`](super::AgentBuilder::with_url).
pub url: Option<Url>,
}

impl Default for AgentConfig {
Expand All @@ -49,6 +54,8 @@ impl Default for AgentConfig {
max_response_body_size: None,
max_tcp_error_retries: 0,
max_polling_time: Duration::from_secs(60 * 5),
background_dynamic_routing: false,
url: None,
}
}
}
55 changes: 27 additions & 28 deletions ic-agent/src/agent/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use url::Url;

use crate::{
agent::{agent_config::AgentConfig, Agent},
AgentError, Identity, NonceFactory, NonceGenerator,
Expand All @@ -20,34 +18,27 @@ impl AgentBuilder {
Agent::new(self.config)
}

/// Set the dynamic transport layer for the [`Agent`], performing continuous discovery of the API boundary nodes and routing traffic via them based on latency.
#[cfg(feature = "_internal_dynamic-routing")]
pub async fn with_discovery_transport(self, client: reqwest::Client) -> Self {
use crate::agent::route_provider::dynamic_routing::{
dynamic_route_provider::{DynamicRouteProviderBuilder, IC0_SEED_DOMAIN},
node::Node,
snapshot::latency_based_routing::LatencyRoutingSnapshot,
};
// TODO: This is a temporary solution to get the seed node.
let seed = Node::new(IC0_SEED_DOMAIN).unwrap();

let route_provider = DynamicRouteProviderBuilder::new(
LatencyRoutingSnapshot::new(),
vec![seed],
client.clone(),
)
.build()
.await;

let route_provider = Arc::new(route_provider) as Arc<dyn RouteProvider>;

self.with_arc_route_provider(route_provider)
.with_http_client(client)
/// Set the dynamic transport layer for the [`Agent`], performing continuous discovery of the API boundary nodes
/// and routing traffic via them based on latency. Cannot be set together with `with_route_provider`.
///
/// See [`DynamicRouteProvider`](super::route_provider::DynamicRouteProvider) if more customization is needed such as polling intervals.
pub async fn with_background_dynamic_routing(mut self) -> Self {
assert!(
self.config.route_provider.is_none(),
"with_background_dynamic_routing cannot be called with with_route_provider"
);
self.config.background_dynamic_routing = true;
self
}

/// Set the URL of the [Agent].
pub fn with_url<S: Into<String>>(self, url: S) -> Self {
self.with_route_provider(url.into().parse::<Url>().unwrap())
/// Set the URL of the [`Agent`]. Either this or `with_route_provider` must be called (but not both).
pub fn with_url<S: Into<String>>(mut self, url: S) -> Self {
assert!(
self.config.route_provider.is_none(),
"with_url cannot be called with with_route_provider"
);
self.config.url = Some(url.into().parse().unwrap());
self
}

/// Add a `NonceFactory` to this Agent. By default, no nonce is produced.
Expand Down Expand Up @@ -125,6 +116,14 @@ impl AgentBuilder {

/// Same as [`Self::with_route_provider`], but reuses an existing `Arc`.
pub fn with_arc_route_provider(mut self, provider: Arc<dyn RouteProvider>) -> Self {
assert!(
!self.config.background_dynamic_routing,
"with_background_dynamic_routing cannot be called with with_route_provider"
);
assert!(
self.config.url.is_none(),
"with_url cannot be called with with_route_provider"
);
self.config.route_provider = Some(provider);
self
}
Expand Down
81 changes: 55 additions & 26 deletions ic-agent/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ pub use ic_transport_types::{
pub use nonce::{NonceFactory, NonceGenerator};
use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
use reqwest::{Body, Client, Request, Response};
use route_provider::RouteProvider;
use route_provider::{
dynamic_routing::{
dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
snapshot::latency_based_routing::LatencyRoutingSnapshot,
},
RouteProvider, UrlUntilReady,
};
use time::OffsetDateTime;
use tower_service::Service;

Expand Down Expand Up @@ -58,7 +64,7 @@ use std::{
borrow::Cow,
collections::HashMap,
convert::TryFrom,
fmt,
fmt::{self, Debug},
future::{Future, IntoFuture},
pin::Pin,
sync::{Arc, Mutex, RwLock},
Expand Down Expand Up @@ -177,32 +183,54 @@ impl Agent {

/// Create an instance of an [`Agent`].
pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
let client = config.http_service.unwrap_or_else(|| {
Arc::new(Retry429Logic {
client: config.client.unwrap_or_else(|| {
#[cfg(not(target_family = "wasm"))]
{
Client::builder()
.use_rustls_tls()
.timeout(Duration::from_secs(360))
.build()
.expect("Could not create HTTP client.")
}
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
{
Client::new()
}
}),
})
});
Ok(Agent {
nonce_factory: config.nonce_factory,
identity: config.identity,
ingress_expiry: config.ingress_expiry,
root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
client: config.http_service.unwrap_or_else(|| {
Arc::new(Retry429Logic {
client: config.client.unwrap_or_else(|| {
#[cfg(not(target_family = "wasm"))]
{
Client::builder()
.use_rustls_tls()
.timeout(Duration::from_secs(360))
.build()
.expect("Could not create HTTP client.")
}
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
{
Client::new()
}
}),
})
}),
route_provider: config
.route_provider
.expect("missing `url` or `route_provider` in `AgentBuilder`"),
client: client.clone(),
route_provider: if let Some(route_provider) = config.route_provider {
route_provider
} else if let Some(url) = config.url {
if config.background_dynamic_routing {
assert!(
url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
"in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
);
let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
UrlUntilReady::new(url, async move {
DynamicRouteProviderBuilder::new(
LatencyRoutingSnapshot::new(),
seeds,
client,
)
.build()
.await
}) as Arc<dyn RouteProvider>
} else {
Arc::new(url)
}
} else {
panic!("either route_provider or url must be specified");
},
subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
verify_query_signatures: config.verify_query_signatures,
concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
Expand Down Expand Up @@ -1862,7 +1890,7 @@ impl<'agent> IntoFuture for UpdateBuilder<'agent> {
/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
pub trait HttpService: Send + Sync {
pub trait HttpService: Send + Sync + Debug {
/// Perform a HTTP request. Any retry logic should call `req` again, instead of `Request::try_clone`.
async fn call<'a>(
&'a self,
Expand All @@ -1876,7 +1904,7 @@ impl<T> HttpService for T
where
for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
for<'a> <&'a Self as Service<Request>>::Future: Send,
T: Send + Sync + ?Sized,
T: Send + Sync + Debug + ?Sized,
{
#[allow(clippy::needless_arbitrary_self_type)]
async fn call<'a>(
Expand Down Expand Up @@ -1907,7 +1935,7 @@ where
impl<T> HttpService for T
where
for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
T: Send + Sync + ?Sized,
T: Send + Sync + Debug + ?Sized,
{
#[allow(clippy::needless_arbitrary_self_type)]
async fn call<'a>(
Expand All @@ -1919,6 +1947,7 @@ where
}
}

#[derive(Debug)]
struct Retry429Logic {
client: Client,
}
Expand Down
Loading

0 comments on commit f76fecf

Please sign in to comment.