Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Otlp Retry Part2 - Introduce transmission handler #5367

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;

internal class OtlpExporterTransmissionHandler<TRequest>
{
public OtlpExporterTransmissionHandler(IExportClient<TRequest> exportClient)
{
Guard.ThrowIfNull(exportClient);

this.ExportClient = exportClient;
}

protected IExportClient<TRequest> ExportClient { get; }

/// <summary>
/// Sends export request to the server.
/// </summary>
/// <param name="request">The request to send to the server.</param>
/// <returns>True if the request is sent successfully or else false.</returns>
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
public bool SubmitRequest(TRequest request)
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
{
try
{
var response = this.ExportClient.SendExportRequest(request);
if (response.Success)
{
return true;
}

return this.OnSubmitRequestFailure(request, response);
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
this.OnRequestDropped(request);
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
}

/// <summary>
/// Attempts to shutdown the transmission handler, blocks the current thread
/// until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <see langword="true" /> if shutdown succeeded; otherwise, <see
/// langword="false" />.
/// </returns>
public bool Shutdown(int timeoutMilliseconds)
{
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
this.OnShutdown();
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved

return this.ExportClient.Shutdown(timeoutMilliseconds);
}

/// <summary>
/// Fired when the transmission handler is shutdown.
/// </summary>
protected virtual void OnShutdown()
{
}

/// <summary>
/// Fired when a request could not be submitted.
/// </summary>
/// <param name="request">The request that was attempted to send to the server.</param>
/// <param name="response"><see cref="ExportClientResponse" />.</param>
/// <returns><see langword="true" /> if the request will be resubmitted.</returns>
protected virtual bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response)
{
this.OnRequestDropped(request);
return false;
}

/// <summary>
/// Fired when a request could not be submitted.
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
/// <param name="request">The request that was attempted to send to the server.</param>
/// <returns><see cref="ExportClientResponse"/>.</returns>
protected ExportClientResponse RetryRequest(TRequest request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be a method of its own? A transmission handler could simply override OnSubmitRequestFailure to implement this logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a common method that can be used by implementations. OnSubmitRequestFailure would call this method along with other logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this TryRetryRequest now that we have TrySubmitRequest? Both have similar mechanics this one just returns more details.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
var response = this.ExportClient.SendExportRequest(request);
if (!response.Success)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(response.Exception, isRetry: true);
}

return response;
}

/// <summary>
/// Fired when a request is dropped.
/// </summary>
/// <param name="request">The request that was attempted to send to the server.</param>
protected virtual void OnRequestDropped(TRequest request)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#if NETSTANDARD2_1 || NET6_0_OR_GREATER
using Grpc.Net.Client;
#endif
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using LogOtlpCollector = OpenTelemetry.Proto.Collector.Logs.V1;
using MetricsOtlpCollector = OpenTelemetry.Proto.Collector.Metrics.V1;
using TraceOtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1;
Expand Down Expand Up @@ -87,6 +88,15 @@ public static THeaders GetHeaders<THeaders>(this OtlpExporterOptions options, Ac
return headers;
}

public static OtlpExporterTransmissionHandler<TraceOtlpCollector.ExportTraceServiceRequest> GetTraceExportTransmissionHandler(this OtlpExporterOptions options)
=> new(GetTraceExportClient(options));

public static OtlpExporterTransmissionHandler<MetricsOtlpCollector.ExportMetricsServiceRequest> GetMetricsExportTransmissionHandler(this OtlpExporterOptions options)
=> new(GetMetricsExportClient(options));

public static OtlpExporterTransmissionHandler<LogOtlpCollector.ExportLogsServiceRequest> GetLogsExportTransmissionHandler(this OtlpExporterOptions options)
=> new(GetLogExportClient(options));

public static IExportClient<TraceOtlpCollector.ExportTraceServiceRequest> GetTraceExportClient(this OtlpExporterOptions options) =>
options.Protocol switch
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

