From c073cb9ac7e41a8d4d70784645ced48551c89384 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 5 Apr 2023 12:27:29 +0200 Subject: [PATCH] Refactor Diagnostics (#26) --- .github/workflows/ci.yml | 2 +- .../Elastic.Ingest.Apm.Example/Program.cs | 2 +- src/Elastic.Channels/BufferOptions.cs | 7 - src/Elastic.Channels/BufferedChannelBase.cs | 59 ++++-- src/Elastic.Channels/ChannelOptionsBase.cs | 45 +++-- .../Diagnostics/ChannelDiagnosticsListener.cs | 136 +++++++++++++ .../Diagnostics/ChannelListener.cs | 94 --------- .../Diagnostics/DiagnosticsBufferedChannel.cs | 23 +-- .../Diagnostics/IChannelCallbacks.cs | 187 ++++++++++++++++++ .../Diagnostics/NoopBufferedChannel.cs | 35 +++- .../ResponseItemsBufferedChannelBase.cs | 8 +- .../Diagnostics/ChannelListener.cs | 53 ----- .../ElasticsearchChannelBase.cs | 8 +- .../CustomOtlpTraceExporter.cs | 7 +- .../TransportChannelBase.cs | 8 +- .../TroubleshootTests.cs | 93 +++++++++ .../DataStreamIngestionTests.cs | 2 +- .../IndexIngestionTests.cs | 2 +- .../IngestionCluster.cs | 3 +- 19 files changed, 555 insertions(+), 219 deletions(-) create mode 100644 src/Elastic.Channels/Diagnostics/ChannelDiagnosticsListener.cs delete mode 100644 src/Elastic.Channels/Diagnostics/ChannelListener.cs create mode 100644 src/Elastic.Channels/Diagnostics/IChannelCallbacks.cs delete mode 100644 src/Elastic.Ingest.Elasticsearch/Diagnostics/ChannelListener.cs create mode 100644 tests/Elastic.Channels.Tests/TroubleshootTests.cs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0168fe6..a42ca80 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,7 +41,7 @@ jobs: name: Test build: - runs-on: ubuntu-18.04 + runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 with: diff --git a/examples/Elastic.Ingest.Apm.Example/Program.cs b/examples/Elastic.Ingest.Apm.Example/Program.cs index d174d09..f2a0873 100644 --- a/examples/Elastic.Ingest.Apm.Example/Program.cs +++ b/examples/Elastic.Ingest.Apm.Example/Program.cs @@ -47,7 +47,6 @@ private static int Main(string[] args) WaitHandle = handle, ExportMaxRetries = 3, ExportBackoffPeriod = times => TimeSpan.FromMilliseconds(1), - ExportBufferCallback = () => Console.WriteLine("Flushed"), }; var channelOptions = new ApmChannelOptions(transport) { @@ -59,6 +58,7 @@ private static int Main(string[] args) Interlocked.Increment(ref _responses); Console.WriteLine(r.ApiCallDetails.DebugInformation); }, + ExportBufferCallback = () => Console.WriteLine("Flushed"), ExportMaxRetriesCallback = (list) => Interlocked.Increment(ref _maxRetriesExceeded), ExportRetryCallback = (list) => Interlocked.Increment(ref _retries), ExportExceptionCallback = (e) => _exception = e diff --git a/src/Elastic.Channels/BufferOptions.cs b/src/Elastic.Channels/BufferOptions.cs index f4cb11d..6288828 100644 --- a/src/Elastic.Channels/BufferOptions.cs +++ b/src/Elastic.Channels/BufferOptions.cs @@ -53,13 +53,6 @@ public class BufferOptions /// public Func ExportBackoffPeriod { get; set; } = (i) => TimeSpan.FromSeconds(2 * (i + 1)); - /// - /// Called once after a buffer has been flushed, if the buffer is retried this callback is only called once - /// all retries have been exhausted. Its called regardless of whether the call to - /// succeeded. - /// - public Action? ExportBufferCallback { get; set; } - /// /// Allows you to inject a to wait for N number of buffers to flush. /// diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 35840c9..8d112cb 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -9,6 +9,7 @@ using System.Threading.Channels; using System.Threading.Tasks; using Elastic.Channels.Buffers; +using Elastic.Channels.Diagnostics; namespace Elastic.Channels; @@ -59,11 +60,31 @@ public abstract class BufferedChannelBase private readonly SemaphoreSlim _throttleTasks; private readonly CountdownEvent? _signal; + private readonly ChannelCallbackInvoker _callbacks; + private readonly IChannelDiagnosticsListener? _diagnosticsListener; + + /// + protected BufferedChannelBase(TChannelOptions options) : this(options, null) { } + /// - protected BufferedChannelBase(TChannelOptions options) + protected BufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners) { TokenSource = new CancellationTokenSource(); Options = options; + + var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray(); + _diagnosticsListener = listeners + .Select(l => (l is IChannelDiagnosticsListener c) ? c : null) + .FirstOrDefault(e=> e != null); + if (_diagnosticsListener == null && !options.DisableDiagnostics) + { + // if no debug listener was already provided but was requested explicitly create one. + var l = new ChannelDiagnosticsListener(GetType().Name); + _diagnosticsListener = l; + listeners = listeners.Concat(new[] { l }).ToArray(); + } + _callbacks = new ChannelCallbackInvoker(listeners); + var maxConsumers = Math.Max(1, BufferOptions.ExportMaxConcurrency); _throttleTasks = new SemaphoreSlim(maxConsumers, maxConsumers); _signal = options.BufferOptions.WaitHandle; @@ -133,10 +154,10 @@ public override bool TryWrite(TEvent item) { if (InChannel.Writer.TryWrite(item)) { - Options.PublishToInboundChannelCallback?.Invoke(); + _callbacks.PublishToInboundChannelCallback?.Invoke(); return true; } - Options.PublishToInboundChannelFailureCallback?.Invoke(); + _callbacks.PublishToInboundChannelFailureCallback?.Invoke(); return false; } @@ -164,10 +185,10 @@ public virtual async Task WaitToWriteAsync(TEvent item, CancellationToken if (await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false) && InChannel.Writer.TryWrite(item)) { - Options.PublishToInboundChannelCallback?.Invoke(); + _callbacks.PublishToInboundChannelCallback?.Invoke(); return true; } - Options.PublishToInboundChannelFailureCallback?.Invoke(); + _callbacks.PublishToInboundChannelFailureCallback?.Invoke(); return false; } @@ -185,7 +206,7 @@ IWriteTrackingBuffer statistics private async Task ConsumeOutboundEvents() { - Options.OutboundChannelStartedCallback?.Invoke(); + _callbacks.OutboundChannelStartedCallback?.Invoke(); var maxConsumers = Options.BufferOptions.ExportMaxConcurrency; var taskList = new List(maxConsumers); @@ -212,7 +233,7 @@ private async Task ConsumeOutboundEvents() } } await Task.WhenAll(taskList).ConfigureAwait(false); - Options.OutboundChannelExitedCallback?.Invoke(); + _callbacks.OutboundChannelExitedCallback?.Invoke(); } private async Task ExportBuffer(IReadOnlyCollection items, IWriteTrackingBuffer buffer) @@ -223,40 +244,42 @@ private async Task ExportBuffer(IReadOnlyCollection items, IWriteTrackin if (TokenSource.Token.IsCancellationRequested) break; if (_signal is { IsSet: true }) break; - Options.ExportItemsAttemptCallback?.Invoke(i, items.Count); + _callbacks.ExportItemsAttemptCallback?.Invoke(i, items.Count); TResponse? response; try { response = await Export(items, TokenSource.Token).ConfigureAwait(false); - Options.ExportResponseCallback?.Invoke(response, buffer); + _callbacks.ExportResponseCallback?.Invoke(response, buffer); } catch (Exception e) { - Options.ExportExceptionCallback?.Invoke(e); + _callbacks.ExportExceptionCallback?.Invoke(e); break; } items = RetryBuffer(response, items, buffer); + if (items.Count > 0 && i == 0) + _callbacks.ExportRetryableCountCallback?.Invoke(items.Count); // delay if we still have items and we are not at the end of the max retry cycle var atEndOfRetries = i == maxRetries; if (items.Count > 0 && !atEndOfRetries) { await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false); - Options.ExportRetryCallback?.Invoke(items); + _callbacks.ExportRetryCallback?.Invoke(items); } // otherwise if retryable items still exist and the user wants to be notified notify the user else if (items.Count > 0 && atEndOfRetries) - Options.ExportMaxRetriesCallback?.Invoke(items); + _callbacks.ExportMaxRetriesCallback?.Invoke(items); } - Options.BufferOptions.ExportBufferCallback?.Invoke(); + _callbacks.ExportBufferCallback?.Invoke(); if (_signal is { IsSet: false }) _signal.Signal(); } private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInterval) { - Options.InboundChannelStartedCallback?.Invoke(); + _callbacks.InboundChannelStartedCallback?.Invoke(); while (await InboundBuffer.WaitToReadAsync(InChannel.Reader).ConfigureAwait(false)) { if (TokenSource.Token.IsCancellationRequested) break; @@ -276,9 +299,9 @@ private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInter InboundBuffer.Reset(); if (await PublishAsync(outboundBuffer).ConfigureAwait(false)) - Options.PublishToOutboundChannelCallback?.Invoke(); + _callbacks.PublishToOutboundChannelCallback?.Invoke(); else - Options.PublishToOutboundChannelFailureCallback?.Invoke(); + _callbacks.PublishToOutboundChannelFailureCallback?.Invoke(); } } @@ -301,6 +324,10 @@ async Task AsyncSlowPath(IOutboundBuffer b) : new ValueTask(AsyncSlowPath(buffer)); } + /// > + public override string ToString() => + _diagnosticsListener != null ? _diagnosticsListener.ToString() : base.ToString(); + /// public virtual void Dispose() { diff --git a/src/Elastic.Channels/ChannelOptionsBase.cs b/src/Elastic.Channels/ChannelOptionsBase.cs index bbde5fc..a476d7f 100644 --- a/src/Elastic.Channels/ChannelOptionsBase.cs +++ b/src/Elastic.Channels/ChannelOptionsBase.cs @@ -1,12 +1,14 @@ // Licensed to Elasticsearch B.V under one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information + using System; using System.Collections.Generic; using System.IO; using System.Threading; using System.Threading.Tasks; using Elastic.Channels.Buffers; +using Elastic.Channels.Diagnostics; namespace Elastic.Channels { @@ -15,50 +17,63 @@ namespace Elastic.Channels /// /// /// - public abstract class ChannelOptionsBase + public abstract class ChannelOptionsBase : IChannelCallbacks { /// - public BufferOptions BufferOptions { get; set; } = new (); + public BufferOptions BufferOptions { get; set; } = new(); + + + /// + /// Ensures a gets registered so this + /// implementation returns diagnostics in its implementation + /// + public bool DisableDiagnostics { get; set; } /// /// Optionally provides a custom write implementation to a channel. Concrete channel implementations are not required to adhere to this config /// public Func? WriteEvent { get; set; } = null; - /// Called if the call to throws. + /// public Action? ExportExceptionCallback { get; set; } - /// Called with (number of retries) (number of items to be exported) + /// public Action? ExportItemsAttemptCallback { get; set; } - /// Subscribe to be notified of events that are retryable but did not store correctly withing the boundaries of + /// public Action>? ExportMaxRetriesCallback { get; set; } - /// Subscribe to be notified of events that are retryable but did not store correctly within the number of configured + /// public Action>? ExportRetryCallback { get; set; } - /// A generic hook to be notified of any bulk request being initiated by + /// public Action? ExportResponseCallback { get; set; } - /// Called everytime an event is written to the inbound channel + /// + public Action? ExportBufferCallback { get; set; } + + /// + public Action? ExportRetryableCountCallback { get; set; } + + /// public Action? PublishToInboundChannelCallback { get; set; } - /// Called everytime an event is not written to the inbound channel + /// public Action? PublishToInboundChannelFailureCallback { get; set; } - /// Called everytime the inbound channel publishes to the outbound channel. + /// public Action? PublishToOutboundChannelCallback { get; set; } - /// Called when the thread to read the outbound channel is started + /// public Action? OutboundChannelStartedCallback { get; set; } - /// Called when the thread to read the outbound channel has exited + + /// public Action? OutboundChannelExitedCallback { get; set; } - /// Called when the thread to read the inbound channel has started + /// public Action? InboundChannelStartedCallback { get; set; } - /// Called everytime the inbound channel fails to publish to the outbound channel. + /// public Action? PublishToOutboundChannelFailureCallback { get; set; } } - } diff --git a/src/Elastic.Channels/Diagnostics/ChannelDiagnosticsListener.cs b/src/Elastic.Channels/Diagnostics/ChannelDiagnosticsListener.cs new file mode 100644 index 0000000..8033b6c --- /dev/null +++ b/src/Elastic.Channels/Diagnostics/ChannelDiagnosticsListener.cs @@ -0,0 +1,136 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.Threading; +using Elastic.Channels.Buffers; + +namespace Elastic.Channels.Diagnostics; + +/// +/// Marker interface used by to improve +/// its to string if a gets injected. +/// +internal interface IChannelDiagnosticsListener {} + +/// +/// A very rudimentary diagnostics object tracking various important metrics to provide insights into the +/// machinery of . +/// For now this implementation only aids in better ToString() for +/// +public class ChannelDiagnosticsListener : IChannelCallbacks, IChannelDiagnosticsListener +{ + private readonly string? _name; + private int _exportedBuffers; + + private long _responses; + private long _retries; + private long _items; + private long _maxRetriesExceeded; + private long _outboundPublishes; + private long _outboundPublishFailures; + private long _inboundPublishes; + private long _inboundPublishFailures; + private bool _outboundChannelStarted; + private bool _inboundChannelStarted; + private bool _outboundChannelExited; + private bool _returnedRetryableObjects; + + /// + public ChannelDiagnosticsListener(string? name = null) + { + _name = name; + ExportBufferCallback = () => Interlocked.Increment(ref _exportedBuffers); + ExportItemsAttemptCallback = (retries, count) => + { + if (retries == 0) Interlocked.Add(ref _items, count); + }; + ExportRetryCallback = _ => Interlocked.Increment(ref _retries); + ExportResponseCallback = (_, _) => Interlocked.Increment(ref _responses); + ExportMaxRetriesCallback = _ => Interlocked.Increment(ref _maxRetriesExceeded); + PublishToInboundChannelCallback = () => Interlocked.Increment(ref _inboundPublishes); + PublishToInboundChannelFailureCallback = () => Interlocked.Increment(ref _inboundPublishFailures); + PublishToOutboundChannelCallback = () => Interlocked.Increment(ref _outboundPublishes); + PublishToOutboundChannelFailureCallback = () => Interlocked.Increment(ref _outboundPublishFailures); + InboundChannelStartedCallback = () => _inboundChannelStarted = true; + OutboundChannelStartedCallback = () => _outboundChannelStarted = true; + OutboundChannelExitedCallback = () => _outboundChannelExited = true; + + ExportExceptionCallback = e => ObservedException ??= e; + ExportRetryableCountCallback = i => _returnedRetryableObjects = true; + } + + /// + /// Keeps track of the first observed exception to calls to + /// + public Exception? ObservedException { get; private set; } + + /// Indicates if the overall publishing was successful + public bool PublishSuccess => !_returnedRetryableObjects && ObservedException == null && _exportedBuffers > 0 && _maxRetriesExceeded == 0 && _items > 0; + + /// + public Action? ExportExceptionCallback { get; } + + /// + public Action? ExportItemsAttemptCallback { get; } + + /// + public Action>? ExportMaxRetriesCallback { get; } + + /// + public Action>? ExportRetryCallback { get; } + + /// + public Action? ExportResponseCallback { get; } + + /// + public Action? ExportBufferCallback { get; } + + /// + public Action? ExportRetryableCountCallback { get; set; } + + /// + public Action? PublishToInboundChannelCallback { get; } + + /// + public Action? PublishToInboundChannelFailureCallback { get; } + + /// + public Action? PublishToOutboundChannelCallback { get; } + + /// + public Action? OutboundChannelStartedCallback { get; } + + /// + public Action? OutboundChannelExitedCallback { get; } + + /// + public Action? InboundChannelStartedCallback { get; } + + /// + public Action? PublishToOutboundChannelFailureCallback { get; } + + /// + /// Provides a debug message to give insights to the machinery of + /// + public override string ToString() => + $@"{(!PublishSuccess ? "Failed" : "Successful")} publish over channel: {_name ?? "NAME NOT PROVIDED"}. +Exported Buffers: {_exportedBuffers:N0} +Exported Items: {_items:N0} +Export Responses: {_responses:N0} +Export Retries: {_retries:N0} +Export Exhausts: {_maxRetriesExceeded:N0} +Export Returned Items to retry: {_returnedRetryableObjects} +Inbound Buffer Read Loop Started: {_inboundChannelStarted} +Inbound Buffer Publishes: {_inboundPublishes:N0} +Inbound Buffer Publish Failures: {_inboundPublishFailures:N0} +Outbound Buffer Read Loop Started: {_outboundChannelStarted} +Outbound Buffer Read Loop Exited: {_outboundChannelExited} +Outbound Buffer Publishes: {_outboundPublishes:N0} +Outbound Buffer Publish Failures: {_outboundPublishFailures:N0} +Exception: {(ObservedException != null ? ObservedException.ToString() : "None")} +"; + +} diff --git a/src/Elastic.Channels/Diagnostics/ChannelListener.cs b/src/Elastic.Channels/Diagnostics/ChannelListener.cs deleted file mode 100644 index 8865e93..0000000 --- a/src/Elastic.Channels/Diagnostics/ChannelListener.cs +++ /dev/null @@ -1,94 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System; -using System.Threading; - -namespace Elastic.Channels.Diagnostics; - -/// -/// A very rudimentary diagnostics object tracking various important metrics to provide insights into the -/// machinery of . -/// This will be soon be replaced by actual metrics -/// -public class ChannelListener -{ - private readonly string? _name; - private int _exportedBuffers; - - /// - /// Keeps track of the first observed exception to calls to - /// - public Exception? ObservedException { get; private set; } - - /// Indicates if the overall publishing was successful - public virtual bool PublishSuccess => ObservedException == null && _exportedBuffers > 0 && _maxRetriesExceeded == 0 && _items > 0; - - /// - public ChannelListener(string? name = null) => _name = name; - - private long _responses; - private long _retries; - private long _items; - private long _maxRetriesExceeded; - private long _outboundPublishes; - private long _outboundPublishFailures; - private long _inboundPublishes; - private long _inboundPublishFailures; - private bool _outboundChannelStarted; - private bool _inboundChannelStarted; - private bool _outboundChannelExited; - - // ReSharper disable once MemberCanBeProtected.Global - /// - /// Registers callbacks on to keep track metrics. - /// - public ChannelListener Register(ChannelOptionsBase options) - { - options.BufferOptions.ExportBufferCallback = () => Interlocked.Increment(ref _exportedBuffers); - options.ExportItemsAttemptCallback = (retries, count) => - { - if (retries == 0) Interlocked.Add(ref _items, count); - }; - options.ExportRetryCallback = _ => Interlocked.Increment(ref _retries); - options.ExportResponseCallback = (_, _) => Interlocked.Increment(ref _responses); - options.ExportMaxRetriesCallback = _ => Interlocked.Increment(ref _maxRetriesExceeded); - options.PublishToInboundChannelCallback = () => Interlocked.Increment(ref _inboundPublishes); - options.PublishToInboundChannelFailureCallback = () => Interlocked.Increment(ref _inboundPublishFailures); - options.PublishToOutboundChannelCallback = () => Interlocked.Increment(ref _outboundPublishes); - options.PublishToOutboundChannelFailureCallback = () => Interlocked.Increment(ref _outboundPublishFailures); - options.InboundChannelStartedCallback = () => _inboundChannelStarted = true; - options.OutboundChannelStartedCallback = () => _outboundChannelStarted = true; - options.OutboundChannelExitedCallback = () => _outboundChannelExited = true; - - if (options.ExportExceptionCallback == null) options.ExportExceptionCallback = e => ObservedException ??= e; - else options.ExportExceptionCallback += e => ObservedException ??= e; - return this; - } - - /// - /// Allows subclasses to include more data in the implementation before the exception is printed - /// - protected virtual string AdditionalData => string.Empty; - - /// - /// Provides a debug message to give insights to the machinery of - /// - public override string ToString() => $@"{(!PublishSuccess ? "Failed" : "Successful")} publish over channel: {_name ?? nameof(ChannelListener)}. -Exported Buffers: {_exportedBuffers:N0} -Exported Items: {_items:N0} -Export Responses: {_responses:N0} -Export Retries: {_retries:N0} -Export Exhausts: {_maxRetriesExceeded:N0} -Inbound Buffer Read Loop Started: {_inboundChannelStarted} -Inbound Buffer Publishes: {_inboundPublishes:N0} -Inbound Buffer Publish Failures: {_inboundPublishFailures:N0} -Outbound Buffer Read Loop Started: {_outboundChannelStarted} -Outbound Buffer Read Loop Exited: {_outboundChannelExited} -Outbound Buffer Publishes: {_outboundPublishes:N0} -Outbound Buffer Publish Failures: {_outboundPublishFailures:N0} -{AdditionalData} -Exception: {ObservedException} -"; -} diff --git a/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs index 6546cc1..bf72c69 100644 --- a/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs +++ b/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs @@ -11,31 +11,10 @@ namespace Elastic.Channels.Diagnostics; /// public class DiagnosticsBufferedChannel : NoopBufferedChannel { - private readonly string? _name; - /// public DiagnosticsBufferedChannel(BufferOptions options, bool observeConcurrency = false, string? name = null) - : base(options, observeConcurrency) + : base(options, new [] { new ChannelDiagnosticsListener(name ?? nameof(DiagnosticsBufferedChannel)) }, observeConcurrency) { - _name = name; - Listener = new ChannelListener(_name).Register(Options); } - /// - // ReSharper disable once MemberCanBePrivate.Global - public ChannelListener Listener { get; } - - /// - /// Provides a debug message to give insights to the machinery of - /// - public override string ToString() => $@"------------------------------------------ -{Listener} - -InboundBuffer Count: {InboundBuffer.Count:N0} -InboundBuffer Duration Since First Wait: {InboundBuffer.DurationSinceFirstWaitToRead} -InboundBuffer Duration Since First Write: {InboundBuffer.DurationSinceFirstWrite} -InboundBuffer No Thresholds hit: {InboundBuffer.NoThresholdsHit} -Exported Buffers: {ExportedBuffers:N0} -Observed Concurrency: {ObservedConcurrency:N0} -------------------------------------------"; } diff --git a/src/Elastic.Channels/Diagnostics/IChannelCallbacks.cs b/src/Elastic.Channels/Diagnostics/IChannelCallbacks.cs new file mode 100644 index 0000000..f52eed7 --- /dev/null +++ b/src/Elastic.Channels/Diagnostics/IChannelCallbacks.cs @@ -0,0 +1,187 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.Linq; +using Elastic.Channels.Buffers; + +namespace Elastic.Channels.Diagnostics; + +/// +/// A set of callbacks that implementers can inject into the channel without fear of overwriting the callbacks +/// defined in +/// +public interface IChannelCallbacks +{ + /// Called if the call to throws. + Action? ExportExceptionCallback { get; } + + /// Called with (number of retries) (number of items to be exported) + Action? ExportItemsAttemptCallback { get; } + + /// Subscribe to be notified of events that are retryable but did not store correctly withing the boundaries of + Action>? ExportMaxRetriesCallback { get; } + + /// Subscribe to be notified of events that are retryable but did not store correctly within the number of configured + Action>? ExportRetryCallback { get; } + + /// A generic hook to be notified of any bulk request being initiated by + Action? ExportResponseCallback { get; } + + /// Called everytime an event is written to the inbound channel + Action? PublishToInboundChannelCallback { get; } + + /// Called everytime an event is not written to the inbound channel + Action? PublishToInboundChannelFailureCallback { get; } + + /// Called everytime the inbound channel publishes to the outbound channel. + Action? PublishToOutboundChannelCallback { get; } + + /// Called when the thread to read the outbound channel is started + Action? OutboundChannelStartedCallback { get; } + + /// Called when the thread to read the outbound channel has exited + Action? OutboundChannelExitedCallback { get; } + + /// Called when the thread to read the inbound channel has started + Action? InboundChannelStartedCallback { get; } + + /// Called everytime the inbound channel fails to publish to the outbound channel. + Action? PublishToOutboundChannelFailureCallback { get; } + + /// + /// Called once after a buffer has been flushed, if the buffer is retried this callback is only called once + /// all retries have been exhausted. Its called regardless of whether the call to + /// succeeded. + /// + Action? ExportBufferCallback { get; } + + /// + /// Called once if an export to external system returned items that needed retries + /// Unlike this is called even if no retries will be attempted + ///due to being 0. + /// + /// + public Action? ExportRetryableCountCallback { get; } +} + +internal class ChannelCallbackInvoker : IChannelCallbacks +{ + public ChannelCallbackInvoker(ICollection> channelCallbacks) + { + ExportExceptionCallback = channelCallbacks + .Select(e => e.ExportExceptionCallback) + .Where(e => e != null) + .Aggregate(ExportExceptionCallback, (s, f) => s + f); + + ExportItemsAttemptCallback = channelCallbacks + .Select(e => e.ExportItemsAttemptCallback) + .Where(e => e != null) + .Aggregate(ExportItemsAttemptCallback, (s, f) => s + f); + + ExportMaxRetriesCallback = channelCallbacks + .Select(e => e.ExportMaxRetriesCallback) + .Where(e => e != null) + .Aggregate(ExportMaxRetriesCallback, (s, f) => s + f); + + ExportRetryCallback = channelCallbacks + .Select(e => e.ExportRetryCallback) + .Where(e => e != null) + .Aggregate(ExportRetryCallback, (s, f) => s + f); + + ExportResponseCallback = channelCallbacks + .Select(e => e.ExportResponseCallback) + .Where(e => e != null) + .Aggregate(ExportResponseCallback, (s, f) => s + f); + + ExportBufferCallback = channelCallbacks + .Select(e => e.ExportBufferCallback) + .Where(e => e != null) + .Aggregate((Action?)null, (s, f) => s + f); + + ExportRetryableCountCallback = channelCallbacks + .Select(e => e.ExportRetryableCountCallback) + .Where(e => e != null) + .Aggregate(ExportRetryableCountCallback, (s, f) => s + f); + + + PublishToInboundChannelCallback = channelCallbacks + .Select(e => e.PublishToInboundChannelCallback) + .Where(e => e != null) + .Aggregate((Action?)null, (s, f) => s + f); + + PublishToInboundChannelFailureCallback = channelCallbacks + .Select(e => e.PublishToInboundChannelFailureCallback) + .Where(e => e != null) + .Aggregate((Action?)null, (s, f) => s + f); + + PublishToOutboundChannelCallback = channelCallbacks + .Select(e => e.PublishToOutboundChannelCallback) + .Where(e => e != null) + .Aggregate((Action?)null, (s, f) => s + f); + + OutboundChannelStartedCallback = channelCallbacks + .Select(e => e.OutboundChannelStartedCallback) + .Where(e => e != null) + .Aggregate((Action?)null, (s, f) => s + f); + + OutboundChannelExitedCallback = channelCallbacks + .Select(e => e.OutboundChannelExitedCallback) + .Where(e => e != null) + .Aggregate((Action?)null, (s, f) => s + f); + + InboundChannelStartedCallback = channelCallbacks + .Select(e => e.InboundChannelStartedCallback) + .Where(e => e != null) + .Aggregate((Action?)null, (s, f) => s + f); + + PublishToOutboundChannelFailureCallback = channelCallbacks + .Select(e => e.PublishToOutboundChannelFailureCallback) + .Where(e => e != null) + .Aggregate((Action?)null, (s, f) => s + f); + } + + /// + public Action? ExportExceptionCallback { get; set; } + + /// + public Action? ExportItemsAttemptCallback { get; set; } + + /// + public Action>? ExportMaxRetriesCallback { get; set; } + + /// + public Action>? ExportRetryCallback { get; set; } + + /// + public Action? ExportResponseCallback { get; set; } + + /// + public Action? ExportBufferCallback { get; set; } + + /// + public Action? PublishToInboundChannelCallback { get; set; } + + /// + public Action? PublishToInboundChannelFailureCallback { get; set; } + + /// + public Action? PublishToOutboundChannelCallback { get; set; } + + /// + public Action? OutboundChannelStartedCallback { get; set; } + + /// + public Action? OutboundChannelExitedCallback { get; set; } + + /// + public Action? InboundChannelStartedCallback { get; set; } + + /// + public Action? PublishToOutboundChannelFailureCallback { get; set; } + + /// + public Action? ExportRetryableCountCallback { get; set; } +} diff --git a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs index 7d26638..ad3d792 100644 --- a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs +++ b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs @@ -31,10 +31,26 @@ public class NoopChannelOptions : ChannelOptionsBase } /// - public NoopBufferedChannel(BufferOptions options, bool observeConcurrency = false) : base(new NoopChannelOptions + public NoopBufferedChannel(NoopChannelOptions options) : base(options) { } + + /// + public NoopBufferedChannel( + BufferOptions options, + bool observeConcurrency = false + ) : this(options, null, observeConcurrency) { - BufferOptions = options, TrackConcurrency = observeConcurrency - }) { } + + } + + /// + public NoopBufferedChannel( + BufferOptions options, + ICollection>? channelListeners, + bool observeConcurrency = false + ) : base(new NoopChannelOptions { BufferOptions = options, TrackConcurrency = observeConcurrency }, channelListeners) + { + + } /// Returns the number of times was called public long ExportedBuffers => _exportedBuffers; @@ -58,4 +74,17 @@ protected override async Task Export(IReadOnlyCollection ObservedConcurrency) ObservedConcurrency = max; return new NoopResponse(); } + + /// + /// Provides a debug message to give insights to the machinery of + /// + public override string ToString() => $@"------------------------------------------ +{base.ToString()} + +InboundBuffer Count: {InboundBuffer.Count:N0} +InboundBuffer Duration Since First Wait: {InboundBuffer.DurationSinceFirstWaitToRead} +InboundBuffer Duration Since First Write: {InboundBuffer.DurationSinceFirstWrite} +InboundBuffer No Thresholds hit: {InboundBuffer.NoThresholdsHit} +Observed Concurrency: {ObservedConcurrency:N0} +------------------------------------------"; } diff --git a/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs b/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs index fe9da6d..d778b70 100644 --- a/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs +++ b/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.Linq; using Elastic.Channels.Buffers; +using Elastic.Channels.Diagnostics; namespace Elastic.Channels; @@ -31,7 +32,12 @@ public abstract class ResponseItemsBufferedChannelBase - protected ResponseItemsBufferedChannelBase(TChannelOptions options) : base(options) { } + protected ResponseItemsBufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners) + : base(options, callbackListeners) { } + + /// + protected ResponseItemsBufferedChannelBase(TChannelOptions options) + : base(options) { } /// Based on should return a bool indicating if retry is needed protected abstract bool Retry(TResponse response); diff --git a/src/Elastic.Ingest.Elasticsearch/Diagnostics/ChannelListener.cs b/src/Elastic.Ingest.Elasticsearch/Diagnostics/ChannelListener.cs deleted file mode 100644 index 74539e6..0000000 --- a/src/Elastic.Ingest.Elasticsearch/Diagnostics/ChannelListener.cs +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System.Linq; -using System.Threading; -using Elastic.Channels; -using Elastic.Channels.Diagnostics; -using Elastic.Ingest.Elasticsearch.Serialization; - -namespace Elastic.Ingest.Elasticsearch.Diagnostics; - -/// -/// A very rudimentary diagnostics object tracking various important metrics to provide insights into the -/// machinery of . -/// This will be soon be replaced by actual metrics -/// -// ReSharper disable once UnusedType.Global -public class ElasticsearchChannelListener : ChannelListener -{ - private int _rejectedItems; - private string? _firstItemError; - private int _serverRejections; - - /// - public override bool PublishSuccess => base.PublishSuccess && string.IsNullOrEmpty(_firstItemError); - - // ReSharper disable once UnusedMember.Global - /// - public ElasticsearchChannelListener Register(ResponseItemsChannelOptionsBase options) - { - base.Register(options); - - options.ServerRejectionCallback = r => - { - Interlocked.Add(ref _rejectedItems, r.Count); - if (r.Count > 0) - { - var error = r.Select(e => e.Item2).FirstOrDefault(i=>i.Error != null); - if (error != null) - _firstItemError ??= error.Error?.ToString(); - } - Interlocked.Increment(ref _serverRejections); - }; - return this; - } - - /// - protected override string AdditionalData => $@"Server Rejected Calls: {_serverRejections:N0} -Server Rejected Items: {_rejectedItems:N0} -First Error: {_firstItemError} -"; -} diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs index be042c7..73187fb 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs @@ -9,6 +9,7 @@ using System.Threading; using System.Threading.Tasks; using Elastic.Channels; +using Elastic.Channels.Diagnostics; using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Indices; using Elastic.Ingest.Elasticsearch.Serialization; @@ -27,7 +28,12 @@ public abstract partial class ElasticsearchChannelBase where TChannelOptions : TransportChannelOptionsBase { /// - protected ElasticsearchChannelBase(TChannelOptions options) : base(options) { } + protected ElasticsearchChannelBase(TChannelOptions options, ICollection>? callbackListeners) + : base(options, callbackListeners) { } + + /// + protected ElasticsearchChannelBase(TChannelOptions options) + : base(options) { } /// protected override bool Retry(BulkResponse response) diff --git a/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs b/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs index 6551947..64081a4 100644 --- a/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs +++ b/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs @@ -12,6 +12,7 @@ using OpenTelemetry.Exporter; using OpenTelemetry.Resources; using Elastic.Channels; +using Elastic.Channels.Diagnostics; namespace Elastic.Ingest.OpenTelemetry { @@ -59,7 +60,11 @@ public class TraceExportResult public class TraceChannel : BufferedChannelBase { /// - public TraceChannel(TraceChannelOptions options) : base(options) { + public TraceChannel(TraceChannelOptions options) : this(options, null) { } + + /// + public TraceChannel(TraceChannelOptions options, ICollection>? callbackListeners) + : base(options, callbackListeners) { var o = new OtlpExporterOptions(); o.Endpoint = options.Endpoint; o.Headers = $"Authorization=Bearer {options.SecretToken}"; diff --git a/src/Elastic.Ingest.Transport/TransportChannelBase.cs b/src/Elastic.Ingest.Transport/TransportChannelBase.cs index 970f23e..29538c7 100644 --- a/src/Elastic.Ingest.Transport/TransportChannelBase.cs +++ b/src/Elastic.Ingest.Transport/TransportChannelBase.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Threading.Tasks; using Elastic.Channels; +using Elastic.Channels.Diagnostics; using Elastic.Transport; namespace Elastic.Ingest.Transport @@ -22,7 +23,12 @@ public abstract class TransportChannelBase - protected TransportChannelBase(TChannelOptions options) : base(options) { } + protected TransportChannelBase(TChannelOptions options, ICollection>? callbackListeners) + : base(options, callbackListeners) { } + + /// + protected TransportChannelBase(TChannelOptions options) + : base(options) { } /// Implement sending the current of the buffer to the output. /// diff --git a/tests/Elastic.Channels.Tests/TroubleshootTests.cs b/tests/Elastic.Channels.Tests/TroubleshootTests.cs new file mode 100644 index 0000000..cf9d6f2 --- /dev/null +++ b/tests/Elastic.Channels.Tests/TroubleshootTests.cs @@ -0,0 +1,93 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Channels.Diagnostics; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Channels.Tests +{ + public class TroubleshootTests : IDisposable + { + public TroubleshootTests(ITestOutputHelper testOutput) => XunitContext.Register(testOutput); + void IDisposable.Dispose() => XunitContext.Flush(); + + [Fact] public async Task CanDisableDiagnostics() + { + var (totalEvents, expectedSentBuffers, bufferOptions) = Setup(); + var channel = new NoopBufferedChannel(new NoopBufferedChannel.NoopChannelOptions() + { + DisableDiagnostics = true, + BufferOptions = bufferOptions + }); + + await WriteExpectedEvents(totalEvents, channel, bufferOptions, expectedSentBuffers); + + channel.ToString().Should().Contain("Diagnostics.NoopBufferedChannel"); + channel.ToString().Should().NotContain("Successful publish over channel: NoopBufferedChannel."); + channel.ToString().Should().NotContain($"Exported Buffers: {expectedSentBuffers:N0}"); + } + + [Fact] public async Task DefaultIncludesDiagnostics() + { + var (totalEvents, expectedSentBuffers, bufferOptions) = Setup(); + var channel = new NoopBufferedChannel(new NoopBufferedChannel.NoopChannelOptions() + { + BufferOptions = bufferOptions + }); + + await WriteExpectedEvents(totalEvents, channel, bufferOptions, expectedSentBuffers); + + channel.ToString().Should().NotContain("Diagnostics.NoopBufferedChannel"); + channel.ToString().Should().Contain("Successful publish over channel: NoopBufferedChannel."); + channel.ToString().Should().Contain($"Exported Buffers:"); + } + + [Fact] public async Task DiagnosticsChannelAlwaysIncludesDiagnosticsInToString() + { + var (totalEvents, expectedSentBuffers, bufferOptions) = Setup(); + var channel = new DiagnosticsBufferedChannel(bufferOptions); + + await WriteExpectedEvents(totalEvents, channel, bufferOptions, expectedSentBuffers); + + channel.ToString().Should().NotContain("Diagnostics.DiagnosticsBufferedChannel"); + channel.ToString().Should().Contain("Successful publish over channel: DiagnosticsBufferedChannel."); + channel.ToString().Should().Contain($"Exported Buffers: {expectedSentBuffers:N0}"); + } + + private static async Task WriteExpectedEvents(int totalEvents, NoopBufferedChannel channel, BufferOptions bufferOptions, int expectedSentBuffers) + { + var written = 0; + for (var i = 0; i < totalEvents; i++) + { + var e = new NoopBufferedChannel.NoopEvent(); + if (await channel.WaitToWriteAsync(e)) + written++; + } + var signalled = bufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("The channel was not drained in the expected time"); + written.Should().Be(totalEvents); + channel.ExportedBuffers.Should().Be(expectedSentBuffers); + } + + private static (int totalEvents, int expectedSentBuffers, BufferOptions bufferOptions) Setup() + { + int totalEvents = 5000, maxInFlight = totalEvents / 5, bufferSize = maxInFlight / 10; + var expectedSentBuffers = totalEvents / bufferSize; + var bufferOptions = new BufferOptions + { + WaitHandle = new CountdownEvent(expectedSentBuffers), + InboundBufferMaxSize = maxInFlight, + OutboundBufferMaxSize = bufferSize, + }; + return (totalEvents, expectedSentBuffers, bufferOptions); + } + + } +} diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs index 3bd1abc..5cde24e 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs @@ -42,7 +42,7 @@ public async Task EnsureDocumentsEndUpInDataStream() channel.TryWrite(new TimeSeriesDocument { Timestamp = DateTimeOffset.Now, Message = "hello-world" }); if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) - throw new Exception("document was not persisted within 10 seconds"); + throw new Exception($"document was not persisted within 10 seconds: {channel}"); var refreshResult = await Client.Indices.RefreshAsync(targetDataStream.ToString()); refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IndexIngestionTests.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IndexIngestionTests.cs index 1aad114..859f88c 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IndexIngestionTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IndexIngestionTests.cs @@ -47,7 +47,7 @@ public async Task EnsureDocumentsEndUpInIndex() channel.TryWrite(new CatalogDocument { Created = date, Title = "Hello World!", Id = "hello-world" }); if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) - throw new Exception("ecs document was not persisted within 10 seconds"); + throw new Exception($"ecs document was not persisted within 10 seconds: {channel}"); var refreshResult = await Client.Indices.RefreshAsync(indexName); refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IngestionCluster.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IngestionCluster.cs index b75ffa5..89de9b7 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IngestionCluster.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IngestionCluster.cs @@ -16,7 +16,7 @@ namespace Elastic.Ingest.Elasticsearch.IntegrationTests /// Declare our cluster that we want to inject into our test classes public class IngestionCluster : XunitClusterBase { - public IngestionCluster() : base(new XunitClusterConfiguration("8.3.1") { StartingPortNumber = 9202 }) { } + public IngestionCluster() : base(new XunitClusterConfiguration("8.7.0") { StartingPortNumber = 9202 }) { } public ElasticsearchClient CreateClient(ITestOutputHelper output) => this.GetOrAddClient(_ => @@ -28,6 +28,7 @@ public ElasticsearchClient CreateClient(ITestOutputHelper output) => var connectionPool = new StaticNodePool(nodes); var settings = new ElasticsearchClientSettings(connectionPool) .Proxy(new Uri("http://ipv4.fiddler:8080"), null!, null!) + .RequestTimeout(TimeSpan.FromSeconds(5)) .OnRequestCompleted(d => { try { output.WriteLine(d.DebugInformation);}