Skip to content

Commit

Permalink
[SDCISA-15833] Introduce parallel limitter aka UpperBoundParallel)
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed May 3, 2024
1 parent 68a2a77 commit 9120074
Show file tree
Hide file tree
Showing 6 changed files with 468 additions and 93 deletions.
206 changes: 159 additions & 47 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.swisspush.redisques;

import io.vertx.core.*;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
Expand All @@ -13,22 +12,77 @@
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.*;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.action.QueueAction;
import org.swisspush.redisques.handler.RedisquesHttpRequestHandler;
import org.swisspush.redisques.util.*;

import java.util.*;
import org.swisspush.redisques.performance.UpperBoundParallel;
import org.swisspush.redisques.util.DefaultMemoryUsageProvider;
import org.swisspush.redisques.util.DefaultRedisProvider;
import org.swisspush.redisques.util.DefaultRedisquesConfigurationProvider;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.DequeueStatisticCollector;
import org.swisspush.redisques.util.MemoryUsageProvider;
import org.swisspush.redisques.util.QueueActionFactory;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
import org.swisspush.redisques.util.RedisQuesTimer;
import org.swisspush.redisques.util.RedisUtils;
import org.swisspush.redisques.util.RedisquesConfiguration;
import org.swisspush.redisques.util.RedisquesConfigurationProvider;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.swisspush.redisques.util.RedisquesAPI.*;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.*;
import java.util.function.BiConsumer;

import static java.lang.System.currentTimeMillis;
import static org.swisspush.redisques.util.RedisquesAPI.ERROR;
import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE;
import static org.swisspush.redisques.util.RedisquesAPI.OK;
import static org.swisspush.redisques.util.RedisquesAPI.OPERATION;
import static org.swisspush.redisques.util.RedisquesAPI.PAYLOAD;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.addQueueItem;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.bulkDeleteLocks;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.bulkDeleteQueues;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.bulkPutLocks;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteAllLocks;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteAllQueueItems;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteLock;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteQueueItem;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.enqueue;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getAllLocks;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getConfiguration;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getLock;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueueItem;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueueItems;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueueItemsCount;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueues;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesCount;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesItemsCount;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesSpeed;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesStatistics;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.lockedEnqueue;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.putLock;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.replaceQueueItem;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.setConfiguration;
import static org.swisspush.redisques.util.RedisquesAPI.STATUS;

