From a963db3e965d02d95007b85014d8b8ebc679df85 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Fri, 31 Mar 2017 07:25:13 -0700 Subject: [PATCH] Increase existing telemetry, adjust file delays (#237) This commit contains a smattering of adjustments. The main show is the inclusion of Internal setup code in bin/cernan -- oops, it was only present on my disk -- and fiddling with the way backoffs occur in file.rs. We reduce the CPU load of each FileServer thread by only gradually scaling down to faster polls. This means a group of slow files will be polled more slowly that a group with one fast file in it. Signed-off-by: Brian L. Troutwine --- examples/configs/basic.toml | 3 +++ src/bin/cernan.rs | 12 ++++++++++++ src/sink/wavefront.rs | 3 +++ src/source/file.rs | 4 ++-- src/source/internal.rs | 4 ++-- src/source/mod.rs | 2 +- src/time.rs | 1 - 7 files changed, 23 insertions(+), 6 deletions(-) diff --git a/examples/configs/basic.toml b/examples/configs/basic.toml index 056391fa..6a7f8044 100644 --- a/examples/configs/basic.toml +++ b/examples/configs/basic.toml @@ -7,6 +7,9 @@ flush-interval = 5 source = "cernan" [sources] + [sources.internal] + forwards = ["sinks.console", "sinks.null"] + [sources.statsd.primary] enabled = true port = 8125 diff --git a/src/bin/cernan.rs b/src/bin/cernan.rs index 85a63230..f36227e4 100644 --- a/src/bin/cernan.rs +++ b/src/bin/cernan.rs @@ -161,6 +161,18 @@ fn main() { cernan::source::NativeServer::new(native_server_send, config).run(); })) } + + let internal_config = args.internal; + let mut internal_send = Vec::new(); + populate_forwards(&mut internal_send, + Some(&mut flush_sends), + &internal_config.forwards, + &internal_config.config_path, + &sends); + joins.push(thread::spawn(move || { + cernan::source::Internal::new(internal_send, internal_config).run(); + })); + for config in args.statsds.values() { let c = (*config).clone(); let mut statsd_sends = Vec::new(); diff --git a/src/sink/wavefront.rs b/src/sink/wavefront.rs index a8b6a15d..29284441 100644 --- a/src/sink/wavefront.rs +++ b/src/sink/wavefront.rs @@ -1,6 +1,7 @@ use buckets::Buckets; use metric::{AggregationMethod, LogLine, TagMap, Telemetry}; use sink::{Sink, Valve}; +use source::report_telemetry; use std::cmp; use std::io::Write as IoWrite; use std::net::TcpStream; @@ -143,6 +144,8 @@ impl Sink for Wavefront { fn flush(&mut self) { loop { + report_telemetry("cernan.sinks.wavefront.delivery_attempts", + self.delivery_attempts as f64); if self.delivery_attempts > 0 { debug!("delivery attempts: {}", self.delivery_attempts); } diff --git a/src/source/file.rs b/src/source/file.rs index ca385fb4..487b7c5d 100644 --- a/src/source/file.rs +++ b/src/source/file.rs @@ -206,7 +206,7 @@ impl Source for FileServer { } } let start = Instant::now(); - let mut attempts = 0; + let mut attempts: u32 = 0; loop { // file poll if fp_map.is_empty() { @@ -220,7 +220,7 @@ impl Source for FileServer { let mut lines_read = 0; match file.read_line(&mut buffer) { Ok(sz) => { - attempts = 0; + attempts = attempts.saturating_sub(1); if sz > 0 { lines_read += 1; buffer.pop(); diff --git a/src/source/internal.rs b/src/source/internal.rs index a60456f7..370170c3 100644 --- a/src/source/internal.rs +++ b/src/source/internal.rs @@ -68,10 +68,10 @@ pub fn report_telemetry(name: S, value: f64) -> () /// floor. impl Source for Internal { fn run(&mut self) { - let mut attempts = 0; + let mut attempts: u32 = 0; loop { if let Some(mut telem) = Q.lock().unwrap().pop_front() { - attempts -= 1; + attempts = attempts.saturating_sub(1); if !self.chans.is_empty() { telem = telem.overlay_tags_from_map(&self.tags); util::send("internal", diff --git a/src/source/mod.rs b/src/source/mod.rs index 17dfacec..09178bd9 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -8,7 +8,7 @@ mod statsd; pub use self::file::{FileServer, FileServerConfig}; pub use self::flush::FlushTimer; pub use self::graphite::{Graphite, GraphiteConfig}; -pub use self::internal::{Internal, InternalConfig}; +pub use self::internal::{Internal, InternalConfig, report_telemetry}; pub use self::native::{NativeServer, NativeServerConfig}; pub use self::statsd::{Statsd, StatsdConfig}; diff --git a/src/time.rs b/src/time.rs index 78025190..ba4fdb3f 100644 --- a/src/time.rs +++ b/src/time.rs @@ -23,7 +23,6 @@ pub fn update_time() { thread::sleep(dur); let now = UTC::now().timestamp() as usize; let order = Ordering::Relaxed; - trace!("updated cernan {:?} now, is: {}", order, now); NOW.store(now, order); } }