Skip to content

Commit

Permalink
Merge pull request #8 from avito-tech/0.3.3
Browse files Browse the repository at this point in the history
0.3.3
  • Loading branch information
Albibek authored Jun 1, 2018
2 parents 2096b9d + 521fcad commit 7bbf88c
Show file tree
Hide file tree
Showing 14 changed files with 1,333 additions and 975 deletions.
17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
[package]
name = "bioyino"
version = "0.3.2"
version = "0.3.3"
authors = ["Sergey Noskov aka Albibek <albibek@gmail.com>"]
description = "StatsD-compatible, high-performance, fault-tolerant metric aggregator"

[dependencies]
clap="^2.29"
clap="^2.31"
failure="^0.1"
failure_derive="^0.1"
libc="^0.2"
num_cpus="^1.4"
num_cpus="^1.8"
futures="^0.1"
tokio="^0.1"
tokio-core="^0.1"
tokio-io="^0.1"
bytes="^0.4"
resolve="^0.1"
net2="^0.2"
num="^0.1"
combine="^2.5"
hyper="^0.11"
serde="^1.0"
serde_derive="^1.0"
serde_json="^1.0"
bincode="^0.9"
slog="^2.1"
slog-term="^2.1"
slog-async="^2.1"
bincode="^1.0"
slog="^2.2"
slog-term="^2.4"
slog-async="^2.3"
slog-scope="^4.0"
toml="^0.4"
ftoa = "^0.1"
49 changes: 34 additions & 15 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@

verbosity = "warn"

# Number of async UDP network worker threads, use 0 to use all CPU core
n-threads = 2
# Number of network worker threads in any mode, use 0(not recommended) to use all CPU cores
n-threads = 4

# Prefix for sending own stats
stats-prefix = "resources.monitoring.bioyino"

# How often to gather own stats, in ms. Use 0 to disable (stats are still gathered, but not included in
# metric dump
stats-interval = 10000
# Number of aggregating and counting threads, use 0(not recommended) to use all CPU cores
w-threads = 4

# Queue size for single counting thread before packet is dropped
task-queue-size = 1024

# How often to gather own stats, in ms. Use 0 to disable (stats are still gathered and printed to log,
# but not included in metric dump
stats-interval = 10000

# Prefix for sending own stats
stats-prefix = "resources.monitoring.bioyino"

[metrics]
# Should we provide metrics with top update numbers
# Should we provide metrics that update more than update-counter-threshold times diring aggregation interval
count-updates = true

# Prefix for metric update statistics (no trailing dot!)
update-counter-prefix = "resources.monitoring.bioyino.updates"

Expand All @@ -27,6 +31,27 @@ update-counter-suffix = ""
# Minimal update counter to be reported
update-counter-threshold = 200

[carbon]

# IP and port of the carbon-protocol backend to send aggregated data to
address = "127.0.0.1:2003"

# How often to send metrics to carbon backend, ms
interval = 30000

# How much to sleep when connection to backend fails, ms
connect-delay = 250

# Multiply delay to this value for each consequent connection failure, float
connect-delay-multiplier = 2

# Maximum retry delay, ms
connect-delay-max = 10000

# How much times to retry when sending data to backend before giving up and dropping all metrics
#note, that 0 means 1 try
send-retries = 30

# Network settings
[network]
# Address:port to listen for metrics at
Expand All @@ -35,12 +60,6 @@ listen = "127.0.0.1:8125"
# Address and port for replication/command server to listen on
peer-listen = "127.0.0.1:8136"

# IP and port of the carbon-protocol backend to send aggregated data to
backend = "127.0.0.1:2003"

# How often to send metrics to carbon backend, ms
backend-interval = 30000

# UDP buffer size for single packet. Needs to be around MTU. Packet's bytes after that value
# may be lost
bufsize = 1500
Expand Down
1 change: 0 additions & 1 deletion src/bigint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
});
*/


