Skip to content

Commit

Permalink
Resolve graphite growth problem, introduce hopper 0.1.2 (#181)
Browse files Browse the repository at this point in the history
We noticed that there was something fishy going on with graphite.
In particular, the number of lines push to the programmable_filter
grew _way_ too quickly for the inputs. It turns out, that fancy
line buffer we pass in to avoid allocing small strings repeatedly
was not being cleared and so everything that came in would be
appended there.

This commit also includes the work done as a part of
postmates/hopper:#5 by updating to 0.1.2. All messages will be
delivered in order--as they should have been--and hopper is now
quite a bit faster to boot as it can keep cramming in low-memory
when there's space.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Jan 11, 2017
1 parent 03e3910 commit d748c9e
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 50 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ clap = "2.10.0"
fern = "0.3.5"
flate2 = "0.2"
glob = "0.2.11"
hopper = "0.1.0"
hopper = "0.1.2"
hyper = "0.9"
lazy_static = "0.2.1"
libc = "0.2"
Expand Down
6 changes: 3 additions & 3 deletions src/bin/cernan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ fn main() {
let (flt_send, flt_recv) = hopper::channel(&config.config_path, &args.data_directory)
.unwrap();
sends.insert(config.config_path.clone(), flt_send);
let mut upstream_sends = Vec::new();
populate_forwards(&mut upstream_sends,
let mut downstream_sends = Vec::new();
populate_forwards(&mut downstream_sends,
&config.forwards,
&config.config_path,
&sends);
joins.push(thread::spawn(move || {
cernan::filter::ProgrammableFilter::new(c).run(flt_recv, upstream_sends);
cernan::filter::ProgrammableFilter::new(c).run(flt_recv, downstream_sends);
}));
}

Expand Down
12 changes: 8 additions & 4 deletions src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,22 @@ fn event_in_fe(fe: FilterError) -> metric::Event {
}

pub trait Filter {
fn process(&mut self, event: metric::Event) -> Result<Vec<metric::Event>, FilterError>;
fn process(&mut self,
event: metric::Event,
res: &mut Vec<metric::Event>)
-> Result<(), FilterError>;
fn run(&mut self, mut recv: hopper::Receiver<metric::Event>, mut chans: util::Channel) {
let mut attempts = 0;
let mut events = Vec::with_capacity(64);
loop {
time::delay(attempts);
match recv.next() {
None => attempts += 1,
Some(event) => {
attempts = 0;
match self.process(event) {
Ok(events) => {
for ev in events {
match self.process(event, &mut events) {
Ok(()) => {
for ev in events.drain(..) {
util::send("filter", &mut chans, ev)
}
}
Expand Down
38 changes: 22 additions & 16 deletions src/filter/programmable_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ impl ProgrammableFilter {
}

impl filter::Filter for ProgrammableFilter {
fn process(&mut self, event: metric::Event) -> Result<Vec<metric::Event>, filter::FilterError> {
fn process(&mut self, event: metric::Event, res: &mut Vec<metric::Event>) -> Result<(), filter::FilterError> {
match event {
metric::Event::Telemetry(mut m) => {
self.state.get_global("process_metric");
Expand All @@ -396,11 +396,13 @@ impl filter::Filter for ProgrammableFilter {

self.state.call(1, 0);

Ok(pyld.logs
.into_iter()
.map(|m| metric::Event::new_log(*m))
.chain(pyld.metrics.into_iter().map(|m| metric::Event::new_telemetry(*m)))
.collect())
for lg in pyld.logs {
res.push(metric::Event::new_log(*lg));
}
for mt in pyld.metrics {
res.push(metric::Event::new_telemetry(*mt));
}
Ok(())
}
metric::Event::TimerFlush => {
self.state.get_global("tick");
Expand All @@ -423,11 +425,13 @@ impl filter::Filter for ProgrammableFilter {

self.state.call(1, 0);

Ok(pyld.logs
.into_iter()
.map(|m| metric::Event::new_log(*m))
.chain(pyld.metrics.into_iter().map(|m| metric::Event::new_telemetry(*m)))
.collect())
for lg in pyld.logs {
res.push(metric::Event::new_log(*lg));
}
for mt in pyld.metrics {
res.push(metric::Event::new_telemetry(*mt));
}
Ok(())
}
metric::Event::Log(mut l) => {
self.state.get_global("process_log");
Expand All @@ -453,11 +457,13 @@ impl filter::Filter for ProgrammableFilter {

self.state.call(1, 0);

Ok(pyld.logs
.into_iter()
.map(|m| metric::Event::new_log(*m))
.chain(pyld.metrics.into_iter().map(|m| metric::Event::new_telemetry(*m)))
.collect())
for lg in pyld.logs {
res.push(metric::Event::new_log(*lg));
}
for mt in pyld.metrics {
res.push(metric::Event::new_telemetry(*mt));
}
Ok(())
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/source/graphite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ fn handle_stream(mut chans: util::Channel, tags: Arc<metric::TagMap>, stream: Tc
&mut chans,
metric::Event::Telemetry(Arc::new(Some(m))));
}
line.clear();
} else {
let metric = metric::Telemetry::new("cernan.graphite.bad_packet", 1.0)
.aggr_sum()
Expand All @@ -98,6 +99,7 @@ fn handle_stream(mut chans: util::Channel, tags: Arc<metric::TagMap>, stream: Tc
&mut chans,
metric::Event::Telemetry(Arc::new(Some(metric))));
error!("bad packet: {:?}", line);
line.clear();
}
} else {
break;
Expand Down
50 changes: 27 additions & 23 deletions tests/programmable_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ mod integration {
.overlay_tag("bizz", "bazz");
let event = metric::Event::new_telemetry(metric);

let res = cs.process(event.clone());
let mut events = Vec::new();
let res = cs.process(event.clone(), &mut events);
assert!(res.is_ok());
let events = res.ok().unwrap();
assert!(!events.is_empty());
assert_eq!(events.len(), 1);
assert_eq!(events[0], event);
Expand Down Expand Up @@ -58,9 +58,9 @@ mod integration {
let orig_event = metric::Event::new_log(orig_log);
let expected_event = metric::Event::new_log(expected_log);

let res = cs.process(orig_event.clone());
let mut events = Vec::new();
let res = cs.process(orig_event.clone(), &mut events);
assert!(res.is_ok());
let events = res.ok().unwrap();
assert!(!events.is_empty());
assert_eq!(events.len(), 1);
assert_eq!(events[0], expected_event);
Expand All @@ -87,9 +87,9 @@ mod integration {
let orig_event = metric::Event::new_telemetry(orig_metric);
let expected_event = metric::Event::new_telemetry(expected_metric);

let res = cs.process(orig_event.clone());
let mut events = Vec::new();
let res = cs.process(orig_event.clone(), &mut events);
assert!(res.is_ok());
let events = res.ok().unwrap();
assert!(!events.is_empty());
assert_eq!(events.len(), 1);
assert_eq!(events[0], expected_event);
Expand All @@ -114,9 +114,9 @@ mod integration {

let orig_event = metric::Event::new_telemetry(orig_metric);

let res = cs.process(orig_event.clone());
let mut events = Vec::new();
let res = cs.process(orig_event.clone(), &mut events);
assert!(res.is_ok());
let events = res.ok().unwrap();
assert!(!events.is_empty());
assert_eq!(events.len(), 1);
assert_eq!(events[0], orig_event);
Expand All @@ -141,8 +141,10 @@ mod integration {

let orig_event = metric::Event::new_telemetry(orig_metric);

let res = cs.process(orig_event);
let mut events = Vec::new();
let res = cs.process(orig_event, &mut events);
assert!(res.is_err());
assert!(events.is_empty());
}

#[test]
Expand All @@ -169,9 +171,9 @@ mod integration {
let orig_event = metric::Event::new_log(orig_log);
let expected_event = metric::Event::new_log(expected_log);

let res = cs.process(orig_event);
let mut events = Vec::new();
let res = cs.process(orig_event, &mut events);
assert!(res.is_ok());
let events = res.ok().unwrap();
assert!(!events.is_empty());
assert_eq!(events.len(), 1);
assert_eq!(events[0], expected_event);
Expand All @@ -197,9 +199,9 @@ mod integration {
let orig_event = metric::Event::new_telemetry(orig_metric);
let expected_event = metric::Event::new_telemetry(expected_metric);

let res = cs.process(orig_event);
let mut events = Vec::new();
let res = cs.process(orig_event, &mut events);
assert!(res.is_ok());
let events = res.ok().unwrap();
assert!(!events.is_empty());
assert_eq!(events.len(), 1);
assert_eq!(events[0], expected_event);
Expand Down Expand Up @@ -229,12 +231,13 @@ mod integration {

let flush = metric::Event::TimerFlush;

let mut events = Vec::new();
for ev in &[metric0, metric1, metric2, log0, log1] {
let _ = cs.process(ev.clone());
let _ = cs.process(ev.clone(), &mut events);
}
let res = cs.process(flush.clone());
events.clear();
let res = cs.process(flush.clone(), &mut events);
assert!(res.is_ok());
let events = res.ok().unwrap();

assert!(!events.is_empty());
assert_eq!(events.len(), 2);
Expand All @@ -245,12 +248,13 @@ mod integration {
metric::Event::new_log(metric::LogLine::new("filters.keep_count",
"count_per_tick: 5")));

events.clear();
for ev in &[log2, log3] {
let _ = cs.process(ev.clone());
let _ = cs.process(ev.clone(), &mut events);
}
let res = cs.process(flush);
events.clear();
let res = cs.process(flush, &mut events);
assert!(res.is_ok());
let events = res.ok().unwrap();

assert!(!events.is_empty());
assert_eq!(events.len(), 2);
Expand Down Expand Up @@ -282,9 +286,9 @@ mod integration {
let metric = metric::Telemetry::new(orig, 12.0);
let event = metric::Event::new_telemetry(metric);

let res = cs.process(event);
let mut events = Vec::new();
let res = cs.process(event, &mut events);
assert!(res.is_ok());
let events = res.ok().unwrap();
assert!(!events.is_empty());
assert_eq!(events.len(), 1);

Expand Down Expand Up @@ -320,9 +324,9 @@ mod integration {
let metric = metric::Telemetry::new(orig, 12.0);
let event = metric::Event::new_telemetry(metric);

let res = cs.process(event);
let mut events = Vec::new();
let res = cs.process(event, &mut events);
assert!(res.is_ok());
let events = res.ok().unwrap();
assert!(!events.is_empty());
assert_eq!(events.len(), 1);

Expand Down

0 comments on commit d748c9e

Please sign in to comment.