Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding healthcheck command to docker #173

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public static void UsingAmazonSqs(this IServiceCollection services, Action<SqsTr
services.AddSingleton(provider => new AmazonSqsHelper(provider.GetRequiredService<IAmazonSQS>(), provider.GetRequiredService<SqsTransport>(), string.Empty));
services.AddSingleton<IQueueInformationProvider>(provider => provider.GetRequiredService<AmazonSqsHelper>());
services.AddSingleton<IQueueLengthProvider>(provider => provider.GetRequiredService<AmazonSqsHelper>());
services.AddSingleton<IHealthCheckerProvider>(provider => provider.GetRequiredService<AmazonSqsHelper>());
services.AddSingleton<IAmazonSQS, AmazonSQSClient>();
services.AddSingleton<IAmazonSimpleNotificationService, AmazonSimpleNotificationServiceClient>();
services.AddTransient<SqsTransport>(sp =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using Amazon.SQS;
using Amazon.SQS.Model;

sealed class AmazonSqsHelper(IAmazonSQS client, SqsTransport transportDefinition, string? queueNamePrefix = null) : IQueueInformationProvider, IQueueLengthProvider
sealed class AmazonSqsHelper(IAmazonSQS client, SqsTransport transportDefinition, string? queueNamePrefix = null) : IQueueInformationProvider, IQueueLengthProvider, IHealthCheckerProvider
{
readonly ConcurrentDictionary<string, Task<string>> queueUrlCache = new();

Expand Down Expand Up @@ -62,4 +62,19 @@ public async Task<long> GetQueueLength(string name, CancellationToken cancellati

return value;
}

public async Task<(bool Success, string ErrorMessage)> TryCheck(CancellationToken cancellationToken)
{
try
{
await GetQueues(cancellationToken).GetAsyncEnumerator(cancellationToken).MoveNextAsync();
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { }
catch (AmazonSQSException e)
{
return (false, e.Message);
}

return (true, string.Empty);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ public static void UsingAzureServiceBus(this IServiceCollection services, IConfi
{
var receiveMode = useDeadLetterQueue ? SubQueue.DeadLetter : SubQueue.None;

services.AddSingleton<IQueueInformationProvider>(b => new AzureServiceBusHelper(b.GetRequiredService<ILogger<AzureServiceBusHelper>>(), connectionString));
services.AddSingleton<IQueueLengthProvider>(b => new AzureServiceBusHelper(b.GetRequiredService<ILogger<AzureServiceBusHelper>>(), connectionString));
services.AddSingleton(b => new AzureServiceBusHelper(b.GetRequiredService<ILogger<AzureServiceBusHelper>>(), connectionString));
services.AddSingleton<IQueueInformationProvider>(b => b.GetRequiredService<AzureServiceBusHelper>());
services.AddSingleton<IQueueLengthProvider>(b => b.GetRequiredService<AzureServiceBusHelper>());
services.AddSingleton<IHealthCheckerProvider>(b => b.GetRequiredService<AzureServiceBusHelper>());
services.AddTransient<TransportDefinition>(_ => new AzureServiceBusTransport(connectionString)
{
TransportTransactionMode = TransportTransactionMode.ReceiveOnly,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Extensions.Logging;

class AzureServiceBusHelper(ILogger<AzureServiceBusHelper> logger, string connectionstring) : IQueueInformationProvider, IQueueLengthProvider
class AzureServiceBusHelper(ILogger<AzureServiceBusHelper> logger, string connectionstring) : IQueueInformationProvider, IQueueLengthProvider, IHealthCheckerProvider
{
readonly ServiceBusAdministrationClient client = new(connectionstring);

Expand Down Expand Up @@ -30,4 +30,29 @@ public async Task<long> GetQueueLength(string name, CancellationToken cancellati
var queuesRuntimeProperties = await client.GetQueueRuntimePropertiesAsync(name, cancellationToken);
return queuesRuntimeProperties.Value.ActiveMessageCount;
}

public async Task<(bool Success, string ErrorMessage)> TryCheck(CancellationToken cancellationToken)
{
try
{
var clientNoRetries = new ServiceBusAdministrationClient(connectionstring, new ServiceBusAdministrationClientOptions { Retry = { MaxRetries = 0 } });
var result = clientNoRetries.GetQueuesAsync(cancellationToken);

await foreach (var queueProperties in result)
{
break;
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { }
catch (UnauthorizedAccessException)
{
return (false, "The token has an invalid signature.");
}
catch (Azure.RequestFailedException e)
{
return (false, e.Message);
}

return (true, string.Empty);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
namespace ServiceControl.Connector.MassTransit.Host.Commands;

using System.CommandLine;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

public class HealthCheckCommand : Command
{
public HealthCheckCommand() : base("health-check", "Performs a validation that the connector is able to connect to the broker.")
{
this.SetHandler(async context =>
{
context.ExitCode = await InternalHandler(context.GetCancellationToken());
});
}

async Task<int> InternalHandler(CancellationToken cancellationToken)
{
var builder = Host.CreateEmptyApplicationBuilder(null);
builder.Configuration.AddEnvironmentVariables();
builder.UseMassTransitConnector(true);

var host = builder.Build();

var queueInformationProvider = host.Services.GetRequiredService<IHealthCheckerProvider>();
var (success, errorMessage) = await queueInformationProvider.TryCheck(cancellationToken);

if (!success)
{
Console.WriteLine(errorMessage);
return 1;
}

Console.WriteLine("Success");

return 0;
}
}
1 change: 1 addition & 0 deletions src/ServiceControl.Connector.MassTransit.Host/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ COPY --from=build ./build /app

ENV QUEUES_FILE="/app/queues.txt"
ENV DOTNET_SYSTEM_GLOBALIZATION_INVARIANT=1
HEALTHCHECK --start-period=20s CMD ["/app/ServiceControl.Connector.MassTransit.Host", "health-check"]

USER $APP_UID
ENTRYPOINT ["/app/ServiceControl.Connector.MassTransit.Host"]
1 change: 1 addition & 0 deletions src/ServiceControl.Connector.MassTransit.Host/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
var startupCommand = new StartupCommand(args);

startupCommand.AddCommand(new QueuesCommand());
startupCommand.AddCommand(new HealthCheckCommand());

var commandLineBuilder = new CommandLineBuilder(startupCommand);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ public static void UsingRabbitMQ(this IServiceCollection services, string connec
var defaultCredential = new NetworkCredential(username ?? connectionConfiguration.UserName, password ?? connectionConfiguration.Password);

var rabbitMqHelper = new RabbitMQHelper(connectionConfiguration.VirtualHost, managementApi, defaultCredential);
services.AddSingleton(rabbitMqHelper);
services.AddSingleton<IQueueLengthProvider>(rabbitMqHelper);
services.AddSingleton<IQueueInformationProvider>(rabbitMqHelper);
services.AddSingleton<IHealthCheckerProvider, RabbitMQHealthChecker>();
services.AddTransient<TransportDefinition>(_ => new RabbitMQTransport(
RoutingTopology.Conventional(QueueType.Quorum),
connectionString,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using NServiceBus.Transport;
using RabbitMQ.Client.Exceptions;

class RabbitMQHealthChecker(RabbitMQHelper helper, Configuration configuration, TransportInfrastructureFactory transportInfrastructureFactory) : IHealthCheckerProvider
{
public async Task<(bool, string)> TryCheck(CancellationToken cancellationToken)
{
var result = await helper.TryCheck(cancellationToken);

if (!result.Success)
{
return (false, result.ErrorMessage);
}

var hostSettings = new HostSettings(
configuration.ReturnQueue,
$"Queue creator for {configuration.ReturnQueue}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without looking at the signature... should this be a different description?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this matters

new StartupDiagnosticEntries(),
(_, __, ___) =>
{
},
false);

var receiverSettings = new[]{
new ReceiveSettings(
id: "Return",
receiveAddress: new QueueAddress(configuration.ReturnQueue),
usePublishSubscribe: false,
purgeOnStartup: false,
errorQueue: configuration.PoisonQueue)};


Comment on lines +31 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

try
{
var infrastructure = await transportInfrastructureFactory.CreateTransportInfrastructure(hostSettings,
receiverSettings, [configuration.PoisonQueue, configuration.ServiceControlQueue], cancellationToken);
await infrastructure.Shutdown(cancellationToken);
}
catch (BrokerUnreachableException e)
{
return (false, e.Message);
}

return (true, string.Empty);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if it fails for any reason other than brokerunreachable it will still be marked as healthy? This is the description of the healthchecker... but seems to me like it could cause some confusion

Copy link
Member Author

@johnsimons johnsimons Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if it fails for any reason other than brokerunreachable it will still be marked as healthy?

no, it will be an unhandled exception and therefore it will be exit code 1

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,24 @@ public async Task<long> GetQueueLength(string queueName, CancellationToken cance
throw new Exception($"Failed to check the length of the queue {queueName} via URL {url}.", e);
}
}

public async Task<(bool Success, string ErrorMessage)> TryCheck(CancellationToken cancellationToken)
{
var url = $"/api/queues/{HttpUtility.UrlEncode(vhost)}";

try
{
using var response = await httpClient.GetAsync(url, cancellationToken);

if (response.IsSuccessStatusCode)
{
return (true, string.Empty);
}
return (false, response.ReasonPhrase ?? "Connection failed");
}
catch (HttpRequestException e)
{
return (false, e.Message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
public interface IHealthCheckerProvider
{
Task<(bool Success, string ErrorMessage)> TryCheck(CancellationToken cancellationToken);
}