Skip to content

Commit

Permalink
feat: added settings by request priority
Browse files Browse the repository at this point in the history
  • Loading branch information
kulbachnyi.v committed Dec 6, 2023
1 parent 5565035 commit 80b4c5a
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Vostok.ClusterClient.Core.Tests/ClusterClient_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void Should_set_different_keys_to_adaptive_throttling_storage()
var secondThrottling = (AdaptiveThrottlingModule)secondConfiguration.Modules.First()
.Value.Before[0];

firstThrottling.Options.StorageKey.Should().NotBe(secondThrottling.Options.StorageKey);
firstThrottling.StorageKey.Should().NotBe(secondThrottling.StorageKey);
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,28 @@ public static void SetupAdaptiveThrottling(

configuration.AddRequestModule(new AdaptiveThrottlingModule(options), typeof(AbsoluteUrlSenderModule));
}

/// <summary>
/// Sets up an adaptive client throttling mechanism with given parameters.
/// </summary>
/// <param name="configuration">A configuration to be modified.</param>
/// <param name="storageKey">See <see cref="AdaptiveThrottlingOptions.StorageKey"/>.</param>
/// <param name="criticalOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
/// <param name="ordinaryOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
/// <param name="sheddableOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
public static void SetupAdaptiveThrottling(
this IClusterClientConfiguration configuration,
string storageKey,
AdaptiveThrottlingOptions criticalOptions,
AdaptiveThrottlingOptions ordinaryOptions,
AdaptiveThrottlingOptions sheddableOptions)
{
var defaultOptions = new AdaptiveThrottlingOptions(storageKey);
criticalOptions ??= defaultOptions;
ordinaryOptions ??= defaultOptions;
sheddableOptions ??= defaultOptions;
configuration.AddRequestModule(new AdaptiveThrottlingModule(storageKey, criticalOptions, ordinaryOptions, sheddableOptions), typeof(AbsoluteUrlSenderModule));
}

/// <summary>
/// <para>Sets up an adaptive client throttling mechanism with given parameters using <see cref="IClusterClientConfiguration.TargetServiceName"/> and <see cref="IClusterClientConfiguration.TargetEnvironment"/> as a storage key.</para>
Expand All @@ -51,5 +73,25 @@ public static void SetupAdaptiveThrottling(

SetupAdaptiveThrottling(configuration, storageKey, minutesToTrack, minimumRequests, criticalRatio, maximumRejectProbability);
}

/// <summary>
/// <para>Sets up an adaptive client throttling mechanism with given parameters using <see cref="IClusterClientConfiguration.TargetServiceName"/> and <see cref="IClusterClientConfiguration.TargetEnvironment"/> as a storage key.</para>
/// <para> <b>N.B.</b> Ensure that <see cref="IClusterClientConfiguration.TargetServiceName"/> and <see cref="IClusterClientConfiguration.TargetEnvironment"/> is set before calling this method.</para>
/// </summary>
/// <param name="configuration">A configuration to be modified.</param>
/// <param name="storageKey">See <see cref="AdaptiveThrottlingOptions.StorageKey"/>.</param>

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (ubuntu-latest, nuget)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (ubuntu-latest, nuget)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (ubuntu-latest, nuget)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (ubuntu-latest, cement)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (ubuntu-latest, cement)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (ubuntu-latest, cement)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (macos-12, nuget)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (macos-12, nuget)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (macos-12, nuget)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (macos-12, cement)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (macos-12, cement)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (macos-12, cement)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (windows-latest, cement)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (windows-latest, cement)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (windows-latest, cement)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (windows-latest, nuget)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (windows-latest, nuget)

XML comment has a param tag for 'storageKey', but there is no parameter by that name

Check warning on line 82 in Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs

View workflow job for this annotation

GitHub Actions / build / build (windows-latest, nuget)

XML comment has a param tag for 'storageKey', but there is no parameter by that name
/// <param name="criticalOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
/// <param name="ordinaryOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
/// <param name="sheddableOptions">See <see cref="AdaptiveThrottlingOptions"/>.</param>
public static void SetupAdaptiveThrottling(
this IClusterClientConfiguration configuration,
AdaptiveThrottlingOptions criticalOptions,
AdaptiveThrottlingOptions ordinaryOptions,
AdaptiveThrottlingOptions sheddableOptions)
{
var storageKey = GenerateStorageKey(configuration);

SetupAdaptiveThrottling(configuration, storageKey, criticalOptions, ordinaryOptions, sheddableOptions);
}
}
}
72 changes: 51 additions & 21 deletions Vostok.ClusterClient.Core/Modules/AdaptiveThrottlingModule.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
Expand All @@ -21,30 +22,61 @@ internal class AdaptiveThrottlingModule : IRequestModule
private readonly Func<string, CountersByPriority> counterFactory;

public AdaptiveThrottlingModule(AdaptiveThrottlingOptions options)
: this(options.StorageKey, options, options, options)
{
}

public AdaptiveThrottlingModule(
string storageKey,
AdaptiveThrottlingOptions criticalOptions,
AdaptiveThrottlingOptions ordinaryOptions,
AdaptiveThrottlingOptions sheddableOptions)
{
StorageKey = storageKey;
Options = new Dictionary<RequestPriority, AdaptiveThrottlingOptions>
{
{RequestPriority.Critical, criticalOptions},
{RequestPriority.Ordinary, ordinaryOptions},
{RequestPriority.Sheddable, sheddableOptions}
};
counterFactory = _ => new CountersByPriority(
criticalOptions.MinutesToTrack,
ordinaryOptions.MinutesToTrack,
sheddableOptions.MinutesToTrack
);
}

