Skip to content

Commit

Permalink
feat: introduce http request retries on tcp failures (#562)
Browse files Browse the repository at this point in the history
* chore: introduce request retries

* docs: add comment

* fix: wasm build

* docs: change changelog.md

* add retries to HyperTransport

* check error

* add: tests

* add cfg feature

* Stronger error handling

---------

Co-authored-by: Adam Spofford <adam.spofford@dfinity.org>
Co-authored-by: Adam Spofford <93943719+adamspofford-dfinity@users.noreply.github.com>
  • Loading branch information
3 people authored May 31, 2024
1 parent 608a3f4 commit c934d17
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
* Introduced transparent http request retry logic for network-related failures. `ReqwestTransport::with_max_tcp_errors_retries()`, `HyperTransport::with_max_tcp_errors_retries()`.

* Changed the SyncCall and AsyncCall traits to use an associated type for their output instead of a generic parameter.
* Call builders now generally implement `IntoFuture`, allowing `.call_and_wait().await` to be shortened to `.await`.
Expand Down
120 changes: 120 additions & 0 deletions ic-agent/src/agent/agent_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ use candid::{Encode, Nat};
use futures_util::FutureExt;
use ic_certification::{Delegation, Label};
use ic_transport_types::{NodeSignature, QueryResponse, RejectCode, RejectResponse, ReplyResponse};
use reqwest::Client;
use std::sync::Arc;
use std::{collections::BTreeMap, time::Duration};
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
use wasm_bindgen_test::wasm_bindgen_test;

use crate::agent::http_transport::route_provider::{RoundRobinRouteProvider, RouteProvider};
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

Expand All @@ -28,6 +31,56 @@ fn make_agent(url: &str) -> Agent {
.unwrap()
}

fn make_agent_with_route_provider(
route_provider: Arc<dyn RouteProvider>,
tcp_retries: usize,
) -> Agent {
let client = Client::builder()
.build()
.expect("Could not create HTTP client.");
Agent::builder()
.with_transport(
ReqwestTransport::create_with_client_route(route_provider, client)
.unwrap()
.with_max_tcp_errors_retries(tcp_retries),
)
.with_verify_query_signatures(false)
.build()
.unwrap()
}

#[cfg(feature = "hyper")]
fn make_agent_with_hyper_transport_route_provider(
route_provider: Arc<dyn RouteProvider>,
tcp_retries: usize,
) -> Agent {
use super::http_transport::HyperTransport;
use http_body_util::Full;
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use hyper_util::{
client::legacy::{connect::HttpConnector, Client as LegacyClient},
rt::TokioExecutor,
};
use std::collections::VecDeque;

let connector = HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build();
let client: LegacyClient<HttpsConnector<HttpConnector>, Full<VecDeque<u8>>> =
LegacyClient::builder(TokioExecutor::new()).build(connector);
let transport = HyperTransport::create_with_service_route(route_provider, client)
.unwrap()
.with_max_tcp_errors_retries(tcp_retries);
Agent::builder()
.with_transport(transport)
.with_verify_query_signatures(false)
.build()
.unwrap()
}

fn make_untimed_agent(url: &str) -> Agent {
Agent::builder()
.with_transport(ReqwestTransport::create(url).unwrap())
Expand Down Expand Up @@ -297,6 +350,73 @@ async fn status_okay() -> Result<(), AgentError> {
Ok(())
}

#[cfg_attr(not(target_family = "wasm"), tokio::test)]
async fn reqwest_client_status_okay_when_request_retried() -> Result<(), AgentError> {
let map = BTreeMap::new();
let response = serde_cbor::Value::Map(map);
let (read_mock, url) = mock(
"GET",
"/api/v2/status",
200,
serde_cbor::to_vec(&response)?,
Some("application/cbor"),
)
.await;
// Without retry request should fail.
let non_working_url = "http://127.0.0.1:4444";
let tcp_retries = 0;
let route_provider = RoundRobinRouteProvider::new(vec![non_working_url, &url]).unwrap();
let agent = make_agent_with_route_provider(Arc::new(route_provider), tcp_retries);
let result = agent.status().await;
assert!(result.is_err());

// With retry request should succeed.
let tcp_retries = 1;
let route_provider = RoundRobinRouteProvider::new(vec![non_working_url, &url]).unwrap();
let agent = make_agent_with_route_provider(Arc::new(route_provider), tcp_retries);
let result = agent.status().await;

assert_mock(read_mock).await;

assert!(result.is_ok());
Ok(())
}

#[cfg_attr(not(target_family = "wasm"), tokio::test)]
#[cfg(feature = "hyper")]
async fn hyper_client_status_okay_when_request_retried() -> Result<(), AgentError> {
let map = BTreeMap::new();
let response = serde_cbor::Value::Map(map);
let (read_mock, url) = mock(
"GET",
"/api/v2/status",
200,
serde_cbor::to_vec(&response)?,
Some("application/cbor"),
)
.await;
// Without retry request should fail.
let non_working_url = "http://127.0.0.1:4444";
let tcp_retries = 0;
let route_provider = RoundRobinRouteProvider::new(vec![non_working_url, &url]).unwrap();
let agent =
make_agent_with_hyper_transport_route_provider(Arc::new(route_provider), tcp_retries);
let result = agent.status().await;
assert!(result.is_err());

// With retry request should succeed.
let tcp_retries = 1;
let route_provider = RoundRobinRouteProvider::new(vec![non_working_url, &url]).unwrap();
let agent =
make_agent_with_hyper_transport_route_provider(Arc::new(route_provider), tcp_retries);
let result = agent.status().await;

assert_mock(read_mock).await;

assert!(result.is_ok());
Ok(())
}

#[cfg_attr(not(target_family = "wasm"), tokio::test)]
#[cfg_attr(target_family = "wasm", wasm_bindgen_test)]
// test that the agent (re)tries to reach the server.
Expand Down
100 changes: 69 additions & 31 deletions ic-agent/src/agent/http_transport/hyper_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct HyperTransport<B1, S = Client<HttpsConnector<HttpConnector>, B1>> {
_marker: PhantomData<AtomicPtr<B1>>,
route_provider: Arc<dyn RouteProvider>,
max_response_body_size: Option<usize>,
#[allow(dead_code)]
max_tcp_error_retries: usize,
service: S,
}

Expand Down Expand Up @@ -125,6 +127,7 @@ where
route_provider,
service,
max_response_body_size: None,
max_tcp_error_retries: 0,
})
}

