Skip to content

Commit

Permalink
Otlp Retry Part2 - Introduce transmission handler (#5367)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishweshbankwar authored Feb 23, 2024
1 parent c87134d commit 7e0213d
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ protected BaseOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpC
/// <inheritdoc/>
public ExportClientResponse SendExportRequest(TRequest request, CancellationToken cancellationToken = default)
{
// `HttpClient.Timeout.TotalMilliseconds` would be populated with the correct timeout value for both the exporter configuration cases:
// 1. User provides their own HttpClient. This case is straightforward as the user wants to use their `HttpClient` and thereby the same client's timeout value.
// 2. If the user configures timeout via the exporter options, then the timeout set for the `HttpClient` initialized by the exporter will be set to user provided value.
DateTime deadline = DateTime.UtcNow.AddMilliseconds(this.HttpClient.Timeout.TotalMilliseconds);
try
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#if NET6_0_OR_GREATER
using System.Diagnostics.CodeAnalysis;
#endif
using System.Diagnostics.Tracing;
using OpenTelemetry.Internal;

Expand Down Expand Up @@ -33,6 +30,15 @@ public void ExportMethodException(Exception ex, bool isRetry = false)
}
}

[NonEvent]
public void TrySubmitRequestException(Exception ex)
{
if (Log.IsEnabled(EventLevel.Error, EventKeywords.All))
{
this.TrySubmitRequestException(ex.ToInvariantString());
}
}

[Event(2, Message = "Exporter failed send data to collector to {0} endpoint. Data will not be sent. Exception: {1}", Level = EventLevel.Error)]
public void FailedToReachCollector(string rawCollectorUri, string ex)
{
Expand All @@ -45,9 +51,6 @@ public void CouldNotTranslateActivity(string className, string methodName)
this.WriteEvent(3, className, methodName);
}

