Skip to content

Commit

Permalink
[SDCISA-15833, swisspost#170] Add some doc for quota.
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed May 16, 2024
1 parent 3634ad8 commit cf5902f
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 50 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
<version>3.1.5-SNAPSHOT</version>
<version>0.0.0-SNAPSHOT</version>
<name>redisques</name>
<description>
A highly scalable redis-persistent queuing system for vertx
Expand Down
27 changes: 15 additions & 12 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
Expand Down Expand Up @@ -49,38 +48,39 @@ public class QueueStatsService {
private final String redisquesAddress;
private final QueueStatisticsCollector queueStatisticsCollector;
private final DequeueStatisticCollector dequeueStatisticCollector;
private final Semaphore incomingRequestLimit;
private final Semaphore incomingRequestQuota;

public QueueStatsService(
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
Semaphore incomingRequestLimit
Semaphore incomingRequestQuota
) {
this.vertx = vertx;
this.eventBus = eventBus;
this.redisquesAddress = redisquesAddress;
this.queueStatisticsCollector = queueStatisticsCollector;
this.dequeueStatisticCollector = dequeueStatisticCollector;
this.incomingRequestLimit = incomingRequestLimit;
this.incomingRequestQuota = incomingRequestQuota;
}

public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
if (!incomingRequestLimit.tryAcquire()) {
if (!incomingRequestQuota.tryAcquire()) {
var ex = new RuntimeException("Server too busy to handle yet-another-queue-stats-request now");
vertx.runOnContext(v -> mentor.onError(ex, mCtx));
return;
} else try {
}
AtomicBoolean isCompleted = new AtomicBoolean();
try {
var req0 = new GetQueueStatsRequest<CTX>();
AtomicBoolean isCompleted = new AtomicBoolean();
BiConsumer<Throwable, List<Queue>> onDone = (Throwable ex, List<Queue> ans) -> {
if (!isCompleted.compareAndSet(false, true)) {
if (log.isInfoEnabled()) log.info("", new RuntimeException("onDone MUST be called ONCE only", ex));
return;
}
incomingRequestLimit.release();
incomingRequestQuota.release();
if (ex != null) mentor.onError(ex, mCtx);
else mentor.onQueueStatistics(ans, mCtx);
};
Expand All @@ -100,8 +100,12 @@ public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
});
});
} catch (Exception ex) {
incomingRequestLimit.release();
throw ex;
if (!isCompleted.compareAndSet(false, true)) {
if (log.isInfoEnabled()) log.info("onDone MUST be called ONCE only", ex);
return;
}
incomingRequestQuota.release();
vertx.runOnContext(v -> mentor.onError(ex, mCtx));
}
}

