diff --git a/http/Cargo.toml b/http/Cargo.toml index 3041a0fd45..3d2f8481d9 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -50,6 +50,9 @@ fastrand = "2.0.1" test-harness = "0.2.0" indoc = "2.0.4" serde_json = "1.0.108" +async-channel = "2.2.0" +async-executor = "1.9.1" +core_affinity = "0.8.1" [dev-dependencies.tokio] version = "1.35.1" diff --git a/http/examples/unsend.rs b/http/examples/unsend.rs new file mode 100644 index 0000000000..2dcf8b37a9 --- /dev/null +++ b/http/examples/unsend.rs @@ -0,0 +1,64 @@ +use async_net::{TcpListener, TcpStream}; +use futures_lite::prelude::*; +use std::thread; +use trillium_http::{Conn, Stopper}; + +async fn handler(mut conn: Conn) -> Conn { + let rc = std::rc::Rc::new(()); + conn.set_status(200); + std::future::ready(()).await; + conn.set_response_body("ok"); + let _ = rc.clone(); + conn +} + +pub fn main() { + env_logger::init(); + let stopper = Stopper::new(); + let (send, receive) = async_channel::unbounded(); + let core_ids = core_affinity::get_core_ids().unwrap(); + let handles = core_ids + .into_iter() + .map(|id| { + let stopper = stopper.clone(); + let receive = receive.clone(); + thread::spawn(move || { + if !core_affinity::set_for_current(id) { + log::warn!("unable to set core affinity"); + } + let executor = async_executor::LocalExecutor::new(); + + futures_lite::future::block_on(executor.run(async { + while let Ok(transport) = receive.recv().await { + let stopper = stopper.clone(); + + let future = async move { + match Conn::map(transport, stopper, handler).await { + Ok(_) => {} + Err(e) => log::error!("{e}"), + } + }; + executor.spawn(future).detach(); + } + })); + }) + }) + .collect::>(); + + async_io::block_on(async move { + let port = std::env::var("PORT") + .unwrap_or("8080".into()) + .parse::() + .unwrap(); + + let listener = TcpListener::bind(("0.0.0.0", port)).await.unwrap(); + let mut incoming = stopper.stop_stream(listener.incoming()); + while let Some(Ok(stream)) = incoming.next().await { + send.send(stream).await.unwrap(); + } + }); + + for handle in handles { + handle.join().unwrap(); + } +} diff --git a/http/src/conn.rs b/http/src/conn.rs index 45470a4378..d2e3fa9fac 100644 --- a/http/src/conn.rs +++ b/http/src/conn.rs @@ -114,7 +114,7 @@ where ) -> Result>> where F: FnMut(Conn) -> Fut, - Fut: Future> + Send, + Fut: Future>, { Self::map_with_config(DEFAULT_CONFIG, transport, stopper, handler).await } @@ -146,7 +146,7 @@ where ) -> Result>> where F: FnMut(Conn) -> Fut, - Fut: Future> + Send, + Fut: Future>, { Self::map_with_config_and_shared_state(http_config, transport, stopper, None, handler).await } @@ -182,7 +182,7 @@ where ) -> Result>> where F: FnMut(Conn) -> Fut, - Fut: Future> + Send, + Fut: Future>, { let mut conn = Conn::new_internal( http_config,