diff --git a/src/Orleans.Core/Messaging/ClientMessageCenter.cs b/src/Orleans.Core/Messaging/ClientMessageCenter.cs index 62ee871b2e..1bcba14ccc 100644 --- a/src/Orleans.Core/Messaging/ClientMessageCenter.cs +++ b/src/Orleans.Core/Messaging/ClientMessageCenter.cs @@ -99,16 +99,14 @@ public async Task StartAsync(CancellationToken cancellationToken) private async Task EstablishInitialConnection(CancellationToken cancellationToken) { - var cancellationTask = cancellationToken.WhenCancelled(); var liveGateways = gatewayManager.GetLiveGateways(); if (liveGateways.Count == 0) { - throw new ConnectionFailedException("There are no available gateways"); + throw new ConnectionFailedException("There are no available gateways."); } - var pendingTasks = new List(liveGateways.Count + 1); - pendingTasks.Add(cancellationTask); + var pendingTasks = new List(liveGateways.Count); foreach (var gateway in liveGateways) { pendingTasks.Add(connectionManager.GetConnection(gateway).AsTask()); @@ -116,14 +114,11 @@ private async Task EstablishInitialConnection(CancellationToken cancellationToke try { - // There will always be one task to represent cancellation. - while (pendingTasks.Count > 1) + while (pendingTasks.Count > 0) { - var completedTask = await Task.WhenAny(pendingTasks); + var completedTask = await Task.WhenAny(pendingTasks).WaitAsync(cancellationToken); pendingTasks.Remove(completedTask); - cancellationToken.ThrowIfCancellationRequested(); - // If at least one gateway connection has been established, break out of the loop and continue startup. if (completedTask.IsCompletedSuccessfully) { @@ -131,10 +126,14 @@ private async Task EstablishInitialConnection(CancellationToken cancellationToke } // If there are no more gateways, observe the most recent exception and bail out. - if (pendingTasks.Count == 1) + if (pendingTasks.Count == 0) { await completedTask; } + else + { + completedTask.Ignore(); + } } } catch (Exception exception) diff --git a/src/Orleans.Core/Networking/ConnectionManager.cs b/src/Orleans.Core/Networking/ConnectionManager.cs index 891c2d5e72..81b728ab74 100644 --- a/src/Orleans.Core/Networking/ConnectionManager.cs +++ b/src/Orleans.Core/Networking/ConnectionManager.cs @@ -298,7 +298,7 @@ public async Task Close(CancellationToken ct) if (closeTasks.Count > 0) { - await Task.WhenAny(Task.WhenAll(closeTasks), ct.WhenCancelled()); + await Task.WhenAll(closeTasks).WaitAsync(ct).SuppressThrowing(); if (ct.IsCancellationRequested) break; } else if (!pendingConnections) break; diff --git a/src/Orleans.Core/Runtime/AsyncEnumerableGrainExtension.cs b/src/Orleans.Core/Runtime/AsyncEnumerableGrainExtension.cs index 45569d03cb..71217375c6 100644 --- a/src/Orleans.Core/Runtime/AsyncEnumerableGrainExtension.cs +++ b/src/Orleans.Core/Runtime/AsyncEnumerableGrainExtension.cs @@ -215,8 +215,8 @@ private async ValueTask RemoveExpiredAsync(CancellationToken cancellationToken) using var cancellation = new CancellationTokenSource(_messagingOptions.ResponseTimeout / 2); // Wait for either the MoveNextAsync task to complete or the cancellation token to be cancelled. - var completedTask = await Task.WhenAny(moveNextTask, cancellation.Token.WhenCancelled()); - if (completedTask == moveNextTask) + await moveNextTask.WaitAsync(cancellation.Token).SuppressThrowing(); + if (moveNextTask is {IsCompletedSuccessfully: true }) { OnMoveNext(requestId); var hasValue = moveNextTask.GetAwaiter().GetResult(); diff --git a/src/Orleans.Runtime/Catalog/ActivationWorkingSet.cs b/src/Orleans.Runtime/Catalog/ActivationWorkingSet.cs index b232c58bb0..d6e274bf63 100644 --- a/src/Orleans.Runtime/Catalog/ActivationWorkingSet.cs +++ b/src/Orleans.Runtime/Catalog/ActivationWorkingSet.cs @@ -168,7 +168,7 @@ void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) _scanPeriodTimer.Dispose(); if (_runTask is Task task) { - await Task.WhenAny(task, ct.WhenCancelled()); + await task.WaitAsync(ct).SuppressThrowing(); } }); } diff --git a/src/Orleans.Runtime/Catalog/IncomingRequestMonitor.cs b/src/Orleans.Runtime/Catalog/IncomingRequestMonitor.cs index b82ba25440..eca6ee18c4 100644 --- a/src/Orleans.Runtime/Catalog/IncomingRequestMonitor.cs +++ b/src/Orleans.Runtime/Catalog/IncomingRequestMonitor.cs @@ -79,7 +79,10 @@ void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) async ct => { _scanPeriodTimer.Dispose(); - if (_runTask is Task task) await Task.WhenAny(task, ct.WhenCancelled()); + if (_runTask is Task task) + { + await task.WaitAsync(ct).SuppressThrowing(); + } }); } diff --git a/src/Orleans.Runtime/Core/HostedClient.cs b/src/Orleans.Runtime/Core/HostedClient.cs index 04faef384c..273610ecce 100644 --- a/src/Orleans.Runtime/Core/HostedClient.cs +++ b/src/Orleans.Runtime/Core/HostedClient.cs @@ -291,7 +291,7 @@ async Task OnStop(CancellationToken cancellation) if (this.messagePump != null) { - await Task.WhenAny(cancellation.WhenCancelled(), this.messagePump); + await messagePump.WaitAsync(cancellation).SuppressThrowing(); } } } diff --git a/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs b/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs index c2978198b7..70e6744625 100644 --- a/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs @@ -542,7 +542,7 @@ void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) if (_runTask is Task task) { - await Task.WhenAny(ct.WhenCancelled(), task); + await task.WaitAsync(ct).SuppressThrowing(); } }); } diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs index e57c64622d..a78ca9bdf8 100644 --- a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs @@ -169,7 +169,7 @@ private async Task InvokeAsync( { // This likely indicates that the target silo has been declared dead. ++attempts; - await Task.Delay(delay); + await Task.Delay(delay, cancellationToken); delay *= 1.5; continue; } diff --git a/src/Orleans.Runtime/MembershipService/LocalSiloHealthMonitor.cs b/src/Orleans.Runtime/MembershipService/LocalSiloHealthMonitor.cs index 1f6a639796..2f46bb623b 100644 --- a/src/Orleans.Runtime/MembershipService/LocalSiloHealthMonitor.cs +++ b/src/Orleans.Runtime/MembershipService/LocalSiloHealthMonitor.cs @@ -339,7 +339,7 @@ public async Task OnStop(CancellationToken ct) if (_runTask is Task task) { - await Task.WhenAny(task, ct.WhenCancelled()).ConfigureAwait(false); + await task.WaitAsync(ct).SuppressThrowing(); } } diff --git a/src/Orleans.Runtime/MembershipService/MembershipTableCleanupAgent.cs b/src/Orleans.Runtime/MembershipService/MembershipTableCleanupAgent.cs index 263bbf4452..476bcf91a2 100644 --- a/src/Orleans.Runtime/MembershipService/MembershipTableCleanupAgent.cs +++ b/src/Orleans.Runtime/MembershipService/MembershipTableCleanupAgent.cs @@ -1,11 +1,13 @@ +#nullable enable using System; -using System.Collections.Generic; using Orleans.Configuration; using System.Threading.Tasks; using System.Threading; using Microsoft.Extensions.Options; using Microsoft.Extensions.Logging; using Orleans.Internal; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; namespace Orleans.Runtime.MembershipService { @@ -14,10 +16,10 @@ namespace Orleans.Runtime.MembershipService /// internal class MembershipTableCleanupAgent : IHealthCheckParticipant, ILifecycleParticipant, IDisposable { - private readonly ClusterMembershipOptions clusterMembershipOptions; - private readonly IMembershipTable membershipTableProvider; - private readonly ILogger log; - private readonly IAsyncTimer cleanupDefunctSilosTimer; + private readonly ClusterMembershipOptions _clusterMembershipOptions; + private readonly IMembershipTable _membershipTableProvider; + private readonly ILogger _logger; + private readonly IAsyncTimer? _cleanupDefunctSilosTimer; public MembershipTableCleanupAgent( IOptions clusterMembershipOptions, @@ -25,92 +27,96 @@ public MembershipTableCleanupAgent( ILogger log, IAsyncTimerFactory timerFactory) { - this.clusterMembershipOptions = clusterMembershipOptions.Value; - this.membershipTableProvider = membershipTableProvider; - this.log = log; - if (this.clusterMembershipOptions.DefunctSiloCleanupPeriod.HasValue) + _clusterMembershipOptions = clusterMembershipOptions.Value; + _membershipTableProvider = membershipTableProvider; + _logger = log; + if (_clusterMembershipOptions.DefunctSiloCleanupPeriod.HasValue) { - this.cleanupDefunctSilosTimer = timerFactory.Create( - this.clusterMembershipOptions.DefunctSiloCleanupPeriod.Value, + _cleanupDefunctSilosTimer = timerFactory.Create( + _clusterMembershipOptions.DefunctSiloCleanupPeriod.Value, nameof(CleanupDefunctSilos)); } } public void Dispose() { - this.cleanupDefunctSilosTimer?.Dispose(); + _cleanupDefunctSilosTimer?.Dispose(); } private async Task CleanupDefunctSilos() { - if (!this.clusterMembershipOptions.DefunctSiloCleanupPeriod.HasValue) + if (!_clusterMembershipOptions.DefunctSiloCleanupPeriod.HasValue) { - if (this.log.IsEnabled(LogLevel.Debug)) + if (_logger.IsEnabled(LogLevel.Debug)) { - this.log.LogDebug($"Membership table cleanup is disabled due to {nameof(ClusterMembershipOptions)}.{nameof(ClusterMembershipOptions.DefunctSiloCleanupPeriod)} not being specified"); + _logger.LogDebug($"Membership table cleanup is disabled due to {nameof(ClusterMembershipOptions)}.{nameof(ClusterMembershipOptions.DefunctSiloCleanupPeriod)} not being specified"); } return; } - if (this.log.IsEnabled(LogLevel.Debug)) this.log.LogDebug("Starting membership table cleanup agent"); + Debug.Assert(_cleanupDefunctSilosTimer is not null); + if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Starting membership table cleanup agent"); try { - var period = this.clusterMembershipOptions.DefunctSiloCleanupPeriod.Value; + var period = _clusterMembershipOptions.DefunctSiloCleanupPeriod.Value; // The first cleanup should be scheduled for shortly after silo startup. var delay = RandomTimeSpan.Next(TimeSpan.FromMinutes(2), TimeSpan.FromMinutes(10)); - while (await this.cleanupDefunctSilosTimer.NextTick(delay)) + while (await _cleanupDefunctSilosTimer.NextTick(delay)) { // Select a random time within the next window. // The purpose of this is to add jitter to a process which could be affected by contention with other silos. delay = RandomTimeSpan.Next(period, period + TimeSpan.FromMinutes(5)); try { - var dateLimit = DateTime.UtcNow - this.clusterMembershipOptions.DefunctSiloExpiration; - await this.membershipTableProvider.CleanupDefunctSiloEntries(dateLimit); + var dateLimit = DateTime.UtcNow - _clusterMembershipOptions.DefunctSiloExpiration; + await _membershipTableProvider.CleanupDefunctSiloEntries(dateLimit); } catch (Exception exception) when (exception is NotImplementedException or MissingMethodException) { - this.cleanupDefunctSilosTimer.Dispose(); - this.log.LogWarning( + _cleanupDefunctSilosTimer.Dispose(); + _logger.LogWarning( (int)ErrorCode.MembershipCleanDeadEntriesFailure, $"{nameof(IMembershipTable.CleanupDefunctSiloEntries)} operation is not supported by the current implementation of {nameof(IMembershipTable)}. Disabling the timer now."); return; } catch (Exception exception) { - this.log.LogError((int)ErrorCode.MembershipCleanDeadEntriesFailure, exception, "Failed to clean up defunct membership table entries"); + _logger.LogError((int)ErrorCode.MembershipCleanDeadEntriesFailure, exception, "Failed to clean up defunct membership table entries"); } } } finally { - if (this.log.IsEnabled(LogLevel.Debug)) this.log.LogDebug("Stopped membership table cleanup agent"); + if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Stopped membership table cleanup agent"); } } void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) { - var tasks = new List(); + Task? task = null; lifecycle.Subscribe(nameof(MembershipTableCleanupAgent), ServiceLifecycleStage.Active, OnStart, OnStop); Task OnStart(CancellationToken ct) { - tasks.Add(Task.Run(() => this.CleanupDefunctSilos())); + task = Task.Run(CleanupDefunctSilos); return Task.CompletedTask; } async Task OnStop(CancellationToken ct) { - this.cleanupDefunctSilosTimer?.Dispose(); - await Task.WhenAny(ct.WhenCancelled(), Task.WhenAll(tasks)); + _cleanupDefunctSilosTimer?.Dispose(); + if (task is { }) + { + await task.WaitAsync(ct).SuppressThrowing(); + } } } - bool IHealthCheckable.CheckHealth(DateTime lastCheckTime, out string reason) + bool IHealthCheckable.CheckHealth(DateTime lastCheckTime, [NotNullWhen(false)] out string? reason) { - if (cleanupDefunctSilosTimer is IAsyncTimer timer) + if (_cleanupDefunctSilosTimer is IAsyncTimer timer) { return timer.CheckHealth(lastCheckTime, out reason); } diff --git a/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs b/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs index 7641f2154b..a10efa49b8 100644 --- a/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs +++ b/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs @@ -1,3 +1,4 @@ +#nullable enable using System; using System.Collections.Generic; using Microsoft.Extensions.Logging; @@ -6,156 +7,158 @@ using System.Collections.Immutable; using Orleans.Internal; -namespace Orleans.Runtime.MembershipService +namespace Orleans.Runtime.MembershipService; + +/// +/// Manages instances. +/// +internal class SiloStatusListenerManager : ILifecycleParticipant { - /// - /// Manages instances. - /// - internal class SiloStatusListenerManager : ILifecycleParticipant + private readonly object _listenersLock = new(); + private readonly CancellationTokenSource _cancellation = new(); + private readonly MembershipTableManager _membershipTableManager; + private readonly ILogger _logger; + private readonly IFatalErrorHandler _fatalErrorHandler; + private ImmutableList> _listeners = []; + + public SiloStatusListenerManager( + MembershipTableManager membershipTableManager, + ILogger log, + IFatalErrorHandler fatalErrorHandler) { - private readonly object listenersLock = new object(); - private readonly CancellationTokenSource cancellation = new CancellationTokenSource(); - private readonly MembershipTableManager membershipTableManager; - private readonly ILogger log; - private readonly IFatalErrorHandler fatalErrorHandler; - private ImmutableList> listeners = ImmutableList>.Empty; - - public SiloStatusListenerManager( - MembershipTableManager membershipTableManager, - ILogger log, - IFatalErrorHandler fatalErrorHandler) - { - this.membershipTableManager = membershipTableManager; - this.log = log; - this.fatalErrorHandler = fatalErrorHandler; - } + _membershipTableManager = membershipTableManager; + _logger = log; + _fatalErrorHandler = fatalErrorHandler; + } - public bool Subscribe(ISiloStatusListener listener) + public bool Subscribe(ISiloStatusListener listener) + { + lock (_listenersLock) { - lock (this.listenersLock) + foreach (var reference in _listeners) { - foreach (var reference in this.listeners) + if (!reference.TryGetTarget(out var existing)) { - if (!reference.TryGetTarget(out var existing)) - { - continue; - } - - if (ReferenceEquals(existing, listener)) return false; + continue; } - this.listeners = this.listeners.Add(new WeakReference(listener)); - return true; + if (ReferenceEquals(existing, listener)) return false; } + + _listeners = _listeners.Add(new WeakReference(listener)); + return true; } + } - public bool Unsubscribe(ISiloStatusListener listener) + public bool Unsubscribe(ISiloStatusListener listener) + { + lock (_listenersLock) { - lock (this.listenersLock) + for (var i = 0; i < _listeners.Count; i++) { - for (var i = 0; i < this.listeners.Count; i++) + if (!_listeners[i].TryGetTarget(out var existing)) { - if (!this.listeners[i].TryGetTarget(out var existing)) - { - continue; - } - - if (ReferenceEquals(existing, listener)) - { - this.listeners = this.listeners.RemoveAt(i); - return true; - } + continue; } - return false; + if (ReferenceEquals(existing, listener)) + { + _listeners = _listeners.RemoveAt(i); + return true; + } } + + return false; } + } - private async Task ProcessMembershipUpdates() + private async Task ProcessMembershipUpdates() + { + ClusterMembershipSnapshot? previous = default; + try { - ClusterMembershipSnapshot previous = default; - try + if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Starting to process membership updates."); + await foreach (var tableSnapshot in _membershipTableManager.MembershipTableUpdates.WithCancellation(_cancellation.Token)) { - if (this.log.IsEnabled(LogLevel.Debug)) this.log.LogDebug("Starting to process membership updates"); - await foreach (var tableSnapshot in this.membershipTableManager.MembershipTableUpdates.WithCancellation(this.cancellation.Token)) - { - var snapshot = tableSnapshot.CreateClusterMembershipSnapshot(); + var snapshot = tableSnapshot.CreateClusterMembershipSnapshot(); - var update = (previous is null || snapshot.Version == MembershipVersion.MinValue) ? snapshot.AsUpdate() : snapshot.CreateUpdate(previous); - this.NotifyObservers(update); - previous = snapshot; - } - } - catch (Exception exception) when (this.fatalErrorHandler.IsUnexpected(exception)) - { - this.log.LogError(exception, "Error processing membership updates"); - this.fatalErrorHandler.OnFatalException(this, nameof(ProcessMembershipUpdates), exception); - } - finally - { - if (this.log.IsEnabled(LogLevel.Debug)) this.log.LogDebug("Stopping membership update processor"); + var update = (previous is null || snapshot.Version == MembershipVersion.MinValue) ? snapshot.AsUpdate() : snapshot.CreateUpdate(previous); + NotifyObservers(update); + previous = snapshot; } } - - private void NotifyObservers(ClusterMembershipUpdate update) + catch (Exception exception) when (_fatalErrorHandler.IsUnexpected(exception)) { - if (!update.HasChanges) return; + _logger.LogError(exception, "Error processing membership updates."); + _fatalErrorHandler.OnFatalException(this, nameof(ProcessMembershipUpdates), exception); + } + finally + { + if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Stopping membership update processor."); + } + } - List> toRemove = null; - var subscribers = this.listeners; - foreach (var change in update.Changes) + private void NotifyObservers(ClusterMembershipUpdate update) + { + if (!update.HasChanges) return; + + List>? toRemove = null; + var subscribers = _listeners; + foreach (var change in update.Changes) + { + for (var i = 0; i < subscribers.Count; ++i) { - for (var i = 0; i < subscribers.Count; ++i) + if (!subscribers[i].TryGetTarget(out var listener)) { - if (!subscribers[i].TryGetTarget(out var listener)) - { - if (toRemove is null) toRemove = new List>(); - toRemove.Add(subscribers[i]); - continue; - } - - try - { - listener.SiloStatusChangeNotification(change.SiloAddress, change.Status); - } - catch (Exception exception) - { - this.log.LogError( - exception, - "Exception while calling " + nameof(ISiloStatusListener.SiloStatusChangeNotification) + " on listener {Listener}", - listener); - } + if (toRemove is null) toRemove = new List>(); + toRemove.Add(subscribers[i]); + continue; } - } - if (toRemove != null) - { - lock (this.listenersLock) + try { - var builder = this.listeners.ToBuilder(); - foreach (var entry in toRemove) builder.Remove(entry); - this.listeners = builder.ToImmutable(); + listener.SiloStatusChangeNotification(change.SiloAddress, change.Status); + } + catch (Exception exception) + { + _logger.LogError( + exception, + "Exception while calling " + nameof(ISiloStatusListener.SiloStatusChangeNotification) + " on listener '{Listener}'.", + listener); } } } - void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) + if (toRemove != null) { - var tasks = new List(); - - lifecycle.Subscribe(nameof(SiloStatusListenerManager), ServiceLifecycleStage.AfterRuntimeGrainServices, OnStart, _ => Task.CompletedTask); - lifecycle.Subscribe(nameof(SiloStatusListenerManager), ServiceLifecycleStage.RuntimeInitialize, _ => Task.CompletedTask, OnStop); - - Task OnStart(CancellationToken ct) + lock (_listenersLock) { - tasks.Add(Task.Run(() => this.ProcessMembershipUpdates())); - return Task.CompletedTask; + var builder = _listeners.ToBuilder(); + foreach (var entry in toRemove) builder.Remove(entry); + _listeners = builder.ToImmutable(); } + } + } + + void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) + { + Task? task = null; - Task OnStop(CancellationToken ct) + lifecycle.Subscribe(nameof(SiloStatusListenerManager), ServiceLifecycleStage.AfterRuntimeGrainServices, OnStart, _ => Task.CompletedTask); + lifecycle.Subscribe(nameof(SiloStatusListenerManager), ServiceLifecycleStage.RuntimeInitialize, _ => Task.CompletedTask, OnStop); + + Task OnStart(CancellationToken ct) + { + task = Task.Run(ProcessMembershipUpdates); + return Task.CompletedTask; + } + + async Task OnStop(CancellationToken ct) + { + _cancellation.Cancel(throwOnFirstException: false); + if (task is not null) { - this.cancellation.Cancel(throwOnFirstException: false); - return Task.WhenAny(ct.WhenCancelled(), Task.WhenAll(tasks)); + await task.WaitAsync(ct).SuppressThrowing(); } } } diff --git a/src/Orleans.Runtime/Networking/ConnectionListener.cs b/src/Orleans.Runtime/Networking/ConnectionListener.cs index 3205e810f1..a90e16b897 100644 --- a/src/Orleans.Runtime/Networking/ConnectionListener.cs +++ b/src/Orleans.Runtime/Networking/ConnectionListener.cs @@ -123,7 +123,7 @@ protected async Task StopAsync(CancellationToken cancellationToken) if (closeTasks.Count > 0) { - await Task.WhenAny(Task.WhenAll(closeTasks), cancellationToken.WhenCancelled()); + await Task.WhenAll(closeTasks).WaitAsync(cancellationToken).SuppressThrowing(); } await this.connectionManager.Closed; diff --git a/src/Orleans.Runtime/Silo/Silo.cs b/src/Orleans.Runtime/Silo/Silo.cs index 2f63fd6371..a1ae80fb25 100644 --- a/src/Orleans.Runtime/Silo/Silo.cs +++ b/src/Orleans.Runtime/Silo/Silo.cs @@ -490,7 +490,7 @@ private async Task OnBecomeActiveStop(CancellationToken ct) } // Wait for all queued message sent to OutboundMessageQueue before MessageCenter stop and OutboundMessageQueue stop. - await Task.WhenAny(Task.Delay(waitForMessageToBeQueuedForOutbound), ct.WhenCancelled()); + await Task.Delay(waitForMessageToBeQueuedForOutbound, ct).SuppressThrowing(); } catch (Exception exc) {