Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: use swansong instead of stopper + clone counter #623

Merged
merged 1 commit into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions async-std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn main() {
use std::future::Future;

use trillium::Handler;
pub use trillium_server_common::{Binding, CloneCounterObserver, Stopper};
pub use trillium_server_common::{Binding, Swansong};

mod client;
pub use client::ClientConfig;
Expand Down Expand Up @@ -92,15 +92,15 @@ The default configuration is as follows:
## Usage

```rust
let stopper = trillium_async_std::Stopper::new();
# stopper.stop(); // stoppping the server immediately for the test
let swansong = trillium_async_std::Swansong::new();
# swansong.shut_down(); // stoppping the server immediately for the test
trillium_async_std::config()
.with_port(0)
.with_host("127.0.0.1")
.without_signals()
.with_nodelay()
.with_acceptor(()) // see [`trillium_rustls`] and [`trillium_native_tls`]
.with_stopper(stopper)
.with_swansong(swansong)
.run(|conn: trillium::Conn| async move {
conn.ok("hello async-std")
});
Expand Down
4 changes: 2 additions & 2 deletions async-std/src/server/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::AsyncStdTransport;
use async_std::net::{TcpListener, TcpStream};
use async_std::task::{block_on, spawn};
use std::{convert::TryInto, env, future::Future, io::Result, pin::Pin};

Check warning on line 4 in async-std/src/server/tcp.rs

View workflow job for this annotation

GitHub Actions / Build and Test (windows-latest)

unused import: `pin::Pin`

Check warning on line 4 in async-std/src/server/tcp.rs

View workflow job for this annotation

GitHub Actions / Build and Test (windows-latest)

unused import: `pin::Pin`
use trillium::Info;
use trillium_server_common::Server;

Expand Down Expand Up @@ -29,8 +29,8 @@
")"
);

fn accept(&mut self) -> Pin<Box<dyn Future<Output = Result<Self::Transport>> + Send + '_>> {
Box::pin(async move { self.0.accept().await.map(|(t, _)| t.into()) })
async fn accept(&mut self) -> Result<Self::Transport> {
self.0.accept().await.map(|(t, _)| t.into())
}

fn listener_from_tcp(tcp: std::net::TcpListener) -> Self {
Expand Down
72 changes: 33 additions & 39 deletions async-std/src/server/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use async_std::{
stream::StreamExt,
task::{block_on, spawn},
};
use std::{env, future::Future, io::Result, pin::Pin};
use std::{env, future::Future, io::Result};
use trillium::{log_error, Info};
use trillium_server_common::{
Binding::{self, *},
Server, Stopper,
Server, Swansong,
};

/// Tcp/Unix Trillium server adapter for Async-Std
Expand Down Expand Up @@ -48,39 +48,35 @@ impl Server for AsyncStdServer {
")"
);

fn handle_signals(stop: Stopper) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(async move {
use signal_hook::consts::signal::*;
use signal_hook_async_std::Signals;
async fn handle_signals(swansong: Swansong) {
use signal_hook::consts::signal::*;
use signal_hook_async_std::Signals;

let signals = Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap();
let mut signals = signals.fuse();
while signals.next().await.is_some() {
if stop.is_stopped() {
eprintln!("\nSecond interrupt, shutting down harshly");
std::process::exit(1);
} else {
println!("\nShutting down gracefully.\nControl-C again to force.");
stop.stop();
}
let signals = Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap();
let mut signals = signals.fuse();
while signals.next().await.is_some() {
if swansong.state().is_shutting_down() {
eprintln!("\nSecond interrupt, shutting down harshly");
std::process::exit(1);
} else {
println!("\nShutting down gracefully.\nControl-C again to force.");
swansong.shut_down();
}
})
}
}

fn accept(&mut self) -> Pin<Box<dyn Future<Output = Result<Self::Transport>> + Send + '_>> {
Box::pin(async move {
match &self.0 {
Tcp(t) => t
.accept()
.await
.map(|(t, _)| Tcp(AsyncStdTransport::from(t))),
async fn accept(&mut self) -> Result<Self::Transport> {
match &self.0 {
Tcp(t) => t
.accept()
.await
.map(|(t, _)| Tcp(AsyncStdTransport::from(t))),

Unix(u) => u
.accept()
.await
.map(|(u, _)| Unix(AsyncStdTransport::from(u))),
}
})
Unix(u) => u
.accept()
.await
.map(|(u, _)| Unix(AsyncStdTransport::from(u))),
}
}

fn listener_from_tcp(tcp: std::net::TcpListener) -> Self {
Expand All @@ -106,16 +102,14 @@ impl Server for AsyncStdServer {
block_on(fut);
}

fn clean_up(self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(async move {
if let Unix(u) = &self.0 {
if let Ok(local) = u.local_addr() {
if let Some(path) = local.as_pathname() {
log::info!("deleting {:?}", &path);
log_error!(async_std::fs::remove_file(path).await);
}
async fn clean_up(self) {
if let Unix(u) = &self.0 {
if let Ok(local) = u.local_addr() {
if let Some(path) = local.as_pathname() {
log::info!("deleting {:?}", &path);
log_error!(async_std::fs::remove_file(path).await);
}
}
})
}
}
}
2 changes: 1 addition & 1 deletion http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ serde = { version = "1.0.193", features = ["derive"], optional = true }
smallvec = "1.11.2"
smartcow = "0.2.1"
smartstring = "1.0.1"
stopper = "0.2.3"
thiserror = "1.0.52"
trillium-macros = { version = "0.0.5", path = "../macros" }
swansong = "0.3.0"

[dev-dependencies]
async-compat = "0.2.3"
Expand Down
14 changes: 7 additions & 7 deletions http/examples/conn-example.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
fn main() -> trillium_http::Result<()> {
use async_net::{TcpListener, TcpStream};
use futures_lite::StreamExt;
use stopper::Stopper;
use swansong::Swansong;
use trillium_http::{Conn, Result};

smol::block_on(async {
let stopper = Stopper::new();
let swansong = Swansong::new();

let server_stopper = stopper.clone();
let server_swansong = swansong.clone();
let server = smol::spawn(async move {
let listener = TcpListener::bind("localhost:8001").await?;
let mut incoming = server_stopper.stop_stream(listener.incoming());
let mut incoming = server_swansong.interrupt(listener.incoming());

while let Some(Ok(stream)) = incoming.next().await {
let stopper = server_stopper.clone();
let swansong = server_swansong.clone();
smol::spawn(async move {
Conn::map(stream, stopper, |mut conn: Conn<TcpStream>| async move {
Conn::map(stream, swansong, |mut conn: Conn<TcpStream>| async move {
conn.set_response_body("hello world");
conn.set_status(200);
conn
Expand All @@ -42,7 +42,7 @@ fn main() -> trillium_http::Result<()> {
"hello world"
);

stopper.stop(); // stop the server after one request
swansong.shut_down(); // stop the server after one request
server.await?; // wait for the server to shut down

Result::Ok(())
Expand Down
10 changes: 5 additions & 5 deletions http/examples/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_net::{TcpListener, TcpStream};
use futures_lite::prelude::*;
use trillium_http::{Conn, Stopper};
use trillium_http::{Conn, Swansong};

async fn handler(mut conn: Conn<TcpStream>) -> Conn<TcpStream> {
conn.set_status(200);
Expand All @@ -12,18 +12,18 @@ pub fn main() {
env_logger::init();

smol::block_on(async move {
let stopper = Stopper::new();
let swansong = Swansong::new();
let port = std::env::var("PORT")
.unwrap_or("8080".into())
.parse::<u16>()
.unwrap();

let listener = TcpListener::bind(("0.0.0.0", port)).await.unwrap();
let mut incoming = stopper.stop_stream(listener.incoming());
let mut incoming = swansong.interrupt(listener.incoming());
while let Some(Ok(stream)) = incoming.next().await {
let stopper = stopper.clone();
let swansong = swansong.clone();
smol::spawn(async move {
match Conn::map(stream, stopper, handler).await {
match Conn::map(stream, swansong, handler).await {
Ok(Some(_)) => log::info!("upgrade"),
Ok(None) => log::info!("closing connection"),
Err(e) => log::error!("{:?}", e),
Expand Down
8 changes: 4 additions & 4 deletions http/examples/tokio-http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_compat::Compat;
use tokio::net::{TcpListener, TcpStream};
use trillium_http::{Conn, Stopper};
use trillium_http::{Conn, Swansong};

async fn handler(mut conn: Conn<Compat<TcpStream>>) -> Conn<Compat<TcpStream>> {
let body = conn.request_body().await.read_string().await.unwrap();
Expand All @@ -15,14 +15,14 @@ async fn handler(mut conn: Conn<Compat<TcpStream>>) -> Conn<Compat<TcpStream>> {
#[tokio::main]
pub async fn main() {
env_logger::init();
let stopper = Stopper::new();
let swansong = Swansong::new();
let listener = TcpListener::bind("127.0.0.1:8081").await.unwrap();
loop {
match listener.accept().await {
Ok((stream, _)) => {
let stopper = stopper.clone();
let swansong = swansong.clone();
tokio::spawn(async move {
match Conn::map(Compat::new(stream), stopper, handler).await {
match Conn::map(Compat::new(stream), swansong, handler).await {
Ok(Some(_)) => log::info!("upgrade"),
Ok(None) => log::info!("closing connection"),
Err(e) => log::error!("{:?}", e),
Expand Down
12 changes: 6 additions & 6 deletions http/examples/unsend.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_net::{TcpListener, TcpStream};
use futures_lite::prelude::*;
use std::thread;
use trillium_http::{Conn, Stopper};
use trillium_http::{Conn, Swansong};

async fn handler(mut conn: Conn<TcpStream>) -> Conn<TcpStream> {
let rc = std::rc::Rc::new(());
Expand All @@ -14,13 +14,13 @@ async fn handler(mut conn: Conn<TcpStream>) -> Conn<TcpStream> {

pub fn main() {
env_logger::init();
let stopper = Stopper::new();
let swansong = Swansong::new();
let (send, receive) = async_channel::unbounded();
let core_ids = core_affinity::get_core_ids().unwrap();
let handles = core_ids
.into_iter()
.map(|id| {
let stopper = stopper.clone();
let swansong = swansong.clone();
let receive = receive.clone();
thread::spawn(move || {
if !core_affinity::set_for_current(id) {
Expand All @@ -30,10 +30,10 @@ pub fn main() {

futures_lite::future::block_on(executor.run(async {
while let Ok(transport) = receive.recv().await {
let stopper = stopper.clone();
let swansong = swansong.clone();

let future = async move {
match Conn::map(transport, stopper, handler).await {
match Conn::map(transport, swansong, handler).await {
Ok(_) => {}
Err(e) => log::error!("{e}"),
}
Expand All @@ -52,7 +52,7 @@ pub fn main() {
.unwrap();

let listener = TcpListener::bind(("0.0.0.0", port)).await.unwrap();
let mut incoming = stopper.stop_stream(listener.incoming());
let mut incoming = swansong.interrupt(listener.incoming());
while let Some(Ok(stream)) = incoming.next().await {
send.send(stream).await.unwrap();
}
Expand Down
Loading
Loading