Skip to content

Commit

Permalink
[SDCISA-15833, swisspost#170] Redis Request Quota for GetQueuesItemsC…
Browse files Browse the repository at this point in the history
…ountHandler
  • Loading branch information
hiddenalpha committed May 7, 2024
1 parent 53733bd commit 1e6f046
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 199 deletions.
64 changes: 47 additions & 17 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.swisspush.redisques;

import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
Expand All @@ -11,6 +12,8 @@
import org.swisspush.redisques.util.QueueStatisticsCollector;

import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

import static java.lang.Long.compare;
Expand All @@ -37,41 +40,68 @@ public class QueueStatsService {
private final String redisquesAddress;
private final QueueStatisticsCollector queueStatisticsCollector;
private final DequeueStatisticCollector dequeueStatisticCollector;
private final Semaphore incomingRequestLimit;

public QueueStatsService(Vertx vertx, EventBus eventBus, String redisquesAddress, QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector) {
public QueueStatsService(
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
Semaphore incomingRequestLimit
) {
this.vertx = vertx;
this.eventBus = eventBus;
this.redisquesAddress = redisquesAddress;
this.queueStatisticsCollector = queueStatisticsCollector;
this.dequeueStatisticCollector = dequeueStatisticCollector;
this.incomingRequestLimit = incomingRequestLimit;
}

public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
var req0 = new GetQueueStatsRequest<CTX>();
req0.mCtx = mCtx;
req0.mentor = mentor;
fetchQueueNamesAndSize(req0, (ex1, req1) -> {
if (ex1 != null) { req1.mentor.onError(ex1, req1.mCtx); return; }
// Prepare a list of queue names as it is needed to fetch retryDetails.
req1.queueNames = new ArrayList<>(req1.queues.size());
for (Queue q : req1.queues) req1.queueNames.add(q.name);
fetchRetryDetails(req1, (ex2, req2) -> {
if (ex2 != null) { req2.mentor.onError(ex2, req2.mCtx); return; }
attachDequeueStats(req2, (ex3, req3) -> {
if (ex3 != null) { req3.mentor.onError(ex3, req3.mCtx); return; }
req3.mentor.onQueueStatistics(req3.queues, req3.mCtx);
if (!incomingRequestLimit.tryAcquire()) {
vertx.runOnContext(v -> {
var ex = new RuntimeException("Server too busy to handle yet-another-queue-stats-request now");
mentor.onError(ex, mCtx);
});
return;
} else try {
var req0 = new GetQueueStatsRequest<CTX>();
AtomicBoolean isCompleted = new AtomicBoolean();
BiConsumer<Throwable, List<Queue>> onDone = (Throwable ex, List<Queue> ans) -> {
if (!isCompleted.compareAndSet(false, true))
return/*TODO maybe throw here*/;
incomingRequestLimit.release();
if (ex != null) mentor.onError(ex, mCtx);
else mentor.onQueueStatistics(ans, mCtx);
};
req0.mCtx = mCtx;
req0.mentor = mentor;
fetchQueueNamesAndSize(req0, (ex1, req1) -> {
if (ex1 != null) { onDone.accept(ex1, null); return; }
// Prepare a list of queue names as it is needed to fetch retryDetails.
req1.queueNames = new ArrayList<>(req1.queues.size());
for (Queue q : req1.queues) req1.queueNames.add(q.name);
fetchRetryDetails(req1, (ex2, req2) -> {
if (ex2 != null) { onDone.accept(ex2, null); return; }
attachDequeueStats(req2, (ex3, req3) -> {
if (ex3 != null) { onDone.accept(ex3, null); return; }
onDone.accept(null, req3.queues);
});
});
});
});
} catch (Exception ex) {
incomingRequestLimit.release();
throw ex;
}
}

private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
String filter = req.mentor.filter(req.mCtx);
JsonObject operation = buildGetQueuesItemsCountOperation(filter);
eventBus.<JsonObject>request(redisquesAddress, operation, ev -> {
if (ev.failed()) {
onDone.accept(new Exception("eventBus.request()", ev.cause()), null);
onDone.accept(new Exception("eventBus.request()", ev.cause()), req);
return;
}
Message<JsonObject> msg = ev.result();
Expand Down
Loading

0 comments on commit 1e6f046

Please sign in to comment.