Expand All @@ -136,10 +139,18 @@ where
}
}

/// Sets a max number of retries for tcp connection errors.
pub fn with_max_tcp_errors_retries(self, retries: usize) -> Self {
HyperTransport {
max_tcp_error_retries: retries,
..self
}
}

async fn request(
&self,
method: Method,
url: String,
endpoint: &str,
body: Option<Vec<u8>>,
) -> Result<Vec<u8>, AgentError> {
let body = body.unwrap_or_default();
Expand All @@ -161,19 +172,58 @@ where
}
AgentError::TransportError(Box::new(err))
}
let response = loop {

let create_request_with_generated_url = || -> Result<Request<_>, AgentError> {
let url = self.route_provider.route()?.join(endpoint)?;
println!("{url}");
let http_request = Request::builder()
.method(&method)
.uri(&url)
.uri(url.as_str())
.header(CONTENT_TYPE, "application/cbor")
.body(body.clone().into())
.map_err(|err| AgentError::TransportError(Box::new(err)))?;
let response = self
.service
.clone()
.call(http_request)
.await
.map_err(map_error)?;
Ok(http_request)
};

let response = loop {
let response = {
#[cfg(target_family = "wasm")]
{
let http_request = create_request_with_generated_url()?;
match self.client.execute(http_request).await {
Ok(response) => response,
Err(err) => return Err(AgentError::TransportError(Box::new(err))),
}
}
#[cfg(not(target_family = "wasm"))]
{
// RouteProvider generates urls dynamically. Some of these urls can be potentially unhealthy.
// TCP related errors (host unreachable, connection refused, connection timed out, connection reset) can be safely retried with a newly generated url.

let mut retry_count = 0;
loop {
let http_request = create_request_with_generated_url()?;

match self.service.clone().call(http_request).await {
Ok(response) => break response,
Err(err) => {
if (&err as &dyn Error)
.downcast_ref::<hyper_util::client::legacy::Error>()
.is_some_and(|e| e.is_connect())
{
if retry_count >= self.max_tcp_error_retries {
return Err(map_error(err));
}
retry_count += 1;
continue;
}
return Err(map_error(err));
}
}
}
}
};

if response.status() != StatusCode::TOO_MANY_REQUESTS {
break response;
}
Expand Down Expand Up @@ -224,11 +274,8 @@ where
_request_id: RequestId,
) -> AgentFuture<()> {
Box::pin(async move {
let url = format!(
"{}canister/{effective_canister_id}/call",
self.route_provider.route()?
);
self.request(Method::POST, url, Some(envelope)).await?;
let endpoint = &format!("canister/{effective_canister_id}/call");
self.request(Method::POST, endpoint, Some(envelope)).await?;
Ok(())
})
}
Expand All @@ -239,38 +286,29 @@ where
envelope: Vec<u8>,
) -> AgentFuture<Vec<u8>> {
Box::pin(async move {
let url = format!(
"{}canister/{effective_canister_id}/read_state",
self.route_provider.route()?
);
self.request(Method::POST, url, Some(envelope)).await
let endpoint = &format!("canister/{effective_canister_id}/read_state");
self.request(Method::POST, endpoint, Some(envelope)).await
})
}

fn read_subnet_state(&self, subnet_id: Principal, envelope: Vec<u8>) -> AgentFuture<Vec<u8>> {
Box::pin(async move {
let url = format!(
"{}subnet/{subnet_id}/read_state",
self.route_provider.route()?
);
self.request(Method::POST, url, Some(envelope)).await
let endpoint = &format!("subnet/{subnet_id}/read_state");
self.request(Method::POST, endpoint, Some(envelope)).await
})
}

fn query(&self, effective_canister_id: Principal, envelope: Vec<u8>) -> AgentFuture<Vec<u8>> {
Box::pin(async move {
let url = format!(
"{}canister/{effective_canister_id}/query",
self.route_provider.route()?
);
self.request(Method::POST, url, Some(envelope)).await
let endpoint = &format!("canister/{effective_canister_id}/query");
self.request(Method::POST, endpoint, Some(envelope)).await
})
}

fn status(&self) -> AgentFuture<Vec<u8>> {
Box::pin(async move {
let url = format!("{}status", self.route_provider.route()?);
self.request(Method::GET, url, None).await
let endpoint = "status".to_string();
self.request(Method::GET, &endpoint, None).await
})
}
}
Expand Down
Loading

0 comments on commit c934d17

Please sign in to comment.