From 80b4c5a51cf170d83d63844e22d2221e21fe3881 Mon Sep 17 00:00:00 2001 From: "kulbachnyi.v" Date: Wed, 6 Dec 2023 09:29:43 +0500 Subject: [PATCH] feat: added settings by request priority --- .../ClusterClient_Tests.cs | 2 +- ...lientConfigurationExtensions_Throttling.cs | 42 +++++++++++ .../Modules/AdaptiveThrottlingModule.cs | 72 +++++++++++++------ .../PublicAPI.Unshipped.txt | 2 + 4 files changed, 96 insertions(+), 22 deletions(-) diff --git a/Vostok.ClusterClient.Core.Tests/ClusterClient_Tests.cs b/Vostok.ClusterClient.Core.Tests/ClusterClient_Tests.cs index 1fb5aac2..cd4d5abb 100644 --- a/Vostok.ClusterClient.Core.Tests/ClusterClient_Tests.cs +++ b/Vostok.ClusterClient.Core.Tests/ClusterClient_Tests.cs @@ -162,7 +162,7 @@ public void Should_set_different_keys_to_adaptive_throttling_storage() var secondThrottling = (AdaptiveThrottlingModule)secondConfiguration.Modules.First() .Value.Before[0]; - firstThrottling.Options.StorageKey.Should().NotBe(secondThrottling.Options.StorageKey); + firstThrottling.StorageKey.Should().NotBe(secondThrottling.StorageKey); } [Test] diff --git a/Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs b/Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs index b92296c2..8fcd8291 100644 --- a/Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs +++ b/Vostok.ClusterClient.Core/IClusterClientConfigurationExtensions_Throttling.cs @@ -30,6 +30,28 @@ public static void SetupAdaptiveThrottling( configuration.AddRequestModule(new AdaptiveThrottlingModule(options), typeof(AbsoluteUrlSenderModule)); } + + /// + /// Sets up an adaptive client throttling mechanism with given parameters. + /// + /// A configuration to be modified. + /// See . + /// See . + /// See . + /// See . + public static void SetupAdaptiveThrottling( + this IClusterClientConfiguration configuration, + string storageKey, + AdaptiveThrottlingOptions criticalOptions, + AdaptiveThrottlingOptions ordinaryOptions, + AdaptiveThrottlingOptions sheddableOptions) + { + var defaultOptions = new AdaptiveThrottlingOptions(storageKey); + criticalOptions ??= defaultOptions; + ordinaryOptions ??= defaultOptions; + sheddableOptions ??= defaultOptions; + configuration.AddRequestModule(new AdaptiveThrottlingModule(storageKey, criticalOptions, ordinaryOptions, sheddableOptions), typeof(AbsoluteUrlSenderModule)); + } /// /// Sets up an adaptive client throttling mechanism with given parameters using and as a storage key. @@ -51,5 +73,25 @@ public static void SetupAdaptiveThrottling( SetupAdaptiveThrottling(configuration, storageKey, minutesToTrack, minimumRequests, criticalRatio, maximumRejectProbability); } + + /// + /// Sets up an adaptive client throttling mechanism with given parameters using and as a storage key. + /// N.B. Ensure that and is set before calling this method. + /// + /// A configuration to be modified. + /// See . + /// See . + /// See . + /// See . + public static void SetupAdaptiveThrottling( + this IClusterClientConfiguration configuration, + AdaptiveThrottlingOptions criticalOptions, + AdaptiveThrottlingOptions ordinaryOptions, + AdaptiveThrottlingOptions sheddableOptions) + { + var storageKey = GenerateStorageKey(configuration); + + SetupAdaptiveThrottling(configuration, storageKey, criticalOptions, ordinaryOptions, sheddableOptions); + } } } \ No newline at end of file diff --git a/Vostok.ClusterClient.Core/Modules/AdaptiveThrottlingModule.cs b/Vostok.ClusterClient.Core/Modules/AdaptiveThrottlingModule.cs index 97010ca1..3b7f8224 100644 --- a/Vostok.ClusterClient.Core/Modules/AdaptiveThrottlingModule.cs +++ b/Vostok.ClusterClient.Core/Modules/AdaptiveThrottlingModule.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; @@ -21,9 +22,38 @@ internal class AdaptiveThrottlingModule : IRequestModule private readonly Func counterFactory; public AdaptiveThrottlingModule(AdaptiveThrottlingOptions options) + : this(options.StorageKey, options, options, options) + { + } + + public AdaptiveThrottlingModule( + string storageKey, + AdaptiveThrottlingOptions criticalOptions, + AdaptiveThrottlingOptions ordinaryOptions, + AdaptiveThrottlingOptions sheddableOptions) + { + StorageKey = storageKey; + Options = new Dictionary + { + {RequestPriority.Critical, criticalOptions}, + {RequestPriority.Ordinary, ordinaryOptions}, + {RequestPriority.Sheddable, sheddableOptions} + }; + counterFactory = _ => new CountersByPriority( + criticalOptions.MinutesToTrack, + ordinaryOptions.MinutesToTrack, + sheddableOptions.MinutesToTrack + ); + } + + public AdaptiveThrottlingModule(Dictionary options) { this.Options = options; - counterFactory = _ => new CountersByPriority(options.MinutesToTrack); + counterFactory = _ => new CountersByPriority( + options[RequestPriority.Critical].MinutesToTrack, + options[RequestPriority.Ordinary].MinutesToTrack, + options[RequestPriority.Sheddable].MinutesToTrack + ); } public static void ClearCache() @@ -31,7 +61,7 @@ public static void ClearCache() Counters.Clear(); } - public AdaptiveThrottlingOptions Options { get; } + public Dictionary Options { get; } public int Requests(RequestPriority? priority) => GetCounter(priority).GetMetrics().Requests; @@ -39,12 +69,14 @@ public static void ClearCache() public double Ratio(RequestPriority? priority) => ComputeRatio(GetCounter(priority).GetMetrics()); - public double RejectionProbability(RequestPriority? priority) => ComputeRejectionProbability(GetCounter(priority).GetMetrics(), Options); + public double RejectionProbability(RequestPriority? priority) => ComputeRejectionProbability(GetCounter(priority).GetMetrics(), GetOptions(priority)); + + public string StorageKey { get; } public async Task ExecuteAsync(IRequestContext context, Func> next) { var counter = GetCounter(context.Parameters.Priority); - + var options = GetOptions(context.Parameters.Priority); counter.BeginRequest(); ClusterResult result; @@ -57,9 +89,9 @@ public async Task ExecuteAsync(IRequestContext context, Func= Options.MinimumRequests && - (ratio = ComputeRatio(metrics)) >= Options.CriticalRatio && - (rejectionProbability = ComputeRejectionProbability(metrics, Options)) > ThreadSafeRandom.NextDouble()) + if (metrics.Requests >= options.MinimumRequests && + (ratio = ComputeRatio(metrics)) >= options.CriticalRatio && + (rejectionProbability = ComputeRejectionProbability(metrics, options)) > ThreadSafeRandom.NextDouble()) { LogThrottledRequest(context, ratio, rejectionProbability); @@ -106,7 +138,7 @@ private static void UpdateCounter(Counter counter, ClusterResult result) private Counter GetCounter(RequestPriority? priority) { priority ??= RequestPriority.Sheddable; - var counters = Counters.GetOrAdd(Options.StorageKey, counterFactory); + var counters = Counters.GetOrAdd(StorageKey, counterFactory); return priority switch { RequestPriority.Critical => counters.CriticalRequestCounter, @@ -116,28 +148,26 @@ private Counter GetCounter(RequestPriority? priority) }; } + private AdaptiveThrottlingOptions GetOptions(RequestPriority? priority) => Options[priority ?? RequestPriority.Sheddable]; + #region CountersByPriority private class CountersByPriority { - private readonly Counter criticalRequestCounter; - private readonly Counter ordinaryRequestCounter; - private readonly Counter sheddableRequestCounter; - - public CountersByPriority(int buckets) + public CountersByPriority(int criticalBuckets, int ordinaryBuckets, int sheddableBuckets) { - criticalRequestCounter = new Counter(buckets); - ordinaryRequestCounter = new Counter(buckets); - sheddableRequestCounter = new Counter(buckets); + CriticalRequestCounter = new Counter(criticalBuckets); + OrdinaryRequestCounter = new Counter(ordinaryBuckets); + SheddableRequestCounter = new Counter(sheddableBuckets); } - public Counter CriticalRequestCounter => criticalRequestCounter; - public Counter OrdinaryRequestCounter => ordinaryRequestCounter; - public Counter SheddableRequestCounter => sheddableRequestCounter; + public Counter CriticalRequestCounter { get; } + public Counter OrdinaryRequestCounter { get; } + public Counter SheddableRequestCounter { get; } } #endregion - + #region Logging private void LogThrottledRequest(IRequestContext context, double ratio, double rejectionProbability) => @@ -198,7 +228,7 @@ public CounterMetrics GetMetrics() public void AddAccept() => Interlocked.Increment(ref ObtainBucket().Accepts); - private static int GetCurrentMinute() => (int) Math.Floor(Watch.Elapsed.TotalMinutes); + private static int GetCurrentMinute() => (int)Math.Floor(Watch.Elapsed.TotalMinutes); private CounterBucket ObtainBucket() { diff --git a/Vostok.ClusterClient.Core/PublicAPI.Unshipped.txt b/Vostok.ClusterClient.Core/PublicAPI.Unshipped.txt index e69de29b..573ed589 100644 --- a/Vostok.ClusterClient.Core/PublicAPI.Unshipped.txt +++ b/Vostok.ClusterClient.Core/PublicAPI.Unshipped.txt @@ -0,0 +1,2 @@ +static Vostok.Clusterclient.Core.IClusterClientConfigurationExtensions.SetupAdaptiveThrottling(this Vostok.Clusterclient.Core.IClusterClientConfiguration configuration, string storageKey, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions criticalOptions, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions ordinaryOptions, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions sheddableOptions) -> void +static Vostok.Clusterclient.Core.IClusterClientConfigurationExtensions.SetupAdaptiveThrottling(this Vostok.Clusterclient.Core.IClusterClientConfiguration configuration, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions criticalOptions, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions ordinaryOptions, Vostok.Clusterclient.Core.Modules.AdaptiveThrottlingOptions sheddableOptions) -> void \ No newline at end of file