Skip to content

Commit

Permalink
Refactor Diagnostics (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz authored Apr 5, 2023
1 parent 1d86925 commit c073cb9
Showing 19 changed files with 555 additions and 219 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ jobs:
name: Test

build:
runs-on: ubuntu-18.04
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
2 changes: 1 addition & 1 deletion examples/Elastic.Ingest.Apm.Example/Program.cs
Original file line number Diff line number Diff line change
@@ -47,7 +47,6 @@ private static int Main(string[] args)
WaitHandle = handle,
ExportMaxRetries = 3,
ExportBackoffPeriod = times => TimeSpan.FromMilliseconds(1),
ExportBufferCallback = () => Console.WriteLine("Flushed"),
};
var channelOptions = new ApmChannelOptions(transport)
{
@@ -59,6 +58,7 @@ private static int Main(string[] args)
Interlocked.Increment(ref _responses);
Console.WriteLine(r.ApiCallDetails.DebugInformation);
},
ExportBufferCallback = () => Console.WriteLine("Flushed"),
ExportMaxRetriesCallback = (list) => Interlocked.Increment(ref _maxRetriesExceeded),
ExportRetryCallback = (list) => Interlocked.Increment(ref _retries),
ExportExceptionCallback = (e) => _exception = e
7 changes: 0 additions & 7 deletions src/Elastic.Channels/BufferOptions.cs
Original file line number Diff line number Diff line change
@@ -53,13 +53,6 @@ public class BufferOptions
/// </summary>
public Func<int, TimeSpan> ExportBackoffPeriod { get; set; } = (i) => TimeSpan.FromSeconds(2 * (i + 1));

/// <summary>
/// Called once after a buffer has been flushed, if the buffer is retried this callback is only called once
/// all retries have been exhausted. Its called regardless of whether the call to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/>
/// succeeded.
/// </summary>
public Action? ExportBufferCallback { get; set; }

/// <summary>
/// Allows you to inject a <see cref="CountdownEvent"/> to wait for N number of buffers to flush.
/// </summary>
59 changes: 43 additions & 16 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
using System.Threading.Channels;
using System.Threading.Tasks;
using Elastic.Channels.Buffers;
using Elastic.Channels.Diagnostics;

namespace Elastic.Channels;

@@ -59,11 +60,31 @@ public abstract class BufferedChannelBase<TChannelOptions, TEvent, TResponse>
private readonly SemaphoreSlim _throttleTasks;
private readonly CountdownEvent? _signal;

private readonly ChannelCallbackInvoker<TEvent, TResponse> _callbacks;
private readonly IChannelDiagnosticsListener? _diagnosticsListener;

/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
protected BufferedChannelBase(TChannelOptions options) : this(options, null) { }

/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
protected BufferedChannelBase(TChannelOptions options)
protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners)
{
TokenSource = new CancellationTokenSource();
Options = options;

var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray();
_diagnosticsListener = listeners
.Select(l => (l is IChannelDiagnosticsListener c) ? c : null)
.FirstOrDefault(e=> e != null);
if (_diagnosticsListener == null && !options.DisableDiagnostics)
{
// if no debug listener was already provided but was requested explicitly create one.
var l = new ChannelDiagnosticsListener<TEvent, TResponse>(GetType().Name);
_diagnosticsListener = l;
listeners = listeners.Concat(new[] { l }).ToArray();
}
_callbacks = new ChannelCallbackInvoker<TEvent, TResponse>(listeners);

var maxConsumers = Math.Max(1, BufferOptions.ExportMaxConcurrency);
_throttleTasks = new SemaphoreSlim(maxConsumers, maxConsumers);
_signal = options.BufferOptions.WaitHandle;
@@ -133,10 +154,10 @@ public override bool TryWrite(TEvent item)
{
if (InChannel.Writer.TryWrite(item))
{
Options.PublishToInboundChannelCallback?.Invoke();
_callbacks.PublishToInboundChannelCallback?.Invoke();
return true;
}
Options.PublishToInboundChannelFailureCallback?.Invoke();
_callbacks.PublishToInboundChannelFailureCallback?.Invoke();
return false;
}

@@ -164,10 +185,10 @@ public virtual async Task<bool> WaitToWriteAsync(TEvent item, CancellationToken
if (await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false) &&
InChannel.Writer.TryWrite(item))
{
Options.PublishToInboundChannelCallback?.Invoke();
_callbacks.PublishToInboundChannelCallback?.Invoke();
return true;
}
Options.PublishToInboundChannelFailureCallback?.Invoke();
_callbacks.PublishToInboundChannelFailureCallback?.Invoke();
return false;
}

