diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index b3828d8..6563cc8 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -438,26 +438,6 @@ private void initialize() { registerQueueCheck(); } - class Task { - private final String queueName; - private final DequeueStatistic dequeueStatistic; - Task(String queueName, DequeueStatistic dequeueStatistic) { - this.queueName = queueName; - this.dequeueStatistic = dequeueStatistic; - } - Future execute() { - // switch to a worker thread - return vertx.executeBlocking(promise -> { - dequeueStatisticCollector.setDequeueStatistic(queueName, dequeueStatistic).onComplete(event -> { - if (event.failed()) { - log.error("Future that should always succeed has failed, ignore it", event.cause()); - } - promise.complete(); - }); - }); - } - } - private Runnable newDequeueStatisticPublisher() { return new Runnable() { final AtomicBoolean isRunning = new AtomicBoolean(); @@ -499,32 +479,37 @@ public void run() { void resume() { // here we are executing in an event loop thread try { - List entryList = new ArrayList<>(); - while (iter.hasNext()) { - var entry = iter.next(); - var queueName = entry.getKey(); - var dequeueStatistic = entry.getValue(); - entryList.add(new Task(queueName, dequeueStatistic)); - } - - Future> startFuture = Future.succeededFuture(new ArrayList<>()); - // chain the futures sequentially to execute tasks - Future> resultFuture = entryList.stream() - .reduce(startFuture, (future, task) -> future.compose(previousResults -> { - // perform asynchronous task - return task.execute().compose(taskResult -> { - // append task result to previous results - previousResults.add(taskResult); + upperBoundParallel.request(redisMonitoringReqQuota, null, new UpperBoundParallel.Mentor() { + @Override public boolean runOneMore(BiConsumer onDone, Void unused) { + if (iter.hasNext()) { + var entry = iter.next(); + String queueName = entry.getKey(); + DequeueStatistic dequeueStatistic = entry.getValue(); + vertx.executeBlocking(onBlockingDone -> { + dequeueStatisticCollector.setDequeueStatistic(queueName, dequeueStatistic).onComplete(event -> { + if (event.failed()) { + log.error("Future that should always succeed has failed, ignore it", + exceptionFactory.newException(event.cause())); + } + onBlockingDone.complete(); + }); + }, (AsyncResult ev) -> { i.incrementAndGet(); - return Future.succeededFuture(previousResults); + onDone.accept(ev.cause(), ev.result()); }); - }), (a,b) -> Future.succeededFuture()); - resultFuture.onComplete(event -> { - if (event.failed()) { - log.error("publishing dequeue statistics not complete, just continue", event.cause()); + } + return iter.hasNext(); + } + @Override public boolean onError(Throwable ex, Void ctx) { + isRunning.set(false); + log.error("publishing dequeue statistics not complete, just continue", ex); + return false; + } + @Override public void onDone(Void ctx) { + isRunning.set(false); + log.debug("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs); + assert false : "TODO"; } - log.debug("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs); - isRunning.set(false); }); } catch (Throwable ex) { isRunning.set(false); diff --git a/src/main/java/org/swisspush/redisques/performance/JavaGcStats.java b/src/main/java/org/swisspush/redisques/performance/JavaGcStats.java index 4b11e7f..6be99a8 100644 --- a/src/main/java/org/swisspush/redisques/performance/JavaGcStats.java +++ b/src/main/java/org/swisspush/redisques/performance/JavaGcStats.java @@ -185,10 +185,12 @@ public static class Measurement { * requested information is not (or not yet) available.

* *

Example 1: If gcFrac05 is 0.25, this means that one fourth of the - * execution time got into garbage collection.

+ * execution time got into garbage collection during the past 5 + * minutes.

* *

Example 2: If gcFrac15 is 0.03, this means that 3 percent of - * execution time got into garbage collection.

+ * execution time got into garbage collection during the past 15 + * minutes.

*/ public float gcFrac01 = NaN, gcFrac05 = NaN, gcFrac15 = NaN;