Skip to content

Commit

Permalink
Flush AsyncPoolSink on the current thread if tearing down
Browse files Browse the repository at this point in the history
  • Loading branch information
SpriteOvO committed Mar 21, 2024
1 parent 281f1aa commit ad2384a
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 3 deletions.
5 changes: 5 additions & 0 deletions spdlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ tracing-appender = "=0.2.2"
[build-dependencies]
rustc_version = "0.4.0"

[[test]]
name = "global_async_pool_sink"
harness = false
required-features = ["multi-thread"]

[[bench]]
name = "compare_with_cpp_spdlog"
harness = false
Expand Down
3 changes: 3 additions & 0 deletions spdlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,13 +647,16 @@ pub fn log_crate_proxy() -> &'static LogCrateProxy {
&PROXY
}

static IS_TEARING_DOWN: AtomicBool = AtomicBool::new(false);

fn flush_default_logger_at_exit() {
// Rust never calls `drop` for static variables.
//
// Setting up an exit handler gives us a chance to flush the default logger
// once at the program exit, thus we don't lose the last logs.

extern "C" fn handler() {
IS_TEARING_DOWN.store(true, Ordering::SeqCst);
if let Some(default_logger) = DEFAULT_LOGGER.get() {
default_logger.load().flush()
}
Expand Down
16 changes: 13 additions & 3 deletions spdlog/src/sink/async_sink/async_pool_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,19 @@ impl Sink for AsyncPoolSink {
}

fn flush(&self) -> Result<()> {
self.assign_task(Task::Flush {
backend: self.clone_backend(),
})
if crate::IS_TEARING_DOWN.load(Ordering::SeqCst) {
// https://github.com/SpriteOvO/spdlog-rs/issues/64
//
// `crossbeam` uses thread-local internally, which is not supported in `atexit`
// callback. Let's directly flush the sinks on the current thread if the program
// is tearing down.
self.backend.flush();
Ok(())
} else {
self.assign_task(Task::Flush {
backend: self.clone_backend(),
})
}
}

/// For [`AsyncPoolSink`], the function performs the same call to all
Expand Down
133 changes: 133 additions & 0 deletions spdlog/tests/global_async_pool_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use std::{
env,
fmt::Write,
os::raw::c_int,
process::{self, Stdio},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use spdlog::{
formatter::Formatter,
prelude::*,
sink::{AsyncPoolSink, Sink},
ErrorHandler,
};

static IS_FLUSHED: AtomicBool = AtomicBool::new(false);

struct SetFlagSink;

impl Sink for SetFlagSink {
fn log(&self, _record: &spdlog::Record) -> error::Result<()> {
Ok(())
}

fn flush(&self) -> error::Result<()> {
IS_FLUSHED.store(true, Ordering::SeqCst);
Ok(())
}

fn level_filter(&self) -> LevelFilter {
LevelFilter::All
}

fn set_level_filter(&self, _level_filter: LevelFilter) {
unimplemented!()
}

fn set_formatter(&self, _formatter: Box<dyn Formatter>) {
unimplemented!()
}

fn set_error_handler(&self, _handler: Option<ErrorHandler>) {
unimplemented!()
}
}

fn run_test() {
{
extern "C" fn check() {
// Assert that `AsyncPoolSink` in the default logger will be flushed correctly
// and will not panic.
assert!(IS_FLUSHED.load(Ordering::SeqCst));
}
// Setup `atexit` to check the flag at the end of the program
extern "C" {
fn atexit(cb: extern "C" fn()) -> c_int;
}
assert_eq!(unsafe { atexit(check) }, 0);

let async_pool_sink = Arc::new(
AsyncPoolSink::builder()
.sink(Arc::new(SetFlagSink))
.build()
.unwrap(),
);
let logger = Arc::new(
Logger::builder()
.sink(async_pool_sink)
.level_filter(LevelFilter::All)
.flush_level_filter(LevelFilter::Off)
.build()
.unwrap(),
);
spdlog::set_default_logger(logger);
}

info!("hello async_pool_sink");
}

fn main() {
// https://github.com/SpriteOvO/spdlog-rs/issues/64

// This is a flaky test, it only has a certain probability of failing, so we run
// it multiple times to make sure it's really working properly.
{
let mut captured_output = String::new();
let args = env::args().collect::<Vec<_>>();
// If this is the parent process (no additional arguments)
if args.len() == 1 {
for i in 0..1000 {
let output = process::Command::new(&args[0])
.arg("child")
.stderr(Stdio::piped())
.output()
.unwrap();
let success = output.status.success();

writeln!(
captured_output,
"Attempt #{i} = {}",
if success { "ok" } else { "failed!" }
)
.unwrap();

if !success {
eprintln!("{captured_output}");

let stderr = String::from_utf8_lossy(&output.stderr).lines().fold(
String::new(),
|mut contents, line| {
writeln!(&mut contents, "> {line}").unwrap();
contents
},
);
eprintln!("Stderr of the failed child:\n{stderr}");

panic!("Test failed");
}
}
return;
} else {
assert_eq!(args[1], "child");
}

// Run the test after leaving the scope, so the main function ends
// without dropping additional variables, thus exiting faster. This
// should increase the probability of reproducing the error.
}
run_test();
}

0 comments on commit ad2384a

Please sign in to comment.