#if NET6_0_OR_GREATER
[UnconditionalSuppressMessage("ReflectionAnalysis", "IL2026:RequiresUnreferencedCode", Justification = "Parameters to this method are primitive and are trimmer safe.")]
#endif
[Event(4, Message = "Unknown error in export method. Message: '{0}'. IsRetry: {1}", Level = EventLevel.Error)]
public void ExportMethodException(string ex, bool isRetry)
{
Expand Down Expand Up @@ -83,4 +86,10 @@ public void InvalidEnvironmentVariable(string key, string value)
{
this.WriteEvent(11, key, value);
}

[Event(12, Message = "Unknown error in TrySubmitRequest method. Message: '{0}'", Level = EventLevel.Error)]
public void TrySubmitRequestException(string ex)
{
this.WriteEvent(12, ex);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

using System.Diagnostics;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;

internal class OtlpExporterTransmissionHandler<TRequest>
{
public OtlpExporterTransmissionHandler(IExportClient<TRequest> exportClient)
{
Guard.ThrowIfNull(exportClient);

this.ExportClient = exportClient;
}

protected IExportClient<TRequest> ExportClient { get; }

/// <summary>
/// Attempts to send an export request to the server.
/// </summary>
/// <param name="request">The request to send to the server.</param>
/// <returns> <see langword="true" /> if the request is sent successfully; otherwise, <see
/// langword="false" />.
/// </returns>
public bool TrySubmitRequest(TRequest request)
{
try
{
var response = this.ExportClient.SendExportRequest(request);
if (response.Success)
{
return true;
}

return this.OnSubmitRequestFailure(request, response);
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.TrySubmitRequestException(ex);
return false;
}
}

/// <summary>
/// Attempts to shutdown the transmission handler, blocks the current thread
/// until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <see langword="true" /> if shutdown succeeded; otherwise, <see
/// langword="false" />.
/// </returns>
public bool Shutdown(int timeoutMilliseconds)
{
Guard.ThrowIfInvalidTimeout(timeoutMilliseconds);

var sw = timeoutMilliseconds == Timeout.Infinite ? null : Stopwatch.StartNew();

this.OnShutdown(timeoutMilliseconds);

if (sw != null)
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

return this.ExportClient.Shutdown((int)Math.Max(timeout, 0));
}

return this.ExportClient.Shutdown(timeoutMilliseconds);
}

/// <summary>
/// Fired when the transmission handler is shutdown.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
protected virtual void OnShutdown(int timeoutMilliseconds)
{
}

/// <summary>
/// Fired when a request could not be submitted.
/// </summary>
/// <param name="request">The request that was attempted to send to the server.</param>
/// <param name="response"><see cref="ExportClientResponse" />.</param>
/// <returns><see langword="true" /> If the request is resubmitted and succeeds; otherwise, <see
/// langword="false" />.</returns>
protected virtual bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response)
{
return false;
}

/// <summary>
/// Fired when resending a request to the server.
/// </summary>
/// <param name="request">The request to be resent to the server.</param>
/// <param name="response"><see cref="ExportClientResponse" />.</param>
/// <returns><see langword="true" /> If the retry succeeds; otherwise, <see
/// langword="false" />.</returns>
protected bool TryRetryRequest(TRequest request, out ExportClientResponse response)
{
response = this.ExportClient.SendExportRequest(request);
if (!response.Success)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(response.Exception, isRetry: true);
return false;
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#if NETSTANDARD2_1 || NET6_0_OR_GREATER
using Grpc.Net.Client;
#endif
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using LogOtlpCollector = OpenTelemetry.Proto.Collector.Logs.V1;
using MetricsOtlpCollector = OpenTelemetry.Proto.Collector.Metrics.V1;
using TraceOtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1;
Expand Down Expand Up @@ -87,6 +88,15 @@ public static THeaders GetHeaders<THeaders>(this OtlpExporterOptions options, Ac
return headers;
}

public static OtlpExporterTransmissionHandler<TraceOtlpCollector.ExportTraceServiceRequest> GetTraceExportTransmissionHandler(this OtlpExporterOptions options)
=> new(GetTraceExportClient(options));

public static OtlpExporterTransmissionHandler<MetricsOtlpCollector.ExportMetricsServiceRequest> GetMetricsExportTransmissionHandler(this OtlpExporterOptions options)
=> new(GetMetricsExportClient(options));

public static OtlpExporterTransmissionHandler<LogOtlpCollector.ExportLogsServiceRequest> GetLogsExportTransmissionHandler(this OtlpExporterOptions options)
=> new(GetLogExportClient(options));

public static IExportClient<TraceOtlpCollector.ExportTraceServiceRequest> GetTraceExportClient(this OtlpExporterOptions options) =>
options.Protocol switch
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

using System.Diagnostics;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Internal;
using OpenTelemetry.Logs;
using OtlpCollector = OpenTelemetry.Proto.Collector.Logs.V1;
Expand All @@ -19,7 +19,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public sealed class OtlpLogExporter : BaseExporter<LogRecord>
{
private readonly IExportClient<OtlpCollector.ExportLogsServiceRequest> exportClient;
private readonly OtlpExporterTransmissionHandler<OtlpCollector.ExportLogsServiceRequest> transmissionHandler;
private readonly OtlpLogRecordTransformer otlpLogRecordTransformer;

private OtlpResource.Resource? processResource;
Expand All @@ -29,7 +29,7 @@ public sealed class OtlpLogExporter : BaseExporter<LogRecord>
/// </summary>
/// <param name="options">Configuration options for the exporter.</param>
public OtlpLogExporter(OtlpExporterOptions options)
: this(options, sdkLimitOptions: new(), experimentalOptions: new(), exportClient: null)
: this(options, sdkLimitOptions: new(), experimentalOptions: new(), transmissionHandler: null)
{
}

Expand All @@ -39,12 +39,12 @@ public OtlpLogExporter(OtlpExporterOptions options)
/// <param name="exporterOptions">Configuration options for the exporter.</param>
/// <param name="sdkLimitOptions"><see cref="SdkLimitOptions"/>.</param>
/// <param name="experimentalOptions"><see cref="ExperimentalOptions"/>.</param>
/// <param name="exportClient">Client used for sending export request.</param>
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
internal OtlpLogExporter(
OtlpExporterOptions exporterOptions,
SdkLimitOptions sdkLimitOptions,
ExperimentalOptions experimentalOptions,
IExportClient<OtlpCollector.ExportLogsServiceRequest>? exportClient = null)
OtlpExporterTransmissionHandler<OtlpCollector.ExportLogsServiceRequest>? transmissionHandler = null)
{
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");
Expand All @@ -62,14 +62,7 @@ internal OtlpLogExporter(
OpenTelemetryProtocolExporterEventSource.Log.InvalidEnvironmentVariable(key, value);
};

if (exportClient != null)
{
this.exportClient = exportClient;
}
else
{
this.exportClient = exporterOptions!.GetLogExportClient();
}
this.transmissionHandler = transmissionHandler ?? exporterOptions.GetLogsExportTransmissionHandler();

this.otlpLogRecordTransformer = new OtlpLogRecordTransformer(sdkLimitOptions!, experimentalOptions!);
}
Expand All @@ -89,7 +82,7 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
{
request = this.otlpLogRecordTransformer.BuildExportRequest(this.ProcessResource, logRecordBatch);

if (!this.exportClient.SendExportRequest(request).Success)
if (!this.transmissionHandler.TrySubmitRequest(request))
{
return ExportResult.Failure;
}
Expand All @@ -113,6 +106,6 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.exportClient?.Shutdown(timeoutMilliseconds) ?? true;
return this.transmissionHandler?.Shutdown(timeoutMilliseconds) ?? true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Internal;
using OpenTelemetry.Metrics;
using OtlpCollector = OpenTelemetry.Proto.Collector.Metrics.V1;
Expand All @@ -16,7 +16,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public class OtlpMetricExporter : BaseExporter<Metric>
{
private readonly IExportClient<OtlpCollector.ExportMetricsServiceRequest> exportClient;
private readonly OtlpExporterTransmissionHandler<OtlpCollector.ExportMetricsServiceRequest> transmissionHandler;

private OtlpResource.Resource processResource;

Expand All @@ -25,16 +25,18 @@ public class OtlpMetricExporter : BaseExporter<Metric>
/// </summary>
/// <param name="options">Configuration options for the exporter.</param>
public OtlpMetricExporter(OtlpExporterOptions options)
: this(options, null)
: this(options, transmissionHandler: null)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="OtlpMetricExporter"/> class.
/// </summary>
/// <param name="options">Configuration options for the export.</param>
/// <param name="exportClient">Client used for sending export request.</param>
internal OtlpMetricExporter(OtlpExporterOptions options, IExportClient<OtlpCollector.ExportMetricsServiceRequest> exportClient = null)
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
internal OtlpMetricExporter(
OtlpExporterOptions options,
OtlpExporterTransmissionHandler<OtlpCollector.ExportMetricsServiceRequest> transmissionHandler = null)
{
// Each of the Otlp exporters: Traces, Metrics, and Logs set the same value for `OtlpKeyValueTransformer.LogUnsupportedAttributeType`
// and `ConfigurationExtensions.LogInvalidEnvironmentVariable` so it should be fine even if these exporters are used together.
Expand All @@ -48,14 +50,7 @@ internal OtlpMetricExporter(OtlpExporterOptions options, IExportClient<OtlpColle
OpenTelemetryProtocolExporterEventSource.Log.InvalidEnvironmentVariable(key, value);
};

if (exportClient != null)
{
this.exportClient = exportClient;
}
else
{
this.exportClient = options.GetMetricsExportClient();
}
this.transmissionHandler = transmissionHandler ?? options.GetMetricsExportTransmissionHandler();
}

internal OtlpResource.Resource ProcessResource => this.processResource ??= this.ParentProvider.GetResource().ToOtlpResource();
Expand All @@ -72,7 +67,7 @@ public override ExportResult Export(in Batch<Metric> metrics)
{
request.AddMetrics(this.ProcessResource, metrics);

if (!this.exportClient.SendExportRequest(request).Success)
if (!this.transmissionHandler.TrySubmitRequest(request))
{
return ExportResult.Failure;
}
Expand All @@ -93,6 +88,6 @@ public override ExportResult Export(in Batch<Metric> metrics)
/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.exportClient?.Shutdown(timeoutMilliseconds) ?? true;
return this.transmissionHandler.Shutdown(timeoutMilliseconds);
}
}
Loading

0 comments on commit 7e0213d

Please sign in to comment.