Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow CancellationToken more appropriately in lifecycle #9330

Merged
merged 1 commit into from
Feb 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions src/Orleans.Core/Messaging/ClientMessageCenter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,42 +99,41 @@ public async Task StartAsync(CancellationToken cancellationToken)

private async Task EstablishInitialConnection(CancellationToken cancellationToken)
{
var cancellationTask = cancellationToken.WhenCancelled();
var liveGateways = gatewayManager.GetLiveGateways();

if (liveGateways.Count == 0)
{
throw new ConnectionFailedException("There are no available gateways");
throw new ConnectionFailedException("There are no available gateways.");
}

var pendingTasks = new List<Task>(liveGateways.Count + 1);
pendingTasks.Add(cancellationTask);
var pendingTasks = new List<Task>(liveGateways.Count);
foreach (var gateway in liveGateways)
{
pendingTasks.Add(connectionManager.GetConnection(gateway).AsTask());
}

try
{
// There will always be one task to represent cancellation.
while (pendingTasks.Count > 1)
while (pendingTasks.Count > 0)
{
var completedTask = await Task.WhenAny(pendingTasks);
var completedTask = await Task.WhenAny(pendingTasks).WaitAsync(cancellationToken);
pendingTasks.Remove(completedTask);

cancellationToken.ThrowIfCancellationRequested();

// If at least one gateway connection has been established, break out of the loop and continue startup.
if (completedTask.IsCompletedSuccessfully)
{
break;
}

// If there are no more gateways, observe the most recent exception and bail out.
if (pendingTasks.Count == 1)
if (pendingTasks.Count == 0)
{
await completedTask;
}
else
{
completedTask.Ignore();
}
}
}
catch (Exception exception)
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Core/Networking/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public async Task Close(CancellationToken ct)

if (closeTasks.Count > 0)
{
await Task.WhenAny(Task.WhenAll(closeTasks), ct.WhenCancelled());
await Task.WhenAll(closeTasks).WaitAsync(ct).SuppressThrowing();
if (ct.IsCancellationRequested) break;
}
else if (!pendingConnections) break;
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Core/Runtime/AsyncEnumerableGrainExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ private async ValueTask RemoveExpiredAsync(CancellationToken cancellationToken)
using var cancellation = new CancellationTokenSource(_messagingOptions.ResponseTimeout / 2);

// Wait for either the MoveNextAsync task to complete or the cancellation token to be cancelled.
var completedTask = await Task.WhenAny(moveNextTask, cancellation.Token.WhenCancelled());
if (completedTask == moveNextTask)
await moveNextTask.WaitAsync(cancellation.Token).SuppressThrowing();
if (moveNextTask is {IsCompletedSuccessfully: true })
{
OnMoveNext(requestId);
var hasValue = moveNextTask.GetAwaiter().GetResult();
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Catalog/ActivationWorkingSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
_scanPeriodTimer.Dispose();
if (_runTask is Task task)
{
await Task.WhenAny(task, ct.WhenCancelled());
await task.WaitAsync(ct).SuppressThrowing();
}
});
}
Expand Down
5 changes: 4 additions & 1 deletion src/Orleans.Runtime/Catalog/IncomingRequestMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
async ct =>
{
_scanPeriodTimer.Dispose();
if (_runTask is Task task) await Task.WhenAny(task, ct.WhenCancelled());
if (_runTask is Task task)
{
await task.WaitAsync(ct).SuppressThrowing();
}
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Core/HostedClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ async Task OnStop(CancellationToken cancellation)

if (this.messagePump != null)
{
await Task.WhenAny(cancellation.WhenCancelled(), this.messagePump);
await messagePump.WaitAsync(cancellation).SuppressThrowing();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)

if (_runTask is Task task)
{
await Task.WhenAny(ct.WhenCancelled(), task);
await task.WaitAsync(ct).SuppressThrowing();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private async Task<TResult> InvokeAsync<TState, TResult>(
{
// This likely indicates that the target silo has been declared dead.
++attempts;
await Task.Delay(delay);
await Task.Delay(delay, cancellationToken);
delay *= 1.5;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public async Task OnStop(CancellationToken ct)

if (_runTask is Task task)
{
await Task.WhenAny(task, ct.WhenCancelled()).ConfigureAwait(false);
await task.WaitAsync(ct).SuppressThrowing();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#nullable enable
using System;
using System.Collections.Generic;
using Orleans.Configuration;
using System.Threading.Tasks;
using System.Threading;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using Orleans.Internal;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;

namespace Orleans.Runtime.MembershipService
{
Expand All @@ -14,103 +16,107 @@ namespace Orleans.Runtime.MembershipService
/// </summary>
internal class MembershipTableCleanupAgent : IHealthCheckParticipant, ILifecycleParticipant<ISiloLifecycle>, IDisposable
{
private readonly ClusterMembershipOptions clusterMembershipOptions;
private readonly IMembershipTable membershipTableProvider;
private readonly ILogger<MembershipTableCleanupAgent> log;
private readonly IAsyncTimer cleanupDefunctSilosTimer;
private readonly ClusterMembershipOptions _clusterMembershipOptions;
private readonly IMembershipTable _membershipTableProvider;
private readonly ILogger<MembershipTableCleanupAgent> _logger;
private readonly IAsyncTimer? _cleanupDefunctSilosTimer;

public MembershipTableCleanupAgent(
IOptions<ClusterMembershipOptions> clusterMembershipOptions,
IMembershipTable membershipTableProvider,
ILogger<MembershipTableCleanupAgent> log,
IAsyncTimerFactory timerFactory)
{
this.clusterMembershipOptions = clusterMembershipOptions.Value;
this.membershipTableProvider = membershipTableProvider;
this.log = log;
if (this.clusterMembershipOptions.DefunctSiloCleanupPeriod.HasValue)
_clusterMembershipOptions = clusterMembershipOptions.Value;
_membershipTableProvider = membershipTableProvider;
_logger = log;
if (_clusterMembershipOptions.DefunctSiloCleanupPeriod.HasValue)
{
this.cleanupDefunctSilosTimer = timerFactory.Create(
this.clusterMembershipOptions.DefunctSiloCleanupPeriod.Value,
_cleanupDefunctSilosTimer = timerFactory.Create(
_clusterMembershipOptions.DefunctSiloCleanupPeriod.Value,
nameof(CleanupDefunctSilos));
}
}

public void Dispose()
{
this.cleanupDefunctSilosTimer?.Dispose();
_cleanupDefunctSilosTimer?.Dispose();
}

private async Task CleanupDefunctSilos()
{
if (!this.clusterMembershipOptions.DefunctSiloCleanupPeriod.HasValue)
if (!_clusterMembershipOptions.DefunctSiloCleanupPeriod.HasValue)
{
if (this.log.IsEnabled(LogLevel.Debug))
if (_logger.IsEnabled(LogLevel.Debug))
{
this.log.LogDebug($"Membership table cleanup is disabled due to {nameof(ClusterMembershipOptions)}.{nameof(ClusterMembershipOptions.DefunctSiloCleanupPeriod)} not being specified");
_logger.LogDebug($"Membership table cleanup is disabled due to {nameof(ClusterMembershipOptions)}.{nameof(ClusterMembershipOptions.DefunctSiloCleanupPeriod)} not being specified");
}

return;
}

if (this.log.IsEnabled(LogLevel.Debug)) this.log.LogDebug("Starting membership table cleanup agent");
Debug.Assert(_cleanupDefunctSilosTimer is not null);
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Starting membership table cleanup agent");
try
{
var period = this.clusterMembershipOptions.DefunctSiloCleanupPeriod.Value;
var period = _clusterMembershipOptions.DefunctSiloCleanupPeriod.Value;

// The first cleanup should be scheduled for shortly after silo startup.
var delay = RandomTimeSpan.Next(TimeSpan.FromMinutes(2), TimeSpan.FromMinutes(10));
while (await this.cleanupDefunctSilosTimer.NextTick(delay))
while (await _cleanupDefunctSilosTimer.NextTick(delay))
{
// Select a random time within the next window.
// The purpose of this is to add jitter to a process which could be affected by contention with other silos.
delay = RandomTimeSpan.Next(period, period + TimeSpan.FromMinutes(5));
try
{
var dateLimit = DateTime.UtcNow - this.clusterMembershipOptions.DefunctSiloExpiration;
await this.membershipTableProvider.CleanupDefunctSiloEntries(dateLimit);
var dateLimit = DateTime.UtcNow - _clusterMembershipOptions.DefunctSiloExpiration;
await _membershipTableProvider.CleanupDefunctSiloEntries(dateLimit);
}
catch (Exception exception) when (exception is NotImplementedException or MissingMethodException)
{
this.cleanupDefunctSilosTimer.Dispose();
this.log.LogWarning(
_cleanupDefunctSilosTimer.Dispose();
_logger.LogWarning(
(int)ErrorCode.MembershipCleanDeadEntriesFailure,
$"{nameof(IMembershipTable.CleanupDefunctSiloEntries)} operation is not supported by the current implementation of {nameof(IMembershipTable)}. Disabling the timer now.");
return;
}
catch (Exception exception)
{
this.log.LogError((int)ErrorCode.MembershipCleanDeadEntriesFailure, exception, "Failed to clean up defunct membership table entries");
_logger.LogError((int)ErrorCode.MembershipCleanDeadEntriesFailure, exception, "Failed to clean up defunct membership table entries");
}
}
}
finally
{
if (this.log.IsEnabled(LogLevel.Debug)) this.log.LogDebug("Stopped membership table cleanup agent");
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Stopped membership table cleanup agent");
}
}

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
var tasks = new List<Task>();
Task? task = null;
lifecycle.Subscribe(nameof(MembershipTableCleanupAgent), ServiceLifecycleStage.Active, OnStart, OnStop);

Task OnStart(CancellationToken ct)
{
tasks.Add(Task.Run(() => this.CleanupDefunctSilos()));
task = Task.Run(CleanupDefunctSilos);
return Task.CompletedTask;
}

async Task OnStop(CancellationToken ct)
{
this.cleanupDefunctSilosTimer?.Dispose();
await Task.WhenAny(ct.WhenCancelled(), Task.WhenAll(tasks));
_cleanupDefunctSilosTimer?.Dispose();
if (task is { })
{
await task.WaitAsync(ct).SuppressThrowing();
}
}
}

bool IHealthCheckable.CheckHealth(DateTime lastCheckTime, out string reason)
bool IHealthCheckable.CheckHealth(DateTime lastCheckTime, [NotNullWhen(false)] out string? reason)
{
if (cleanupDefunctSilosTimer is IAsyncTimer timer)
if (_cleanupDefunctSilosTimer is IAsyncTimer timer)
{
return timer.CheckHealth(lastCheckTime, out reason);
}
Expand Down
Loading
Loading