Skip to content

Commit

Permalink
Fixed MassTransit#4317 - Topic Cache in Amazon SQS was caching endpoi…
Browse files Browse the repository at this point in the history
…nt faulted when loading existing topics, causing eternal startup failure
  • Loading branch information
phatboyg committed Apr 21, 2023
1 parent 5eb85c2 commit 863d71d
Showing 1 changed file with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class TopicCache :
readonly CancellationToken _cancellationToken;
readonly IAmazonSimpleNotificationService _client;
readonly IDictionary<string, TopicInfo> _durableTopics;
readonly Lazy<Task> _loadExistingTopics;
Lazy<Task> _loadExistingTopics;
bool _topicsLoaded;

public TopicCache(IAmazonSimpleNotificationService client, CancellationToken cancellationToken)
Expand All @@ -27,7 +27,7 @@ public TopicCache(IAmazonSimpleNotificationService client, CancellationToken can

_cache = ClientContextCacheDefaults.CreateCache<string, TopicInfo>();

_loadExistingTopics = new Lazy<Task>(() => LoadExistingTopics(cancellationToken));
ResetLoadExistingTopics();

_durableTopics = new Dictionary<string, TopicInfo>();
}
Expand All @@ -51,7 +51,7 @@ public async ValueTask DisposeAsync()
public async Task<TopicInfo> Get(Topology.Topic topic)
{
if (!_topicsLoaded)
await _loadExistingTopics.Value.ConfigureAwait(false);
await LoadExistingTopics().ConfigureAwait(false);

lock (_durableTopics)
{
Expand All @@ -65,7 +65,7 @@ public async Task<TopicInfo> Get(Topology.Topic topic)
public async Task<TopicInfo> GetByName(string entityName)
{
if (!_topicsLoaded)
await _loadExistingTopics.Value.ConfigureAwait(false);
await LoadExistingTopics().ConfigureAwait(false);

lock (_durableTopics)
{
Expand Down Expand Up @@ -117,7 +117,28 @@ async Task<TopicInfo> CreateMissingTopic(Topology.Topic topic)
return missingTopic;
}

async Task LoadExistingTopics(CancellationToken token)
Lazy<Task> ResetLoadExistingTopics()
{
return _loadExistingTopics = new Lazy<Task>(() => LoadExistingTopicsLazy(_cancellationToken));
}

Task LoadExistingTopics()
{
var result = _loadExistingTopics.Value;
if (result.IsFaulted || result.IsCanceled)
{
lock (this)
{
result = _loadExistingTopics.Value;
if (result.IsFaulted || result.IsCanceled)
result = ResetLoadExistingTopics().Value;
}
}

return result;
}

async Task LoadExistingTopicsLazy(CancellationToken token)
{
var cursor = string.Empty;
do
Expand Down

0 comments on commit 863d71d

Please sign in to comment.