Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Otlp Retry Part1 - Refactor ExportClients #5335

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie
/// <typeparam name="TRequest">Type of export request.</typeparam>
internal abstract class BaseOtlpGrpcExportClient<TRequest> : IExportClient<TRequest>
{
protected static readonly ExportClientGrpcResponse SuccessExportResponse = new ExportClientGrpcResponse(success: true, deadlineUtc: null, exception: null);

protected BaseOtlpGrpcExportClient(OtlpExporterOptions options)
{
Guard.ThrowIfNull(options);
Expand All @@ -38,7 +40,7 @@ protected BaseOtlpGrpcExportClient(OtlpExporterOptions options)
internal int TimeoutMilliseconds { get; }

/// <inheritdoc/>
public abstract bool SendExportRequest(TRequest request, CancellationToken cancellationToken = default);
public abstract ExportClientResponse SendExportRequest(TRequest request, CancellationToken cancellationToken = default);

/// <inheritdoc/>
public virtual bool Shutdown(int timeoutMilliseconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie
/// <typeparam name="TRequest">Type of export request.</typeparam>
internal abstract class BaseOtlpHttpExportClient<TRequest> : IExportClient<TRequest>
{
private static readonly ExportClientHttpResponse SuccessExportResponse = new ExportClientHttpResponse(success: true, deadlineUtc: null, response: null, exception: null);

protected BaseOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
{
Guard.ThrowIfNull(options);
Expand All @@ -34,24 +36,33 @@ protected BaseOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpC
internal IReadOnlyDictionary<string, string> Headers { get; }

/// <inheritdoc/>
public bool SendExportRequest(TRequest request, CancellationToken cancellationToken = default)
public ExportClientResponse SendExportRequest(TRequest request, CancellationToken cancellationToken = default)
{
DateTime deadline = DateTime.UtcNow.AddMilliseconds(this.HttpClient.Timeout.TotalMilliseconds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this not use the timeout value from the OtlpExporterOptions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point - It is because user can override it with their own HttpClient instance via HttpClientFactory. So using the timeout value from the client itself and not options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a way to detect if the user actually configured timeout through OtlpExporter options?

Copy link
Member Author

@vishweshbankwar vishweshbankwar Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - even if they are not using custom client. We initialize default client using timeout value set via options.

this.HttpClientFactory = this.DefaultHttpClientFactory = () =>
{
return new HttpClient
{
Timeout = TimeSpan.FromMilliseconds(this.TimeoutMilliseconds),
};
};

So in both the cases we will have the correct deadline here.

Edit: In case of custom client, the timeout set within the client will take precedence over timeout set in options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add this as a comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can add it in my next PR if thats ok.

try
{
using var httpRequest = this.CreateHttpRequest(request);

using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken);

httpResponse?.EnsureSuccessStatusCode();
try
{
httpResponse.EnsureSuccessStatusCode();
}
catch (HttpRequestException ex)
{
return new ExportClientHttpResponse(success: false, deadlineUtc: deadline, response: httpResponse, ex);
}

// We do not need to return back response and deadline for successful response so using cached value.
return SuccessExportResponse;
}
catch (HttpRequestException ex)
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);

return false;
return new ExportClientHttpResponse(success: false, deadlineUtc: deadline, response: null, exception: ex);
}

return true;
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

internal sealed class ExportClientGrpcResponse : ExportClientResponse
{
public ExportClientGrpcResponse(
bool success,
DateTime? deadlineUtc,
Exception? exception)
: base(success, deadlineUtc, exception)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

using System.Net;
#if NETFRAMEWORK
using System.Net.Http;
#endif
using System.Net.Http.Headers;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

internal sealed class ExportClientHttpResponse : ExportClientResponse
{
public ExportClientHttpResponse(
bool success,
DateTime? deadlineUtc,
HttpResponseMessage? response,
Exception? exception)
: base(success, deadlineUtc, exception)
{
this.Headers = response?.Headers;
this.StatusCode = response?.StatusCode;
}

public HttpResponseHeaders? Headers { get; }

public HttpStatusCode? StatusCode { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

internal abstract class ExportClientResponse
{
protected ExportClientResponse(bool success, DateTime? deadlineUtc, Exception? exception)
{
this.Success = success;
this.Exception = exception;
this.DeadlineUtc = deadlineUtc;
}

public bool Success { get; }

public Exception? Exception { get; }

public DateTime? DeadlineUtc { get; }
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

/// <summary>Export client interface.</summary>
Expand All @@ -12,8 +14,8 @@ internal interface IExportClient<in TRequest>
/// </summary>
/// <param name="request">The request to send to the server.</param>
/// <param name="cancellationToken">An optional token for canceling the call.</param>
/// <returns>True if the request has been sent successfully, otherwise false.</returns>
bool SendExportRequest(TRequest request, CancellationToken cancellationToken = default);
/// <returns><see cref="ExportClientResponse"/>.</returns>
ExportClientResponse SendExportRequest(TRequest request, CancellationToken cancellationToken = default);

/// <summary>
/// Method for shutting down the export client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ public OtlpGrpcLogExportClient(OtlpExporterOptions options, OtlpCollector.LogsSe
}

/// <inheritdoc/>
public override bool SendExportRequest(OtlpCollector.ExportLogsServiceRequest request, CancellationToken cancellationToken = default)
public override ExportClientResponse SendExportRequest(OtlpCollector.ExportLogsServiceRequest request, CancellationToken cancellationToken = default)
{
var deadline = DateTime.UtcNow.AddMilliseconds(this.TimeoutMilliseconds);

try
{
this.logsClient.Export(request, headers: this.Headers, deadline: deadline, cancellationToken: cancellationToken);

// We do not need to return back response and deadline for successful response so using cached value.
return SuccessExportResponse;
}
catch (RpcException ex)
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);

return false;
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadline, exception: ex);
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ public OtlpGrpcMetricsExportClient(OtlpExporterOptions options, OtlpCollector.Me
}

/// <inheritdoc/>
public override bool SendExportRequest(OtlpCollector.ExportMetricsServiceRequest request, CancellationToken cancellationToken = default)
public override ExportClientResponse SendExportRequest(OtlpCollector.ExportMetricsServiceRequest request, CancellationToken cancellationToken = default)
{
var deadline = DateTime.UtcNow.AddMilliseconds(this.TimeoutMilliseconds);

try
{
this.metricsClient.Export(request, headers: this.Headers, deadline: deadline, cancellationToken: cancellationToken);

// We do not need to return back response and deadline for successful response so using cached value.
return SuccessExportResponse;
}
catch (RpcException ex)
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);

return false;
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadline, exception: ex);
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ public OtlpGrpcTraceExportClient(OtlpExporterOptions options, OtlpCollector.Trac
}

/// <inheritdoc/>
public override bool SendExportRequest(OtlpCollector.ExportTraceServiceRequest request, CancellationToken cancellationToken = default)
public override ExportClientResponse SendExportRequest(OtlpCollector.ExportTraceServiceRequest request, CancellationToken cancellationToken = default)
{
var deadline = DateTime.UtcNow.AddMilliseconds(this.TimeoutMilliseconds);

try
{
this.traceClient.Export(request, headers: this.Headers, deadline: deadline, cancellationToken: cancellationToken);

// We do not need to return back response and deadline for successful response so using cached value.
return SuccessExportResponse;
}
catch (RpcException ex)
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);

return false;
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadline, exception: ex);
}

return true;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
{
request = this.otlpLogRecordTransformer.BuildExportRequest(this.ProcessResource, logRecordBatch);

if (!this.exportClient.SendExportRequest(request))
if (!this.exportClient.SendExportRequest(request).Success)
{
return ExportResult.Failure;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public override ExportResult Export(in Batch<Metric> metrics)
{
request.AddMetrics(this.ProcessResource, metrics);

if (!this.exportClient.SendExportRequest(request))
if (!this.exportClient.SendExportRequest(request).Success)
{
return ExportResult.Failure;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
{
request.AddBatch(this.sdkLimitOptions, this.ProcessResource, activityBatch);

if (!this.exportClient.SendExportRequest(request))
if (!this.exportClient.SendExportRequest(request).Success)
{
return ExportResult.Failure;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void RunTest(Batch<Activity> batch)
var httpRequest = testHttpHandler.HttpRequestMessage;

// Assert
Assert.True(result);
Assert.True(result.Success);
Assert.NotNull(httpRequest);
Assert.Equal(HttpMethod.Post, httpRequest.Method);
Assert.Equal("http://localhost:4317/", httpRequest.RequestUri.AbsoluteUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,28 @@ internal class TestExportClient<T>(bool throwException = false) : IExportClient<

public bool ThrowException { get; set; } = throwException;

public bool SendExportRequest(T request, CancellationToken cancellationToken = default)
public ExportClientResponse SendExportRequest(T request, CancellationToken cancellationToken = default)
{
if (this.ThrowException)
{
throw new Exception("Exception thrown from SendExportRequest");
}

this.SendExportRequestCalled = true;
return true;
return new TestExportClientResponse(true, null, null);
}

public bool Shutdown(int timeoutMilliseconds)
{
this.ShutdownCalled = true;
return true;
}

private class TestExportClientResponse : ExportClientResponse
{
public TestExportClientResponse(bool success, DateTime? deadline, Exception exception)
: base(success, deadline, exception)
{
}
}
}