Expand All @@ -110,8 +114,7 @@ private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsu
JsonObject operation = buildGetQueuesItemsCountOperation(filter);
eventBus.<JsonObject>request(redisquesAddress, operation, ev -> {
if (ev.failed()) {
Throwable ex = ev.cause();
onDone.accept(new NoStacktraceException("error_QzkCACMbAgCgOwIA", ex), req);
onDone.accept(new NoStacktraceException("error_QzkCACMbAgCgOwIA", ev.cause()), req);
return;
}
JsonObject body = ev.result().body();
Expand Down
83 changes: 50 additions & 33 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ public static class RedisQuesBuilder {
private MemoryUsageProvider memoryUsageProvider;
private RedisquesConfigurationProvider configurationProvider;
private RedisProvider redisProvider;
private Semaphore redisMonitoringReqLimit;
private Semaphore checkQueueRequestsLimit;/*TODO maybe should be removed*/
private Semaphore queueStatsRequestLimit;
private Semaphore redisMonitoringReqQuota;
private Semaphore checkQueueRequestsQuota;
private Semaphore queueStatsRequestQuota;
private Semaphore getQueuesItemsCountRedisRequestQuota;

private RedisQuesBuilder() {
Expand All @@ -118,45 +118,62 @@ public RedisQuesBuilder withRedisProvider(RedisProvider redisProvider) {
return this;
}

public RedisQuesBuilder withRedisMonitoringReqLimit(Semaphore limit) {
this.redisMonitoringReqLimit = limit;
/**
* How many redis requests monitoring related component will trigger
* simultaneously. One of those components for example is
* {@link QueueStatisticsCollector}.
*/
public RedisQuesBuilder withRedisMonitoringReqQuota(Semaphore quota) {
this.redisMonitoringReqQuota = quota;
return this;
}

public RedisQuesBuilder withCheckQueueRequestsLimit(Semaphore limit) {
this.checkQueueRequestsLimit = limit;
/**
* How many redis requests {@link RedisQues#checkQueues()} will trigger
* simultaneously.
*/
public RedisQuesBuilder withCheckQueueRequestsQuota(Semaphore quota) {
this.checkQueueRequestsQuota = quota;
return this;
}

public RedisQuesBuilder withQueueStatsRequestLimit(Semaphore limit) {
this.queueStatsRequestLimit = limit;
/**
* How many incoming requests {@link QueueStatsService} will accept
* simultaneously.
*/
public RedisQuesBuilder withQueueStatsRequestQuota(Semaphore quota) {
this.queueStatsRequestQuota = quota;
return this;
}

public RedisQuesBuilder withGetQueuesItemsCountRedisRequestQuota(Semaphore limit) {
this.getQueuesItemsCountRedisRequestQuota = limit;
/**
* How many simultaneous redis requests will be performed maximally for
* {@link org.swisspush.redisques.handler.GetQueuesItemsCountHandler} requests.
*/
public RedisQuesBuilder withGetQueuesItemsCountRedisRequestQuota(Semaphore quota) {
this.getQueuesItemsCountRedisRequestQuota = quota;
return this;
}

public RedisQues build() {
if (redisMonitoringReqLimit == null) {
redisMonitoringReqLimit = new Semaphore(Integer.MAX_VALUE);
if (redisMonitoringReqQuota == null) {
redisMonitoringReqQuota = new Semaphore(Integer.MAX_VALUE);
log.warn("No redis request limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
}
if (checkQueueRequestsLimit == null) {
checkQueueRequestsLimit = new Semaphore(Integer.MAX_VALUE);
if (checkQueueRequestsQuota == null) {
checkQueueRequestsQuota = new Semaphore(Integer.MAX_VALUE);
log.warn("No redis check queue limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
}
if (queueStatsRequestLimit == null) {
queueStatsRequestLimit = new Semaphore(Integer.MAX_VALUE);
if (queueStatsRequestQuota == null) {
queueStatsRequestQuota = new Semaphore(Integer.MAX_VALUE);
log.warn("No redis queue stats limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
}
if (getQueuesItemsCountRedisRequestQuota == null) {
getQueuesItemsCountRedisRequestQuota = new Semaphore(Integer.MAX_VALUE);
log.warn("No redis getQueueItemsCount quota provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
}
return new RedisQues(memoryUsageProvider, configurationProvider, redisProvider,
redisMonitoringReqLimit, checkQueueRequestsLimit, queueStatsRequestLimit,
redisMonitoringReqQuota, checkQueueRequestsQuota, queueStatsRequestQuota,
getQueuesItemsCountRedisRequestQuota);
}
}
Expand Down Expand Up @@ -207,34 +224,34 @@ private enum QueueState {
private Map<String, DequeueStatistic> dequeueStatistic = new ConcurrentHashMap<>();
private boolean dequeueStatisticEnabled = false;
private PeriodicSkipScheduler periodicSkipScheduler;
private final Semaphore redisMonitoringReqLimit;
private final Semaphore checkQueueRequestsLimit;
private final Semaphore queueStatsRequestLimit;
private final Semaphore redisMonitoringReqQuota;
private final Semaphore checkQueueRequestsQuota;
private final Semaphore queueStatsRequestQuota;
private final Semaphore getQueuesItemsCountRedisRequestQuota;

public RedisQues() {
log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE);
this.redisMonitoringReqLimit = new Semaphore(Integer.MAX_VALUE);
this.checkQueueRequestsLimit = new Semaphore(Integer.MAX_VALUE);
this.queueStatsRequestLimit = new Semaphore(Integer.MAX_VALUE);
this.redisMonitoringReqQuota = new Semaphore(Integer.MAX_VALUE);
this.checkQueueRequestsQuota = new Semaphore(Integer.MAX_VALUE);
this.queueStatsRequestQuota = new Semaphore(Integer.MAX_VALUE);
this.getQueuesItemsCountRedisRequestQuota = new Semaphore(Integer.MAX_VALUE);
}

public RedisQues(
MemoryUsageProvider memoryUsageProvider,
RedisquesConfigurationProvider configurationProvider,
RedisProvider redisProvider,
Semaphore redisMonitoringReqLimit,
Semaphore checkQueueRequestsLimit,
Semaphore queueStatsRequestLimit,
Semaphore redisMonitoringReqQuota,
Semaphore checkQueueRequestsQuota,
Semaphore queueStatsRequestQuota,
Semaphore getQueuesItemsCountRedisRequestQuota
) {
this.memoryUsageProvider = memoryUsageProvider;
this.configurationProvider = configurationProvider;
this.redisProvider = redisProvider;
this.redisMonitoringReqLimit = redisMonitoringReqLimit;
this.checkQueueRequestsLimit = checkQueueRequestsLimit;
this.queueStatsRequestLimit = queueStatsRequestLimit;
this.redisMonitoringReqQuota = redisMonitoringReqQuota;
this.checkQueueRequestsQuota = checkQueueRequestsQuota;
this.queueStatsRequestQuota = queueStatsRequestQuota;
this.getQueuesItemsCountRedisRequestQuota = getQueuesItemsCountRedisRequestQuota;
}

Expand Down Expand Up @@ -338,11 +355,11 @@ public void start(Promise<Void> promise) {
private void initialize() {
RedisquesConfiguration configuration = configurationProvider.configuration();
this.queueStatisticsCollector = new QueueStatisticsCollector(
redisProvider, queuesPrefix, vertx, redisMonitoringReqLimit,
redisProvider, queuesPrefix, vertx, redisMonitoringReqQuota,
configuration.getQueueSpeedIntervalSec());

RedisquesHttpRequestHandler.init(vertx, configuration, queueStatisticsCollector,
dequeueStatisticCollector, queueStatsRequestLimit);
dequeueStatisticCollector, queueStatsRequestQuota);

// only initialize memoryUsageProvider when not provided in the constructor
if (memoryUsageProvider == null) {
Expand Down Expand Up @@ -1085,7 +1102,7 @@ private Future<Void> checkQueues() {
// MUST pre-initialize ALL futures, so that our 'Future.all()' call knows
log.trace("RedisQues update queues: {}", ctx.counter);
var p = Promise.<Void>promise();
upperBoundParallel.request(checkQueueRequestsLimit, null, new UpperBoundParallel.Mentor<Void>() {
upperBoundParallel.request(checkQueueRequestsQuota, null, new UpperBoundParallel.Mentor<Void>() {
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void ctx_) {
if (ctx.iter.hasNext()) {
var queueObject = ctx.iter.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,20 @@ public class QueueStatisticsCollector {
private final RedisProvider redisProvider;
private final String queuePrefix;
private final Vertx vertx;
private final Semaphore redisRequestLimit;
private final Semaphore redisRequestQuota;
private final UpperBoundParallel upperBoundParallel;

public QueueStatisticsCollector(
RedisProvider redisProvider,
String queuePrefix,
Vertx vertx,
Semaphore redisRequestLimit,
Semaphore redisRequestQuota,
int speedIntervalSec
) {
this.redisProvider = redisProvider;
this.queuePrefix = queuePrefix;
this.vertx = vertx;
this.redisRequestLimit = redisRequestLimit;
this.redisRequestQuota = redisRequestQuota;
this.upperBoundParallel = new UpperBoundParallel(vertx);
speedStatisticsScheduler(speedIntervalSec);
}
Expand Down Expand Up @@ -175,7 +175,7 @@ public void resetQueueStatistics(JsonArray queues, BiConsumer<Throwable, Void> o
onDone.accept(null, null);
return;
}
upperBoundParallel.request(redisRequestLimit, null, new UpperBoundParallel.Mentor<Void>() {
upperBoundParallel.request(redisRequestQuota, null, new UpperBoundParallel.Mentor<Void>() {
int i = 0;
final int size = queues.size();
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void ctx) {
Expand Down

0 comments on commit cf5902f

Please sign in to comment.