Skip to content

Commit

Permalink
Pass optional timeout alongside headers
Browse files Browse the repository at this point in the history
  • Loading branch information
Tumas committed Jul 22, 2024
1 parent 7397650 commit 9d791a0
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 27 deletions.
11 changes: 7 additions & 4 deletions src/api/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ impl<T: Transport> Eth<T> {

/// Get current block number
pub fn block_number(&self, headers: Option<HeaderMap>) -> CallFuture<U64, T::Out> {
CallFuture::new(self.transport.execute_with_headers("eth_blockNumber", vec![], headers))
CallFuture::new(
self.transport
.execute_with_headers("eth_blockNumber", vec![], headers, None),
)
}

/// Call a constant method of contract without changing the state of the blockchain.
Expand Down Expand Up @@ -122,7 +125,7 @@ impl<T: Transport> Eth<T> {
let filter = helpers::serialize(&filter);
CallFuture::new(
self.transport
.execute_with_headers("eth_getLogs", vec![filter], headers),
.execute_with_headers("eth_getLogs", vec![filter], headers, None),
)
}

Expand All @@ -134,12 +137,12 @@ impl<T: Transport> Eth<T> {
BlockId::Hash(hash) => {
let hash = helpers::serialize(&hash);
self.transport
.execute_with_headers("eth_getBlockByHash", vec![hash, include_txs], headers)
.execute_with_headers("eth_getBlockByHash", vec![hash, include_txs], headers, None)
}
BlockId::Number(num) => {
let num = helpers::serialize(&num);
self.transport
.execute_with_headers("eth_getBlockByNumber", vec![num, include_txs], headers)
.execute_with_headers("eth_getBlockByNumber", vec![num, include_txs], headers, None)
}
};

Expand Down
42 changes: 34 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// select! in WS transport
#![recursion_limit = "256"]

use std::time::Duration;

use headers::HeaderMap;

Check failure on line 16 in src/lib.rs

View workflow job for this annotation

GitHub Actions / Check Transports

unresolved import `headers`

Check failure on line 16 in src/lib.rs

View workflow job for this annotation

GitHub Actions / Check WASM

unresolved import `headers`
use jsonrpc_core as rpc;

Expand Down Expand Up @@ -53,18 +55,30 @@ pub trait Transport: std::fmt::Debug + Clone {
fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call);

/// Execute prepared RPC call.
fn send(&self, id: RequestId, request: rpc::Call, headers: Option<HeaderMap>) -> Self::Out;
fn send(
&self,
id: RequestId,
request: rpc::Call,
headers: Option<HeaderMap>,
timeout: Option<Duration>,
) -> Self::Out;

/// Execute remote method with given parameters.
fn execute(&self, method: &str, params: Vec<rpc::Value>) -> Self::Out {
let (id, request) = self.prepare(method, params);
self.send(id, request, None)
self.send(id, request, None, None)
}

/// Execute remote method with given parameters and headers.
fn execute_with_headers(&self, method: &str, params: Vec<rpc::Value>, headers: Option<HeaderMap>) -> Self::Out {
fn execute_with_headers(
&self,
method: &str,
params: Vec<rpc::Value>,
headers: Option<HeaderMap>,
timeout: Option<Duration>,
) -> Self::Out {
let (id, request) = self.prepare(method, params);
self.send(id, request, headers)
self.send(id, request, headers, timeout)
}
}

Expand Down Expand Up @@ -104,8 +118,14 @@ where
(**self).prepare(method, params)
}

fn send(&self, id: RequestId, request: rpc::Call, headers: Option<HeaderMap>) -> Self::Out {
(**self).send(id, request, headers)
fn send(
&self,
id: RequestId,
request: rpc::Call,
headers: Option<HeaderMap>,
timeout: Option<Duration>,
) -> Self::Out {
(**self).send(id, request, headers, timeout)
}
}