public class RedisQues extends AbstractVerticle {

Expand All @@ -37,6 +91,7 @@ public static class RedisQuesBuilder {
private MemoryUsageProvider memoryUsageProvider;
private RedisquesConfigurationProvider configurationProvider;
private RedisProvider redisProvider;
private Semaphore redisMonitoringReqLimit;

private RedisQuesBuilder() {
// Private, as clients should use "RedisQues.builder()" and not this class here directly.
Expand All @@ -57,8 +112,17 @@ public RedisQuesBuilder withRedisProvider(RedisProvider redisProvider) {
return this;
}

public RedisQuesBuilder withRedisMonitoringReqLimit(Semaphore limit) {
this.redisMonitoringReqLimit = limit;
return this;
}

public RedisQues build() {
return new RedisQues(memoryUsageProvider, configurationProvider, redisProvider);
if (redisMonitoringReqLimit == null) {
redisMonitoringReqLimit = new Semaphore(Integer.MAX_VALUE);
log.warn("No redis request limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
}
return new RedisQues(memoryUsageProvider, configurationProvider, redisProvider, redisMonitoringReqLimit);
}
}

Expand Down Expand Up @@ -106,15 +170,23 @@ private enum QueueState {

private Map<String, DequeueStatistic> dequeueStatistic = new ConcurrentHashMap<>();
private boolean dequeueStatisticEnabled = false;
private final Semaphore redisMonitoringReqLimit;

public RedisQues() {
log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE);
this.redisMonitoringReqLimit = new Semaphore(Integer.MAX_VALUE);
}

public RedisQues(MemoryUsageProvider memoryUsageProvider, RedisquesConfigurationProvider configurationProvider,
RedisProvider redisProvider) {
public RedisQues(
MemoryUsageProvider memoryUsageProvider,
RedisquesConfigurationProvider configurationProvider,
RedisProvider redisProvider,
Semaphore redisMonitoringReqLimit
) {
this.memoryUsageProvider = memoryUsageProvider;
this.configurationProvider = configurationProvider;
this.redisProvider = redisProvider;
this.redisMonitoringReqLimit = redisMonitoringReqLimit;
}

public static RedisQuesBuilder builder() {
Expand Down Expand Up @@ -210,8 +282,10 @@ public void start(Promise<Void> promise) {

private void initialize() {
RedisquesConfiguration configuration = configurationProvider.configuration();
this.queueStatisticsCollector = new QueueStatisticsCollector(redisProvider,
queuesPrefix, vertx, configuration.getQueueSpeedIntervalSec());
UpperBoundParallel upperBoundParallel = new UpperBoundParallel(vertx);
this.queueStatisticsCollector = new QueueStatisticsCollector(
redisProvider, queuesPrefix, vertx, redisMonitoringReqLimit,
configuration.getQueueSpeedIntervalSec());

RedisquesHttpRequestHandler.init(vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector);

Expand Down Expand Up @@ -369,37 +443,69 @@ void resume() {

private void registerActiveQueueRegistrationRefresh() {
// Periodic refresh of my registrations on active queues.
vertx.setPeriodic(configurationProvider.configuration().getRefreshPeriod() * 1000L, event -> {
// Check if I am still the registered consumer
myQueues.entrySet().stream().filter(entry -> entry.getValue() == QueueState.CONSUMING).
forEach(entry -> {
final String queue = entry.getKey();
// Check if I am still the registered consumer
String consumerKey = consumersPrefix + queue;
if (log.isTraceEnabled()) {
log.trace("RedisQues refresh queues get: {}", consumerKey);
var upperBoundParallelity = new UpperBoundParallel/*TODO maybe do elsewhere*/(vertx);
vertx.setPeriodic(configurationProvider.configuration().getRefreshPeriod() * 1000L, new Handler<Long>() {
Iterator<Map.Entry<String, QueueState>> iter;
@Override public void handle(Long timerId) {
// Need a copy to prevent concurrent modification issuses.
iter = new HashMap<>(myQueues).entrySet().iterator();
// Trigger only a limitted amount of requests in parallel.
upperBoundParallelity.request(redisMonitoringReqLimit, iter, new UpperBoundParallel.Mentor<>() {
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Iterator<Map.Entry<String, QueueState>> iter) {
handleNextQueueOfInterest(onDone);
return iter.hasNext();
}
@Override public boolean onError(Throwable ex, Iterator<Map.Entry<String, QueueState>> iter) {
log.warn("TODO error handling", ex);
return false;
}
@Override public void onDone(Iterator<Map.Entry<String, QueueState>> iter) {/*no-op*/}
});
}
void handleNextQueueOfInterest(BiConsumer<Throwable, Void> onDone) {
while (iter.hasNext()) {
var entry = iter.next();
if (entry.getValue() != QueueState.CONSUMING) continue;
checkIfImStillTheRegisteredConsumer(entry.getKey(), onDone);
return;
}
// no entry found. we're done.
onDone.accept(null, null);
}
void checkIfImStillTheRegisteredConsumer(String queue, BiConsumer<Throwable, Void> onDone) {
// Check if I am still the registered consumer
String consumerKey = consumersPrefix + queue;
log.trace("RedisQues refresh queues get: {}", consumerKey);
redisProvider.redis().onComplete( ev1 -> {
if (ev1.failed()) {
onDone.accept(ev1.cause(), null);
return;
}
var redisAPI = ev1.result();
redisAPI.get(consumerKey, getConsumerEvent -> {
if (getConsumerEvent.failed()) {
onDone.accept(new RuntimeException("Failed to get queue consumer for queue '{}'", getConsumerEvent.cause()), null);
return;
}
final String consumer = Objects.toString(getConsumerEvent.result(), "");
if (uid.equals(consumer)) {
log.debug("RedisQues Periodic consumer refresh for active queue {}", queue);
refreshRegistration(queue, ev -> {
if (ev.failed()) {
onDone.accept(new RuntimeException("TODO error handling", ev.cause()), null);
return;
}
updateTimestamp(queue, ev3 -> onDone.accept(ev3.failed() ? ev3.cause() : null, null));
});
} else {
log.debug("RedisQues Removing queue {} from the list", queue);
myQueues.remove(queue);
queueStatisticsCollector.resetQueueFailureStatistics(queue,
(ex, v) -> onDone.accept(ex, null));
}
redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(consumerKey, getConsumerEvent -> {
if (getConsumerEvent.failed()) {
log.warn("Failed to get queue consumer for queue '{}'. But we'll continue anyway :)", queue, getConsumerEvent.cause());
// We should return here. See: "https://softwareengineering.stackexchange.com/a/190535"
}
final String consumer = Objects.toString(getConsumerEvent.result(), "");
if (uid.equals(consumer)) {
log.debug("RedisQues Periodic consumer refresh for active queue {}", queue);
refreshRegistration(queue, ev -> {
if (ev.failed())
log.warn("TODO error handling", new Exception(ev.cause()));
updateTimestamp(queue, null);
});
} else {
log.debug("RedisQues Removing queue {} from the list", queue);
myQueues.remove(queue);
queueStatisticsCollector.resetQueueFailureStatistics(queue);
}
}))
.onFailure(throwable -> log.error("Redis: Failed to registerActiveQueueRegistrationRefresh", throwable));
});
});
}
});
}

Expand Down Expand Up @@ -438,7 +544,9 @@ private Handler<Message<JsonObject>> operationsHandler() {

int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean sendSuccess) {
if (sendSuccess) {
queueStatisticsCollector.queueMessageSuccess(queueName);
queueStatisticsCollector.queueMessageSuccess(queueName, (ex, v) -> {
if (ex != null) log.warn("TODO_3q98hq3 error handling", ex);
});
return 0;
} else {
// update the failure count
Expand Down Expand Up @@ -979,12 +1087,16 @@ private Future<Void> checkQueues() {
removeOldQueues(limit).onComplete(removeOldQueuesEvent -> {
if( removeOldQueuesEvent.failed() )
log.warn("TODO error handling", new Exception(removeOldQueuesEvent.cause()));
queueStatisticsCollector.resetQueueFailureStatistics(queueName);
promise.complete();
queueStatisticsCollector.resetQueueFailureStatistics(queueName, (ex, v) -> {
if (ex != null) promise.fail(ex);
else promise.complete();
});
});
} else {
queueStatisticsCollector.resetQueueFailureStatistics(queueName);
promise.complete();
queueStatisticsCollector.resetQueueFailureStatistics(queueName, (ex, v) -> {
if (ex != null) promise.fail(ex);
else promise.complete();
});
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public void execute(Message<JsonObject> event) {
}
var p = redisProvider.redis();
p.onSuccess(redisAPI -> redisAPI.del(buildQueueKeys(queues), delManyReply -> {
queueStatisticsCollector.resetQueueStatistics(queues);
queueStatisticsCollector.resetQueueStatistics(queues, (Throwable ex, Void v) -> {
if (ex != null) log.warn("TODO_q93258hu38 error handling", ex);
});
if (delManyReply.succeeded()) {
event.reply(createOkReply().put(VALUE, delManyReply.result().toLong()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ public void execute(Message<JsonObject> event) {
// 1st: We don't, to keep backward compatibility
// 2nd: We don't, to may unlock below.
}
queueStatisticsCollector.resetQueueFailureStatistics(queue);
queueStatisticsCollector.resetQueueFailureStatistics(queue, (Throwable ex, Void v) -> {
if (ex != null) log.warn("TODO_2958iouhj error handling", ex);
});
if (unlock) {
redisAPI.hdel(Arrays.asList(locksKey, queue), unlockReply -> {
if (unlockReply.failed()) {
Expand Down
Loading

0 comments on commit 9120074

Please sign in to comment.