@@ -185,7 +206,7 @@ IWriteTrackingBuffer statistics

private async Task ConsumeOutboundEvents()
{
Options.OutboundChannelStartedCallback?.Invoke();
_callbacks.OutboundChannelStartedCallback?.Invoke();

var maxConsumers = Options.BufferOptions.ExportMaxConcurrency;
var taskList = new List<Task>(maxConsumers);
@@ -212,7 +233,7 @@ private async Task ConsumeOutboundEvents()
}
}
await Task.WhenAll(taskList).ConfigureAwait(false);
Options.OutboundChannelExitedCallback?.Invoke();
_callbacks.OutboundChannelExitedCallback?.Invoke();
}

private async Task ExportBuffer(IReadOnlyCollection<TEvent> items, IWriteTrackingBuffer buffer)
@@ -223,40 +244,42 @@ private async Task ExportBuffer(IReadOnlyCollection<TEvent> items, IWriteTrackin
if (TokenSource.Token.IsCancellationRequested) break;
if (_signal is { IsSet: true }) break;

Options.ExportItemsAttemptCallback?.Invoke(i, items.Count);
_callbacks.ExportItemsAttemptCallback?.Invoke(i, items.Count);
TResponse? response;
try
{
response = await Export(items, TokenSource.Token).ConfigureAwait(false);
Options.ExportResponseCallback?.Invoke(response, buffer);
_callbacks.ExportResponseCallback?.Invoke(response, buffer);
}
catch (Exception e)
{
Options.ExportExceptionCallback?.Invoke(e);
_callbacks.ExportExceptionCallback?.Invoke(e);
break;
}

items = RetryBuffer(response, items, buffer);
if (items.Count > 0 && i == 0)
_callbacks.ExportRetryableCountCallback?.Invoke(items.Count);

// delay if we still have items and we are not at the end of the max retry cycle
var atEndOfRetries = i == maxRetries;
if (items.Count > 0 && !atEndOfRetries)
{
await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false);
Options.ExportRetryCallback?.Invoke(items);
_callbacks.ExportRetryCallback?.Invoke(items);
}
// otherwise if retryable items still exist and the user wants to be notified notify the user
else if (items.Count > 0 && atEndOfRetries)
Options.ExportMaxRetriesCallback?.Invoke(items);
_callbacks.ExportMaxRetriesCallback?.Invoke(items);
}
Options.BufferOptions.ExportBufferCallback?.Invoke();
_callbacks.ExportBufferCallback?.Invoke();
if (_signal is { IsSet: false })
_signal.Signal();
}

private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInterval)
{
Options.InboundChannelStartedCallback?.Invoke();
_callbacks.InboundChannelStartedCallback?.Invoke();
while (await InboundBuffer.WaitToReadAsync(InChannel.Reader).ConfigureAwait(false))
{
if (TokenSource.Token.IsCancellationRequested) break;
@@ -276,9 +299,9 @@ private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInter
InboundBuffer.Reset();

if (await PublishAsync(outboundBuffer).ConfigureAwait(false))
Options.PublishToOutboundChannelCallback?.Invoke();
_callbacks.PublishToOutboundChannelCallback?.Invoke();
else
Options.PublishToOutboundChannelFailureCallback?.Invoke();
_callbacks.PublishToOutboundChannelFailureCallback?.Invoke();
}
}

@@ -301,6 +324,10 @@ async Task<bool> AsyncSlowPath(IOutboundBuffer<TEvent> b)
: new ValueTask<bool>(AsyncSlowPath(buffer));
}

/// <inheritdoc cref="object.ToString"/>>
public override string ToString() =>
_diagnosticsListener != null ? _diagnosticsListener.ToString() : base.ToString();