/*
ToString:
// let formatted = match (value.numer().to_f64(), value.denom().to_f64()) {
Expand Down
132 changes: 132 additions & 0 deletions src/carbon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;

use bytes::{BufMut, Bytes, BytesMut};
use failure::Error;
use ftoa;
use futures::stream;
use futures::{Future, IntoFuture, Sink, Stream};
use tokio::net::TcpStream;
use tokio_io::AsyncRead;
use tokio_io::codec::{Decoder, Encoder};

use errors::GeneralError;

use {Float, AGG_ERRORS};

#[derive(Clone)]
pub struct CarbonBackend {
addr: SocketAddr,

metrics: Arc<Vec<(Bytes, Bytes, Bytes)>>,
}

impl CarbonBackend {
pub(crate) fn new(addr: SocketAddr, ts: Duration, metrics: Arc<Vec<(Bytes, Float)>>) -> Self {
let ts: Bytes = ts.as_secs().to_string().into();

let buf = BytesMut::with_capacity(metrics.len() * 200); // 200 is an approximate for full metric name + value
let (metrics, _) = metrics.iter().fold(
(Vec::new(), buf),
|(mut acc, mut buf), (name, metric)| {
let mut wr = buf.writer();
let buf = match ftoa::write(&mut wr, *metric) {
Ok(()) => {
buf = wr.into_inner();
let metric = buf.take().freeze();
acc.push((name.clone(), metric, ts.clone()));
buf
}
Err(_) => {
AGG_ERRORS.fetch_add(1, Ordering::Relaxed);
wr.into_inner()
}
};
(acc, buf)
},
);
let metrics = Arc::new(metrics);
let self_ = Self { addr, metrics };
self_
}
}

impl IntoFuture for CarbonBackend {
type Item = ();
type Error = GeneralError;
type Future = Box<Future<Item = Self::Item, Error = Self::Error>>;

fn into_future(self) -> Self::Future {
let Self { addr, metrics } = self;

let conn = TcpStream::connect(&addr).map_err(|e| GeneralError::Io(e));
let future = conn.and_then(move |conn| {
let writer = conn.framed(CarbonCodec::new());
//let metric_stream = stream::iter_ok::<_, ()>(metrics.clone());
let metric_stream = stream::iter_ok::<_, ()>(SharedIter::new(metrics));
metric_stream
.map_err(|_| GeneralError::CarbonBackend)
.forward(writer.sink_map_err(|_| GeneralError::CarbonBackend))
.map(|_| ())
});

Box::new(future)
}
}

pub struct SharedIter<T> {
inner: Arc<Vec<T>>,
current: usize,
}

impl<T> SharedIter<T> {
pub fn new(inner: Arc<Vec<T>>) -> Self {
Self { inner, current: 0 }
}
}

impl<T: Clone> Iterator for SharedIter<T> {
type Item = T;
fn next(&mut self) -> Option<T> {
self.current += 1;
self.inner.get(self.current).map(|i| i.clone())
}
}

pub struct CarbonCodec;

impl CarbonCodec {
pub fn new() -> Self {
CarbonCodec //(PhantomData)
}
}

impl Decoder for CarbonCodec {
type Item = ();
// It could be a separate error here, but it's useless, since there is no errors in process of
// encoding
type Error = Error;

fn decode(&mut self, _buf: &mut BytesMut) -> Result<Option<Self::Item>, Error> {
unreachable!()
}
}

impl Encoder for CarbonCodec {
type Item = (Bytes, Bytes, Bytes); // Metric name, suffix value and timestamp
type Error = Error;

fn encode(&mut self, m: Self::Item, buf: &mut BytesMut) -> Result<(), Self::Error> {
let len = m.0.len() + 1 + m.1.len() + 1 + m.2.len() + 1;
buf.reserve(len);
buf.put(m.0);
buf.put(" ");
buf.put(m.1);
buf.put(" ");
buf.put(m.2);
buf.put("\n");
Ok(())
}
}
Loading

0 comments on commit 7bbf88c

Please sign in to comment.