Skip to content

Commit

Permalink
Add Missing Synchronization in Ack Buffer Access (#1038)
Browse files Browse the repository at this point in the history
Fixes #1029

There was one piece of missing synchronization to the Acknowledgement Buffer that led to a racing condition where acknowledgements could be lost.

This commit adds the missing synchronization to fix the issue.
  • Loading branch information
tomazfernandes authored Feb 7, 2024
1 parent 49d7ef4 commit 562b933
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ public void run() {
try {
Message<T> polledMessage = this.acks.poll(1, TimeUnit.SECONDS);
if (polledMessage != null) {
this.acksBuffer.computeIfAbsent(this.messageGroupingFunction.apply(polledMessage),
newGroup -> new LinkedBlockingQueue<>()).add(polledMessage);
addMessageToBuffer(polledMessage);
this.thresholdAcknowledgementExecution.checkAndExecute();
}
}
Expand All @@ -203,6 +202,17 @@ public void run() {
logger.debug("Acknowledgement processor thread stopped");
}

private void addMessageToBuffer(Message<T> polledMessage) {
this.context.lock();
try {
this.acksBuffer.computeIfAbsent(this.messageGroupingFunction.apply(polledMessage),
newGroup -> new LinkedBlockingQueue<>()).add(polledMessage);
}
finally {
this.context.unlock();
}
}

public void waitAcknowledgementsToFinish() {
try {
CompletableFuture.allOf(this.context.runningAcks.toArray(new CompletableFuture[] {}))
Expand Down Expand Up @@ -330,16 +340,11 @@ private boolean isRunning() {
}

private void purgeEmptyBuffers() {
lock();
try {
List<String> emptyAcks = this.acksBuffer.entrySet().stream().filter(entry -> entry.getValue().isEmpty())
.map(Map.Entry::getKey).collect(Collectors.toList());
logger.trace("Removing groups {} from buffer in {}", emptyAcks, this.id);
emptyAcks.forEach(this.acksBuffer::remove);
}
finally {
unlock();
}
verifyLock();
List<String> emptyAcks = this.acksBuffer.entrySet().stream().filter(entry -> entry.getValue().isEmpty())
.map(Map.Entry::getKey).collect(Collectors.toList());
logger.trace("Removing groups {} from buffer in {}", emptyAcks, this.id);
emptyAcks.forEach(this.acksBuffer::remove);
}

private void lock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ abstract class BaseSqsIntegrationTest {

protected static final boolean useLocalStackClient = true;

protected static final boolean purgeQueues = true;
protected static boolean purgeQueues = true;

protected static boolean waitForPurge = true;

private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";

Expand Down Expand Up @@ -93,7 +95,14 @@ protected static CompletableFuture<?> createQueue(SqsAsyncClient client, String
if (purgeQueues) {
String queueUrl = v.queueUrl();
logger.debug("Purging queue {}", queueName);
return client.purgeQueue(req -> req.queueUrl(queueUrl).build());
return client.purgeQueue(req -> req.queueUrl(queueUrl).build())
.thenRun(() -> {
if (waitForPurge) {
logger.info("Waiting 30000 seconds to start sending.");
sleep(30000);
logger.info("Done waiting.");
}
});
}
else {
logger.debug("Skipping purge for queue {}", queueName);
Expand All @@ -108,6 +117,15 @@ protected static CompletableFuture<?> createQueue(SqsAsyncClient client, String
});
}

private static void sleep(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while sleeping");
}
}

private static CreateQueueRequest getCreateQueueRequest(String queueName,
Map<QueueAttributeName, String> attributes, CreateQueueRequest.Builder builder) {
if (!attributes.isEmpty()) {
Expand Down

0 comments on commit 562b933

Please sign in to comment.