Expand Down Expand Up @@ -151,7 +171,7 @@ mod tests {
use crate::api::Web3;
use futures::future::BoxFuture;
use headers::HeaderMap;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

#[derive(Debug, Clone)]
struct FakeTransport;
Expand All @@ -163,7 +183,13 @@ mod tests {
unimplemented!()
}

fn send(&self, _id: RequestId, _request: rpc::Call, _headers: Option<HeaderMap>) -> Self::Out {
fn send(
&self,
_id: RequestId,
_request: rpc::Call,
_headers: Option<HeaderMap>,
_timeout: Option<Duration>,
) -> Self::Out {
unimplemented!()
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/transports/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::{
};
use headers::HeaderMap;

Check failure on line 12 in src/transports/batch.rs

View workflow job for this annotation

GitHub Actions / Check Transports

unresolved import `headers`

Check failure on line 12 in src/transports/batch.rs

View workflow job for this annotation

GitHub Actions / Check WASM

unresolved import `headers`
use parking_lot::Mutex;
use std::{collections::BTreeMap, pin::Pin, sync::Arc};
use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};

type Pending = oneshot::Sender<error::Result<rpc::Value>>;
type PendingRequests = Arc<Mutex<BTreeMap<RequestId, Pending>>>;
Expand Down Expand Up @@ -76,7 +76,13 @@ where
self.transport.prepare(method, params)
}

fn send(&self, id: RequestId, request: rpc::Call, _headers: Option<HeaderMap>) -> Self::Out {
fn send(
&self,
id: RequestId,
request: rpc::Call,
_headers: Option<HeaderMap>,
_timeout: Option<Duration>,
) -> Self::Out {
let (tx, rx) = oneshot::channel();
self.pending.lock().insert(id, tx);
self.batch.lock().push((id, request));
Expand Down
14 changes: 11 additions & 3 deletions src/transports/either.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! A strongly-typed transport alternative.
use std::time::Duration;

use crate::{api, error, rpc, BatchTransport, DuplexTransport, RequestId, Transport};
use futures::{
future::{BoxFuture, FutureExt},
Expand Down Expand Up @@ -37,10 +39,16 @@ where
}
}

fn send(&self, id: RequestId, request: rpc::Call, headers: Option<HeaderMap>) -> Self::Out {
fn send(
&self,
id: RequestId,
request: rpc::Call,
headers: Option<HeaderMap>,
timeout: Option<Duration>,
) -> Self::Out {
match *self {
Self::Left(ref a) => a.send(id, request, headers).boxed(),
Self::Right(ref b) => b.send(id, request, headers).boxed(),
Self::Left(ref a) => a.send(id, request, headers, timeout).boxed(),
Self::Right(ref b) => b.send(id, request, headers, timeout).boxed(),
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

/// HTTP Transport
Expand Down Expand Up @@ -80,6 +81,7 @@ async fn execute_rpc<T: DeserializeOwned>(
request: &Request,
id: RequestId,
headers: Option<HeaderMap>,
timeout: Option<Duration>,
) -> Result<T> {
log::debug!("[id:{}] sending request: {:?}", id, serde_json::to_string(&request)?);
let mut request_builder = client.post(url).json(request);
Expand All @@ -88,6 +90,10 @@ async fn execute_rpc<T: DeserializeOwned>(
request_builder = request_builder.headers(headers);
}

if let Some(timeout) = timeout {
request_builder = request_builder.timeout(timeout);
}

let response = request_builder
.send()
.await
Expand Down Expand Up @@ -127,10 +133,10 @@ impl Transport for Http {
(id, request)
}

fn send(&self, id: RequestId, call: Call, headers: Option<HeaderMap>) -> Self::Out {
fn send(&self, id: RequestId, call: Call, headers: Option<HeaderMap>, timeout: Option<Duration>) -> Self::Out {
let (client, url) = self.new_request();
Box::pin(async move {
let output: Output = execute_rpc(&client, url, &Request::Single(call), id, headers).await?;
let output: Output = execute_rpc(&client, url, &Request::Single(call), id, headers, timeout).await?;
helpers::to_result_from_output(output)
})
}
Expand All @@ -148,7 +154,7 @@ impl BatchTransport for Http {
let (client, url) = self.new_request();
let (ids, calls): (Vec<_>, Vec<_>) = requests.into_iter().unzip();
Box::pin(async move {
let value = execute_rpc(&client, url, &Request::Batch(calls), id, None).await?;
let value = execute_rpc(&client, url, &Request::Batch(calls), id, None, None).await?;
let outputs = handle_possible_error_object_for_batched_request(value)?;
handle_batch_response(&ids, outputs)
})
Expand Down
13 changes: 10 additions & 3 deletions src/transports/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
pin::Pin,
sync::{atomic::AtomicUsize, Arc},
task::{Context, Poll},
time::Duration,
};
use tokio::{
io::AsyncWriteExt,
Expand Down Expand Up @@ -64,7 +65,13 @@ impl Transport for Ipc {
(id, request)
}

fn send(&self, id: RequestId, call: rpc::Call, _headers: Option<HeaderMap>) -> Self::Out {
fn send(
&self,
id: RequestId,
call: rpc::Call,
_headers: Option<HeaderMap>,
_timeout: Option<Duration>,
) -> Self::Out {
let (response_tx, response_rx) = oneshot::channel();
let message = TransportMessage::Single((id, call, response_tx));

Expand Down Expand Up @@ -357,7 +364,7 @@ mod test {
"test": -1,
})],
);
let response = ipc.send(req_id, request, None).await;
let response = ipc.send(req_id, request, None, None).await;
let expected_response_json: serde_json::Value = json!({
"test": 1,
});
Expand All @@ -369,7 +376,7 @@ mod test {
"test": 3,
})],
);
let response = ipc.send(req_id, request, None).await;
let response = ipc.send(req_id, request, None, None).await;
let expected_response_json: serde_json::Value = json!({
"test": "string1",
});
Expand Down
10 changes: 8 additions & 2 deletions src/transports/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
};
use futures::future::{self, BoxFuture, FutureExt};
use headers::HeaderMap;
use std::{cell::RefCell, collections::VecDeque, rc::Rc};
use std::{cell::RefCell, collections::VecDeque, rc::Rc, time::Duration};

type Result<T> = BoxFuture<'static, error::Result<T>>;

Expand All @@ -27,7 +27,13 @@ impl Transport for TestTransport {
(self.requests.borrow().len(), request)
}

fn send(&self, id: RequestId, request: rpc::Call, _headers: Option<HeaderMap>) -> Result<rpc::Value> {
fn send(
&self,
id: RequestId,
request: rpc::Call,
_headers: Option<HeaderMap>,
_timeout: Option<Duration>,
) -> Result<rpc::Value> {
future::ready(match self.responses.borrow_mut().pop_front() {
Some(response) => Ok(response),
None => {
Expand Down
4 changes: 2 additions & 2 deletions src/transports/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
fmt,
marker::Unpin,
pin::Pin,
sync::{atomic, Arc},
sync::{atomic, Arc}, time::Duration,
};
use url::Url;

Expand Down Expand Up @@ -474,7 +474,7 @@ impl Transport for WebSocket {
(id, request)
}

fn send(&self, id: RequestId, request: rpc::Call, _headers: Option<HeaderMap>) -> Self::Out {
fn send(&self, id: RequestId, request: rpc::Call, _headers: Option<HeaderMap>, _timeout: Option<Duration>) -> Self::Out {
let response = self.send_request(id, rpc::Request::Single(request));
Response::new(response, batch_to_single)
}
Expand Down

0 comments on commit 9d791a0

Please sign in to comment.