Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement RotatingScope with thread local storage #5573

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 49 additions & 29 deletions source/extensions/filters/http/istio_stats/istio_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "envoy/registry/registry.h"
#include "envoy/server/factory_context.h"
#include "envoy/singleton/manager.h"
#include "envoy/thread_local/thread_local.h"
#include "extensions/common/metadata_object.h"
#include "parser/parser.h"
#include "source/common/grpc/common.h"
Expand Down Expand Up @@ -313,7 +314,7 @@ struct Context : public Singleton::Instance {

using ContextSharedPtr = std::shared_ptr<Context>;

SINGLETON_MANAGER_REGISTRATION(Context)
SINGLETON_MANAGER_REGISTRATION(istio_stats_filter_context)

using google::api::expr::runtime::CelValue;

Expand Down Expand Up @@ -418,21 +419,20 @@ struct MetricOverrides : public Logger::Loggable<Logger::Id::filter> {
// periodically to replace the current scope.
//
// The replaced stats scope is deleted gracefully after a minimum of 1s delay
// for two reasons:
//
// 1. Stats flushing is asynchronous and the data may be lost if not flushed
// before the deletion (see stats_flush_interval).
//
// 2. The implementation avoids locking by releasing a raw pointer to workers.
// When the rotation happens on the main, the raw pointer may still be in-use
// by workers for a short duration.
// because of stats flushing is asynchronous and the data may be lost if not
// flushed before the deletion (see stats_flush_interval).
class RotatingScope : public Logger::Loggable<Logger::Id::filter> {
public:
RotatingScope(Server::Configuration::FactoryContext& factory_context, uint64_t rotate_interval_ms,
uint64_t delete_interval_ms)
: parent_scope_(factory_context.scope()), active_scope_(parent_scope_.createScope("")),
raw_scope_(active_scope_.get()), rotate_interval_ms_(rotate_interval_ms),
delete_interval_ms_(delete_interval_ms) {
tls_scope_(factory_context.serverFactoryContext().threadLocal()),
rotate_interval_ms_(rotate_interval_ms), delete_interval_ms_(delete_interval_ms) {

tls_scope_.set([&scope = *active_scope_](Event::Dispatcher&) {
return std::make_shared<TlsCachedScope>(scope);
});

if (rotate_interval_ms_ > 0) {
ASSERT(delete_interval_ms_ < rotate_interval_ms_);
ASSERT(delete_interval_ms_ >= 1000);
Expand All @@ -452,36 +452,57 @@ class RotatingScope : public Logger::Loggable<Logger::Id::filter> {
delete_timer_.reset();
}
}
Stats::Scope* scope() { return raw_scope_.load(); }
Stats::Scope& scope() { return tls_scope_->_scope; }

private:
struct TlsCachedScope : ThreadLocal::ThreadLocalObject {
TlsCachedScope(Stats::Scope& scope) : _scope(scope){};
std::reference_wrapper<Stats::Scope> _scope;
};

void onRotate() {
ENVOY_LOG(info, "Rotating active Istio stats scope after {}ms.", rotate_interval_ms_);
draining_scope_ = active_scope_;
delete_timer_->enableTimer(std::chrono::milliseconds(delete_interval_ms_));
active_scope_ = parent_scope_.createScope("");
raw_scope_.store(active_scope_.get());
rotate_timer_->enableTimer(std::chrono::milliseconds(rotate_interval_ms_));
tls_scope_.runOnAllThreads(
[&scope = *active_scope_](OptRef<TlsCachedScope> tls_cache) { tls_cache->_scope = scope; },
// Start the delete and rotate timer after the new scope has been propagated to all worker
// threads. The RotatingScope instance can go away before the dispatcher has a chance to
// execute the callback and the still_alive shared_ptr will be deallocated when the current
// instance is deallocated. We rely on a weak_ptr to still_alive flag to determine if the
// instance is still valid.
[this, maybe_still_alive = std::weak_ptr<bool>(still_alive_)]() -> void {
if (!maybe_still_alive.expired()) {
delete_timer_->enableTimer(std::chrono::milliseconds(delete_interval_ms_));
rotate_timer_->enableTimer(std::chrono::milliseconds(rotate_interval_ms_));
}
});
}
void onDelete() {
ENVOY_LOG(info, "Deleting draining Istio stats scope after {}ms.", delete_interval_ms_);
draining_scope_.reset();
}

Stats::Scope& parent_scope_;
Stats::ScopeSharedPtr active_scope_;
std::atomic<Stats::Scope*> raw_scope_;
Stats::ScopeSharedPtr draining_scope_{nullptr};
ThreadLocal::TypedSlot<TlsCachedScope> tls_scope_;
const uint64_t rotate_interval_ms_;
const uint64_t delete_interval_ms_;
Event::TimerPtr rotate_timer_{nullptr};
Event::TimerPtr delete_timer_{nullptr};

// A sentinel shared_ptr used for keeping track of whether the RotatingContext is still alive.
// It is only held by a weak reference in the callback that will be invoked after the new active
// scope has been propagated to all worker threads.
std::shared_ptr<bool> still_alive_{std::make_shared<bool>(true)};
};

struct Config : public Logger::Loggable<Logger::Id::filter> {
Config(const stats::PluginConfig& proto_config,
Server::Configuration::FactoryContext& factory_context)
: context_(factory_context.serverFactoryContext().singletonManager().getTyped<Context>(
SINGLETON_MANAGER_REGISTERED_NAME(Context),
SINGLETON_MANAGER_REGISTERED_NAME(istio_stats_filter_context),
[&factory_context] {
return std::make_shared<Context>(
factory_context.serverFactoryContext().scope().symbolTable(),
Expand Down Expand Up @@ -514,7 +535,7 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
break;
}
if (proto_config.metrics_size() > 0 || proto_config.definitions_size() > 0) {
metric_overrides_ = std::make_unique<MetricOverrides>(context_, scope()->symbolTable());
metric_overrides_ = std::make_unique<MetricOverrides>(context_, scope().symbolTable());
for (const auto& definition : proto_config.definitions()) {
const auto& it = context_->all_metrics_.find(definition.name());
if (it != context_->all_metrics_.end()) {
Expand Down Expand Up @@ -698,12 +719,12 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
return;
}
auto new_tags = parent_.metric_overrides_->overrideTags(metric, tags, expr_values_);
Stats::Utility::counterFromStatNames(*parent_.scope(),
Stats::Utility::counterFromStatNames(parent_.scope(),
{parent_.context_->stat_namespace_, metric}, new_tags)
.add(amount);
return;
}
Stats::Utility::counterFromStatNames(*parent_.scope(),
Stats::Utility::counterFromStatNames(parent_.scope(),
{parent_.context_->stat_namespace_, metric}, tags)
.add(amount);
}
Expand All @@ -717,12 +738,12 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
}
auto new_tags = parent_.metric_overrides_->overrideTags(metric, tags, expr_values_);
Stats::Utility::histogramFromStatNames(
*parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, new_tags)
parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, new_tags)
.recordValue(value);
return;
}
Stats::Utility::histogramFromStatNames(
*parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, tags)
parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, tags)
.recordValue(value);
}

Expand All @@ -735,17 +756,17 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
switch (metric.type_) {
case MetricOverrides::MetricType::Counter:
Stats::Utility::counterFromStatNames(
*parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, tags)
parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, tags)
.add(amount);
break;
case MetricOverrides::MetricType::Histogram:
Stats::Utility::histogramFromStatNames(
*parent_.scope(), {parent_.context_->stat_namespace_, metric.name_},
parent_.scope(), {parent_.context_->stat_namespace_, metric.name_},
Stats::Histogram::Unit::Bytes, tags)
.recordValue(amount);
break;
case MetricOverrides::MetricType::Gauge:
Stats::Utility::gaugeFromStatNames(*parent_.scope(),
Stats::Utility::gaugeFromStatNames(parent_.scope(),
{parent_.context_->stat_namespace_, metric.name_},
Stats::Gauge::ImportMode::Accumulate, tags)
.set(amount);
Expand All @@ -769,14 +790,13 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
tags.push_back({context_->tag_, context_->istio_version_.empty() ? context_->unknown_
: context_->istio_version_});

Stats::Utility::gaugeFromStatNames(*scope(),
{context_->stat_namespace_, context_->istio_build_},
Stats::Utility::gaugeFromStatNames(scope(), {context_->stat_namespace_, context_->istio_build_},
Stats::Gauge::ImportMode::Accumulate, tags)
.set(1);
}

Reporter reporter() const { return reporter_; }
Stats::Scope* scope() { return scope_.scope(); }
Stats::Scope& scope() { return scope_.scope(); }

ContextSharedPtr context_;
RotatingScope scope_;
Expand All @@ -795,7 +815,7 @@ class IstioStatsFilter : public Http::PassThroughFilter,
public Network::ConnectionCallbacks {
public:
IstioStatsFilter(ConfigSharedPtr config)
: config_(config), context_(*config->context_), pool_(config->scope()->symbolTable()),
: config_(config), context_(*config->context_), pool_(config->scope().symbolTable()),
stream_(*config_, pool_) {
tags_.reserve(25);
switch (config_->reporter()) {
Expand Down