From 0a8e9fb1068e177d858dbd8812f3de07b8356447 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 20 Apr 2024 17:07:47 +0200 Subject: [PATCH] chore: release v0.4.0 (#31) --- CHANGELOG.md | 14 ++++ Cargo.toml | 23 ++++--- README.md | 59 +++++++++++++++-- build.rs | 10 +++ src/lib.rs | 171 ++++++++++++++++++++++++++++++------------------ src/platform.rs | 19 ++---- 6 files changed, 203 insertions(+), 93 deletions(-) create mode 100644 build.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 75f6b63..3f7fdf7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,20 @@ The format is based on [Keep a Changelog]. [Keep a Changelog]: http://keepachangelog.com/en/1.0.0/ +## [v0.4.0] - 2023-04-20 + +A new release that makes the library support WASM and two feature flags, `native` and `web` are introduced +to select whether to compile the library for `native` or `web` but the default is still `native`. + +Some other fixes were: +- Re-export all types in the public API +- Support other retry strategies for calls and subscriptions. +- Improve the reconnect API to know when the reconnection started and was completed. +- Provide an error type. +- Improve internal feature flag handling. + +Thanks to [@seunlanlege](https://github.com/seunlanlege) who did the majority of the work +to get the library working for WASM. ## [v0.3.0] - 2023-02-08 diff --git a/Cargo.toml b/Cargo.toml index 0acb752..c05d9ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,27 +1,29 @@ [package] name = "reconnecting-jsonrpsee-ws-client" authors = ["Niklas Adolfsson "] -version = "0.3.0" +version = "0.4.0" edition = "2021" rust-version = "1.70.0" license = "MIT" repository = "https://github.com/niklasad1/reconnecting-jsonrpsee-ws-client" description = "jsonrpc-ws-client that reconnects automatically. Warning: It may lose subscription messages when reconnecting." documentation = "https://docs.rs/reconnecting-jsonrpsee-ws-client" -keywords = ["jsonrpc", "json", "websocket"] +keywords = ["jsonrpc", "json", "websocket", "WASM"] readme = "README.md" [dependencies] futures = { version = "0.3", default-features = false } -jsonrpsee = { version = "0.22.4", default-features = false } +jsonrpsee = { version = "0.22.4" } serde_json = { version = "1", features = ["raw_value"], default-features = false } tokio = { version = "1.37", features = ["sync"], default-features = false } tracing = { version = "0.1", default-features = false } -thiserror = "1" -finito = "0.1" +# WASM wasm-bindgen-futures = { version = "0.4.41", optional = true } +thiserror = "1" +finito = "0.1" + [features] default = ["native"] native = [ @@ -32,9 +34,7 @@ native = [ "jsonrpsee/ws-client", ] web = [ - "jsonrpsee/jsonrpsee-types", - "jsonrpsee/async-wasm-client", - "jsonrpsee/client-web-transport", + "jsonrpsee/wasm-client", "wasm-bindgen-futures", "finito/wasm-bindgen" ] @@ -45,9 +45,12 @@ hyper = { version = "0.14", features = ["server", "tcp"] } anyhow = "1" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } +[build-dependencies] +cfg_aliases = "0.2.0" + [package.metadata.docs.rs] -all-features = true +features = ["default"] rustdoc-args = ["--cfg", "docsrs"] [package.metadata.playground] -all-features = true +features = ["default"] diff --git a/README.md b/README.md index ee1dd47..e482be6 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,58 @@ # reconnecting-jsonrpsee-ws-client Wrapper crate over the jsonrpsee ws client, which automatically reconnects -under the hood; without that, the user has to restart it manually by -re-transmitting pending calls and re-establish subscriptions that -were closed when the connection was terminated. +under the hood; without that, one has to restart it. +It supports a few retry strategies, such as exponential backoff, but it's also possible +to use custom strategies as long as it implements `Iterator`. + + +By default, the library is re-transmitting pending calls and re-establishing subscriptions that +were closed when the connection was terminated, but it's also possible to disable that +and manage it yourself. + + +For instance, you may not want to re-subscribe to a subscription +that has side effects or retries at all. Then the library exposes +`request_with_policy` and `subscribe_with_policy` to support that + + +```text + let mut sub = client + .subscribe_with_policy( + "subscribe_lo".to_string(), + rpc_params![], + "unsubscribe_lo".to_string(), + // Do not re-subscribe if the connection is closed. + CallRetryPolicy::Retry, + ) + .await + .unwrap(); +``` + The tricky part is subscriptions, which may lose a few notifications -when it's re-connecting where it's not possible to know which ones. +when it's re-connecting, it's not possible to know which ones. + Lost subscription notifications may be very important to know in some cases, and then this library is not recommended to use. + +There is one way to determine how long a reconnection takes: + +```text + // Print when the RPC client starts to reconnect. + loop { + rpc.reconnect_started().await; + let now = std::time::Instant::now(); + rpc.reconnected().await; + println!( + "RPC client reconnection took `{} seconds`", + now.elapsed().as_secs() + ); + } +``` + ## Example ```rust @@ -20,13 +62,16 @@ use std::time::Duration; async fn run() { // Create a new client with with a reconnecting RPC client. let client = Client::builder() - // Reconnect with exponential backoff. - .retry_policy(ExponentialBackoff::from_millis(100)) + // Reconnect with exponential backoff and if fails more then + // 10 retries we give up and terminate. + .retry_policy(ExponentialBackoff::from_millis(100).take(10)) // Send period WebSocket pings/pongs every 6th second and // if ACK:ed in 30 seconds then disconnect. // // This is just a way to ensure that the connection isn't // idle if no message is sent that often + // + // This only works for native. .enable_ws_ping( PingConfig::new() .ping_interval(Duration::from_secs(6)) @@ -53,4 +98,4 @@ async fn run() { .unwrap(); let notif = sub.next().await.unwrap(); } -``` +``` \ No newline at end of file diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..b7d5629 --- /dev/null +++ b/build.rs @@ -0,0 +1,10 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + // Setup cfg aliases + cfg_aliases! { + native : { all(feature = "native", not(feature = "web"), not(target_arch = "wasm32")) }, + web : { all(feature = "web", target_arch = "wasm32", not(feature = "native")) }, + not_supported: { not(any(native, web)) }, + } +} diff --git a/src/lib.rs b/src/lib.rs index 508a987..043d313 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,55 +1,78 @@ +//! # reconnecting-jsonrpsee-ws-client +//! //! Wrapper crate over the jsonrpsee ws client, which automatically reconnects -//! under the hood; without that, the user has to restart it manually by -//! re-transmitting pending calls and re-establish subscriptions that -//! were closed when the connection was terminated. +//! under the hood; without that, one has to restart it. +//! It supports a few retry strategies, such as exponential backoff, but it's also possible +//! to use custom strategies as long as it implements `Iterator`. +//! +//! +//! By default, the library is re-transmitting pending calls and re-establishing subscriptions that +//! were closed until it's successful when the connection was terminated, but it's also possible to disable that +//! and manage it yourself. +//! +//! For instance, you may not want to re-subscribe to a subscription +//! that has side effects or retries at all. Then the library exposes +//! `request_with_policy` and `subscribe_with_policy` to support that +//! +//! ```no_run +//! async fn run() { +//! use reconnecting_jsonrpsee_ws_client::{Client, CallRetryPolicy, rpc_params}; +//! +//! let client = Client::builder().build("ws://127.0.0.1:9944".to_string()).await.unwrap(); +//! let mut sub = client +//! .subscribe_with_policy( +//! "subscribe_lo".to_string(), +//! rpc_params![], +//! "unsubscribe_lo".to_string(), +//! // Do not re-subscribe if the connection is closed. +//! CallRetryPolicy::Retry, +//! ) +//! .await +//! .unwrap(); +//! } +//! ``` //! -//! The tricky part is subscription, which may lose a few notifications, -//! then re-connect where it's not possible to know which ones. +//! +//! The tricky part is subscriptions, which may lose a few notifications +//! when it's re-connecting, it's not possible to know which ones. //! //! Lost subscription notifications may be very important to know in some cases, //! and then this library is not recommended to use. //! -//! # Examples //! -//! ```rust -//! use std::time::Duration; -//! use reconnecting_jsonrpsee_ws_client::{Client, ExponentialBackoff, PingConfig, rpc_params}; +//! There is one way to determine how long a reconnection takes: //! -//! async fn run() { -//! // Create a new client with with a reconnecting RPC client. -//! let client = Client::builder() -//! // Reconnect with exponential backoff. -//! .retry_policy(ExponentialBackoff::from_millis(100)) -//! // Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds -//! // then disconnect. -//! // -//! // This is just a way to ensure that the connection isn't idle if no message is sent that often -//! .enable_ws_ping( -//! PingConfig::new() -//! .ping_interval(Duration::from_secs(6)) -//! .inactive_limit(Duration::from_secs(30)), -//! ) -//! // There are other configurations as well that can be found here: -//! // -//! .build("ws://localhost:9944".to_string()) -//! .await.unwrap(); +//! ```no_run +//! async fn run() { +//! use reconnecting_jsonrpsee_ws_client::{Client, CallRetryPolicy, rpc_params}; //! -//! // make a JSON-RPC call -//! let json = client.request("say_hello".to_string(), rpc_params![]).await.unwrap(); +//! let client = Client::builder().build("ws://127.0.0.1:9944".to_string()).await.unwrap(); +//! +//! // Print when the RPC client starts to reconnect. +//! tokio::spawn(async move { +//! loop { +//! client.reconnect_started().await; +//! let now = std::time::Instant::now(); +//! client.reconnected().await; +//! println!( +//! "RPC client reconnection took `{} seconds`", +//! now.elapsed().as_secs() +//! ); +//! } +//! }); +//! } //! -//! // make JSON-RPC subscription. -//! let mut sub = client.subscribe("subscribe_lo".to_string(), rpc_params![], "unsubscribe_lo".to_string()).await.unwrap(); -//! let notif = sub.next().await.unwrap(); -//! } //! ``` -#![warn(missing_docs)] -#[cfg(any( - all(feature = "web", feature = "native"), - not(any(feature = "web", feature = "native")) -))] +#![warn( + missing_docs, + missing_debug_implementations, + missing_copy_implementations +)] + +#[cfg(not_supported)] compile_error!( - "reconnecting-jsonrpsee-client: exactly one of the 'web' and 'native' features should be used." + "reconnecting-jsonrpsee-client: exactly one of the 'web' and 'native' features most be used." ); mod platform; @@ -84,11 +107,8 @@ pub use finito::{ExponentialBackoff, FibonacciBackoff, FixedInterval}; pub use jsonrpsee::core::client::IdKind; pub use jsonrpsee::{core::client::error::Error as RpcError, rpc_params, types::SubscriptionId}; -#[cfg(all(feature = "native", not(feature = "web")))] -pub use jsonrpsee::ws_client::HeaderMap; - -#[cfg(all(feature = "native", not(feature = "web")))] -pub use jsonrpsee::core::client::async_client::PingConfig; +#[cfg(native)] +pub use jsonrpsee::ws_client::{HeaderMap, PingConfig}; const LOG_TARGET: &str = "reconnecting_jsonrpsee_ws_client"; @@ -131,13 +151,13 @@ pub enum CallRetryPolicy { } /// An error that indicates the subscription -/// was disconnnected and may reconnect. +/// was disconnected and may reconnect. #[derive(Debug, thiserror::Error)] pub enum Disconnect { - /// The connection was closed, reconnect initiated and the subscriptin was re-subscribed to. + /// The connection was closed, reconnect initiated and the subscription was re-subscribed to. #[error("The client was disconnected `{0}`, reconnect and re-subscribe initiated")] Retry(RpcError), - /// The connection was closed, reconnect initated and the subscription was dropped. + /// The connection was closed, reconnect initiated and the subscription was dropped. #[error("The client was disconnected `{0}`, reconnect initiated and subscription dropped")] Dropped(RpcError), } @@ -204,21 +224,21 @@ impl std::fmt::Debug for Subscription { /// JSON-RPC client that reconnects automatically and may loose /// subscription notifications when it reconnects. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Client { tx: mpsc::UnboundedSender, reconnect: ReconnectRx, } /// Builder for [`Client`]. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ClientBuilder

{ max_request_size: u32, max_response_size: u32, retry_policy: P, - #[cfg(all(feature = "native", not(feature = "web")))] + #[cfg(native)] ping_config: Option, - #[cfg(all(feature = "native", not(feature = "web")))] + #[cfg(native)] // web doesn't support custom headers // https://stackoverflow.com/a/4361358/6394734 headers: HeaderMap, @@ -236,9 +256,9 @@ impl Default for ClientBuilder { max_request_size: 10 * 1024 * 1024, max_response_size: 10 * 1024 * 1024, retry_policy: ExponentialBackoff::from_millis(10).max_delay(Duration::from_secs(60)), - #[cfg(all(feature = "native", not(feature = "web")))] + #[cfg(native)] ping_config: Some(PingConfig::new()), - #[cfg(all(feature = "native", not(feature = "web")))] + #[cfg(native)] headers: HeaderMap::new(), max_redirections: 5, id_kind: IdKind::Number, @@ -326,7 +346,8 @@ where self } - #[cfg(all(feature = "native", not(feature = "web")))] + #[cfg(native)] + #[cfg_attr(docsrs, cfg(native))] /// Configure custom headers to use in the WebSocket handshake. pub fn set_headers(mut self, headers: HeaderMap) -> Self { self.headers = headers; @@ -341,9 +362,9 @@ where max_request_size: self.max_request_size, max_response_size: self.max_response_size, retry_policy, - #[cfg(all(feature = "native", not(feature = "web")))] + #[cfg(native)] ping_config: self.ping_config, - #[cfg(all(feature = "native", not(feature = "web")))] + #[cfg(native)] headers: self.headers, max_redirections: self.max_redirections, max_log_len: self.max_log_len, @@ -354,7 +375,8 @@ where } } - #[cfg(all(feature = "native", not(feature = "web")))] + #[cfg(native)] + #[cfg_attr(docsrs, cfg(native))] /// Configure the WebSocket ping/pong interval. /// /// Default: 30 seconds. @@ -363,7 +385,8 @@ where self } - #[cfg(all(feature = "native", not(feature = "web")))] + #[cfg(native)] + #[cfg_attr(docsrs, cfg(native))] /// Disable WebSocket ping/pongs. /// /// Default: 30 seconds. @@ -530,7 +553,7 @@ impl Client { pub async fn reconnected(&self) { self.reconnect.reconnected().await } - /// Get how many times the client has reconnected succesfully. + /// Get how many times the client has reconnected successfully. pub fn reconnect_count(&self) -> usize { self.reconnect.count() } @@ -631,7 +654,7 @@ async fn background_task

( client = match reconnect(params).await { Ok(client) => client, Err(e) => { - tracing::error!(target: LOG_TARGET, "Failed to reconnect/re-establish subscriptions: {e}; terminating the connection"); + tracing::debug!(target: LOG_TARGET, "Failed to reconnect: {e}; terminating the connection"); break; } }; @@ -658,7 +681,7 @@ async fn background_task

( client = match reconnect(params).await { Ok(client) => client, Err(e) => { - tracing::error!(target: LOG_TARGET, "Failed to reconnect/re-establish subscriptions: {e}; terminating the connection"); + tracing::debug!(target: LOG_TARGET, "Failed to reconnect: {e}; terminating the connection"); break; } }; @@ -839,7 +862,7 @@ async fn subscription_handler( _ = sub_tx.closed() => { break true } - // This channel indices wheter the main task has been closed. + // This channel indices whether the main task has been closed. // at this point no further messages are processed. _ = remove_sub.closed() => { break true @@ -901,7 +924,7 @@ where .await?; reconnect.reconnected(); - tracing::debug!(target: LOG_TARGET, "Connection to {url} was succesfully re-established"); + tracing::debug!(target: LOG_TARGET, "Connection to {url} was successfully re-established"); for (id, op) in dispatch { pending_calls.push(dispatch_call(client.clone(), op, id, sub_tx.clone()).boxed()); @@ -1172,7 +1195,7 @@ mod tests { futs.try_for_each(|_| future::ready(Ok(()))) .await - .expect("Requests should be succesful"); + .expect("Requests should be successful"); }); // Restart the server and allow the call to complete. @@ -1183,6 +1206,26 @@ mod tests { assert_eq!(client.reconnect_count(), 1); } + #[tokio::test] + async fn gives_up_after_ten_retries() { + init_logger(); + + let (handle, addr) = run_server_with_settings(None, true).await.unwrap(); + let client = Client::builder() + .retry_policy(FixedInterval::from_millis(10).take(10)) + .build(addr.clone()) + .await + .unwrap(); + + let _ = handle.send(()); + client.reconnect_started().await; + + assert!(matches!( + client.request("say_hello".to_string(), rpc_params![]).await, + Err(Error::Closed) + )); + } + async fn run_server() -> anyhow::Result<(tokio::sync::broadcast::Sender<()>, String)> { run_server_with_settings(None, false).await } diff --git a/src/platform.rs b/src/platform.rs index 422b247..e6ecd4c 100644 --- a/src/platform.rs +++ b/src/platform.rs @@ -2,13 +2,13 @@ use crate::{ClientBuilder, RpcError}; use jsonrpsee::core::client::Client; use std::sync::Arc; -#[cfg(all(feature = "native", not(feature = "web")))] +#[cfg(native)] pub use tokio::spawn; -#[cfg(all(feature = "web", target_arch = "wasm32", not(feature = "native")))] +#[cfg(web)] pub use wasm_bindgen_futures::spawn_local as spawn; -#[cfg(all(feature = "native", not(feature = "web")))] +#[cfg(native)] pub async fn ws_client

(url: &str, builder: &ClientBuilder

) -> Result, RpcError> { use jsonrpsee::ws_client::WsClientBuilder; @@ -48,9 +48,9 @@ pub async fn ws_client

(url: &str, builder: &ClientBuilder

) -> Result(url: &str, builder: &ClientBuilder

) -> Result, RpcError> { - use jsonrpsee::client_transport::web; + use jsonrpsee::wasm_client::WasmClientBuilder; let ClientBuilder { id_kind, @@ -60,19 +60,14 @@ pub async fn ws_client

(url: &str, builder: &ClientBuilder

) -> Result