Skip to content

Commit

Permalink
feat: refactoring adaptive throttling options
Browse files Browse the repository at this point in the history
  • Loading branch information
kulbachnyi.v committed Dec 18, 2023
1 parent 80b4c5a commit 0c69a04
Showing 4 changed files with 172 additions and 106 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Vostok.Clusterclient.Core.Modules;
using System.Collections.Generic;
using Vostok.Clusterclient.Core.Model;
using Vostok.Clusterclient.Core.Modules;

namespace Vostok.Clusterclient.Core
{
@@ -9,10 +11,10 @@ public static partial class IClusterClientConfigurationExtensions
/// </summary>
/// <param name="configuration">A configuration to be modified.</param>
/// <param name="storageKey">See <see cref="AdaptiveThrottlingOptions.StorageKey"/>.</param>
/// <param name="minutesToTrack">See <see cref="AdaptiveThrottlingOptions.MinutesToTrack"/>.</param>
/// <param name="minimumRequests">See <see cref="AdaptiveThrottlingOptions.MinimumRequests"/>.</param>
/// <param name="criticalRatio">See <see cref="AdaptiveThrottlingOptions.CriticalRatio"/>.</param>
/// <param name="maximumRejectProbability">See <see cref="AdaptiveThrottlingOptions.MaximumRejectProbability"/>.</param>
/// <param name="minutesToTrack">See <see cref="AdaptiveThrottlingParameters.MinutesToTrack"/>.</param>
/// <param name="minimumRequests">See <see cref="AdaptiveThrottlingParameters.MinimumRequests"/>.</param>
/// <param name="criticalRatio">See <see cref="AdaptiveThrottlingParameters.CriticalRatio"/>.</param>
/// <param name="maximumRejectProbability">See <see cref="AdaptiveThrottlingParameters.MaximumRejectProbability"/>.</param>
public static void SetupAdaptiveThrottling(
this IClusterClientConfiguration configuration,
string storageKey,
@@ -30,38 +32,16 @@ public static void SetupAdaptiveThrottling(

configuration.AddRequestModule(new AdaptiveThrottlingModule(options), typeof(AbsoluteUrlSenderModule));
}

/// <summary>
/// Sets up an adaptive client throttling mechanism with given parameters.
/// </summary>
/// <param name="configuration">A configuration to be modified.</param>
/// <param name="storageKey">See <see cref="AdaptiveThrottlingOptions.StorageKey"/>.</param>
/// <param name="criticalOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
/// <param name="ordinaryOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
/// <param name="sheddableOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
public static void SetupAdaptiveThrottling(
this IClusterClientConfiguration configuration,
string storageKey,
AdaptiveThrottlingOptions criticalOptions,
AdaptiveThrottlingOptions ordinaryOptions,
AdaptiveThrottlingOptions sheddableOptions)
{
var defaultOptions = new AdaptiveThrottlingOptions(storageKey);
criticalOptions ??= defaultOptions;
ordinaryOptions ??= defaultOptions;
sheddableOptions ??= defaultOptions;
configuration.AddRequestModule(new AdaptiveThrottlingModule(storageKey, criticalOptions, ordinaryOptions, sheddableOptions), typeof(AbsoluteUrlSenderModule));
}

/// <summary>
/// <para>Sets up an adaptive client throttling mechanism with given parameters using <see cref="IClusterClientConfiguration.TargetServiceName"/> and <see cref="IClusterClientConfiguration.TargetEnvironment"/> as a storage key.</para>
/// <para> <b>N.B.</b> Ensure that <see cref="IClusterClientConfiguration.TargetServiceName"/> and <see cref="IClusterClientConfiguration.TargetEnvironment"/> is set before calling this method.</para>
/// </summary>
/// <param name="configuration">A configuration to be modified.</param>
/// <param name="minutesToTrack">See <see cref="AdaptiveThrottlingOptions.MinutesToTrack"/>.</param>
/// <param name="minimumRequests">See <see cref="AdaptiveThrottlingOptions.MinimumRequests"/>.</param>
/// <param name="criticalRatio">See <see cref="AdaptiveThrottlingOptions.CriticalRatio"/>.</param>
/// <param name="maximumRejectProbability">See <see cref="AdaptiveThrottlingOptions.MaximumRejectProbability"/>.</param>
/// <param name="minutesToTrack">See <see cref="AdaptiveThrottlingParameters.MinutesToTrack"/>.</param>
/// <param name="minimumRequests">See <see cref="AdaptiveThrottlingParameters.MinimumRequests"/>.</param>
/// <param name="criticalRatio">See <see cref="AdaptiveThrottlingParameters.CriticalRatio"/>.</param>
/// <param name="maximumRejectProbability">See <see cref="AdaptiveThrottlingParameters.MaximumRejectProbability"/>.</param>
public static void SetupAdaptiveThrottling(
this IClusterClientConfiguration configuration,
int minutesToTrack = ClusterClientDefaults.AdaptiveThrottlingMinutesToTrack,
@@ -73,25 +53,34 @@ public static void SetupAdaptiveThrottling(

SetupAdaptiveThrottling(configuration, storageKey, minutesToTrack, minimumRequests, criticalRatio, maximumRejectProbability);
}

/// <summary>
/// <para>Sets up an adaptive client throttling mechanism with given parameters using <see cref="IClusterClientConfiguration.TargetServiceName"/> and <see cref="IClusterClientConfiguration.TargetEnvironment"/> as a storage key.</para>
/// <para> <b>N.B.</b> Ensure that <see cref="IClusterClientConfiguration.TargetServiceName"/> and <see cref="IClusterClientConfiguration.TargetEnvironment"/> is set before calling this method.</para>
/// Configures default settings by request priority for adaptive client throttling mechanism.
/// </summary>
public static AdaptiveThrottlingOptions ConfigureAdaptiveThrottlingOptions(string storageKey, AdaptiveThrottlingParameters defaultParameters)
{
defaultParameters ??= new AdaptiveThrottlingParameters();

var parameters = new Dictionary<RequestPriority, AdaptiveThrottlingParameters>
{
[RequestPriority.Critical] = defaultParameters,
[RequestPriority.Ordinary] = defaultParameters,
[RequestPriority.Sheddable] = defaultParameters
};

return new AdaptiveThrottlingOptions(storageKey, parameters);
}

/// <summary>
/// Sets up an adaptive client throttling mechanism with given options.
/// </summary>
/// <param name="configuration">A configuration to be modified.</param>
/// <param name="storageKey">See <see cref="AdaptiveThrottlingOptions.StorageKey"/>.</param>
/// <param name="criticalOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
/// <param name="ordinaryOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
/// <param name="sheddableOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
/// <param name="options">See <see cref="AdaptiveThrottlingOptions"/> </param>
public static void SetupAdaptiveThrottling(
this IClusterClientConfiguration configuration,
AdaptiveThrottlingOptions criticalOptions,
AdaptiveThrottlingOptions ordinaryOptions,
AdaptiveThrottlingOptions sheddableOptions)
AdaptiveThrottlingOptions options)
{
var storageKey = GenerateStorageKey(configuration);

SetupAdaptiveThrottling(configuration, storageKey, criticalOptions, ordinaryOptions, sheddableOptions);
configuration.AddRequestModule(new AdaptiveThrottlingModule(options), typeof(AbsoluteUrlSenderModule));
}
}
}
78 changes: 28 additions & 50 deletions Vostok.ClusterClient.Core/Modules/AdaptiveThrottlingModule.cs
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Vostok.Clusterclient.Core.Misc;
using Vostok.Clusterclient.Core.Model;
using Vostok.Commons.Threading;
using Vostok.Logging.Abstractions;
@@ -16,67 +17,37 @@ namespace Vostok.Clusterclient.Core.Modules
/// </summary>
internal class AdaptiveThrottlingModule : IRequestModule
{
private static readonly RequestPriority DefaultPriority = RequestPriority.Sheddable;
private static readonly ConcurrentDictionary<string, CountersByPriority> Counters = new();
private static readonly Stopwatch Watch = Stopwatch.StartNew();

private readonly Func<string, CountersByPriority> counterFactory;

public AdaptiveThrottlingModule(AdaptiveThrottlingOptions options)
: this(options.StorageKey, options, options, options)
{
}

public AdaptiveThrottlingModule(
string storageKey,
AdaptiveThrottlingOptions criticalOptions,
AdaptiveThrottlingOptions ordinaryOptions,
AdaptiveThrottlingOptions sheddableOptions)
{
StorageKey = storageKey;
Options = new Dictionary<RequestPriority, AdaptiveThrottlingOptions>
{
{RequestPriority.Critical, criticalOptions},
{RequestPriority.Ordinary, ordinaryOptions},
{RequestPriority.Sheddable, sheddableOptions}
};
counterFactory = _ => new CountersByPriority(
criticalOptions.MinutesToTrack,
ordinaryOptions.MinutesToTrack,
sheddableOptions.MinutesToTrack
);
}

public AdaptiveThrottlingModule(Dictionary<RequestPriority, AdaptiveThrottlingOptions> options)
{
this.Options = options;
counterFactory = _ => new CountersByPriority(
options[RequestPriority.Critical].MinutesToTrack,
options[RequestPriority.Ordinary].MinutesToTrack,
options[RequestPriority.Sheddable].MinutesToTrack
);
StorageKey = options.StorageKey;
counterFactory = _ => new CountersByPriority(options.Parameters);
}

public static void ClearCache()
{
Counters.Clear();
}

public Dictionary<RequestPriority, AdaptiveThrottlingOptions> Options { get; }

public int Requests(RequestPriority? priority) => GetCounter(priority).GetMetrics().Requests;

public int Accepts(RequestPriority? priority) => GetCounter(priority).GetMetrics().Accepts;

public double Ratio(RequestPriority? priority) => ComputeRatio(GetCounter(priority).GetMetrics());

public double RejectionProbability(RequestPriority? priority) => ComputeRejectionProbability(GetCounter(priority).GetMetrics(), GetOptions(priority));
public double RejectionProbability(RequestPriority? priority) => ComputeRejectionProbability(GetCounter(priority).GetMetrics(), GetCounter(priority).Parameters);

public string StorageKey { get; }

public async Task<ClusterResult> ExecuteAsync(IRequestContext context, Func<IRequestContext, Task<ClusterResult>> next)
{
var counter = GetCounter(context.Parameters.Priority);
var options = GetOptions(context.Parameters.Priority);
var options = counter.Parameters;
counter.BeginRequest();

ClusterResult result;
@@ -119,7 +90,7 @@ public async Task<ClusterResult> ExecuteAsync(IRequestContext context, Func<IReq
private static double ComputeRatio(CounterMetrics metrics) =>
1.0 * metrics.Requests / Math.Max(1.0, metrics.Accepts);

private static double ComputeRejectionProbability(CounterMetrics metrics, AdaptiveThrottlingOptions options)
private static double ComputeRejectionProbability(CounterMetrics metrics, AdaptiveThrottlingParameters options)
{
var probability = 1.0 * (metrics.Requests - options.CriticalRatio * metrics.Accepts) / (metrics.Requests + 1);

@@ -137,7 +108,7 @@ private static void UpdateCounter(Counter counter, ClusterResult result)

private Counter GetCounter(RequestPriority? priority)
{
priority ??= RequestPriority.Sheddable;
priority ??= DefaultPriority;
var counters = Counters.GetOrAdd(StorageKey, counterFactory);
return priority switch
{
@@ -148,22 +119,24 @@ private Counter GetCounter(RequestPriority? priority)
};
}

private AdaptiveThrottlingOptions GetOptions(RequestPriority? priority) => Options[priority ?? RequestPriority.Sheddable];

#region CountersByPriority

private class CountersByPriority
{
public CountersByPriority(int criticalBuckets, int ordinaryBuckets, int sheddableBuckets)
private readonly Dictionary<RequestPriority, Counter> requestCounters;

public CountersByPriority(Dictionary<RequestPriority, AdaptiveThrottlingParameters> options)
{
CriticalRequestCounter = new Counter(criticalBuckets);
OrdinaryRequestCounter = new Counter(ordinaryBuckets);
SheddableRequestCounter = new Counter(sheddableBuckets);
requestCounters = new Dictionary<RequestPriority, Counter>();
foreach (var (priority, parameters) in options)
{
requestCounters[priority] = new Counter(parameters);
}
}

public Counter CriticalRequestCounter { get; }
public Counter OrdinaryRequestCounter { get; }
public Counter SheddableRequestCounter { get; }
public Counter CriticalRequestCounter => requestCounters[RequestPriority.Critical];
public Counter OrdinaryRequestCounter => requestCounters[RequestPriority.Ordinary];
public Counter SheddableRequestCounter => requestCounters[RequestPriority.Sheddable];
}

#endregion
@@ -193,14 +166,19 @@ private class Counter
private readonly CounterBucket[] buckets;
private int pendingRequests;

public Counter(int buckets)
public Counter(AdaptiveThrottlingParameters parameters)
{
this.buckets = new CounterBucket[buckets];
Parameters = parameters;

var bucketsNumber = parameters.MinutesToTrack;
buckets = new CounterBucket[bucketsNumber];

for (var i = 0; i < buckets; i++)
this.buckets[i] = new CounterBucket();
for (var i = 0; i < bucketsNumber; i++)
buckets[i] = new CounterBucket();
}

public AdaptiveThrottlingParameters Parameters { get; }

public CounterMetrics GetMetrics()
{
var metrics = new CounterMetrics();
Loading

0 comments on commit 0c69a04

Please sign in to comment.