diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index 07d38ce..1f9ab99 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -1,5 +1,6 @@ package org.swisspush.redisques; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.Message; @@ -11,6 +12,8 @@ import org.swisspush.redisques.util.QueueStatisticsCollector; import java.util.*; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import static java.lang.Long.compare; @@ -37,33 +40,60 @@ public class QueueStatsService { private final String redisquesAddress; private final QueueStatisticsCollector queueStatisticsCollector; private final DequeueStatisticCollector dequeueStatisticCollector; + private final Semaphore incomingRequestLimit; - public QueueStatsService(Vertx vertx, EventBus eventBus, String redisquesAddress, QueueStatisticsCollector queueStatisticsCollector, - DequeueStatisticCollector dequeueStatisticCollector) { + public QueueStatsService( + Vertx vertx, + EventBus eventBus, + String redisquesAddress, + QueueStatisticsCollector queueStatisticsCollector, + DequeueStatisticCollector dequeueStatisticCollector, + Semaphore incomingRequestLimit + ) { this.vertx = vertx; this.eventBus = eventBus; this.redisquesAddress = redisquesAddress; this.queueStatisticsCollector = queueStatisticsCollector; this.dequeueStatisticCollector = dequeueStatisticCollector; + this.incomingRequestLimit = incomingRequestLimit; } public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { - var req0 = new GetQueueStatsRequest(); - req0.mCtx = mCtx; - req0.mentor = mentor; - fetchQueueNamesAndSize(req0, (ex1, req1) -> { - if (ex1 != null) { req1.mentor.onError(ex1, req1.mCtx); return; } - // Prepare a list of queue names as it is needed to fetch retryDetails. - req1.queueNames = new ArrayList<>(req1.queues.size()); - for (Queue q : req1.queues) req1.queueNames.add(q.name); - fetchRetryDetails(req1, (ex2, req2) -> { - if (ex2 != null) { req2.mentor.onError(ex2, req2.mCtx); return; } - attachDequeueStats(req2, (ex3, req3) -> { - if (ex3 != null) { req3.mentor.onError(ex3, req3.mCtx); return; } - req3.mentor.onQueueStatistics(req3.queues, req3.mCtx); + if (!incomingRequestLimit.tryAcquire()) { + vertx.runOnContext(v -> { + var ex = new RuntimeException("Server too busy to handle yet-another-queue-stats-request now"); + mentor.onError(ex, mCtx); + }); + return; + } else try { + var req0 = new GetQueueStatsRequest(); + AtomicBoolean isCompleted = new AtomicBoolean(); + BiConsumer> onDone = (Throwable ex, List ans) -> { + if (!isCompleted.compareAndSet(false, true)) + return/*TODO maybe throw here*/; + incomingRequestLimit.release(); + if (ex != null) mentor.onError(ex, mCtx); + else mentor.onQueueStatistics(ans, mCtx); + }; + req0.mCtx = mCtx; + req0.mentor = mentor; + fetchQueueNamesAndSize(req0, (ex1, req1) -> { + if (ex1 != null) { onDone.accept(ex1, null); return; } + // Prepare a list of queue names as it is needed to fetch retryDetails. + req1.queueNames = new ArrayList<>(req1.queues.size()); + for (Queue q : req1.queues) req1.queueNames.add(q.name); + fetchRetryDetails(req1, (ex2, req2) -> { + if (ex2 != null) { onDone.accept(ex2, null); return; } + attachDequeueStats(req2, (ex3, req3) -> { + if (ex3 != null) { onDone.accept(ex3, null); return; } + onDone.accept(null, req3.queues); + }); }); }); - }); + } catch (Exception ex) { + incomingRequestLimit.release(); + throw ex; + } } private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer> onDone) { @@ -71,7 +101,7 @@ private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsu JsonObject operation = buildGetQueuesItemsCountOperation(filter); eventBus.request(redisquesAddress, operation, ev -> { if (ev.failed()) { - onDone.accept(new Exception("eventBus.request()", ev.cause()), null); + onDone.accept(new Exception("eventBus.request()", ev.cause()), req); return; } Message msg = ev.result(); diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 2f4a1b0..0af9f36 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -95,7 +95,9 @@ public static class RedisQuesBuilder { private RedisquesConfigurationProvider configurationProvider; private RedisProvider redisProvider; private Semaphore redisMonitoringReqLimit; - private Semaphore checkQueueRequestsLimit; + private Semaphore checkQueueRequestsLimit;/*TODO maybe should be removed*/ + private Semaphore queueStatsRequestLimit; + private Semaphore getQueuesItemsCountRedisRequestQuota; private RedisQuesBuilder() { // Private, as clients should use "RedisQues.builder()" and not this class here directly. @@ -126,17 +128,36 @@ public RedisQuesBuilder withCheckQueueRequestsLimit(Semaphore limit) { return this; } + public RedisQuesBuilder withQueueStatsRequestLimit(Semaphore limit) { + this.queueStatsRequestLimit = limit; + return this; + } + + public RedisQuesBuilder withGetQueuesItemsCountRedisRequestQuota(Semaphore limit) { + this.getQueuesItemsCountRedisRequestQuota = limit; + return this; + } + public RedisQues build() { - var farFromCommonSenseLimit = new Semaphore(Integer.MAX_VALUE); if (redisMonitoringReqLimit == null) { - redisMonitoringReqLimit = farFromCommonSenseLimit; + redisMonitoringReqLimit = new Semaphore(Integer.MAX_VALUE); log.warn("No redis request limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE); } if (checkQueueRequestsLimit == null) { - checkQueueRequestsLimit = farFromCommonSenseLimit; + checkQueueRequestsLimit = new Semaphore(Integer.MAX_VALUE); log.warn("No redis check queue limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE); } - return new RedisQues(memoryUsageProvider, configurationProvider, redisProvider, redisMonitoringReqLimit, checkQueueRequestsLimit); + if (queueStatsRequestLimit == null) { + queueStatsRequestLimit = new Semaphore(Integer.MAX_VALUE); + log.warn("No redis queue stats limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE); + } + if (getQueuesItemsCountRedisRequestQuota == null) { + getQueuesItemsCountRedisRequestQuota = new Semaphore(Integer.MAX_VALUE); + log.warn("No redis getQueueItemsCount quota provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE); + } + return new RedisQues(memoryUsageProvider, configurationProvider, redisProvider, + redisMonitoringReqLimit, checkQueueRequestsLimit, queueStatsRequestLimit, + getQueuesItemsCountRedisRequestQuota); } } @@ -188,12 +209,15 @@ private enum QueueState { private PeriodicSkipScheduler periodicSkipScheduler; private final Semaphore redisMonitoringReqLimit; private final Semaphore checkQueueRequestsLimit; + private final Semaphore queueStatsRequestLimit; + private final Semaphore getQueuesItemsCountRedisRequestQuota; public RedisQues() { log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE); - var farFromCommonSenseLimit = new Semaphore(Integer.MAX_VALUE); - this.redisMonitoringReqLimit = farFromCommonSenseLimit; - this.checkQueueRequestsLimit = farFromCommonSenseLimit; + this.redisMonitoringReqLimit = new Semaphore(Integer.MAX_VALUE); + this.checkQueueRequestsLimit = new Semaphore(Integer.MAX_VALUE); + this.queueStatsRequestLimit = new Semaphore(Integer.MAX_VALUE); + this.getQueuesItemsCountRedisRequestQuota = new Semaphore(Integer.MAX_VALUE); } public RedisQues( @@ -201,13 +225,17 @@ public RedisQues( RedisquesConfigurationProvider configurationProvider, RedisProvider redisProvider, Semaphore redisMonitoringReqLimit, - Semaphore checkQueueRequestsLimit + Semaphore checkQueueRequestsLimit, + Semaphore queueStatsRequestLimit, + Semaphore getQueuesItemsCountRedisRequestQuota ) { this.memoryUsageProvider = memoryUsageProvider; this.configurationProvider = configurationProvider; this.redisProvider = redisProvider; this.redisMonitoringReqLimit = redisMonitoringReqLimit; this.checkQueueRequestsLimit = checkQueueRequestsLimit; + this.queueStatsRequestLimit = queueStatsRequestLimit; + this.getQueuesItemsCountRedisRequestQuota = getQueuesItemsCountRedisRequestQuota; } public static RedisQuesBuilder builder() { @@ -313,7 +341,8 @@ private void initialize() { redisProvider, queuesPrefix, vertx, redisMonitoringReqLimit, configuration.getQueueSpeedIntervalSec()); - RedisquesHttpRequestHandler.init(vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector); + RedisquesHttpRequestHandler.init(vertx, configuration, queueStatisticsCollector, + dequeueStatisticCollector, queueStatsRequestLimit); // only initialize memoryUsageProvider when not provided in the constructor if (memoryUsageProvider == null) { @@ -321,9 +350,10 @@ private void initialize() { configurationProvider.configuration().getMemoryUsageCheckIntervalSec()); } + assert getQueuesItemsCountRedisRequestQuota != null; queueActionFactory = new QueueActionFactory(redisProvider, vertx, log, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueStatisticsCollector, memoryUsageProvider, - configurationProvider); + configurationProvider, getQueuesItemsCountRedisRequestQuota); queueActions.put(addQueueItem, queueActionFactory.buildQueueAction(addQueueItem)); queueActions.put(deleteQueueItem, queueActionFactory.buildQueueAction(deleteQueueItem)); @@ -469,66 +499,61 @@ void resume() { private void registerActiveQueueRegistrationRefresh() { // Periodic refresh of my registrations on active queues. - vertx.setPeriodic(configurationProvider.configuration().getRefreshPeriod() * 1000L, new Handler() { - Iterator> iter; - @Override public void handle(Long timerId) { - // Need a copy to prevent concurrent modification issuses. - iter = new HashMap<>(myQueues).entrySet().iterator(); - // Trigger only a limitted amount of requests in parallel. - upperBoundParallel.request(redisMonitoringReqLimit, iter, new UpperBoundParallel.Mentor<>() { - @Override public boolean runOneMore(BiConsumer onDone, Iterator> iter) { - handleNextQueueOfInterest(onDone); - return iter.hasNext(); - } - @Override public boolean onError(Throwable ex, Iterator> iter) { - log.warn("TODO error handling", ex); - return false; - } - @Override public void onDone(Iterator> iter) {/*no-op*/} - }); - } - void handleNextQueueOfInterest(BiConsumer onDone) { - while (iter.hasNext()) { - var entry = iter.next(); - if (entry.getValue() != QueueState.CONSUMING) continue; - checkIfImStillTheRegisteredConsumer(entry.getKey(), onDone); - return; - } - // no entry found. we're done. - onDone.accept(null, null); - } - void checkIfImStillTheRegisteredConsumer(String queue, BiConsumer onDone) { + var periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; + periodicSkipScheduler.setPeriodic(periodMs, "registerActiveQueueRegistrationRefresh", onDone_ -> { + AtomicInteger numPending = new AtomicInteger(); + Runnable onDone = () -> { + var remaining = numPending.decrementAndGet(); + assert remaining >= 0 : "Why is remaining " + remaining; + if (remaining == 0) onDone_.run(); + }; + boolean foundAtLeastOne = false; + for (Map.Entry entry : myQueues.entrySet()) { + if (entry.getValue() == QueueState.CONSUMING) continue; + foundAtLeastOne = true; + + numPending.incrementAndGet(); + final String queue = entry.getKey(); // Check if I am still the registered consumer String consumerKey = consumersPrefix + queue; - log.trace("RedisQues refresh queues get: {}", consumerKey); - redisProvider.redis().onComplete( ev1 -> { - if (ev1.failed()) { - onDone.accept(ev1.cause(), null); - return; - } - var redisAPI = ev1.result(); - redisAPI.get(consumerKey, getConsumerEvent -> { - if (getConsumerEvent.failed()) { - onDone.accept(new RuntimeException("Failed to get queue consumer for queue '{}'", getConsumerEvent.cause()), null); - return; - } - final String consumer = Objects.toString(getConsumerEvent.result(), ""); - if (uid.equals(consumer)) { - log.debug("RedisQues Periodic consumer refresh for active queue {}", queue); - refreshRegistration(queue, ev -> { - if (ev.failed()) { - onDone.accept(new RuntimeException("TODO error handling", ev.cause()), null); - return; - } - updateTimestamp(queue, ev3 -> onDone.accept(ev3.failed() ? ev3.cause() : null, null)); - }); - } else { - log.debug("RedisQues Removing queue {} from the list", queue); - myQueues.remove(queue); - queueStatisticsCollector.resetQueueFailureStatistics(queue, onDone); - } - }); - }); + if (log.isTraceEnabled()) { + log.trace("RedisQues refresh queues get: {}", consumerKey); + } + redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(consumerKey, getConsumerEvent -> { + if (getConsumerEvent.failed()) { + log.warn("Failed to get queue consumer for queue '{}'. But we'll continue anyway :)", queue, getConsumerEvent.cause()); + // We should return here. See: "https://softwareengineering.stackexchange.com/a/190535" + } + final String consumer = Objects.toString(getConsumerEvent.result(), ""); + if (uid.equals(consumer)) { + log.debug("RedisQues Periodic consumer refresh for active queue {}", queue); + refreshRegistration(queue, ev -> { + if (ev.failed()) + log.warn("TODO error handling", new Exception(ev.cause())); + updateTimestamp(queue, updateTimestampEv -> { + if (updateTimestampEv.failed()) log.warn("TODO error handling", + new RuntimeException(updateTimestampEv.cause())); + onDone.run(); + }); + }); + } else { + log.debug("RedisQues Removing queue {} from the list", queue); + myQueues.remove(queue); + queueStatisticsCollector.resetQueueFailureStatistics(queue, (Throwable ex, Void v) -> { + if (ex != null) log.warn("TODO error handling", new RuntimeException(ex)); + onDone.run(); + }); + } + })) + .onFailure(ex -> { + log.error("Redis: Failed to registerActiveQueueRegistrationRefresh", ex); + onDone.run(); + }); + + } + if (!foundAtLeastOne) { + numPending.incrementAndGet(); + onDone.run(); } }); } @@ -1041,9 +1066,6 @@ private Future checkQueues() { RedisAPI redisAPI; AtomicInteger counter; Iterator iter; - int iIter; - Promise[] promises; - Future[] futures; }; Future.succeededFuture().compose((Void v) -> { log.debug("Checking queues timestamps"); @@ -1058,29 +1080,15 @@ private Future checkQueues() { }).compose((Response queues) -> { assert ctx.counter == null; assert ctx.iter == null; - assert ctx.promises == null; - assert ctx.futures == null; ctx.counter = new AtomicInteger(queues.size()); ctx.iter = queues.iterator(); - ctx.promises = new Promise[queues.size()]; - ctx.futures = new Future[queues.size()]; // MUST pre-initialize ALL futures, so that our 'Future.all()' call knows - // for how many it has to wait. - for (int i = 0; i < ctx.futures.length; i++) { - ctx.promises[i] = Promise.promise(); - ctx.futures[i] = ctx.promises[i].future(); - } log.trace("RedisQues update queues: {}", ctx.counter); + var p = Promise.promise(); upperBoundParallel.request(checkQueueRequestsLimit, null, new UpperBoundParallel.Mentor() { - @Override public boolean runOneMore(BiConsumer onDone_, Void ctx_) { + @Override public boolean runOneMore(BiConsumer onDone, Void ctx_) { if (ctx.iter.hasNext()) { var queueObject = ctx.iter.next(); - var promise = ctx.promises[ctx.iIter++]; - BiConsumer onDone = (Throwable ex, Void v) -> { - if (ex != null) promise.fail(ex); - else promise.complete(); - onDone_.accept(ex, v); - }; // Check if the inactive queue is not empty (i.e. the key exists) final String queueName = queueObject.toString(); String key = queuesPrefix + queueName; @@ -1153,25 +1161,18 @@ private Future checkQueues() { return true; // true, keep going with other queues. } @Override public void onDone(Void ctx_) { - assert false : "TODO anything to do here?"; + // No longer used, so reduce GC graph traversal effort. + ctx.redisAPI = null; + ctx.counter = null; + ctx.iter = null; + // Mark this composition step as completed. + p.complete(); } }); - var p = Promise.promise(); - Future.all(List.of(ctx.futures)).onComplete(ev -> { - if (ev.failed()) p.fail(ev.cause()); - else if (ev.result().failed()) p.fail(ev.result().cause()); - else p.complete(null); - // No longer used, so reduce GC graph traversal effort. - ctx.redisAPI = null; - ctx.counter = null; - ctx.iter = null; - ctx.promises = null; - ctx.futures = null; - }); return p.future(); }).compose((Void v) -> { ctx.checkQueuesResult.complete(); - return null; + return Future.succeededFuture(); }).onFailure(ex -> { log.debug("Redis: Failed to checkQueues", ex); ctx.checkQueuesResult.fail(ex); diff --git a/src/main/java/org/swisspush/redisques/action/GetQueueItemsCountAction.java b/src/main/java/org/swisspush/redisques/action/GetQueueItemsCountAction.java index 04d8ca5..797050a 100644 --- a/src/main/java/org/swisspush/redisques/action/GetQueueItemsCountAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetQueueItemsCountAction.java @@ -31,6 +31,7 @@ public GetQueueItemsCountAction( @Override public void execute(Message event) { + /* TODO this seems the handler that is timing out. */ String queue = event.body().getJsonObject(PAYLOAD).getString(QUEUENAME); var p = redisProvider.redis(); p.onSuccess(redisAPI -> redisAPI.llen(queuesPrefix + queue, new GetQueueItemsCountHandler(event))); diff --git a/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java b/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java index 77637b6..94873ee 100644 --- a/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Optional; +import java.util.concurrent.Semaphore; import java.util.regex.Pattern; import static org.swisspush.redisques.util.RedisquesAPI.*; @@ -18,11 +19,24 @@ */ public class GetQueuesItemsCountAction extends AbstractQueueAction { - public GetQueuesItemsCountAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { + private final Semaphore redisRequestQuota; + + public GetQueuesItemsCountAction( + Vertx vertx, + RedisProvider redisProvider, + String address, + String queuesKey, + String queuesPrefix, + String consumersPrefix, + String locksKey, + List queueConfigurations, + Semaphore redisRequestQuota, + QueueStatisticsCollector queueStatisticsCollector, + Logger log + ) { super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, queueStatisticsCollector, log); + this.redisRequestQuota = redisRequestQuota; } @Override @@ -34,8 +48,8 @@ public void execute(Message event) { } else { redisProvider.redis().onSuccess(redisAPI -> redisAPI.zrangebyscore(List.of(queuesKey, String.valueOf(getMaxAgeTimestamp()), "+inf"), - new GetQueuesItemsCountHandler(event, filterPattern.getOk(), - queuesPrefix, redisProvider))) + new GetQueuesItemsCountHandler(vertx, event, filterPattern.getOk(), + queuesPrefix, redisProvider, redisRequestQuota))) .onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } } diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java index 6f51dd1..6b9b24b 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java @@ -1,103 +1,146 @@ package org.swisspush.redisques.handler; import io.vertx.core.AsyncResult; -import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Command; import io.vertx.redis.client.Redis; import io.vertx.redis.client.Request; -import io.vertx.redis.client.impl.RequestImpl; -import io.vertx.redis.client.impl.types.NumberType; +import io.vertx.redis.client.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.vertx.redis.client.Response; +import org.swisspush.redisques.exception.NoStacktraceException; +import org.swisspush.redisques.performance.UpperBoundParallel; +import org.swisspush.redisques.util.HandlerUtil; +import org.swisspush.redisques.util.RedisProvider; + +import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.concurrent.Semaphore; +import java.util.function.BiConsumer; import java.util.regex.Pattern; -import java.util.stream.Collectors; -import org.swisspush.redisques.util.HandlerUtil; -import org.swisspush.redisques.util.RedisProvider; -import org.swisspush.redisques.util.RedisquesAPI; - -import static org.swisspush.redisques.util.RedisquesAPI.*; +import static java.lang.System.currentTimeMillis; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME; +import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_SIZE; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.QUEUES; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; public class GetQueuesItemsCountHandler implements Handler> { private final Logger log = LoggerFactory.getLogger(GetQueuesItemsCountHandler.class); + private final Vertx vertx; private final Message event; private final Optional filterPattern; private final String queuesPrefix; private final RedisProvider redisProvider; + private final UpperBoundParallel upperBoundParallel; + private final Semaphore redisRequestQuota; public GetQueuesItemsCountHandler( + Vertx vertx, Message event, Optional filterPattern, String queuesPrefix, - RedisProvider redisProvider + RedisProvider redisProvider, + Semaphore redisRequestQuota ) { + this.vertx = vertx; this.event = event; this.filterPattern = filterPattern; this.queuesPrefix = queuesPrefix; this.redisProvider = redisProvider; + this.upperBoundParallel = new UpperBoundParallel(vertx); + this.redisRequestQuota = redisRequestQuota; } @Override public void handle(AsyncResult handleQueues) { - if (handleQueues.succeeded()) { - List queues = HandlerUtil.filterByPattern(handleQueues.result(), - filterPattern); - if (queues.isEmpty()) { - log.debug("Queue count evaluation with empty queues"); - event.reply(new JsonObject().put(STATUS, OK).put(QUEUES, new JsonArray())); - return; - } - - redisProvider.connection().onSuccess(conn -> { - List responses = queues.stream() - .map(queue -> conn.send(Request.cmd(Command.LLEN, queuesPrefix + queue))) - .collect(Collectors.toList()); - CompositeFuture.all(responses).onFailure(ex -> { + if (true) throw new NoStacktraceException(/*TODO*/"not impl yet aoqör5hgjuaöoiwetrjaöelk"); + if (!handleQueues.succeeded()) { + log.warn("Concealed error", new Exception(handleQueues.cause())); + event.reply(new JsonObject().put(STATUS, ERROR)); + return; + } + var ctx = new Object() { + Redis redis; + Iterator iter; + /* TODO this filterByPattern is EVIL! Kill it! */ + List queues = HandlerUtil.filterByPattern(handleQueues.result(), filterPattern); + int iNumberResult; + int[] queueLengths; /*TODO consider using primitive type*/ + }; + if (ctx.queues.isEmpty()) { + log.debug("Queue count evaluation with empty queues"); + event.reply(new JsonObject().put(STATUS, OK).put(QUEUES, new JsonArray())); + return; + } + redisProvider.connection().compose((Redis redis_) -> { + assert redis_ != null : "redis_ != null"; + assert ctx.redis == null : "ctx.redis == null"; + assert ctx.iter == null : "ctx.iter == null"; + assert ctx.queueLengths == null : "ctx.queueLengths == null"; + ctx.redis = redis_; + ctx.queueLengths = new int[ctx.queues.size()]; + ctx.iter = ctx.queues.iterator(); + var p = Promise.promise(); + upperBoundParallel.request(redisRequestQuota, null, new UpperBoundParallel.Mentor() { + @Override public boolean runOneMore(BiConsumer onDone, Void unused) { + if (ctx.iter.hasNext()) { + String queue = ctx.iter.next(); + int iNum = ctx.iNumberResult++; + ctx.redis.send(Request.cmd(Command.LLEN, queuesPrefix + queue)).onSuccess((Response rsp) -> { + ctx.queueLengths[iNum] = rsp.toInteger(); + onDone.accept(null, null); + }).onFailure(ex -> { + onDone.accept(ex, null); + }); + } + return ctx.iter.hasNext(); + } + @Override public boolean onError(Throwable ex, Void ctx_) { log.error("Unexpected queue length result", new Exception(ex)); event.reply(new JsonObject().put(STATUS, ERROR)); - }).onSuccess(compositeFuture -> { - List queueLengthList = compositeFuture.list(); - if (queueLengthList == null) { - log.error("Unexpected queue length result null", - new Exception(compositeFuture.cause())); - event.reply(new JsonObject().put(STATUS, ERROR)); - return; - } - if (queueLengthList.size() != queues.size()) { - log.error("Unexpected queue length result with unequal size {} : {}", - queues.size(), queueLengthList.size()); - event.reply(new JsonObject().put(STATUS, ERROR)); - return; - } - JsonArray result = new JsonArray(); - for (int i = 0; i < queues.size(); i++) { - String queueName = queues.get(i); - result.add(new JsonObject() - .put(MONITOR_QUEUE_NAME, queueName) - .put(MONITOR_QUEUE_SIZE, queueLengthList.get(i).toLong())); - } - event.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK) - .put(QUEUES, result)); - }); - }).onFailure(ex -> { - log.warn("Redis: Failed to get queue length.", new Exception(ex)); - event.reply(new JsonObject().put(STATUS, ERROR)); + return false; + } + @Override public void onDone(Void ctx_) { + p.complete(); + } }); - - } else { - log.warn("Concealed error", new Exception(handleQueues.cause())); + return p.future(); + }).compose((Void v) -> { + /*going to waste another threads time to produce those garbage objects*/ + return vertx.executeBlocking((Promise workerPromise) -> { + assert !Thread.currentThread().getName().toUpperCase().contains("EVENTLOOP"); + long beginEpchMs = currentTimeMillis(); + JsonArray result = new JsonArray(); + for (int i = 0; i < ctx.queueLengths.length; ++i) { + String queueName = ctx.queues.get(i); + result.add(new JsonObject() + .put(MONITOR_QUEUE_NAME, queueName) + .put(MONITOR_QUEUE_SIZE, ctx.queueLengths[i])); + } + var obj = new JsonObject().put(STATUS, OK).put(QUEUES, result); + long jsonCreateDurationMs = currentTimeMillis() - beginEpchMs; + log.info("Creating JSON with {} entries took {}ms", ctx.queueLengths.length, jsonCreateDurationMs); + workerPromise.complete(obj); + }, false); + }).onSuccess((JsonObject json) -> { + log.trace("call event.reply(json)"); + event.reply(json); + }).onFailure(ex -> { + log.warn("Redis: Failed to get queue length.", new Exception(ex)); event.reply(new JsonObject().put(STATUS, ERROR)); - } + }); } } diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index 587582f..3d0b768 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -33,6 +33,7 @@ import java.util.Date; import java.util.List; import java.util.Optional; +import java.util.concurrent.Semaphore; import java.util.stream.Collectors; import static org.swisspush.redisques.util.HttpServerRequestUtil.*; @@ -61,12 +62,7 @@ public class RedisquesHttpRequestHandler implements Handler { private static final String EMPTY_QUEUES_PARAM = "emptyQueues"; private static final String DELETED = "deleted"; - /** - *

For why we should NOT use such date formats, see SDCISA-15311. We really - * should utilize ISO dates and include timezone information.

- * - * @deprecated TODO about date formats - */ + /** @deprecated TODO about date formats */ @Deprecated private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss"); @@ -79,11 +75,12 @@ public class RedisquesHttpRequestHandler implements Handler { private final GetQueueStatsMentor queueStatsMentor = new MyQueueStatsMentor(); public static void init(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector, - DequeueStatisticCollector dequeueStatisticCollector) { + DequeueStatisticCollector dequeueStatisticCollector, Semaphore queueStatsRequestLimit) { log.info("Enabling http request handler: {}", modConfig.getHttpRequestHandlerEnabled()); if (modConfig.getHttpRequestHandlerEnabled()) { if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) { - RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig, queueStatisticsCollector, dequeueStatisticCollector); + RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig, + queueStatisticsCollector, dequeueStatisticCollector, queueStatsRequestLimit); // in Vert.x 2x 100-continues was activated per default, in vert.x 3x it is off per default. HttpServerOptions options = new HttpServerOptions().setHandle100ContinueAutomatically(true); vertx.createHttpServer(options).requestHandler(handler).listen(modConfig.getHttpRequestHandlerPort(), result -> { @@ -113,7 +110,7 @@ private Result checkHttpAuthenticationConfiguration(RedisquesCo } private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector, - DequeueStatisticCollector dequeueStatisticCollector) { + DequeueStatisticCollector dequeueStatisticCollector, Semaphore queueStatsRequestLimit) { this.vertx = vertx; this.router = Router.router(vertx); this.eventBus = vertx.eventBus(); @@ -122,7 +119,8 @@ private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfi this.enableQueueNameDecoding = modConfig.getEnableQueueNameDecoding(); this.queueSpeedIntervalSec = modConfig.getQueueSpeedIntervalSec(); this.queueStatisticsCollector = queueStatisticsCollector; - this.queueStatsService = new QueueStatsService(vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector); + this.queueStatsService = new QueueStatsService(vertx, eventBus, redisquesAddress, + queueStatisticsCollector, dequeueStatisticCollector, queueStatsRequestLimit); final String prefix = modConfig.getHttpRequestHandlerPrefix(); @@ -569,6 +567,7 @@ public void onQueueStatistics(List queues, RoutingConte public void onError(Throwable ex, RoutingContext ctx) { String exMsg = ex.getMessage(); if (!ctx.response().ended()) { + log.debug("Failed to serve queue stats", ex); respondWith(StatusCode.INTERNAL_SERVER_ERROR, exMsg, ctx.request()); } else { log.warn("_q938hugz_ {}", ctx.request().uri(), ex); diff --git a/src/main/java/org/swisspush/redisques/scheduling/PeriodicSkipScheduler.java b/src/main/java/org/swisspush/redisques/scheduling/PeriodicSkipScheduler.java index 5b488a2..5963fd6 100644 --- a/src/main/java/org/swisspush/redisques/scheduling/PeriodicSkipScheduler.java +++ b/src/main/java/org/swisspush/redisques/scheduling/PeriodicSkipScheduler.java @@ -25,18 +25,19 @@ public PeriodicSkipScheduler(Vertx vertx) { this.vertx = vertx; } - /** Convenience overload for {@link #setPeriodic(long, long, Consumer)}. */ - public Timer setPeriodic(long periodMs, Consumer task) { - return setPeriodic(periodMs, periodMs, task); + /** Convenience overload for {@link #setPeriodic(long, long, String, Consumer)}. */ + public Timer setPeriodic(long periodMs, String dbgHint, Consumer task) { + return setPeriodic(periodMs, periodMs, dbgHint, task); } /** * Same idea as {@link Vertx#setPeriodic(long, long, Handler)}. BUT prevents * tasks which start to overtake themself. */ - public Timer setPeriodic(long initDelayMy, long periodMs, Consumer task) { + public Timer setPeriodic(long initDelayMy, long periodMs, String dbgHint, Consumer task) { var timer = new Timer(task); timer.id = vertx.setPeriodic(initDelayMy, periodMs, timer::onTrigger_); + timer.dbgHint = dbgHint; return timer; } @@ -44,8 +45,8 @@ private void onTrigger(Timer timer) { long now = currentTimeMillis(); boolean isPreviousStillRunning = timer.begEpochMs > timer.endEpochMs; if (isPreviousStillRunning) { - log.debug("Previous task still running. We have to NOT run in this interval. Previous did not respond for {}ms.", - now - timer.begEpochMs); + log.debug("Have to skip run. Previous did not respond for {}ms. ({})", + now - timer.begEpochMs, timer.dbgHint); return; } timer.begEpochMs = currentTimeMillis(); @@ -53,7 +54,7 @@ private void onTrigger(Timer timer) { Promise p = Promise.promise(); var fut = p.future(); fut.onSuccess((Void v) -> timer.onTaskDone_()); - fut.onFailure(ex -> log.error("This is expected to be UNREACHABLE", ex)); + fut.onFailure(ex -> log.error("This is expected to be UNREACHABLE ({})", timer.dbgHint, ex)); timer.task.accept(p::complete); } @@ -69,6 +70,7 @@ private void cancel(Timer timer) { public class Timer { private final Consumer task; private long id; + private String dbgHint; // When the last run has begun and end. private long begEpochMs, endEpochMs; diff --git a/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java b/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java index 2035cd4..58a08d4 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java +++ b/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java @@ -5,6 +5,7 @@ import org.swisspush.redisques.action.*; import java.util.List; +import java.util.concurrent.Semaphore; public class QueueActionFactory { @@ -20,13 +21,16 @@ public class QueueActionFactory { private final QueueStatisticsCollector queueStatisticsCollector; private final int memoryUsageLimitPercent; private final MemoryUsageProvider memoryUsageProvider; + private final Semaphore getQueuesItemsCountRedisRequestQuota; private final RedisquesConfigurationProvider configurationProvider; public QueueActionFactory(RedisProvider redisProvider, Vertx vertx, Logger log, String queuesKey, String queuesPrefix, String consumersPrefix, String locksKey, QueueStatisticsCollector queueStatisticsCollector, MemoryUsageProvider memoryUsageProvider, - RedisquesConfigurationProvider configurationProvider) { + RedisquesConfigurationProvider configurationProvider, + Semaphore getQueuesItemsCountRedisRequestQuota + ) { this.redisProvider = redisProvider; this.vertx = vertx; this.log = log; @@ -41,6 +45,7 @@ public QueueActionFactory(RedisProvider redisProvider, Vertx vertx, Logger log, this.address = configurationProvider.configuration().getAddress(); this.queueConfigurations = configurationProvider.configuration().getQueueConfigurations(); this.memoryUsageLimitPercent = configurationProvider.configuration().getMemoryUsageLimitPercent(); + this.getQueuesItemsCountRedisRequestQuota = getQueuesItemsCountRedisRequestQuota; } public QueueAction buildQueueAction(RedisquesAPI.QueueOperation queueOperation){ @@ -77,7 +82,7 @@ public QueueAction buildQueueAction(RedisquesAPI.QueueOperation queueOperation){ consumersPrefix, locksKey, queueConfigurations, queueStatisticsCollector, log); case getQueuesItemsCount: return new GetQueuesItemsCountAction(vertx, redisProvider, address, queuesKey, queuesPrefix, - consumersPrefix, locksKey, queueConfigurations, queueStatisticsCollector, log); + consumersPrefix, locksKey, queueConfigurations, getQueuesItemsCountRedisRequestQuota, queueStatisticsCollector, log); case enqueue: return new EnqueueAction(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, queueStatisticsCollector, log, memoryUsageProvider,