Skip to content

Commit

Permalink
feat(http)!: support !Send handler functions
Browse files Browse the repository at this point in the history
  • Loading branch information
jbr committed Apr 6, 2024
1 parent 44b2095 commit e4b3a48
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 3 deletions.
3 changes: 3 additions & 0 deletions http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ fastrand = "2.0.1"
test-harness = "0.2.0"
indoc = "2.0.4"
serde_json = "1.0.108"
async-channel = "2.2.0"
async-executor = "1.9.1"
core_affinity = "0.8.1"

[dev-dependencies.tokio]
version = "1.35.1"
Expand Down
64 changes: 64 additions & 0 deletions http/examples/unsend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use async_net::{TcpListener, TcpStream};
use futures_lite::prelude::*;
use std::thread;
use trillium_http::{Conn, Stopper};

async fn handler(mut conn: Conn<TcpStream>) -> Conn<TcpStream> {
let rc = std::rc::Rc::new(());
conn.set_status(200);
std::future::ready(()).await;
conn.set_response_body("ok");
let _ = rc.clone();
conn
}

pub fn main() {
env_logger::init();
let stopper = Stopper::new();
let (send, receive) = async_channel::unbounded();
let core_ids = core_affinity::get_core_ids().unwrap();
let handles = core_ids
.into_iter()
.map(|id| {
let stopper = stopper.clone();
let receive = receive.clone();
thread::spawn(move || {
if !core_affinity::set_for_current(id) {
log::warn!("unable to set core affinity");
}
let executor = async_executor::LocalExecutor::new();

futures_lite::future::block_on(executor.run(async {
while let Ok(transport) = receive.recv().await {
let stopper = stopper.clone();

let future = async move {
match Conn::map(transport, stopper, handler).await {
Ok(_) => {}
Err(e) => log::error!("{e}"),
}
};
executor.spawn(future).detach();
}
}));
})
})
.collect::<Vec<_>>();

async_io::block_on(async move {
let port = std::env::var("PORT")
.unwrap_or("8080".into())
.parse::<u16>()
.unwrap();

let listener = TcpListener::bind(("0.0.0.0", port)).await.unwrap();
let mut incoming = stopper.stop_stream(listener.incoming());
while let Some(Ok(stream)) = incoming.next().await {
send.send(stream).await.unwrap();
}
});

for handle in handles {
handle.join().unwrap();
}
}
6 changes: 3 additions & 3 deletions http/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ where
) -> Result<Option<Upgrade<Transport>>>
where
F: FnMut(Conn<Transport>) -> Fut,
Fut: Future<Output = Conn<Transport>> + Send,
Fut: Future<Output = Conn<Transport>>,
{
Self::map_with_config(DEFAULT_CONFIG, transport, stopper, handler).await
}
Expand Down Expand Up @@ -146,7 +146,7 @@ where
) -> Result<Option<Upgrade<Transport>>>
where
F: FnMut(Conn<Transport>) -> Fut,
Fut: Future<Output = Conn<Transport>> + Send,
Fut: Future<Output = Conn<Transport>>,
{
Self::map_with_config_and_shared_state(http_config, transport, stopper, None, handler).await
}
Expand Down Expand Up @@ -182,7 +182,7 @@ where
) -> Result<Option<Upgrade<Transport>>>
where
F: FnMut(Conn<Transport>) -> Fut,
Fut: Future<Output = Conn<Transport>> + Send,
Fut: Future<Output = Conn<Transport>>,
{
let mut conn = Conn::new_internal(
http_config,
Expand Down

0 comments on commit e4b3a48

Please sign in to comment.