From 58f98e3bc0fe1559ebaa50dcbc5d41dcd91fdf83 Mon Sep 17 00:00:00 2001 From: Asuna Date: Tue, 19 Dec 2023 12:35:43 +0800 Subject: [PATCH] Implement `DedupSink` --- spdlog/src/error.rs | 52 ++++++ spdlog/src/record.rs | 9 + spdlog/src/sink/dedup_sink.rs | 314 ++++++++++++++++++++++++++++++++ spdlog/src/sink/mod.rs | 2 + spdlog/src/test_utils/common.rs | 27 ++- 5 files changed, 394 insertions(+), 10 deletions(-) create mode 100644 spdlog/src/sink/dedup_sink.rs diff --git a/spdlog/src/error.rs b/spdlog/src/error.rs index 9ede2159..41316dd0 100644 --- a/spdlog/src/error.rs +++ b/spdlog/src/error.rs @@ -91,6 +91,14 @@ pub enum Error { #[cfg(feature = "multi-thread")] #[error("failed to send message to channel: {0}")] SendToChannel(SendToChannelError, SendToChannelErrorDropped), + + /// This variant returned when multiple errors occurred. + #[error("{0:?}")] + Multiple(Vec), + + #[cfg(test)] + #[error("{0}")] + __ForInternalTestsUseOnly(i32), } /// This error type contains a variety of possible invalid arguments. @@ -180,6 +188,26 @@ pub enum SendToChannelErrorDropped { Flush, } +impl Error { + pub(crate) fn push_err(result: Result, new: Self) -> Result { + match result { + Ok(_) => Err(new), + Err(Self::Multiple(mut errors)) => { + errors.push(new); + Err(Self::Multiple(errors)) + } + Err(prev) => Err(Error::Multiple(vec![prev, new])), + } + } + + pub(crate) fn push_result(result: Result, new: Result) -> Result { + match new { + Ok(_) => result, + Err(err) => Self::push_err(result, err), + } + } +} + #[cfg(feature = "multi-thread")] impl Error { #[must_use] @@ -222,3 +250,27 @@ pub type ErrorHandler = fn(Error); const_assert!(Atomic::::is_lock_free()); const_assert!(Atomic::>::is_lock_free()); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_err() { + macro_rules! make_err { + ( $($inputs:tt)+ ) => { + Error::__ForInternalTestsUseOnly($($inputs)*) + }; + } + + assert!(matches!( + Error::push_err(Ok(()), make_err!(1)), + Err(make_err!(1)) + )); + + assert!(matches!( + Error::push_err::<()>(Err(make_err!(1)), make_err!(2)), + Err(Error::Multiple(v)) if matches!(v[..], [make_err!(1), make_err!(2)]) + )); + } +} diff --git a/spdlog/src/record.rs b/spdlog/src/record.rs index 66561e7e..568835fe 100644 --- a/spdlog/src/record.rs +++ b/spdlog/src/record.rs @@ -118,6 +118,15 @@ impl<'a> Record<'a> { // When adding more getters, also add to `RecordOwned` + #[must_use] + pub(crate) fn replace_payload(&'a self, new: impl Into>) -> Self { + Self { + logger_name: self.logger_name, + payload: new.into(), + inner: Cow::Borrowed(&self.inner), + } + } + #[cfg(feature = "log")] #[must_use] pub(crate) fn from_log_crate_record( diff --git a/spdlog/src/sink/dedup_sink.rs b/spdlog/src/sink/dedup_sink.rs new file mode 100644 index 00000000..f4395602 --- /dev/null +++ b/spdlog/src/sink/dedup_sink.rs @@ -0,0 +1,314 @@ +use std::{cmp::Ordering, convert::Infallible, sync::Arc, time::Duration}; + +use crate::{ + sink::{helper, Sink, Sinks}, + sync::*, + Error, Record, RecordOwned, Result, +}; + +struct DedupSinkState { + last_record: Option, + skipped_count: usize, +} + +/// A [combined sink], skip consecutive repeated records. +/// +/// More than 2 consecutive repeated records, the records after the first one +/// will be replaced with a single record `"(skipped {count} duplicates)"`. +/// +/// The skip will stop if the incoming record compares to the last skipped +/// records: +/// - content changed, or +/// - logging level changed, or +/// - interval exceeded the skip duration +/// +/// # Example +/// +/// ``` +/// use std::time::Duration; +/// +/// use spdlog::{prelude::*, sink::DedupSink}; +/// # use std::sync::Arc; +/// # use spdlog::{ +/// # formatter::{pattern, PatternFormatter}, +/// # sink::WriteSink, +/// # }; +/// # +/// # fn main() -> Result<(), spdlog::Error> { +/// # let underlying_sink = Arc::new( +/// # WriteSink::builder() +/// # .formatter(Box::new(PatternFormatter::new(pattern!("{payload}\n")))) +/// # .target(Vec::new()) +/// # .build()? +/// # ); +/// +/// # let sink = { +/// # let underlying_sink = underlying_sink.clone(); +/// let sink = Arc::new( +/// DedupSink::builder() +/// .sink(underlying_sink) +/// .skip_duration(Duration::from_secs(1)) +/// .build()? +/// ); +/// # sink +/// # }; +/// # let doctest = Logger::builder().sink(sink).build()?; +/// +/// // ... Add the `sink` to a logger +/// +/// info!(logger: doctest, "I wish I was a cat"); +/// info!(logger: doctest, "I wish I was a cat"); +/// info!(logger: doctest, "I wish I was a cat"); +/// // The skip will stop since the content changed. +/// info!(logger: doctest, "No school"); +/// info!(logger: doctest, "No works"); +/// info!(logger: doctest, "Just meow meow"); +/// +/// # assert_eq!( +/// # String::from_utf8(underlying_sink.clone_target()).unwrap(), +/// /* Output of `underlying_sink` */ +/// r#"I wish I was a cat +/// (skipped 2 duplicates) +/// No school +/// No works +/// Just meow meow +/// "# +/// # ); +/// # Ok(()) } +/// ``` +/// +/// [combined sink]: index.html#combined-sink +pub struct DedupSink { + common_impl: helper::CommonImpl, + sinks: Sinks, + skip_duration: Duration, + state: Mutex, +} + +impl DedupSink { + /// Constructs a builder of `DedupSink`. + #[must_use] + pub fn builder() -> DedupSinkBuilder<()> { + DedupSinkBuilder { + common_builder_impl: helper::CommonBuilderImpl::new(), + sinks: vec![], + skip_duration: (), + } + } + + /// Gets a reference to internal sinks in the combined sink. + #[must_use] + pub fn sinks(&self) -> &[Arc] { + &self.sinks + } + + #[must_use] + fn is_dup_record(&self, last_record: Option, other: &Record) -> bool { + if let Some(last_record) = last_record { + last_record.payload() == other.payload() + && last_record.level() == other.level() + && other.time().duration_since(last_record.time()).unwrap() < self.skip_duration + } else { + false + } + } + + fn log_record(&self, record: &Record) -> Result<()> { + self.sinks.iter().fold(Ok(()), |result, sink| { + Error::push_result(result, sink.log(record)) + }) + } + + fn flush_sinks(&self) -> Result<()> { + self.sinks.iter().fold(Ok(()), |result, sink| { + Error::push_result(result, sink.flush()) + }) + } +} + +impl Sink for DedupSink { + fn log(&self, record: &Record) -> Result<()> { + let mut state = self.state.lock_expect(); + + if self.is_dup_record(state.last_record.as_ref().map(|r| r.as_ref()), record) { + state.skipped_count += 1; + return Ok(()); + } + + if state.skipped_count != 0 { + let last_record = state.last_record.as_ref().unwrap().as_ref(); + match state.skipped_count.cmp(&1) { + Ordering::Equal => self.log_record(&last_record)?, + Ordering::Greater => self.log_record( + &last_record + .replace_payload(format!("(skipped {} duplicates)", state.skipped_count)), + )?, + Ordering::Less => unreachable!(), // We have checked if `state.skipped_count != 0` + } + } + + self.log_record(record)?; + state.skipped_count = 0; + state.last_record = Some(record.to_owned()); + + Ok(()) + } + + fn flush(&self) -> Result<()> { + self.flush_sinks() + } + + helper::common_impl!(@Sink: common_impl); +} + +/// The builder of [`DedupSink`]. +pub struct DedupSinkBuilder { + common_builder_impl: helper::CommonBuilderImpl, + sinks: Sinks, + skip_duration: ArgS, +} + +impl DedupSinkBuilder { + /// Add a [`Sink`]. + #[must_use] + pub fn sink(mut self, sink: Arc) -> Self { + self.sinks.push(sink); + self + } + + /// Add multiple [`Sink`]s. + #[must_use] + pub fn sinks(mut self, sinks: I) -> Self + where + I: IntoIterator>, + { + self.sinks.append(&mut sinks.into_iter().collect()); + self + } + + /// Only consecutive repeated records within the given duration will be + /// skipped. + /// + /// This parameter is **required**. + #[must_use] + pub fn skip_duration(self, duration: Duration) -> DedupSinkBuilder { + DedupSinkBuilder { + common_builder_impl: self.common_builder_impl, + sinks: self.sinks, + skip_duration: duration, + } + } + + helper::common_impl!(@SinkBuilder: common_builder_impl); +} + +impl DedupSinkBuilder<()> { + #[doc(hidden)] + #[deprecated(note = "\n\n\ + builder compile-time error:\n\ + - missing required field `skip_duration`\n\n\ + ")] + pub fn build(self, _: Infallible) {} +} + +impl DedupSinkBuilder { + /// Builds a [`DedupSink`]. + pub fn build(self) -> Result { + Ok(DedupSink { + common_impl: helper::CommonImpl::from_builder(self.common_builder_impl), + sinks: self.sinks, + skip_duration: self.skip_duration, + state: Mutex::new(DedupSinkState { + last_record: None, + skipped_count: 0, + }), + }) + } +} + +#[cfg(test)] +mod tests { + use std::thread::sleep; + + use super::*; + use crate::{prelude::*, test_utils::*}; + + #[test] + fn dedup() { + let test_sink = Arc::new(CounterSink::new()); + let dedup_sink = Arc::new( + DedupSink::builder() + .skip_duration(Duration::from_secs(1)) + .sink(test_sink.clone()) + .build() + .unwrap(), + ); + let test = build_test_logger(|b| b.sink(dedup_sink)); + + info!(logger: test, "I wish I was a cat"); + info!(logger: test, "I wish I was a cat"); + info!(logger: test, "I wish I was a cat"); + + warn!(logger: test, "I wish I was a cat"); + warn!(logger: test, "I wish I was a cat"); + sleep(Duration::from_millis(1250)); + warn!(logger: test, "I wish I was a cat"); + + warn!(logger: test, "No school"); + warn!(logger: test, "No works"); + info!(logger: test, "Just meow meow"); + + info!(logger: test, "Meow~ Meow~"); + info!(logger: test, "Meow~ Meow~"); + info!(logger: test, "Meow~ Meow~"); + info!(logger: test, "Meow~ Meow~"); + sleep(Duration::from_millis(1250)); + info!(logger: test, "Meow~ Meow~"); + info!(logger: test, "Meow~ Meow~"); + info!(logger: test, "Meow~ Meow~"); + info!(logger: test, "Meow~ Meow..."); + + let records = test_sink.records(); + + assert_eq!(records.len(), 13); + + assert_eq!(records[0].payload(), "I wish I was a cat"); + assert_eq!(records[0].level(), Level::Info); + + assert_eq!(records[1].payload(), "(skipped 2 duplicates)"); + assert_eq!(records[1].level(), Level::Info); + + assert_eq!(records[2].payload(), "I wish I was a cat"); + assert_eq!(records[2].level(), Level::Warn); + + assert_eq!(records[3].payload(), "I wish I was a cat"); + assert_eq!(records[3].level(), Level::Warn); + + assert_eq!(records[4].payload(), "I wish I was a cat"); + assert_eq!(records[4].level(), Level::Warn); + + assert_eq!(records[5].payload(), "No school"); + assert_eq!(records[5].level(), Level::Warn); + + assert_eq!(records[6].payload(), "No works"); + assert_eq!(records[6].level(), Level::Warn); + + assert_eq!(records[7].payload(), "Just meow meow"); + assert_eq!(records[7].level(), Level::Info); + + assert_eq!(records[8].payload(), "Meow~ Meow~"); + assert_eq!(records[8].level(), Level::Info); + + assert_eq!(records[9].payload(), "(skipped 3 duplicates)"); + assert_eq!(records[9].level(), Level::Info); + + assert_eq!(records[10].payload(), "Meow~ Meow~"); + assert_eq!(records[10].level(), Level::Info); + + assert_eq!(records[11].payload(), "(skipped 2 duplicates)"); + assert_eq!(records[11].level(), Level::Info); + + assert_eq!(records[12].payload(), "Meow~ Meow..."); + assert_eq!(records[12].level(), Level::Info); + } +} diff --git a/spdlog/src/sink/mod.rs b/spdlog/src/sink/mod.rs index b16458de..d5049e21 100644 --- a/spdlog/src/sink/mod.rs +++ b/spdlog/src/sink/mod.rs @@ -47,6 +47,7 @@ #[cfg(feature = "multi-thread")] pub(crate) mod async_sink; +mod dedup_sink; mod file_sink; mod helper; #[cfg(any( @@ -62,6 +63,7 @@ mod write_sink; #[cfg(feature = "multi-thread")] pub use async_sink::*; +pub use dedup_sink::*; pub use file_sink::*; #[cfg(any( all(target_os = "linux", feature = "native", feature = "libsystemd"), diff --git a/spdlog/src/test_utils/common.rs b/spdlog/src/test_utils/common.rs index 9c97cd55..d83feb67 100644 --- a/spdlog/src/test_utils/common.rs +++ b/spdlog/src/test_utils/common.rs @@ -18,7 +18,8 @@ use atomic::Atomic; use spdlog::{ formatter::{FmtExtraInfo, Formatter, Pattern, PatternFormatter}, sink::{Sink, WriteSink, WriteSinkBuilder}, - Error, ErrorHandler, LevelFilter, Logger, LoggerBuilder, Record, Result, StringBuf, + Error, ErrorHandler, LevelFilter, Logger, LoggerBuilder, Record, RecordOwned, Result, + StringBuf, }; ////////////////////////////////////////////////// @@ -27,7 +28,7 @@ pub struct CounterSink { level_filter: Atomic, log_counter: AtomicUsize, flush_counter: AtomicUsize, - payloads: Mutex>, + records: Mutex>, delay_duration: Option, } @@ -43,7 +44,7 @@ impl CounterSink { level_filter: Atomic::new(LevelFilter::All), log_counter: AtomicUsize::new(0), flush_counter: AtomicUsize::new(0), - payloads: Mutex::new(vec![]), + records: Mutex::new(vec![]), delay_duration: duration, } } @@ -58,15 +59,25 @@ impl CounterSink { self.flush_counter.load(Ordering::Relaxed) } + #[must_use] + pub fn records(&self) -> Vec { + self.records.lock().unwrap().clone() + } + #[must_use] pub fn payloads(&self) -> Vec { - self.payloads.lock().unwrap().clone() + self.records + .lock() + .unwrap() + .iter() + .map(|r| r.payload().to_string()) + .collect() } pub fn reset(&self) { self.log_counter.store(0, Ordering::Relaxed); self.flush_counter.store(0, Ordering::Relaxed); - self.payloads.lock().unwrap().clear(); + self.records.lock().unwrap().clear(); } } @@ -77,11 +88,7 @@ impl Sink for CounterSink { } self.log_counter.fetch_add(1, Ordering::Relaxed); - - self.payloads - .lock() - .unwrap() - .push(record.payload().to_string()); + self.records.lock().unwrap().push(record.to_owned()); Ok(()) }