using System.Diagnostics;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Internal;
using OpenTelemetry.Logs;
using OtlpCollector = OpenTelemetry.Proto.Collector.Logs.V1;
Expand All @@ -19,7 +19,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public sealed class OtlpLogExporter : BaseExporter<LogRecord>
{
private readonly IExportClient<OtlpCollector.ExportLogsServiceRequest> exportClient;
private readonly OtlpExporterTransmissionHandler<OtlpCollector.ExportLogsServiceRequest> transmissionHandler;
private readonly OtlpLogRecordTransformer otlpLogRecordTransformer;

private OtlpResource.Resource? processResource;
Expand All @@ -29,7 +29,7 @@ public sealed class OtlpLogExporter : BaseExporter<LogRecord>
/// </summary>
/// <param name="options">Configuration options for the exporter.</param>
public OtlpLogExporter(OtlpExporterOptions options)
: this(options, sdkLimitOptions: new(), experimentalOptions: new(), exportClient: null)
: this(options, sdkLimitOptions: new(), experimentalOptions: new(), transmissionHandler: null)
{
}

Expand All @@ -39,12 +39,12 @@ public OtlpLogExporter(OtlpExporterOptions options)
/// <param name="exporterOptions">Configuration options for the exporter.</param>
/// <param name="sdkLimitOptions"><see cref="SdkLimitOptions"/>.</param>
/// <param name="experimentalOptions"><see cref="ExperimentalOptions"/>.</param>
/// <param name="exportClient">Client used for sending export request.</param>
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
internal OtlpLogExporter(
OtlpExporterOptions exporterOptions,
SdkLimitOptions sdkLimitOptions,
ExperimentalOptions experimentalOptions,
IExportClient<OtlpCollector.ExportLogsServiceRequest>? exportClient = null)
OtlpExporterTransmissionHandler<OtlpCollector.ExportLogsServiceRequest>? transmissionHandler = null)
{
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");
Expand All @@ -62,14 +62,7 @@ internal OtlpLogExporter(
OpenTelemetryProtocolExporterEventSource.Log.InvalidEnvironmentVariable(key, value);
};

if (exportClient != null)
{
this.exportClient = exportClient;
}
else
{
this.exportClient = exporterOptions!.GetLogExportClient();
}
this.transmissionHandler = transmissionHandler ?? exporterOptions.GetLogsExportTransmissionHandler();

this.otlpLogRecordTransformer = new OtlpLogRecordTransformer(sdkLimitOptions!, experimentalOptions!);
}
Expand All @@ -89,7 +82,7 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
{
request = this.otlpLogRecordTransformer.BuildExportRequest(this.ProcessResource, logRecordBatch);

if (!this.exportClient.SendExportRequest(request).Success)
if (!this.transmissionHandler.SubmitRequest(request))
{
return ExportResult.Failure;
}
Expand All @@ -113,6 +106,6 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.exportClient?.Shutdown(timeoutMilliseconds) ?? true;
return this.transmissionHandler?.Shutdown(timeoutMilliseconds) ?? true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Internal;
using OpenTelemetry.Metrics;
using OtlpCollector = OpenTelemetry.Proto.Collector.Metrics.V1;
Expand All @@ -16,7 +16,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public class OtlpMetricExporter : BaseExporter<Metric>
{
private readonly IExportClient<OtlpCollector.ExportMetricsServiceRequest> exportClient;
private readonly OtlpExporterTransmissionHandler<OtlpCollector.ExportMetricsServiceRequest> transmissionHandler;

private OtlpResource.Resource processResource;

Expand All @@ -25,16 +25,18 @@ public class OtlpMetricExporter : BaseExporter<Metric>
/// </summary>
/// <param name="options">Configuration options for the exporter.</param>
public OtlpMetricExporter(OtlpExporterOptions options)
: this(options, null)
: this(options, transmissionHandler: null)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="OtlpMetricExporter"/> class.
/// </summary>
/// <param name="options">Configuration options for the export.</param>
/// <param name="exportClient">Client used for sending export request.</param>
internal OtlpMetricExporter(OtlpExporterOptions options, IExportClient<OtlpCollector.ExportMetricsServiceRequest> exportClient = null)
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
internal OtlpMetricExporter(
OtlpExporterOptions options,
OtlpExporterTransmissionHandler<OtlpCollector.ExportMetricsServiceRequest> transmissionHandler = null)
{
// Each of the Otlp exporters: Traces, Metrics, and Logs set the same value for `OtlpKeyValueTransformer.LogUnsupportedAttributeType`
// and `ConfigurationExtensions.LogInvalidEnvironmentVariable` so it should be fine even if these exporters are used together.
Expand All @@ -48,14 +50,7 @@ internal OtlpMetricExporter(OtlpExporterOptions options, IExportClient<OtlpColle
OpenTelemetryProtocolExporterEventSource.Log.InvalidEnvironmentVariable(key, value);
};

if (exportClient != null)
{
this.exportClient = exportClient;
}
else
{
this.exportClient = options.GetMetricsExportClient();
}
this.transmissionHandler = transmissionHandler ?? options.GetMetricsExportTransmissionHandler();
}

internal OtlpResource.Resource ProcessResource => this.processResource ??= this.ParentProvider.GetResource().ToOtlpResource();
Expand All @@ -72,7 +67,7 @@ public override ExportResult Export(in Batch<Metric> metrics)
{
request.AddMetrics(this.ProcessResource, metrics);

if (!this.exportClient.SendExportRequest(request).Success)
if (!this.transmissionHandler.SubmitRequest(request))
{
return ExportResult.Failure;
}
Expand All @@ -93,6 +88,6 @@ public override ExportResult Export(in Batch<Metric> metrics)
/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.exportClient?.Shutdown(timeoutMilliseconds) ?? true;
return this.transmissionHandler.Shutdown(timeoutMilliseconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

using System.Diagnostics;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Internal;
using OtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1;
using OtlpResource = OpenTelemetry.Proto.Resource.V1;
Expand All @@ -17,7 +17,7 @@ namespace OpenTelemetry.Exporter;
public class OtlpTraceExporter : BaseExporter<Activity>
{
private readonly SdkLimitOptions sdkLimitOptions;
private readonly IExportClient<OtlpCollector.ExportTraceServiceRequest> exportClient;
private readonly OtlpExporterTransmissionHandler<OtlpCollector.ExportTraceServiceRequest> transmissionHandler;
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved

private OtlpResource.Resource processResource;

Expand All @@ -26,7 +26,7 @@ public class OtlpTraceExporter : BaseExporter<Activity>
/// </summary>
/// <param name="options">Configuration options for the export.</param>
public OtlpTraceExporter(OtlpExporterOptions options)
: this(options, new(), null)
: this(options, sdkLimitOptions: new())
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
{
}

Expand All @@ -35,35 +35,22 @@ public OtlpTraceExporter(OtlpExporterOptions options)
/// </summary>
/// <param name="exporterOptions"><see cref="OtlpExporterOptions"/>.</param>
/// <param name="sdkLimitOptions"><see cref="SdkLimitOptions"/>.</param>
/// <param name="exportClient">Client used for sending export request.</param>
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
internal OtlpTraceExporter(
OtlpExporterOptions exporterOptions,
SdkLimitOptions sdkLimitOptions,
IExportClient<OtlpCollector.ExportTraceServiceRequest> exportClient = null)
OtlpExporterOptions exporterOptions,
SdkLimitOptions sdkLimitOptions,
OtlpExporterTransmissionHandler<OtlpCollector.ExportTraceServiceRequest> transmissionHandler = null)
{
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");

this.sdkLimitOptions = sdkLimitOptions;

OtlpKeyValueTransformer.LogUnsupportedAttributeType = (string tagValueType, string tagKey) =>
{
OpenTelemetryProtocolExporterEventSource.Log.UnsupportedAttributeType(tagValueType, tagKey);
};
OtlpKeyValueTransformer.LogUnsupportedAttributeType = OpenTelemetryProtocolExporterEventSource.Log.UnsupportedAttributeType;

ConfigurationExtensions.LogInvalidEnvironmentVariable = (string key, string value) =>
{
OpenTelemetryProtocolExporterEventSource.Log.InvalidEnvironmentVariable(key, value);
};
ConfigurationExtensions.LogInvalidEnvironmentVariable = OpenTelemetryProtocolExporterEventSource.Log.InvalidEnvironmentVariable;

if (exportClient != null)
{
this.exportClient = exportClient;
}
else
{
this.exportClient = exporterOptions.GetTraceExportClient();
}
this.transmissionHandler = transmissionHandler ?? exporterOptions.GetTraceExportTransmissionHandler();
}

internal OtlpResource.Resource ProcessResource => this.processResource ??= this.ParentProvider.GetResource().ToOtlpResource();
Expand All @@ -80,7 +67,7 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
{
request.AddBatch(this.sdkLimitOptions, this.ProcessResource, activityBatch);

if (!this.exportClient.SendExportRequest(request).Success)
if (!this.transmissionHandler.SubmitRequest(request))
{
return ExportResult.Failure;
}
Expand All @@ -101,6 +88,6 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.exportClient?.Shutdown(timeoutMilliseconds) ?? true;
return this.transmissionHandler.Shutdown(timeoutMilliseconds);
}
}
4 changes: 3 additions & 1 deletion test/Benchmarks/Exporter/OtlpGrpcExporterBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
using OpenTelemetryProtocol::OpenTelemetry.Exporter;
using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetryProtocol::OpenTelemetry.Proto.Collector.Trace.V1;

namespace Benchmarks.Exporter;

Expand All @@ -33,7 +35,7 @@ public void GlobalSetup()
this.exporter = new OtlpTraceExporter(
options,
new SdkLimitOptions(),
new OtlpGrpcTraceExportClient(options, new TestTraceServiceClient()));
new OtlpExporterTransmissionHandler<ExportTraceServiceRequest>(new OtlpGrpcTraceExportClient(options, new TestTraceServiceClient())));
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved

this.activity = ActivityHelper.CreateTestActivity();
this.activityBatch = new CircularBuffer<Activity>(this.NumberOfSpans);
Expand Down
Loading