Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sending custom headers to export endpoint #79

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/batch_exporter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ class BatchExporter {
int attrSize{0};
};

BatchExporter(StrView target, bool ssl, const std::string& trustedCert,
BatchExporter(const Target& target,
size_t batchSize, size_t batchCount,
const std::map<StrView, StrView>& resourceAttrs) :
batchSize(batchSize), client(std::string(target), ssl, trustedCert)
batchSize(batchSize), client(target)
{
free.reserve(batchCount);
while (batchCount-- > 0) {
Expand Down
60 changes: 50 additions & 10 deletions src/http_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct MainConf : MainConfBase {
std::map<StrView, StrView> resourceAttrs;
bool ssl;
std::string trustedCert;
Target::HeaderVec headers;
};

struct SpanAttr {
Expand All @@ -49,6 +50,7 @@ char* setExporter(ngx_conf_t* cf, ngx_command_t* cmd, void* conf);
char* addResourceAttr(ngx_conf_t* cf, ngx_command_t* cmd, void* conf);
char* addSpanAttr(ngx_conf_t* cf, ngx_command_t* cmd, void* conf);
char* setTrustedCertificate(ngx_conf_t* cf, ngx_command_t* cmd, void* conf);
char* addExporterHeader(ngx_conf_t* cf, ngx_command_t* cmd, void* conf);

namespace Propagation {

Expand Down Expand Up @@ -120,6 +122,10 @@ ngx_command_t gExporterCommands[] = {
NGX_CONF_TAKE1,
setTrustedCertificate },

{ ngx_string("header"),
NGX_CONF_TAKE2,
addExporterHeader },

{ ngx_string("interval"),
NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
Expand Down Expand Up @@ -576,10 +582,14 @@ ngx_int_t initWorkerProcess(ngx_cycle_t* cycle)
}

try {
Target target;
target.endpoint = std::string(toStrView(mcf->endpoint));
target.ssl = mcf->ssl;
target.trustedCert = mcf->trustedCert;
target.headers = mcf->headers;

gExporter.reset(new BatchExporter(
toStrView(mcf->endpoint),
mcf->ssl,
mcf->trustedCert,
target,
mcf->batchSize,
mcf->batchCount,
mcf->resourceAttrs));
Expand Down Expand Up @@ -648,7 +658,7 @@ char* setExporter(ngx_conf_t* cf, ngx_command_t* cmd, void* conf)
continue;
}

if (cf->args->nelts != 2) {
if (cf->args->nelts != static_cast<unsigned>(ffs(cmd->type))) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid number of arguments in \"%V\" "
"directive of \"otel_exporter\"", name);
Expand Down Expand Up @@ -711,7 +721,8 @@ char* addResourceAttr(ngx_conf_t* cf, ngx_command_t* cmd, void* conf)
return NGX_CONF_OK;
}

char* setTrustedCertificate(ngx_conf_t* cf, ngx_command_t* cmd, void* conf) {
char* setTrustedCertificate(ngx_conf_t* cf, ngx_command_t* cmd, void* conf)
{
auto path = ((ngx_str_t*)cf->args->elts)[1];
auto mcf = getMainConf(cf);

Expand All @@ -727,11 +738,13 @@ char* setTrustedCertificate(ngx_conf_t* cf, ngx_command_t* cmd, void* conf) {
return (char*)NGX_CONF_ERROR;
}
file.exceptions(std::ios::failbit | std::ios::badbit);
file.seekg(0, std::ios::end);
size_t size = file.tellg();
mcf->trustedCert.resize(size);
file.peek(); // trigger early error for dirs

size_t size = file.seekg(0, std::ios::end).tellg();
file.seekg(0);
file.read(&mcf->trustedCert[0], mcf->trustedCert.size());

mcf->trustedCert.resize(size);
file.read(&mcf->trustedCert[0], size);
} catch (const std::exception& e) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"failed to read \"%V\": %s", &path, e.what());
Expand All @@ -741,6 +754,33 @@ char* setTrustedCertificate(ngx_conf_t* cf, ngx_command_t* cmd, void* conf) {
return NGX_CONF_OK;
}

char* addExporterHeader(ngx_conf_t* cf, ngx_command_t* cmd, void* conf)
{
auto args = (ngx_str_t*)cf->args->elts;

// don't force on users lower case name requirement of gRPC
ngx_strlow(args[1].data, args[1].data, args[1].len);

try {
// validate header here to avoid runtime assert failure in gRPC
auto name = toStrView(args[1]);
if (!Target::validateHeaderName(name)) {
return (char*)"has invalid header name";
}
auto value = toStrView(args[2]);
if (!Target::validateHeaderValue(value)) {
return (char*)"has invalid header value";
}

getMainConf(cf)->headers.emplace_back(name, value);
} catch (const std::exception& e) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "OTel: %s", e.what());
return (char*)NGX_CONF_ERROR;
}

return NGX_CONF_OK;
}

void* createMainConf(ngx_conf_t* cf)
{
auto cln = ngx_pool_cleanup_add(cf->pool, sizeof(MainConf));
Expand Down Expand Up @@ -769,7 +809,7 @@ void* createMainConf(ngx_conf_t* cf)

char* initMainConf(ngx_conf_t* cf, void* conf)
{
auto mcf = (MainConf*)conf;
auto mcf = getMainConf(cf);

ngx_conf_init_msec_value(mcf->interval, 5000);
ngx_conf_init_size_value(mcf->batchSize, 512);
Expand Down
37 changes: 32 additions & 5 deletions src/trace_service_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,27 @@

namespace otel_proto_trace = opentelemetry::proto::collector::trace::v1;

struct Target {
typedef std::vector<std::pair<std::string, std::string>> HeaderVec;

std::string endpoint;
bool ssl;
std::string trustedCert;
HeaderVec headers;

static bool validateHeaderName(StrView name)
{
return grpc_header_key_is_legal(
grpc_slice_from_static_buffer(name.data(), name.size()));
}

static bool validateHeaderValue(StrView value)
{
return grpc_header_nonbin_value_is_legal(
grpc_slice_from_static_buffer(value.data(), value.size()));
}
};

class TraceServiceClient {
public:
typedef otel_proto_trace::ExportTraceServiceRequest Request;
Expand All @@ -17,18 +38,18 @@ class TraceServiceClient {
typedef std::function<void (Request, Response, grpc::Status)>
ResponseCb;

TraceServiceClient(const std::string& target, bool ssl,
const std::string& trustedCert)
TraceServiceClient(const Target& target) : headers(target.headers)
{
std::shared_ptr<grpc::ChannelCredentials> creds;
if (ssl) {
if (target.ssl) {
grpc::SslCredentialsOptions options;
options.pem_root_certs = trustedCert;
options.pem_root_certs = target.trustedCert;

creds = grpc::SslCredentials(options);
} else {
creds = grpc::InsecureChannelCredentials();
}
auto channel = grpc::CreateChannel(target, creds);
auto channel = grpc::CreateChannel(target.endpoint, creds);
channel->GetState(true); // trigger 'connecting' state

stub = TraceService::NewStub(channel);
Expand All @@ -38,6 +59,10 @@ class TraceServiceClient {
{
std::unique_ptr<ActiveCall> call{new ActiveCall{}};

for (auto& header : headers) {
call->context.AddMetadata(header.first, header.second);
}

call->request = std::move(req);
call->cb = std::move(cb);

Expand Down Expand Up @@ -107,6 +132,8 @@ class TraceServiceClient {
ResponseCb cb;
};

Target::HeaderVec headers;

std::unique_ptr<TraceService::Stub> stub;
grpc::CompletionQueue queue;

Expand Down
25 changes: 25 additions & 0 deletions tests/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
interval {{ interval or "1ms" }};
batch_size 3;
batch_count 3;

{{ exporter_opts }}
jimf5 marked this conversation as resolved.
Show resolved Hide resolved
}

otel_trace on;
Expand Down Expand Up @@ -288,3 +290,26 @@ def test_custom_resource_attributes(client, trace_service):
assert get_attr(batch.resource, "service.name") == "test_service"
assert get_attr(batch.resource, "my.name") == "my name"
assert get_attr(batch.resource, "my.service") == "my service"


@pytest.mark.parametrize(
"nginx_config",
[
{
"exporter_opts": """
header X-API-TOKEN api.value;
header Authorization "Basic value";
""",
}
],
indirect=True,
)
@pytest.mark.parametrize("trace_service", ["skip_otelcol"], indirect=True)
def test_exporter_headers(client, trace_service):
assert client.get("http://127.0.0.1:18080/ok").status_code == 200

assert trace_service.get_span().name == "/ok"

headers = dict(trace_service.last_metadata)
assert headers["x-api-token"] == "api.value"
assert headers["authorization"] == "Basic value"
11 changes: 8 additions & 3 deletions tests/trace_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class TraceService(trace_service_pb2_grpc.TraceServiceServicer):

def Export(self, request, context):
self.batches.append(request.resource_spans)
self.last_metadata = context.invocation_metadata()
return trace_service_pb2.ExportTracePartialSuccess()

def get_batch(self):
Expand All @@ -31,13 +32,17 @@ def get_span(self):


@pytest.fixture(scope="module")
def trace_service(pytestconfig, logger):
def trace_service(request, pytestconfig, logger):
server = grpc.server(concurrent.futures.ThreadPoolExecutor())
trace_service = TraceService()
trace_service_pb2_grpc.add_TraceServiceServicer_to_server(
trace_service, server
)
listen_addr = f"127.0.0.1:{24317 if pytestconfig.option.otelcol else 14317}"
trace_service.use_otelcol = (
pytestconfig.option.otelcol
and getattr(request, "param", "") != "skip_otelcol"
)
listen_addr = f"127.0.0.1:{24317 if trace_service.use_otelcol else 14317}"
server.add_insecure_port(listen_addr)
logger.info(f"Starting trace service at {listen_addr}...")
server.start()
Expand All @@ -48,7 +53,7 @@ def trace_service(pytestconfig, logger):

@pytest.fixture(scope="module")
def otelcol(pytestconfig, testdir, logger, trace_service):
if pytestconfig.option.otelcol is None:
if not trace_service.use_otelcol:
yield
return

Expand Down