Skip to content

Commit

Permalink
chore(deps): bump jsonrpsee from 0.22.5 to v0.23.1 (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 authored Jun 25, 2024
1 parent 6f8036f commit 580993b
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 51 deletions.
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"

[dependencies]
futures = { version = "0.3", default-features = false }
jsonrpsee = { version = "0.22.4" }
jsonrpsee = { version = "0.23.1" }
serde_json = { version = "1", features = ["raw_value"], default-features = false }
tokio = { version = "1.37", features = ["sync"], default-features = false }
tracing = { version = "0.1", default-features = false }
Expand All @@ -36,8 +36,11 @@ native = [
web = ["jsonrpsee/wasm-client", "wasm-bindgen-futures", "finito/wasm-bindgen"]

[dev-dependencies]
jsonrpsee = { version = "0.22.4", features = ["server"] }
hyper = { version = "0.14", features = ["server", "tcp"] }
jsonrpsee-server = { git = "https://github.com/paritytech/jsonrpsee", rev = "34b7d37" }
hyper = { version = "1.3.1", features = ["server"] }
hyper-util = { version = "0.1", features = ["tokio", "service", "tokio", "server-auto"] }
http-body = "1"
tower = "0.4"
anyhow = "1"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }

Expand Down
119 changes: 71 additions & 48 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,23 +960,17 @@ where

#[cfg(test)]
mod tests {
use std::{convert::Infallible, net::TcpListener};

use super::*;
use futures::{
future::{self, Either},
stream::FuturesUnordered,
TryStreamExt,
};
use hyper::server::conn::AddrStream;
use jsonrpsee::{
rpc_params,
server::{
http, stop_channel, ws, ConnectionGuard, ConnectionState, RpcServiceBuilder,
ServerConfig,
},
RpcModule, SubscriptionMessage,
use jsonrpsee_server::{
http, stop_channel, ws, ConnectionGuard, ConnectionState, HttpRequest, HttpResponse,
RpcModule, RpcServiceBuilder, ServerConfig, SubscriptionMessage,
};
use tower::BoxError;
use tracing_subscriber::util::SubscriberInitExt;

fn init_logger() {
Expand Down Expand Up @@ -1235,7 +1229,7 @@ mod tests {
url: Option<&str>,
dont_respond_to_method_calls: bool,
) -> anyhow::Result<(tokio::sync::broadcast::Sender<()>, String)> {
use hyper::service::{make_service_fn, service_fn};
use jsonrpsee_server::HttpRequest;

let sockaddr = match url {
Some(url) => url.strip_prefix("ws://").unwrap(),
Expand All @@ -1245,7 +1239,7 @@ mod tests {
let mut i = 0;

let listener = loop {
if let Ok(l) = TcpListener::bind(sockaddr) {
if let Ok(l) = tokio::net::TcpListener::bind(sockaddr).await {
break l;
}
tokio::time::sleep(Duration::from_millis(100)).await;
Expand All @@ -1260,19 +1254,19 @@ mod tests {
let mut module = RpcModule::new(());

if dont_respond_to_method_calls {
module.register_async_method("say_hello", |_, _| async {
module.register_async_method("say_hello", |_, _, _| async {
futures::future::pending::<()>().await;
"timeout"
})?;
} else {
module.register_async_method("say_hello", |_, _| async { "lo" })?;
module.register_async_method("say_hello", |_, _, _| async { "lo" })?;
}

module.register_subscription(
"subscribe_lo",
"subscribe_lo",
"unsubscribe_lo",
|_params, pending, _ctx| async move {
|_params, pending, _ctx, _| async move {
let sink = pending.accept().await.unwrap();
let i = 0;

Expand All @@ -1289,20 +1283,37 @@ mod tests {
},
)?;

let (tx, _) = tokio::sync::broadcast::channel(4);
let (tx, mut rx) = tokio::sync::broadcast::channel(4);
let tx2 = tx.clone();
let (stop_handle, server_handle) = stop_channel();
let addr = listener.local_addr().expect("Could not find local addr");

tokio::spawn(async move {
loop {
let sock = tokio::select! {
res = listener.accept() => {
match res {
Ok((stream, _remote_addr)) => stream,
Err(e) => {
tracing::error!("Failed to accept connection: {:?}", e);
continue;
}
}
}
_ = rx.recv() => {
break
}
};

let make_service = make_service_fn(move |_: &AddrStream| {
let module = module.clone();
let tx = tx2.clone();
let stop_handle = stop_handle.clone();
let module = module.clone();
let rx2 = tx2.subscribe();
let tx2 = tx2.clone();
let stop_handle2 = stop_handle.clone();

async move {
Ok::<_, Infallible>(service_fn(move |req| {
let svc = tower::service_fn(move |req: HttpRequest<hyper::body::Incoming>| {
let module = module.clone();
let tx = tx.clone();
let stop_handle = stop_handle.clone();
let tx = tx2.clone();
let stop_handle = stop_handle2.clone();

let conn_permit = ConnectionGuard::new(1).try_acquire().unwrap();

Expand All @@ -1313,48 +1324,60 @@ mod tests {
async move {
let mut rx = tx.subscribe();

match ws::connect(
let (rp, conn_fut) = ws::connect(
req,
ServerConfig::default(),
module,
conn,
rpc_service,
)
.await
{
Ok((rp, conn_fut)) => {
tokio::spawn(async move {
tokio::select! {
_ = conn_fut => (),
_ = rx.recv() => {},
}
});

Ok::<_, Infallible>(rp)
.unwrap();

tokio::spawn(async move {
tokio::select! {
_ = conn_fut => (),
_ = rx.recv() => {},
}
Err(rp) => Ok(rp),
}
});

Ok::<_, BoxError>(rp)
}
.boxed()
} else {
async { Ok(http::response::denied()) }.boxed()
}
}))
}
});
});

let addr = listener.local_addr()?;
let server = hyper::Server::from_tcp(listener)?.serve(make_service);
tokio::spawn(serve_with_graceful_shutdown(sock, svc, rx2));
}

let mut rx = tx.subscribe();
tokio::spawn(async move {
let graceful = server.with_graceful_shutdown(async move {
_ = rx.recv().await;
});
graceful.await.unwrap();
drop(server_handle);
});

Ok((tx, format!("ws://{}", addr)))
}

async fn serve_with_graceful_shutdown<S, B, I>(
io: I,
service: S,
mut rx: tokio::sync::broadcast::Receiver<()>,
) where
S: tower::Service<HttpRequest<hyper::body::Incoming>, Response = HttpResponse<B>>
+ Clone
+ Send
+ 'static,
S::Future: Send,
S::Response: Send,
S::Error: Into<BoxError>,
B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
if let Err(e) =
jsonrpsee_server::serve_with_graceful_shutdown(io, service, rx.recv().map(|_| ())).await
{
tracing::error!("Error while serving: {:?}", e);
}
}
}

0 comments on commit 580993b

Please sign in to comment.