Skip to content

Commit

Permalink
Performance overhaul, use pal_statistics ordering to avoid map lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Lopez committed Jul 8, 2020
1 parent b711405 commit 54715a4
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 278 deletions.
13 changes: 7 additions & 6 deletions include/statsdcc/backend_container.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,29 @@ class BackendContainer {
* @param ledger to be processed and flushed
* @param fflusher_id id of the consumer that initiated the process and flush
*/
inline void processAndFlush(Ledger ledger, const int flusher_id) {

inline void processAndFlush(std::unique_ptr<Ledger> &&ledger_ptr, const int flusher_id) {
std::lock_guard<std::mutex> lock(this->lmut);
ledger.process();
ledger_ptr->process();
for (auto backend_itr = this->backends.cbegin();
backend_itr != this->backends.cend();
++backend_itr) {
(*backend_itr)->flush_stats(ledger, flusher_id);
(*backend_itr)->flush_stats(*ledger_ptr, flusher_id);
}
}
}

/**
* flushes the given ledger to the backends
*
* @param ledger to be flushed
* @param fflusher_id id of the consumer that initiated the flush
*/
inline void flush(const Ledger ledger, const int flusher_id) {
inline void flush(std::unique_ptr<Ledger> &&ledger_ptr, const int flusher_id) {
std::lock_guard<std::mutex> lock(this->lmut);
for (auto backend_itr = this->backends.cbegin();
backend_itr != this->backends.cend();
++backend_itr) {
(*backend_itr)->flush_stats(ledger, flusher_id);
(*backend_itr)->flush_stats(*ledger_ptr, flusher_id);
}
}

Expand Down
126 changes: 71 additions & 55 deletions include/statsdcc/ledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,68 @@ class Stdout;
class Repeater;
}

class Metric
{
public:
virtual void update(double value, double sample_rate) = 0;
};

class Counter : public Metric
{
public:
Counter() : counter_(0.0), counter_rate_(0.0)
{
}
void update(double metric_value, double sample_rate) override
{
counter_ += metric_value * (1 / sample_rate);
}
double counter_;
double counter_rate_; // was unused
};

class Set : public Metric
{
public:
void update(double metric_value, double sample_rate) override
{
}
};

class Gauge : public Metric
{
public:
void update(double metric_value, double /*sample_rate*/) override
{
gauge_ = metric_value;
}
double gauge_;
};

class IncrementalGauge : public Metric
{
public:
void update(double metric_value, double /*sample_rate*/) override
{
gauge_ += metric_value;
}
double gauge_;
};

class Timer : public Metric
{
public:
void update(double metric_value, double sample_rate) override
{
counter_ += metric_value * (1 / sample_rate);
timers_.push_back(metric_value);
}

double counter_;
std::vector<double> timers_;
std::unordered_map<std::string, double> timer_data_;
};

/**
* Parses the metric submitted in one of the following format,
* into std collections for easy processing
Expand All @@ -49,49 +111,8 @@ class Ledger {

public:
inline Ledger() {}

inline Ledger(const Ledger& ledger) :
counters(ledger.counters),
timers(ledger.timers),
gauges(ledger.gauges),
sets(ledger.sets),
counter_rates(ledger.counter_rates),
timer_data(ledger.timer_data),
statsd_metrics(ledger.statsd_metrics) {
}

inline Ledger(const Ledger&& ledger) :
counters(std::move(ledger.counters)),
timers(std::move(ledger.timers)),
gauges(std::move(ledger.gauges)),
sets(std::move(ledger.sets)),
counter_rates(std::move(ledger.counter_rates)),
timer_data(std::move(ledger.timer_data)),
statsd_metrics(std::move(ledger.statsd_metrics)) {
}

inline Ledger& operator=(const Ledger& ledger) {
this->counters = ledger.counters;
this->timers = ledger.timers;
this->gauges = ledger.gauges;
this->sets = ledger.sets;
this->counter_rates = ledger.counter_rates;
this->timer_data = ledger.timer_data;
this->statsd_metrics = ledger.statsd_metrics;
return *this;
}

inline Ledger& operator=(const Ledger&& ledger) {
this->counters = std::move(ledger.counters);
this->timers = std::move(ledger.timers);
this->gauges = std::move(ledger.gauges);
this->sets = std::move(ledger.sets);
this->counter_rates = std::move(ledger.counter_rates);
this->timer_data = std::move(ledger.timer_data);
this->statsd_metrics = std::move(ledger.statsd_metrics);
return *this;
}

inline Ledger(const Ledger& ledger) = delete;
inline Ledger& operator=(const Ledger& ledger) = delete;
~Ledger() = default;

/**
Expand All @@ -100,7 +121,8 @@ class Ledger {
* @param metric The metric value to be buffered for processing later
*/
void buffer(const std::string& metric);
void buffer(const std::string& metric_name, double metric_value, const std::string& metric_type);
std::shared_ptr<Metric> buffer(const std::string& metric_name, double metric_value, const std::string& metric_type);
void buffer(const std::shared_ptr<Metric> &metric, double metric_value);

/**
* Aggregates the metric values buffered by buffer method.
Expand All @@ -109,8 +131,8 @@ class Ledger {

inline int bad_lines_seen() {
auto bad_lines_key = ::config->name + ".bad_lines_seen";
return (this->counters.find(bad_lines_key) != this->counters.end())
? static_cast<int>(this->counters[bad_lines_key])
return (this->metrics.find(bad_lines_key) != this->metrics.end())
? static_cast<int>(dynamic_cast<Counter*>(this->metrics[bad_lines_key].get())->counter_)
: 0;
}

Expand All @@ -124,17 +146,11 @@ class Ledger {
std::vector<double> timers;
double timer_counter;
};
std::unordered_map<std::string, double> counters;
std::unordered_map<std::string, TimerData> timers;
std::unordered_map<std::string, double> gauges;
std::unordered_map<std::string, std::unordered_set<std::string> > sets;
void addBadLine(const std::string &name);

std::unordered_map<std::string, double> counter_rates;

std::unordered_map<
std::string, std::unordered_map<std::string, double>
> timer_data;

std::unordered_map<std::string, std::shared_ptr<Metric>> metrics;
std::unordered_map<std::string, std::unordered_set<std::string> > sets;
std::unordered_map<std::string, std::int64_t> statsd_metrics;
};

Expand Down
8 changes: 5 additions & 3 deletions include/statsdcc/net/servers/socket/ros_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace statsdcc
// forward declarations
class BackendContainer;
class Ledger;

class Metric;
namespace net
{
namespace servers
Expand All @@ -37,14 +37,16 @@ class ROSServer : public Server
{
public:
typedef std::vector<std::string> MetricTypes;
typedef std::vector<std::shared_ptr<statsdcc::Metric>> Metrics;
typedef std::pair<std::string, MetricTypes> Rule;
typedef std::vector<Rule> Rules;

typedef std::unordered_map<std::string, MetricTypes> StatMap;
typedef std::unordered_map<std::string, Metrics> StatMap;

typedef std::vector<std::string> StatsNames;
typedef std::pair<StatsNames, uint32_t> StatsNamesVersion;
typedef std::unordered_map<std::string, StatsNamesVersion> TopicsStatsNames;
typedef std::unordered_map<std::string, std::vector<Metrics>> TopicMetrics;

public:
/**
Expand Down Expand Up @@ -86,7 +88,7 @@ class ROSServer : public Server
std::vector<ros::Subscriber> subs_;
std::vector<Rules> topics_rules_;
TopicsStatsNames topics_stats_names_;
StatMap stat_map_;
TopicMetrics topic_metrics_;

std::unique_ptr<Ledger> ledger_;
bool flush_ledger_;
Expand Down
140 changes: 68 additions & 72 deletions lib/backends/carbon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,90 +72,86 @@ void Carbon::flush_stats(const Ledger& ledger, int flusher_id) {
fqdn_prefix = fqdn;
}

// counters
for (auto counter_itr = ledger.counters.cbegin();
counter_itr != ledger.counters.cend();
++counter_itr) {
std::string key = counter_itr->first;
std::string value =
std::to_string(static_cast<long double>(counter_itr->second));

std::string value_per_second =
std::to_string(static_cast<long double>(ledger.counter_rates.at(key)));
for (const auto &m : ledger.metrics) {
std::string key = m.first;
Metric* metric = m.second.get();
Counter* counter = dynamic_cast<Counter*>(metric);
Timer* timer = dynamic_cast<Timer*>(metric);
Gauge* gauge = dynamic_cast<Gauge*>(metric);

if (counter)
{
std::string value =
std::to_string(static_cast<long double>(counter->counter_));

std::string value_per_second =
std::to_string(static_cast<long double>(counter->counter_rate_));
// get the destination carbon hostport
Hostport n = this->hashring->get(key);

std::string metric_name = this->prefix + fqdn_prefix;
if (this->use_metric_type_prefix) {
metric_name = metric_name + "counters.";
}
metric_name = metric_name + this->process_name(key);

stat_strings[n] +=
metric_name + ".rate"
+ " "
+ value_per_second
+ ts_suffix;

// get the destination carbon hostport
Hostport n = this->hashring->get(key);
stat_strings[n] +=
metric_name + ".count"
+ " "
+ value
+ ts_suffix;

std::string metric_name = this->prefix + fqdn_prefix;
if (this->use_metric_type_prefix) {
metric_name = metric_name + "counters.";
++num_stats;
}
metric_name = metric_name + this->process_name(key);

stat_strings[n] +=
metric_name + ".rate"
+ " "
+ value_per_second
+ ts_suffix;

stat_strings[n] +=
metric_name + ".count"
+ " "
+ value
+ ts_suffix;

++num_stats;
}

// timers
for (auto timer_itr = ledger.timer_data.cbegin();
timer_itr != ledger.timer_data.cend();
++timer_itr) {
std::string key = timer_itr->first;
std::string metric_name = this->prefix + fqdn_prefix;
if (this->use_metric_type_prefix) {
metric_name = metric_name + "timers.";
else if (timer)
{
std::string metric_name = this->prefix + fqdn_prefix;
if (this->use_metric_type_prefix) {
metric_name = metric_name + "timers.";
}
metric_name = metric_name + this->process_name(key);

for (auto timer_data_itr = timer->timer_data_.cbegin();
timer_data_itr != timer->timer_data_.cend();
++timer_data_itr) {
std::string timer_data_key = timer_data_itr->first;

std::string value = std::to_string(
static_cast<long double>(timer_data_itr->second));

stat_strings[this->hashring->get(key)] +=
metric_name + '.'
+ timer_data_key
+ " "
+ value
+ ts_suffix;
}
++num_stats;
}
metric_name = metric_name + this->process_name(key);

for (auto timer_data_itr = timer_itr->second.cbegin();
timer_data_itr != timer_itr->second.cend();
++timer_data_itr) {
std::string timer_data_key = timer_data_itr->first;
else if (gauge)
{
std::string metric_name = this->prefix + fqdn_prefix;
if (this->use_metric_type_prefix) {
metric_name = metric_name + "gauges.";
}
metric_name = metric_name + this->process_name(key);

std::string value = std::to_string(
static_cast<long double>(timer_data_itr->second));
static_cast<long double>(gauge->gauge_));

stat_strings[this->hashring->get(key)] +=
metric_name + '.'
+ timer_data_key
+ " "
metric_name + " "
+ value
+ ts_suffix;
}
++num_stats;
}

// gauges
for (auto gauge_itr = ledger.gauges.cbegin();
gauge_itr != ledger.gauges.cend();
++gauge_itr) {
std::string key = gauge_itr->first;
std::string metric_name = this->prefix + fqdn_prefix;
if (this->use_metric_type_prefix) {
metric_name = metric_name + "gauges.";
++num_stats;
}
metric_name = metric_name + this->process_name(key);

std::string value = std::to_string(
static_cast<long double>(gauge_itr->second));

stat_strings[this->hashring->get(key)] +=
metric_name + " "
+ value
+ ts_suffix;

++num_stats;
}

// sets
Expand Down
Loading

0 comments on commit 54715a4

Please sign in to comment.