diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 4aa209e6..ad71eadb 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -1,6 +1,5 @@ package org.swisspush.redisques; -import io.vertx.core.*; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.CompositeFuture; @@ -13,22 +12,77 @@ import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; -import io.vertx.redis.client.*; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.Request; +import io.vertx.redis.client.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.swisspush.redisques.action.QueueAction; import org.swisspush.redisques.handler.RedisquesHttpRequestHandler; -import org.swisspush.redisques.util.*; - -import java.util.*; +import org.swisspush.redisques.performance.UpperBoundParallel; +import org.swisspush.redisques.util.DefaultMemoryUsageProvider; +import org.swisspush.redisques.util.DefaultRedisProvider; +import org.swisspush.redisques.util.DefaultRedisquesConfigurationProvider; +import org.swisspush.redisques.util.DequeueStatistic; +import org.swisspush.redisques.util.DequeueStatisticCollector; +import org.swisspush.redisques.util.MemoryUsageProvider; +import org.swisspush.redisques.util.QueueActionFactory; +import org.swisspush.redisques.util.QueueConfiguration; +import org.swisspush.redisques.util.QueueStatisticsCollector; +import org.swisspush.redisques.util.RedisProvider; +import org.swisspush.redisques.util.RedisQuesTimer; +import org.swisspush.redisques.util.RedisUtils; +import org.swisspush.redisques.util.RedisquesConfiguration; +import org.swisspush.redisques.util.RedisquesConfigurationProvider; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - -import static org.swisspush.redisques.util.RedisquesAPI.*; -import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.*; +import java.util.function.BiConsumer; import static java.lang.System.currentTimeMillis; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.OPERATION; +import static org.swisspush.redisques.util.RedisquesAPI.PAYLOAD; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.addQueueItem; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.bulkDeleteLocks; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.bulkDeleteQueues; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.bulkPutLocks; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteAllLocks; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteAllQueueItems; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteLock; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteQueueItem; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.enqueue; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getAllLocks; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getConfiguration; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getLock; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueueItem; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueueItems; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueueItemsCount; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueues; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesCount; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesItemsCount; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesSpeed; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesStatistics; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.lockedEnqueue; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.putLock; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.replaceQueueItem; +import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.setConfiguration; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; public class RedisQues extends AbstractVerticle { @@ -37,6 +91,7 @@ public static class RedisQuesBuilder { private MemoryUsageProvider memoryUsageProvider; private RedisquesConfigurationProvider configurationProvider; private RedisProvider redisProvider; + private Semaphore redisMonitoringReqLimit; private RedisQuesBuilder() { // Private, as clients should use "RedisQues.builder()" and not this class here directly. @@ -57,8 +112,17 @@ public RedisQuesBuilder withRedisProvider(RedisProvider redisProvider) { return this; } + public RedisQuesBuilder withRedisMonitoringReqLimit(Semaphore limit) { + this.redisMonitoringReqLimit = limit; + return this; + } + public RedisQues build() { - return new RedisQues(memoryUsageProvider, configurationProvider, redisProvider); + if (redisMonitoringReqLimit == null) { + redisMonitoringReqLimit = new Semaphore(Integer.MAX_VALUE); + log.warn("No redis request limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE); + } + return new RedisQues(memoryUsageProvider, configurationProvider, redisProvider, redisMonitoringReqLimit); } } @@ -106,15 +170,23 @@ private enum QueueState { private Map dequeueStatistic = new ConcurrentHashMap<>(); private boolean dequeueStatisticEnabled = false; + private final Semaphore redisMonitoringReqLimit; public RedisQues() { + log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE); + this.redisMonitoringReqLimit = new Semaphore(Integer.MAX_VALUE); } - public RedisQues(MemoryUsageProvider memoryUsageProvider, RedisquesConfigurationProvider configurationProvider, - RedisProvider redisProvider) { + public RedisQues( + MemoryUsageProvider memoryUsageProvider, + RedisquesConfigurationProvider configurationProvider, + RedisProvider redisProvider, + Semaphore redisMonitoringReqLimit + ) { this.memoryUsageProvider = memoryUsageProvider; this.configurationProvider = configurationProvider; this.redisProvider = redisProvider; + this.redisMonitoringReqLimit = redisMonitoringReqLimit; } public static RedisQuesBuilder builder() { @@ -210,8 +282,10 @@ public void start(Promise promise) { private void initialize() { RedisquesConfiguration configuration = configurationProvider.configuration(); - this.queueStatisticsCollector = new QueueStatisticsCollector(redisProvider, - queuesPrefix, vertx, configuration.getQueueSpeedIntervalSec()); + UpperBoundParallel upperBoundParallel = new UpperBoundParallel(vertx); + this.queueStatisticsCollector = new QueueStatisticsCollector( + redisProvider, queuesPrefix, vertx, redisMonitoringReqLimit, + configuration.getQueueSpeedIntervalSec()); RedisquesHttpRequestHandler.init(vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector); @@ -369,37 +443,69 @@ void resume() { private void registerActiveQueueRegistrationRefresh() { // Periodic refresh of my registrations on active queues. - vertx.setPeriodic(configurationProvider.configuration().getRefreshPeriod() * 1000L, event -> { - // Check if I am still the registered consumer - myQueues.entrySet().stream().filter(entry -> entry.getValue() == QueueState.CONSUMING). - forEach(entry -> { - final String queue = entry.getKey(); - // Check if I am still the registered consumer - String consumerKey = consumersPrefix + queue; - if (log.isTraceEnabled()) { - log.trace("RedisQues refresh queues get: {}", consumerKey); + var upperBoundParallelity = new UpperBoundParallel/*TODO maybe do elsewhere*/(vertx); + vertx.setPeriodic(configurationProvider.configuration().getRefreshPeriod() * 1000L, new Handler() { + Iterator> iter; + @Override public void handle(Long timerId) { + // Need a copy to prevent concurrent modification issuses. + iter = new HashMap<>(myQueues).entrySet().iterator(); + // Trigger only a limitted amount of requests in parallel. + upperBoundParallelity.request(redisMonitoringReqLimit, iter, new UpperBoundParallel.Mentor<>() { + @Override public boolean runOneMore(BiConsumer onDone, Iterator> iter) { + handleNextQueueOfInterest(onDone); + return iter.hasNext(); + } + @Override public boolean onError(Throwable ex, Iterator> iter) { + log.warn("TODO error handling", ex); + return false; + } + @Override public void onDone(Iterator> iter) {/*no-op*/} + }); + } + void handleNextQueueOfInterest(BiConsumer onDone) { + while (iter.hasNext()) { + var entry = iter.next(); + if (entry.getValue() != QueueState.CONSUMING) continue; + checkIfImStillTheRegisteredConsumer(entry.getKey(), onDone); + return; + } + // no entry found. we're done. + onDone.accept(null, null); + } + void checkIfImStillTheRegisteredConsumer(String queue, BiConsumer onDone) { + // Check if I am still the registered consumer + String consumerKey = consumersPrefix + queue; + log.trace("RedisQues refresh queues get: {}", consumerKey); + redisProvider.redis().onComplete( ev1 -> { + if (ev1.failed()) { + onDone.accept(ev1.cause(), null); + return; + } + var redisAPI = ev1.result(); + redisAPI.get(consumerKey, getConsumerEvent -> { + if (getConsumerEvent.failed()) { + onDone.accept(new RuntimeException("Failed to get queue consumer for queue '{}'", getConsumerEvent.cause()), null); + return; + } + final String consumer = Objects.toString(getConsumerEvent.result(), ""); + if (uid.equals(consumer)) { + log.debug("RedisQues Periodic consumer refresh for active queue {}", queue); + refreshRegistration(queue, ev -> { + if (ev.failed()) { + onDone.accept(new RuntimeException("TODO error handling", ev.cause()), null); + return; + } + updateTimestamp(queue, ev3 -> onDone.accept(ev3.failed() ? ev3.cause() : null, null)); + }); + } else { + log.debug("RedisQues Removing queue {} from the list", queue); + myQueues.remove(queue); + queueStatisticsCollector.resetQueueFailureStatistics(queue, + (ex, v) -> onDone.accept(ex, null)); } - redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(consumerKey, getConsumerEvent -> { - if (getConsumerEvent.failed()) { - log.warn("Failed to get queue consumer for queue '{}'. But we'll continue anyway :)", queue, getConsumerEvent.cause()); - // We should return here. See: "https://softwareengineering.stackexchange.com/a/190535" - } - final String consumer = Objects.toString(getConsumerEvent.result(), ""); - if (uid.equals(consumer)) { - log.debug("RedisQues Periodic consumer refresh for active queue {}", queue); - refreshRegistration(queue, ev -> { - if (ev.failed()) - log.warn("TODO error handling", new Exception(ev.cause())); - updateTimestamp(queue, null); - }); - } else { - log.debug("RedisQues Removing queue {} from the list", queue); - myQueues.remove(queue); - queueStatisticsCollector.resetQueueFailureStatistics(queue); - } - })) - .onFailure(throwable -> log.error("Redis: Failed to registerActiveQueueRegistrationRefresh", throwable)); }); + }); + } }); } @@ -438,7 +544,9 @@ private Handler> operationsHandler() { int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean sendSuccess) { if (sendSuccess) { - queueStatisticsCollector.queueMessageSuccess(queueName); + queueStatisticsCollector.queueMessageSuccess(queueName, (ex, v) -> { + if (ex != null) log.warn("TODO_3q98hq3 error handling", ex); + }); return 0; } else { // update the failure count @@ -979,12 +1087,16 @@ private Future checkQueues() { removeOldQueues(limit).onComplete(removeOldQueuesEvent -> { if( removeOldQueuesEvent.failed() ) log.warn("TODO error handling", new Exception(removeOldQueuesEvent.cause())); - queueStatisticsCollector.resetQueueFailureStatistics(queueName); - promise.complete(); + queueStatisticsCollector.resetQueueFailureStatistics(queueName, (ex, v) -> { + if (ex != null) promise.fail(ex); + else promise.complete(); + }); }); } else { - queueStatisticsCollector.resetQueueFailureStatistics(queueName); - promise.complete(); + queueStatisticsCollector.resetQueueFailureStatistics(queueName, (ex, v) -> { + if (ex != null) promise.fail(ex); + else promise.complete(); + }); } } }); diff --git a/src/main/java/org/swisspush/redisques/action/BulkDeleteQueuesAction.java b/src/main/java/org/swisspush/redisques/action/BulkDeleteQueuesAction.java index 83c50a89..6c13733f 100644 --- a/src/main/java/org/swisspush/redisques/action/BulkDeleteQueuesAction.java +++ b/src/main/java/org/swisspush/redisques/action/BulkDeleteQueuesAction.java @@ -43,7 +43,9 @@ public void execute(Message event) { } var p = redisProvider.redis(); p.onSuccess(redisAPI -> redisAPI.del(buildQueueKeys(queues), delManyReply -> { - queueStatisticsCollector.resetQueueStatistics(queues); + queueStatisticsCollector.resetQueueStatistics(queues, (Throwable ex, Void v) -> { + if (ex != null) log.warn("TODO_q93258hu38 error handling", ex); + }); if (delManyReply.succeeded()) { event.reply(createOkReply().put(VALUE, delManyReply.result().toLong())); } else { diff --git a/src/main/java/org/swisspush/redisques/action/DeleteAllQueueItemsAction.java b/src/main/java/org/swisspush/redisques/action/DeleteAllQueueItemsAction.java index 035cda2e..169dd762 100644 --- a/src/main/java/org/swisspush/redisques/action/DeleteAllQueueItemsAction.java +++ b/src/main/java/org/swisspush/redisques/action/DeleteAllQueueItemsAction.java @@ -42,7 +42,9 @@ public void execute(Message event) { // 1st: We don't, to keep backward compatibility // 2nd: We don't, to may unlock below. } - queueStatisticsCollector.resetQueueFailureStatistics(queue); + queueStatisticsCollector.resetQueueFailureStatistics(queue, (Throwable ex, Void v) -> { + if (ex != null) log.warn("TODO_2958iouhj error handling", ex); + }); if (unlock) { redisAPI.hdel(Arrays.asList(locksKey, queue), unlockReply -> { if (unlockReply.failed()) { diff --git a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java new file mode 100644 index 00000000..b5baf628 --- /dev/null +++ b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java @@ -0,0 +1,196 @@ +package org.swisspush.redisques.performance; + +import io.vertx.core.Vertx; +import org.slf4j.Logger; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; + +import static java.lang.Thread.currentThread; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Level 1: KISS says do everything sequentially. Stop here! You're + * done! "Optimization is the root of all evil" you know? Happy you. So + * just do NOT use this class if this is the case for your toy use case. + * + * Level 2: There are cases where we do not have the time to do + * everything sequentially. The next step is to go parallel and/or + * concurrent (I don't care which term you think is the correct one. + * Just take the one that fits your opinion better). But by going + * parallel, we just assume we have an infinite amount of resources + * available (eg sockets, memory, CPU time, ...), so we can happily fill + * queues to an infinite number of entries without running in trouble + * ever. In case your resources are infinite, happy you you're done. + * YOU HAVE NO NEED TO USE THIS CLASS! + * + * Level 3: Welcome in my world. Where reality starts to hit you sooner or + * later and you'll realize no matter how much of those "infinite cloud + * resources" and fancy black magic frameworks you throw at your problem, + * it won't solve the issue (performance-is-not-an-issue? Please, just go + * back to "Level 1" and be happy there). + * + * This class is for "Level 3" users only. We still can utilize parallelity + * without assuming an infinite amount of resources. A KISS approach to do + * this, is to apply an upper bound to what we do in parallel. And thats + * what this class tries to assist with. It wants to be that tool that + * allows parallelity but maintains upper bounds. For stone-age + * programmers: its nothing else than an semaphore really. + */ +public class UpperBoundParallel { + + private static final Logger log = getLogger(UpperBoundParallel.class); + private static final long RETRY_DELAY_IF_LIMIT_REACHED_MS = 8; + private final Vertx vertx; + + public UpperBoundParallel(Vertx vertx) { + assert vertx != null; + this.vertx = vertx; + } + + public void request(Semaphore limit, Ctx ctx, Mentor mentor) { + var req = new Request<>(ctx, mentor, limit); + resume(req); + } + + private void resume(Request req){ + if (!req.lock.tryLock()) { + log.trace("Some other thread already working here"); + return; + } else try { + Thread ourself = currentThread(); + if (req.worker == null) { + log.trace("worker := ourself"); + req.worker = ourself; + } else if( req.worker != ourself) { + log.trace("Another thread is already working here"); + return; + } + // Enqueue as much we can. + while (true) { + if (req.isFatalError) { + log.debug("return from 'resume()' because isFatalError"); + req.limit.release(); + return; + } + if (!req.hasMore) { + if (req.numInProgress == 0 && !req.isDoneCalled) { + req.isDoneCalled = true; + // give up lock to prevent starvation in case mentor is wasting time. + log.debug("call 'mentor.onDone()'"); + req.lock.unlock(); + try { + req.mentor.onDone(req.ctx); + } finally { + req.lock.lock(); // MUST get back our lock RIGHT NOW. + } + }else{ + log.trace("return for now (hasMore = {}, numInProgress = {})", req.hasMore, req.numInProgress); + } + return; + } + if (!req.limit.tryAcquire()) { + log.debug("redis request limit reached. Need to pause now."); + break; // Go to end of loop to schedule a run later. + } + req.numInProgress += 1; + boolean hasMore = true; + try { + // We MUST give up our lock while calling mentor. We cannot know how long + // mentor is going to block (which would then cascade to all threads + // waiting for our lock). + req.lock.unlock(); + log.trace("mentor.runOneMore() numInProgress={}", req.numInProgress); + hasMore = req.mentor.runOneMore(req::onOneDone_, req.ctx); + } catch (RuntimeException ex) { + onOneDone(req, ex); + }finally { + // We MUST get back our lock right NOW. No way to just 'try'. + log.trace("mentor.runOneMore() -> hasMore={}", hasMore); + req.lock.lock(); + req.hasMore = hasMore; + } + } + assert req.numInProgress >= 0 : req.numInProgress; + if (req.numInProgress == 0) { + // Looks as we could not even fire a single event. Need to try later. + vertx.setTimer(RETRY_DELAY_IF_LIMIT_REACHED_MS, nonsense -> resume(req)); + } + } finally { + req.worker = null; + req.lock.unlock(); + } + } + + private void onOneDone(Request req, Throwable ex) { + req.lock.lock(); + try{ + req.limit.release(); + req.numInProgress -= 1; + log.trace("onOneDone({}) {} remaining", ex != null ? "ex" : "null", req.numInProgress); + boolean isFatalError = true; + if (ex != null) try { + // Unlock, to prevent thread stalls as we don't know for how long mentor + // is going to block. + req.lock.unlock(); + log.debug("mentor.onError({}: {})", ex.getClass().getName(), ex.getMessage()); + isFatalError = req.mentor.onError(ex, req.ctx); + } finally { + req.lock.lock(); // Need our lock back. + req.isFatalError = isFatalError; + } + }finally { + req.lock.unlock(); + vertx.runOnContext(nonsense -> resume(req)); + } + } + + private final class Request { + private final Ctx ctx; + private final Mentor mentor; + private final Lock lock = new ReentrantLock(); + private final Semaphore limit; + private Thread worker = null; + private int numInProgress = 0; + private boolean hasMore = true; + private boolean isFatalError = false; + private boolean isDoneCalled = false; + + private Request(Ctx ctx, Mentor mentor, Semaphore limit){ + this.ctx = ctx; + this.mentor = mentor; + this.limit = limit; + } + + public void onOneDone_(Throwable ex, Void result) { + onOneDone(this, ex); + } + } + + + public static interface Mentor { + + /** + * @return true if more elements have to been processed. False is + * iteration source has reached its end. + */ + boolean runOneMore(BiConsumer onDone, Ctx ctx); + + /** + * @return true if iteration should continue with other elements. False + * if no more elements should be processed. + */ + boolean onError(Throwable ex, Ctx ctx); + + /** + * Called once as soon the iteration has ended SUCCESSFULLY. It is NOT + * called if {@link #onError(Throwable, Object)} did request to STOP the + * iteration for example. + */ + void onDone(Ctx ctx); + } + + +} diff --git a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java index c6f3c62b..b21ec57c 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java +++ b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java @@ -7,10 +7,15 @@ import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; -import io.vertx.redis.client.*; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisAPI; +import io.vertx.redis.client.Request; +import io.vertx.redis.client.Response; import io.vertx.redis.client.impl.types.NumberType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.swisspush.redisques.performance.UpperBoundParallel; import java.util.HashMap; import java.util.Iterator; @@ -19,11 +24,21 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import static java.lang.System.currentTimeMillis; -import static org.swisspush.redisques.util.RedisquesAPI.*; +import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME; +import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_SIZE; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.QUEUENAME; +import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_BACKPRESSURE; +import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_FAILURES; +import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_SLOWDOWN; +import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_SPEED; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; /** * Class StatisticsCollector helps collecting statistics information about queue handling and @@ -57,12 +72,21 @@ public class QueueStatisticsCollector { private final RedisProvider redisProvider; private final String queuePrefix; private final Vertx vertx; - - public QueueStatisticsCollector(RedisProvider redisProvider, - String queuePrefix, Vertx vertx, int speedIntervalSec) { + private final Semaphore redisRequestLimit; + private final UpperBoundParallel upperBoundParallel; + + public QueueStatisticsCollector( + RedisProvider redisProvider, + String queuePrefix, + Vertx vertx, + Semaphore redisRequestLimit, + int speedIntervalSec + ) { this.redisProvider = redisProvider; this.queuePrefix = queuePrefix; this.vertx = vertx; + this.redisRequestLimit = redisRequestLimit; + this.upperBoundParallel = new UpperBoundParallel(vertx); speedStatisticsScheduler(speedIntervalSec); } @@ -122,7 +146,7 @@ private void speedStatisticsScheduler(int speedIntervalSec) { * * @param queueName The queue name for which the statistic values must be reset. */ - public void resetQueueFailureStatistics(String queueName) { + public void resetQueueFailureStatistics(String queueName, BiConsumer onDone) { AtomicLong failureCount = queueFailureCount.remove(queueName); queueSlowDownTime.remove(queueName); queueBackpressureTime.remove(queueName); @@ -130,7 +154,9 @@ public void resetQueueFailureStatistics(String queueName) { // there was a real failure before, therefore we will execute this // cleanup as well on Redis itself as we would like to do redis operations // only if necessary of course. - updateStatisticsInRedis(queueName); + updateStatisticsInRedis(queueName, onDone); + }else{ + vertx.runOnContext(nonsense -> onDone.accept(null, null)); } } @@ -140,14 +166,27 @@ public void resetQueueFailureStatistics(String queueName) { * * @param queues The list of queue names for which the statistic values must be reset. */ - public void resetQueueStatistics(JsonArray queues) { + public void resetQueueStatistics(JsonArray queues, BiConsumer onDone) { if (queues == null || queues.isEmpty()) { + onDone.accept(null, null); return; } - final int size = queues.size(); - for (int i = 0; i < size; i++) { - resetQueueFailureStatistics(queues.getString(i)); - } + upperBoundParallel.request(redisRequestLimit, null, new UpperBoundParallel.Mentor() { + int i = 0; + final int size = queues.size(); + @Override public boolean runOneMore(BiConsumer onDone, Void ctx) { + String queueName = queues.getString(i++); + resetQueueFailureStatistics(queueName, onDone); + return i >= size; + } + @Override public boolean onError(Throwable ex, Void ctx) { + onDone.accept(ex, null); + return false; + } + @Override public void onDone(Void ctx) { + onDone.accept(null, null); + } + }); } /** @@ -156,14 +195,14 @@ public void resetQueueStatistics(JsonArray queues) { * * @param queueName The name of the queue for which success must be processed. */ - public void queueMessageSuccess(String queueName) { + public void queueMessageSuccess(String queueName, BiConsumer onDone) { // count the number of messages per queue for interval speed evaluation. AtomicLong messageCtr = queueMessageSpeedCtr.putIfAbsent(queueName, new AtomicLong(1)); if (messageCtr != null) { messageCtr.incrementAndGet(); } // whenever there is a message successfully sent, our failure statistics could be reset as well - resetQueueFailureStatistics(queueName); + resetQueueFailureStatistics(queueName, onDone); } /** @@ -200,7 +239,9 @@ public long queueMessageFailed(String queueName) { if (failureCount != null) { newFailureCount = failureCount.addAndGet(1); } - updateStatisticsInRedis(queueName); + updateStatisticsInRedis(queueName, (ex, nothing) -> { + if (ex != null) log.warn("TODO error handling", ex); + }); return newFailureCount; } @@ -233,11 +274,15 @@ public long getQueueFailureCount(String queueName) { public void setQueueBackPressureTime(String queueName, long time) { if (time > 0) { queueBackpressureTime.put(queueName, time); - updateStatisticsInRedis(queueName); + updateStatisticsInRedis(queueName, (ex, v) -> { + if (ex != null) log.warn("TODO error handling (findme_q39h8ugjoh)", ex); + }); } else { Long lastTime = queueBackpressureTime.remove(queueName); if (lastTime != null) { - updateStatisticsInRedis(queueName); + updateStatisticsInRedis(queueName, (ex, v) -> { + if (ex != null) log.warn("TODO error handling (findme_39587zg)", ex); + }); } } } @@ -264,13 +309,16 @@ private long getQueueBackPressureTime(String queueName) { * @param time The slowdown time in ms */ public void setQueueSlowDownTime(String queueName, long time) { + BiConsumer onDone = (ex, v) -> { + if (ex != null) log.warn("TODO_q9587hg3otuhj error handling", ex); + }; if (time > 0) { queueSlowDownTime.put(queueName, time); - updateStatisticsInRedis(queueName); + updateStatisticsInRedis(queueName, onDone); } else { Long lastTime = queueSlowDownTime.remove(queueName); if (lastTime != null) { - updateStatisticsInRedis(queueName); + updateStatisticsInRedis(queueName, onDone); } } } @@ -295,31 +343,42 @@ private long getQueueSlowDownTime(String queueName) { * If there are no valid useful data available eg. all 0, the corresponding * statistics entry is removed from redis */ - private void updateStatisticsInRedis(String queueName) { - long failures = getQueueFailureCount(queueName); - long slowDownTime = getQueueSlowDownTime(queueName); - long backpressureTime = getQueueBackPressureTime(queueName); - if (failures > 0 || slowDownTime > 0 || backpressureTime > 0) { - JsonObject obj = new JsonObject(); - obj.put(QUEUENAME, queueName); - obj.put(QUEUE_FAILURES, failures); - obj.put(QUEUE_SLOWDOWNTIME, slowDownTime); - obj.put(QUEUE_BACKPRESSURE, backpressureTime); - redisProvider.redis() - .onSuccess(redisAPI -> { - redisAPI.hset(List.of(STATSKEY, queueName, obj.toString()), ev -> { - if( ev.failed() ) log.warn("TODO error handling", new Exception(ev.cause())); - }); - }) - .onFailure(ex -> log.error("Redis: Error in updateStatisticsInRedis", ex)); - } else { - redisProvider.redis() - .onSuccess(redisAPI -> { - redisAPI.hdel(List.of(STATSKEY, queueName), ev -> { - if (ev.failed()) log.warn("TODO error handling", new Exception(ev.cause())); - }); - }) - .onFailure(ex -> log.error("Redis: Error in updateStatisticsInRedis", ex)); + private void updateStatisticsInRedis(String queueName, BiConsumer onDone) { + try { + long failures = getQueueFailureCount(queueName); + long slowDownTime = getQueueSlowDownTime(queueName); + long backpressureTime = getQueueBackPressureTime(queueName); + if (failures > 0 || slowDownTime > 0 || backpressureTime > 0) { + JsonObject obj = new JsonObject(); + obj.put(QUEUENAME, queueName); + obj.put(QUEUE_FAILURES, failures); + obj.put(QUEUE_SLOWDOWNTIME, slowDownTime); + obj.put(QUEUE_BACKPRESSURE, backpressureTime); + redisProvider.redis() + .onSuccess(redisAPI -> { + redisAPI.hset(List.of(STATSKEY, queueName, obj.toString()), ev -> { + if (ev.failed()) { + onDone.accept(ev.cause(), null); + return; + } + onDone.accept(null, null); + }); + }) + .onFailure(ex -> onDone.accept(ex, null)); + } else { + redisProvider.redis() + .onSuccess(redisAPI -> { + redisAPI.hdel(List.of(STATSKEY, queueName), ev -> { + if (ev.failed()) { + onDone.accept(ev.cause(), null); + } + onDone.accept(null, null); + }); + }) + .onFailure(ex -> onDone.accept(ex, null)); + } + } catch (RuntimeException ex) { + onDone.accept(ex, null); } } @@ -595,6 +654,9 @@ private static class RequestCtx { private List queueLengthList; private HashMap statistics; // Stats we're going to populate private Response redisFailStats; // failure stats we got from redis. + + private int getQueueSpeed_i; + private Promise getQueueSpeed_promise; } } diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java index 2e5522f1..b3cfddcc 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java @@ -9,6 +9,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Semaphore; import java.util.stream.Collectors; /**