/// <inheritdoc cref="IDisposable.Dispose"/>
public virtual void Dispose()
{
45 changes: 30 additions & 15 deletions src/Elastic.Channels/ChannelOptionsBase.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels.Buffers;
using Elastic.Channels.Diagnostics;

namespace Elastic.Channels
{
@@ -15,50 +17,63 @@ namespace Elastic.Channels
/// </summary>
/// <typeparam name="TEvent"></typeparam>
/// <typeparam name="TResponse"></typeparam>
public abstract class ChannelOptionsBase<TEvent, TResponse>
public abstract class ChannelOptionsBase<TEvent, TResponse> : IChannelCallbacks<TEvent, TResponse>
{
/// <inheritdoc cref="BufferOptions"/>
public BufferOptions BufferOptions { get; set; } = new ();
public BufferOptions BufferOptions { get; set; } = new();


/// <summary>
/// Ensures a <see cref="ChannelDiagnosticsListener{TEvent,TResponse}"/> gets registered so this <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
/// implementation returns diagnostics in its <see cref="object.ToString"/> implementation
/// </summary>
public bool DisableDiagnostics { get; set; }

/// <summary>
/// Optionally provides a custom write implementation to a channel. Concrete channel implementations are not required to adhere to this config
/// </summary>
public Func<Stream, CancellationToken, TEvent, Task>? WriteEvent { get; set; } = null;

/// <summary> Called if the call to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/> throws. </summary>
/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.ExportExceptionCallback"/>
public Action<Exception>? ExportExceptionCallback { get; set; }

/// <summary> Called with (number of retries) (number of items to be exported) </summary>
/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.ExportItemsAttemptCallback"/>
public Action<int, int>? ExportItemsAttemptCallback { get; set; }

/// <summary> Subscribe to be notified of events that are retryable but did not store correctly withing the boundaries of <see cref="Channels.BufferOptions.ExportMaxRetries"/></summary>
/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.ExportMaxRetriesCallback"/>
public Action<IReadOnlyCollection<TEvent>>? ExportMaxRetriesCallback { get; set; }

/// <summary> Subscribe to be notified of events that are retryable but did not store correctly within the number of configured <see cref="Channels.BufferOptions.ExportMaxRetries"/></summary>
/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.ExportRetryCallback"/>
public Action<IReadOnlyCollection<TEvent>>? ExportRetryCallback { get; set; }

/// <summary> A generic hook to be notified of any bulk request being initiated by <see cref="InboundBuffer{TEvent}"/> </summary>
/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.ExportResponseCallback"/>
public Action<TResponse, IWriteTrackingBuffer>? ExportResponseCallback { get; set; }

/// <summary>Called everytime an event is written to the inbound channel </summary>
/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.ExportBufferCallback"/>
public Action? ExportBufferCallback { get; set; }

/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.ExportRetryableCountCallback"/>
public Action<int>? ExportRetryableCountCallback { get; set; }

/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.PublishToInboundChannelCallback"/>
public Action? PublishToInboundChannelCallback { get; set; }

/// <summary>Called everytime an event is not written to the inbound channel </summary>
/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.PublishToInboundChannelFailureCallback"/>
public Action? PublishToInboundChannelFailureCallback { get; set; }

/// <summary>Called everytime the inbound channel publishes to the outbound channel. </summary>
/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.PublishToOutboundChannelCallback"/>
public Action? PublishToOutboundChannelCallback { get; set; }

/// <summary> Called when the thread to read the outbound channel is started </summary>
/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.OutboundChannelStartedCallback"/>
public Action? OutboundChannelStartedCallback { get; set; }
/// <summary> Called when the thread to read the outbound channel has exited</summary>

/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.OutboundChannelExitedCallback"/>
public Action? OutboundChannelExitedCallback { get; set; }

/// <summary> Called when the thread to read the inbound channel has started</summary>
/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.InboundChannelStartedCallback"/>
public Action? InboundChannelStartedCallback { get; set; }

/// <summary>Called everytime the inbound channel fails to publish to the outbound channel. </summary>
/// <inheritdoc cref="IChannelCallbacks{TEvent,TResponse}.PublishToOutboundChannelFailureCallback"/>
public Action? PublishToOutboundChannelFailureCallback { get; set; }
}

}
Loading

0 comments on commit c073cb9

Please sign in to comment.