-
Notifications
You must be signed in to change notification settings - Fork 305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add instructions to configure ConfluentKafka instrumentation #1935
Comments
Update:I just realized that the InstrumentedProducerBuilder derives from the Confluent ProducerBuilder and has all the methods I need. Please ignore my original comment. Original comment: (Obsolete)@g7ed6e using Confluent.Kafka;
public static class ProducerBuilderExtensions
{
public static ProducerBuilder<TKey, TValue> UseDefaultErrorHandler<TKey, TValue>(this ProducerBuilder<TKey, TValue> builder, ILogger logger)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(logger);
builder.SetErrorHandler((self, e) =>
{
// TODO: figure out how periodic Local_AllBrokersDown could be avoided via configuration
var logLevel = e.Code == ErrorCode.Local_AllBrokersDown ? LogLevel.Debug : LogLevel.Error;
logger.Log(logLevel, "Kafka producer error: ErrorCode:{ErrorCode}, Reason:{Reason}, Name:{Name}", e.Code, e.Reason, self.Name);
});
return builder;
}
public static ProducerBuilder<TKey, TValue> UseDefaultLogHandler<TKey, TValue>(this ProducerBuilder<TKey, TValue> builder, ILogger logger)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(logger);
builder.SetLogHandler((_, log) => log.LogWithLogger("Kafka producer", logger));
return builder;
}
} |
It seems I can only use Could we have another constructor with an additional Options parameter? Update 1:I hacked these constructors to the library and I was able to replace all of my ProducerBuilder and ConsumerBuilder lines. Please let us use this library without having all the Kafka consumers and producers needed to be migrated to DI. public InstrumentedConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config, bool enableMetrics, bool enabledTraces)
: base(config)
{
this.Options = new ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>
{
Metrics = enableMetrics,
Traces = enabledTraces,
};
} public InstrumentedProducerBuilder(IEnumerable<KeyValuePair<string, string>> config, bool enableMetrics, bool enabledTraces)
: base(config)
{
this.Options = new ConfluentKafkaProducerInstrumentationOptions<TKey, TValue>
{
Metrics = enableMetrics,
Traces = enabledTraces,
};
} builder.Services.AddOpenTelemetry()
.WithMetrics(c =>
{
c.AddMeter(ConfluentKafkaCommon.InstrumentationName);
c.AddOtlpExporter();
})
.WithTracing(c =>
{
c.AddSource(ConfluentKafkaCommon.InstrumentationName);
c.AddOtlpExporter();
}) |
I echo what @t03apt mentioned in having the ability to enable tracing/metrics without requiring DI (I am not sure exactly how the API surface should look) What I have been looking into is getting tracing working with Akka.Streams.Kafka https://github.com/mhbuck/KafkaTracingExamples/ Currently my Akka.Streams.Kafka example is not working (Consumer is not working but the producer is working) but I expect that is more due to how I am configuring Akka.Streams.Kafka. The DI approach makes a number of assumptions around how configuration of consumers/producers should be handled that does not fit some approaches. What is reasoning behind the "Options" approach of having an object that determines if metrics/traces are enabled as looking through other Instrumentation packages is seems unique. (Apologies if this is discussed somewhere I had a look through the original PR to see if there were any comments on it #1493) |
Originally posted by @vishweshbankwar in #1493 (comment)
The text was updated successfully, but these errors were encountered: