Skip to content

Commit

Permalink
Adding health checker command
Browse files Browse the repository at this point in the history
  • Loading branch information
johnsimons committed Feb 3, 2025
1 parent 42b8107 commit ea6cf0e
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public static void UsingAmazonSqs(this IServiceCollection services, Action<SqsTr
services.AddSingleton(provider => new AmazonSqsHelper(provider.GetRequiredService<IAmazonSQS>(), provider.GetRequiredService<SqsTransport>(), string.Empty));
services.AddSingleton<IQueueInformationProvider>(provider => provider.GetRequiredService<AmazonSqsHelper>());
services.AddSingleton<IQueueLengthProvider>(provider => provider.GetRequiredService<AmazonSqsHelper>());
services.AddSingleton<IHealthCheckerProvider>(provider => provider.GetRequiredService<AmazonSqsHelper>());
services.AddSingleton<IAmazonSQS, AmazonSQSClient>();
services.AddSingleton<IAmazonSimpleNotificationService, AmazonSimpleNotificationServiceClient>();
services.AddTransient<SqsTransport>(sp =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using Amazon.SQS;
using Amazon.SQS.Model;

sealed class AmazonSqsHelper(IAmazonSQS client, SqsTransport transportDefinition, string? queueNamePrefix = null) : IQueueInformationProvider, IQueueLengthProvider
sealed class AmazonSqsHelper(IAmazonSQS client, SqsTransport transportDefinition, string? queueNamePrefix = null) : IQueueInformationProvider, IQueueLengthProvider, IHealthCheckerProvider
{
readonly ConcurrentDictionary<string, Task<string>> queueUrlCache = new();

Expand Down Expand Up @@ -62,4 +62,19 @@ public async Task<long> GetQueueLength(string name, CancellationToken cancellati

return value;
}

public async Task<(bool Success, string ErrorMessage)> TryCheck(CancellationToken cancellationToken)
{
try
{
await GetQueues(cancellationToken).GetAsyncEnumerator(cancellationToken).MoveNextAsync();
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { }
catch (AmazonSQSException e)
{
return (false, e.Message);
}

return (true, string.Empty);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ public static void UsingAzureServiceBus(this IServiceCollection services, IConfi
{
var receiveMode = useDeadLetterQueue ? SubQueue.DeadLetter : SubQueue.None;

services.AddSingleton<IQueueInformationProvider>(b => new AzureServiceBusHelper(b.GetRequiredService<ILogger<AzureServiceBusHelper>>(), connectionString));
services.AddSingleton<IQueueLengthProvider>(b => new AzureServiceBusHelper(b.GetRequiredService<ILogger<AzureServiceBusHelper>>(), connectionString));
services.AddSingleton(b => new AzureServiceBusHelper(b.GetRequiredService<ILogger<AzureServiceBusHelper>>(), connectionString));
services.AddSingleton<IQueueInformationProvider>(b => b.GetRequiredService<AzureServiceBusHelper>());
services.AddSingleton<IQueueLengthProvider>(b => b.GetRequiredService<AzureServiceBusHelper>());
services.AddSingleton<IHealthCheckerProvider>(b => b.GetRequiredService<AzureServiceBusHelper>());
services.AddTransient<TransportDefinition>(_ => new AzureServiceBusTransport(connectionString)
{
TransportTransactionMode = TransportTransactionMode.ReceiveOnly,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Extensions.Logging;

class AzureServiceBusHelper(ILogger<AzureServiceBusHelper> logger, string connectionstring) : IQueueInformationProvider, IQueueLengthProvider
class AzureServiceBusHelper(ILogger<AzureServiceBusHelper> logger, string connectionstring) : IQueueInformationProvider, IQueueLengthProvider, IHealthCheckerProvider
{
readonly ServiceBusAdministrationClient client = new(connectionstring);

Expand Down Expand Up @@ -30,4 +30,29 @@ public async Task<long> GetQueueLength(string name, CancellationToken cancellati
var queuesRuntimeProperties = await client.GetQueueRuntimePropertiesAsync(name, cancellationToken);
return queuesRuntimeProperties.Value.ActiveMessageCount;
}

public async Task<(bool Success, string ErrorMessage)> TryCheck(CancellationToken cancellationToken)
{
try
{
var clientNoRetries = new ServiceBusAdministrationClient(connectionstring, new ServiceBusAdministrationClientOptions { Retry = { MaxRetries = 0 } });
var result = clientNoRetries.GetQueuesAsync(cancellationToken);

await foreach (var queueProperties in result)
{
break;
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { }
catch (UnauthorizedAccessException)
{
return (false, "The token has an invalid signature.");
}
catch (Azure.RequestFailedException e)
{
return (false, e.Message);
}

return (true, string.Empty);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
namespace ServiceControl.Connector.MassTransit.Host.Commands;

using System.CommandLine;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

public class HealthCheckCommand : Command
{
public HealthCheckCommand() : base("health-check", "Performs a validation that the connector is able to connect to the broker.")
{
this.SetHandler(async context =>
{
context.ExitCode = await InternalHandler(context.GetCancellationToken());
});
}

async Task<int> InternalHandler(CancellationToken cancellationToken)
{
var builder = Host.CreateEmptyApplicationBuilder(null);
builder.Configuration.AddEnvironmentVariables();
builder.UseMassTransitConnector(true);

var host = builder.Build();

var queueInformationProvider = host.Services.GetRequiredService<IHealthCheckerProvider>();
var (success, errorMessage) = await queueInformationProvider.TryCheck(cancellationToken);

if (!success)
{
Console.WriteLine(errorMessage);
return 1;
}

Console.WriteLine("Success");

return 0;
}
}
1 change: 1 addition & 0 deletions src/ServiceControl.Connector.MassTransit.Host/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ COPY --from=build ./build /app

ENV QUEUES_FILE="/app/queues.txt"
ENV DOTNET_SYSTEM_GLOBALIZATION_INVARIANT=1
HEALTHCHECK --start-period=20s CMD ["/app/ServiceControl.Connector.MassTransit.Host", "health-check"]

USER $APP_UID
ENTRYPOINT ["/app/ServiceControl.Connector.MassTransit.Host"]
1 change: 1 addition & 0 deletions src/ServiceControl.Connector.MassTransit.Host/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
var startupCommand = new StartupCommand(args);

startupCommand.AddCommand(new QueuesCommand());
startupCommand.AddCommand(new HealthCheckCommand());

var commandLineBuilder = new CommandLineBuilder(startupCommand);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ public static void UsingRabbitMQ(this IServiceCollection services, string connec
var defaultCredential = new NetworkCredential(username ?? connectionConfiguration.UserName, password ?? connectionConfiguration.Password);

var rabbitMqHelper = new RabbitMQHelper(connectionConfiguration.VirtualHost, managementApi, defaultCredential);
services.AddSingleton(rabbitMqHelper);
services.AddSingleton<IQueueLengthProvider>(rabbitMqHelper);
services.AddSingleton<IQueueInformationProvider>(rabbitMqHelper);
services.AddSingleton<IHealthCheckerProvider, RabbitMQHealthChecker>();
services.AddTransient<TransportDefinition>(_ => new RabbitMQTransport(
RoutingTopology.Conventional(QueueType.Quorum),
connectionString,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using NServiceBus.Transport;
using RabbitMQ.Client.Exceptions;

class RabbitMQHealthChecker(RabbitMQHelper helper, Configuration configuration, TransportInfrastructureFactory transportInfrastructureFactory) : IHealthCheckerProvider
{
public async Task<(bool, string)> TryCheck(CancellationToken cancellationToken)
{
var result = await helper.TryCheck(cancellationToken);

if (!result.Success)
{
return (false, result.ErrorMessage);
}

var hostSettings = new HostSettings(
configuration.ReturnQueue,
$"Queue creator for {configuration.ReturnQueue}",
new StartupDiagnosticEntries(),
(_, __, ___) =>
{
},
false);

var receiverSettings = new[]{
new ReceiveSettings(
id: "Return",
receiveAddress: new QueueAddress(configuration.ReturnQueue),
usePublishSubscribe: false,
purgeOnStartup: false,
errorQueue: configuration.PoisonQueue)};


try
{
var infrastructure = await transportInfrastructureFactory.CreateTransportInfrastructure(hostSettings,
receiverSettings, [configuration.PoisonQueue, configuration.ServiceControlQueue], cancellationToken);
await infrastructure.Shutdown(cancellationToken);
}
catch (BrokerUnreachableException e)
{
return (false, e.Message);
}

return (true, string.Empty);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,24 @@ public async Task<long> GetQueueLength(string queueName, CancellationToken cance
throw new Exception($"Failed to check the length of the queue {queueName} via URL {url}.", e);
}
}

public async Task<(bool Success, string ErrorMessage)> TryCheck(CancellationToken cancellationToken)
{
var url = $"/api/queues/{HttpUtility.UrlEncode(vhost)}";

try
{
using var response = await httpClient.GetAsync(url, cancellationToken);

if (response.IsSuccessStatusCode)
{
return (true, string.Empty);
}
return (false, response.ReasonPhrase ?? "Connection failed");
}
catch (HttpRequestException e)
{
return (false, e.Message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
public interface IHealthCheckerProvider
{
Task<(bool Success, string ErrorMessage)> TryCheck(CancellationToken cancellationToken);
}

0 comments on commit ea6cf0e

Please sign in to comment.