From cf5902f0e8e55139872a09568f08133b6d7445f5 Mon Sep 17 00:00:00 2001
From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com>
Date: Thu, 16 May 2024 15:39:39 +0200
Subject: [PATCH] [SDCISA-15833, #170] Add some doc for quota.
---
pom.xml | 2 +-
.../redisques/QueueStatsService.java | 27 +++---
.../org/swisspush/redisques/RedisQues.java | 83 +++++++++++--------
.../util/QueueStatisticsCollector.java | 8 +-
4 files changed, 70 insertions(+), 50 deletions(-)
diff --git a/pom.xml b/pom.xml
index 9748f06..fa95b0f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2,7 +2,7 @@
4.0.0
org.swisspush
redisques
- 3.1.5-SNAPSHOT
+ 0.0.0-SNAPSHOT
redisques
A highly scalable redis-persistent queuing system for vertx
diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java
index e6f4840..4417dd0 100644
--- a/src/main/java/org/swisspush/redisques/QueueStatsService.java
+++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java
@@ -2,7 +2,6 @@
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
-import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
@@ -49,7 +48,7 @@ public class QueueStatsService {
private final String redisquesAddress;
private final QueueStatisticsCollector queueStatisticsCollector;
private final DequeueStatisticCollector dequeueStatisticCollector;
- private final Semaphore incomingRequestLimit;
+ private final Semaphore incomingRequestQuota;
public QueueStatsService(
Vertx vertx,
@@ -57,30 +56,31 @@ public QueueStatsService(
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
- Semaphore incomingRequestLimit
+ Semaphore incomingRequestQuota
) {
this.vertx = vertx;
this.eventBus = eventBus;
this.redisquesAddress = redisquesAddress;
this.queueStatisticsCollector = queueStatisticsCollector;
this.dequeueStatisticCollector = dequeueStatisticCollector;
- this.incomingRequestLimit = incomingRequestLimit;
+ this.incomingRequestQuota = incomingRequestQuota;
}
public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) {
- if (!incomingRequestLimit.tryAcquire()) {
+ if (!incomingRequestQuota.tryAcquire()) {
var ex = new RuntimeException("Server too busy to handle yet-another-queue-stats-request now");
vertx.runOnContext(v -> mentor.onError(ex, mCtx));
return;
- } else try {
+ }
+ AtomicBoolean isCompleted = new AtomicBoolean();
+ try {
var req0 = new GetQueueStatsRequest();
- AtomicBoolean isCompleted = new AtomicBoolean();
BiConsumer> onDone = (Throwable ex, List ans) -> {
if (!isCompleted.compareAndSet(false, true)) {
if (log.isInfoEnabled()) log.info("", new RuntimeException("onDone MUST be called ONCE only", ex));
return;
}
- incomingRequestLimit.release();
+ incomingRequestQuota.release();
if (ex != null) mentor.onError(ex, mCtx);
else mentor.onQueueStatistics(ans, mCtx);
};
@@ -100,8 +100,12 @@ public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) {
});
});
} catch (Exception ex) {
- incomingRequestLimit.release();
- throw ex;
+ if (!isCompleted.compareAndSet(false, true)) {
+ if (log.isInfoEnabled()) log.info("onDone MUST be called ONCE only", ex);
+ return;
+ }
+ incomingRequestQuota.release();
+ vertx.runOnContext(v -> mentor.onError(ex, mCtx));
}
}
@@ -110,8 +114,7 @@ private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsu
JsonObject operation = buildGetQueuesItemsCountOperation(filter);
eventBus.request(redisquesAddress, operation, ev -> {
if (ev.failed()) {
- Throwable ex = ev.cause();
- onDone.accept(new NoStacktraceException("error_QzkCACMbAgCgOwIA", ex), req);
+ onDone.accept(new NoStacktraceException("error_QzkCACMbAgCgOwIA", ev.cause()), req);
return;
}
JsonObject body = ev.result().body();
diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java
index 0af9f36..f1502fc 100644
--- a/src/main/java/org/swisspush/redisques/RedisQues.java
+++ b/src/main/java/org/swisspush/redisques/RedisQues.java
@@ -94,9 +94,9 @@ public static class RedisQuesBuilder {
private MemoryUsageProvider memoryUsageProvider;
private RedisquesConfigurationProvider configurationProvider;
private RedisProvider redisProvider;
- private Semaphore redisMonitoringReqLimit;
- private Semaphore checkQueueRequestsLimit;/*TODO maybe should be removed*/
- private Semaphore queueStatsRequestLimit;
+ private Semaphore redisMonitoringReqQuota;
+ private Semaphore checkQueueRequestsQuota;
+ private Semaphore queueStatsRequestQuota;
private Semaphore getQueuesItemsCountRedisRequestQuota;
private RedisQuesBuilder() {
@@ -118,37 +118,54 @@ public RedisQuesBuilder withRedisProvider(RedisProvider redisProvider) {
return this;
}
- public RedisQuesBuilder withRedisMonitoringReqLimit(Semaphore limit) {
- this.redisMonitoringReqLimit = limit;
+ /**
+ * How many redis requests monitoring related component will trigger
+ * simultaneously. One of those components for example is
+ * {@link QueueStatisticsCollector}.
+ */
+ public RedisQuesBuilder withRedisMonitoringReqQuota(Semaphore quota) {
+ this.redisMonitoringReqQuota = quota;
return this;
}
- public RedisQuesBuilder withCheckQueueRequestsLimit(Semaphore limit) {
- this.checkQueueRequestsLimit = limit;
+ /**
+ * How many redis requests {@link RedisQues#checkQueues()} will trigger
+ * simultaneously.
+ */
+ public RedisQuesBuilder withCheckQueueRequestsQuota(Semaphore quota) {
+ this.checkQueueRequestsQuota = quota;
return this;
}
- public RedisQuesBuilder withQueueStatsRequestLimit(Semaphore limit) {
- this.queueStatsRequestLimit = limit;
+ /**
+ * How many incoming requests {@link QueueStatsService} will accept
+ * simultaneously.
+ */
+ public RedisQuesBuilder withQueueStatsRequestQuota(Semaphore quota) {
+ this.queueStatsRequestQuota = quota;
return this;
}
- public RedisQuesBuilder withGetQueuesItemsCountRedisRequestQuota(Semaphore limit) {
- this.getQueuesItemsCountRedisRequestQuota = limit;
+ /**
+ * How many simultaneous redis requests will be performed maximally for
+ * {@link org.swisspush.redisques.handler.GetQueuesItemsCountHandler} requests.
+ */
+ public RedisQuesBuilder withGetQueuesItemsCountRedisRequestQuota(Semaphore quota) {
+ this.getQueuesItemsCountRedisRequestQuota = quota;
return this;
}
public RedisQues build() {
- if (redisMonitoringReqLimit == null) {
- redisMonitoringReqLimit = new Semaphore(Integer.MAX_VALUE);
+ if (redisMonitoringReqQuota == null) {
+ redisMonitoringReqQuota = new Semaphore(Integer.MAX_VALUE);
log.warn("No redis request limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
}
- if (checkQueueRequestsLimit == null) {
- checkQueueRequestsLimit = new Semaphore(Integer.MAX_VALUE);
+ if (checkQueueRequestsQuota == null) {
+ checkQueueRequestsQuota = new Semaphore(Integer.MAX_VALUE);
log.warn("No redis check queue limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
}
- if (queueStatsRequestLimit == null) {
- queueStatsRequestLimit = new Semaphore(Integer.MAX_VALUE);
+ if (queueStatsRequestQuota == null) {
+ queueStatsRequestQuota = new Semaphore(Integer.MAX_VALUE);
log.warn("No redis queue stats limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
}
if (getQueuesItemsCountRedisRequestQuota == null) {
@@ -156,7 +173,7 @@ public RedisQues build() {
log.warn("No redis getQueueItemsCount quota provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
}
return new RedisQues(memoryUsageProvider, configurationProvider, redisProvider,
- redisMonitoringReqLimit, checkQueueRequestsLimit, queueStatsRequestLimit,
+ redisMonitoringReqQuota, checkQueueRequestsQuota, queueStatsRequestQuota,
getQueuesItemsCountRedisRequestQuota);
}
}
@@ -207,16 +224,16 @@ private enum QueueState {
private Map dequeueStatistic = new ConcurrentHashMap<>();
private boolean dequeueStatisticEnabled = false;
private PeriodicSkipScheduler periodicSkipScheduler;
- private final Semaphore redisMonitoringReqLimit;
- private final Semaphore checkQueueRequestsLimit;
- private final Semaphore queueStatsRequestLimit;
+ private final Semaphore redisMonitoringReqQuota;
+ private final Semaphore checkQueueRequestsQuota;
+ private final Semaphore queueStatsRequestQuota;
private final Semaphore getQueuesItemsCountRedisRequestQuota;
public RedisQues() {
log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE);
- this.redisMonitoringReqLimit = new Semaphore(Integer.MAX_VALUE);
- this.checkQueueRequestsLimit = new Semaphore(Integer.MAX_VALUE);
- this.queueStatsRequestLimit = new Semaphore(Integer.MAX_VALUE);
+ this.redisMonitoringReqQuota = new Semaphore(Integer.MAX_VALUE);
+ this.checkQueueRequestsQuota = new Semaphore(Integer.MAX_VALUE);
+ this.queueStatsRequestQuota = new Semaphore(Integer.MAX_VALUE);
this.getQueuesItemsCountRedisRequestQuota = new Semaphore(Integer.MAX_VALUE);
}
@@ -224,17 +241,17 @@ public RedisQues(
MemoryUsageProvider memoryUsageProvider,
RedisquesConfigurationProvider configurationProvider,
RedisProvider redisProvider,
- Semaphore redisMonitoringReqLimit,
- Semaphore checkQueueRequestsLimit,
- Semaphore queueStatsRequestLimit,
+ Semaphore redisMonitoringReqQuota,
+ Semaphore checkQueueRequestsQuota,
+ Semaphore queueStatsRequestQuota,
Semaphore getQueuesItemsCountRedisRequestQuota
) {
this.memoryUsageProvider = memoryUsageProvider;
this.configurationProvider = configurationProvider;
this.redisProvider = redisProvider;
- this.redisMonitoringReqLimit = redisMonitoringReqLimit;
- this.checkQueueRequestsLimit = checkQueueRequestsLimit;
- this.queueStatsRequestLimit = queueStatsRequestLimit;
+ this.redisMonitoringReqQuota = redisMonitoringReqQuota;
+ this.checkQueueRequestsQuota = checkQueueRequestsQuota;
+ this.queueStatsRequestQuota = queueStatsRequestQuota;
this.getQueuesItemsCountRedisRequestQuota = getQueuesItemsCountRedisRequestQuota;
}
@@ -338,11 +355,11 @@ public void start(Promise promise) {
private void initialize() {
RedisquesConfiguration configuration = configurationProvider.configuration();
this.queueStatisticsCollector = new QueueStatisticsCollector(
- redisProvider, queuesPrefix, vertx, redisMonitoringReqLimit,
+ redisProvider, queuesPrefix, vertx, redisMonitoringReqQuota,
configuration.getQueueSpeedIntervalSec());
RedisquesHttpRequestHandler.init(vertx, configuration, queueStatisticsCollector,
- dequeueStatisticCollector, queueStatsRequestLimit);
+ dequeueStatisticCollector, queueStatsRequestQuota);
// only initialize memoryUsageProvider when not provided in the constructor
if (memoryUsageProvider == null) {
@@ -1085,7 +1102,7 @@ private Future checkQueues() {
// MUST pre-initialize ALL futures, so that our 'Future.all()' call knows
log.trace("RedisQues update queues: {}", ctx.counter);
var p = Promise.promise();
- upperBoundParallel.request(checkQueueRequestsLimit, null, new UpperBoundParallel.Mentor() {
+ upperBoundParallel.request(checkQueueRequestsQuota, null, new UpperBoundParallel.Mentor() {
@Override public boolean runOneMore(BiConsumer onDone, Void ctx_) {
if (ctx.iter.hasNext()) {
var queueObject = ctx.iter.next();
diff --git a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java
index 595c8e3..6e1de9a 100644
--- a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java
+++ b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java
@@ -72,20 +72,20 @@ public class QueueStatisticsCollector {
private final RedisProvider redisProvider;
private final String queuePrefix;
private final Vertx vertx;
- private final Semaphore redisRequestLimit;
+ private final Semaphore redisRequestQuota;
private final UpperBoundParallel upperBoundParallel;
public QueueStatisticsCollector(
RedisProvider redisProvider,
String queuePrefix,
Vertx vertx,
- Semaphore redisRequestLimit,
+ Semaphore redisRequestQuota,
int speedIntervalSec
) {
this.redisProvider = redisProvider;
this.queuePrefix = queuePrefix;
this.vertx = vertx;
- this.redisRequestLimit = redisRequestLimit;
+ this.redisRequestQuota = redisRequestQuota;
this.upperBoundParallel = new UpperBoundParallel(vertx);
speedStatisticsScheduler(speedIntervalSec);
}
@@ -175,7 +175,7 @@ public void resetQueueStatistics(JsonArray queues, BiConsumer o
onDone.accept(null, null);
return;
}
- upperBoundParallel.request(redisRequestLimit, null, new UpperBoundParallel.Mentor() {
+ upperBoundParallel.request(redisRequestQuota, null, new UpperBoundParallel.Mentor() {
int i = 0;
final int size = queues.size();
@Override public boolean runOneMore(BiConsumer onDone, Void ctx) {