Skip to content

Commit

Permalink
Use quota to limit parallelity to publish queue statistics.
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed May 28, 2024
1 parent 53a830c commit b1faef1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 45 deletions.
71 changes: 28 additions & 43 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,26 +438,6 @@ private void initialize() {
registerQueueCheck();
}

class Task {
private final String queueName;
private final DequeueStatistic dequeueStatistic;
Task(String queueName, DequeueStatistic dequeueStatistic) {
this.queueName = queueName;
this.dequeueStatistic = dequeueStatistic;
}
Future<Void> execute() {
// switch to a worker thread
return vertx.executeBlocking(promise -> {
dequeueStatisticCollector.setDequeueStatistic(queueName, dequeueStatistic).onComplete(event -> {
if (event.failed()) {
log.error("Future that should always succeed has failed, ignore it", event.cause());
}
promise.complete();
});
});
}
}

private Runnable newDequeueStatisticPublisher() {
return new Runnable() {
final AtomicBoolean isRunning = new AtomicBoolean();
Expand Down Expand Up @@ -499,32 +479,37 @@ public void run() {
void resume() {
// here we are executing in an event loop thread
try {
List<Task> entryList = new ArrayList<>();
while (iter.hasNext()) {
var entry = iter.next();
var queueName = entry.getKey();
var dequeueStatistic = entry.getValue();
entryList.add(new Task(queueName, dequeueStatistic));
}

Future<List<Void>> startFuture = Future.succeededFuture(new ArrayList<>());
// chain the futures sequentially to execute tasks
Future<List<Void>> resultFuture = entryList.stream()
.reduce(startFuture, (future, task) -> future.compose(previousResults -> {
// perform asynchronous task
return task.execute().compose(taskResult -> {
// append task result to previous results
previousResults.add(taskResult);
upperBoundParallel.request(redisMonitoringReqQuota, null, new UpperBoundParallel.Mentor<Void>() {
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void unused) {
if (iter.hasNext()) {
var entry = iter.next();
String queueName = entry.getKey();
DequeueStatistic dequeueStatistic = entry.getValue();
vertx.<Void>executeBlocking(onBlockingDone -> {
dequeueStatisticCollector.setDequeueStatistic(queueName, dequeueStatistic).onComplete(event -> {
if (event.failed()) {
log.error("Future that should always succeed has failed, ignore it",
exceptionFactory.newException(event.cause()));
}
onBlockingDone.complete();
});
}, (AsyncResult<Void> ev) -> {
i.incrementAndGet();
return Future.succeededFuture(previousResults);
onDone.accept(ev.cause(), ev.result());
});
}), (a,b) -> Future.succeededFuture());
resultFuture.onComplete(event -> {
if (event.failed()) {
log.error("publishing dequeue statistics not complete, just continue", event.cause());
}
return iter.hasNext();
}
@Override public boolean onError(Throwable ex, Void ctx) {
isRunning.set(false);
log.error("publishing dequeue statistics not complete, just continue", ex);
return false;
}
@Override public void onDone(Void ctx) {
isRunning.set(false);
log.debug("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs);
assert false : "TODO";
}
log.debug("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs);
isRunning.set(false);
});
} catch (Throwable ex) {
isRunning.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,12 @@ public static class Measurement {
* requested information is not (or not yet) available.</p>
*
* <p>Example 1: If gcFrac05 is 0.25, this means that one fourth of the
* execution time got into garbage collection.</p>
* execution time got into garbage collection during the past 5
* minutes.</p>
*
* <p>Example 2: If gcFrac15 is 0.03, this means that 3 percent of
* execution time got into garbage collection.</p>
* execution time got into garbage collection during the past 15
* minutes.</p>
*/
public float gcFrac01 = NaN, gcFrac05 = NaN, gcFrac15 = NaN;

Expand Down

0 comments on commit b1faef1

Please sign in to comment.