public AdaptiveThrottlingModule(Dictionary<RequestPriority, AdaptiveThrottlingOptions> options)
{
this.Options = options;
counterFactory = _ => new CountersByPriority(options.MinutesToTrack);
counterFactory = _ => new CountersByPriority(
options[RequestPriority.Critical].MinutesToTrack,
options[RequestPriority.Ordinary].MinutesToTrack,
options[RequestPriority.Sheddable].MinutesToTrack
);
}

public static void ClearCache()
{
Counters.Clear();
}

public AdaptiveThrottlingOptions Options { get; }
public Dictionary<RequestPriority, AdaptiveThrottlingOptions> Options { get; }

public int Requests(RequestPriority? priority) => GetCounter(priority).GetMetrics().Requests;

public int Accepts(RequestPriority? priority) => GetCounter(priority).GetMetrics().Accepts;

public double Ratio(RequestPriority? priority) => ComputeRatio(GetCounter(priority).GetMetrics());

public double RejectionProbability(RequestPriority? priority) => ComputeRejectionProbability(GetCounter(priority).GetMetrics(), Options);
public double RejectionProbability(RequestPriority? priority) => ComputeRejectionProbability(GetCounter(priority).GetMetrics(), GetOptions(priority));

public string StorageKey { get; }

public async Task<ClusterResult> ExecuteAsync(IRequestContext context, Func<IRequestContext, Task<ClusterResult>> next)
{
var counter = GetCounter(context.Parameters.Priority);

var options = GetOptions(context.Parameters.Priority);
counter.BeginRequest();

ClusterResult result;
Expand All @@ -57,9 +89,9 @@ public async Task<ClusterResult> ExecuteAsync(IRequestContext context, Func<IReq
double rejectionProbability;

var metrics = counter.GetMetrics();
if (metrics.Requests >= Options.MinimumRequests &&
(ratio = ComputeRatio(metrics)) >= Options.CriticalRatio &&
(rejectionProbability = ComputeRejectionProbability(metrics, Options)) > ThreadSafeRandom.NextDouble())
if (metrics.Requests >= options.MinimumRequests &&
(ratio = ComputeRatio(metrics)) >= options.CriticalRatio &&
(rejectionProbability = ComputeRejectionProbability(metrics, options)) > ThreadSafeRandom.NextDouble())
{
LogThrottledRequest(context, ratio, rejectionProbability);

Expand Down Expand Up @@ -106,7 +138,7 @@ private static void UpdateCounter(Counter counter, ClusterResult result)
private Counter GetCounter(RequestPriority? priority)
{
priority ??= RequestPriority.Sheddable;
var counters = Counters.GetOrAdd(Options.StorageKey, counterFactory);
var counters = Counters.GetOrAdd(StorageKey, counterFactory);
return priority switch
{
RequestPriority.Critical => counters.CriticalRequestCounter,
Expand All @@ -116,28 +148,26 @@ private Counter GetCounter(RequestPriority? priority)
};
}

private AdaptiveThrottlingOptions GetOptions(RequestPriority? priority) => Options[priority ?? RequestPriority.Sheddable];

#region CountersByPriority

private class CountersByPriority
{
private readonly Counter criticalRequestCounter;
private readonly Counter ordinaryRequestCounter;
private readonly Counter sheddableRequestCounter;

public CountersByPriority(int buckets)
public CountersByPriority(int criticalBuckets, int ordinaryBuckets, int sheddableBuckets)
{
criticalRequestCounter = new Counter(buckets);
ordinaryRequestCounter = new Counter(buckets);
sheddableRequestCounter = new Counter(buckets);
CriticalRequestCounter = new Counter(criticalBuckets);
OrdinaryRequestCounter = new Counter(ordinaryBuckets);
SheddableRequestCounter = new Counter(sheddableBuckets);
}

public Counter CriticalRequestCounter => criticalRequestCounter;
public Counter OrdinaryRequestCounter => ordinaryRequestCounter;
public Counter SheddableRequestCounter => sheddableRequestCounter;
public Counter CriticalRequestCounter { get; }
public Counter OrdinaryRequestCounter { get; }
public Counter SheddableRequestCounter { get; }
}

#endregion

#region Logging

private void LogThrottledRequest(IRequestContext context, double ratio, double rejectionProbability) =>
Expand Down Expand Up @@ -198,7 +228,7 @@ public CounterMetrics GetMetrics()

public void AddAccept() => Interlocked.Increment(ref ObtainBucket().Accepts);

private static int GetCurrentMinute() => (int) Math.Floor(Watch.Elapsed.TotalMinutes);
private static int GetCurrentMinute() => (int)Math.Floor(Watch.Elapsed.TotalMinutes);

private CounterBucket ObtainBucket()
{
Expand Down
2 changes: 2 additions & 0 deletions Vostok.ClusterClient.Core/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
static Vostok.Clusterclient.Core.IClusterClientConfigurationExtensions.SetupAdaptiveThrottling(this Vostok.Clusterclient.Core.IClusterClientConfiguration configuration, string storageKey, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions criticalOptions, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions ordinaryOptions, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions sheddableOptions) -> void
static Vostok.Clusterclient.Core.IClusterClientConfigurationExtensions.SetupAdaptiveThrottling(this Vostok.Clusterclient.Core.IClusterClientConfiguration configuration, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions criticalOptions, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions ordinaryOptions, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions sheddableOptions) -> void

0 comments on commit 80b4c5a

Please sign in to comment.