Skip to content

Commit

Permalink
[SDCISA-16293] Continue full re-impl. Not much impl yet.
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed Jun 28, 2024
1 parent ee568cb commit fa37e98
Showing 1 changed file with 116 additions and 34 deletions.
150 changes: 116 additions & 34 deletions src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.swisspush.redisques.util;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsonFormatVisitors.JsonBooleanFormatVisitor;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
Expand All @@ -10,6 +13,11 @@
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.io.Writer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -20,10 +28,15 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;

import static java.lang.Math.max;
import static java.lang.System.currentTimeMillis;
import static org.slf4j.LoggerFactory.getLogger;

import static io.vertx.core.Future.succeededFuture;
import static java.lang.Thread.currentThread;
import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME;
import static org.swisspush.redisques.util.RedisquesAPI.QUEUES;
import static org.swisspush.redisques.util.RedisquesAPI.STATUS;

/**
* Class StatisticsCollector helps collecting statistics information about queue handling and
Expand All @@ -42,38 +55,61 @@
*/
public class QueueStatisticsCollector {

private static final String queueUpdatesAddr = "redisques." + QueueStatisticsCollector.class.getSimpleName() + ".jAQCAO0YAgDrSQIA";
private final long publishEveryMs = 10_000; /*TODO ctor-inject*/
private static final Logger log = getLogger(QueueStatisticsCollector.class);
private final Vertx vertx;
private final EventBus evBus;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Lock mutx = new ReentrantLock();
private Thread mutxOwner;
private final Map<String, Queue> queues = new HashMap<>();
private final Set<Queue> hasLocalChanges = new HashSet<>();
private final Map<String, QueueInternal> queues = new HashMap<>();
private Set<QueueInternal> hasLocalChanges = new HashSet<>();

public QueueStatisticsCollector(
RedisProvider redisProvider, String queuesPrefix, Vertx vertx, RedisQuesExceptionFactory exceptionFactory,
Semaphore redisMonitoringReqQuota, int queueSpeedIntervalSec
RedisProvider redisProvider, String queuesPrefix, Vertx vertx, RedisQuesExceptionFactory exceptionFactory,
Semaphore redisMonitoringReqQuota, int queueSpeedIntervalSec
) {
long publishEveryMs = 1000;
String evBusAddr = QueueStatisticsCollector.class.getCanonicalName();
evBus = vertx.eventBus();
evBus.consumer(evBusAddr, this::onUpdateFromClusterPeer);
vertx.setTimer(publishEveryMs, this::publishLocalChanges);
this.vertx = vertx;
this.evBus = vertx.eventBus();
this.evBus.consumer(queueUpdatesAddr, this::onUpdateFromCluster);
this.vertx.setTimer(publishEveryMs, this::publishLocalChanges);
}

private void onUpdateFromClusterPeer(Message<Object> evBusMsg) {
Object body = evBusMsg.body();
assert false : "TODO";
private void publishLocalChanges(long nonsense) {
Set<QueueInternal> copyToPublish;
// TODO make 'hasLocalchanges.size()' thread-save.
Set<QueueInternal> theNewInstance = new HashSet<>(max(16, hasLocalChanges.size() / 2));
mutx.lock();
try {
copyToPublish = hasLocalChanges;
hasLocalChanges = theNewInstance;
} finally {
mutx.unlock();
}
var buf = new ByteArrayOutputStream(64);
var writer = new OutputStreamWriter(buf);
encode(copyToPublish, writer);
try {
writer.close();
} catch (IOException e) {
throw new UncheckedIOException("TODO", e);
}
evBus.publish(queueUpdatesAddr, buf.toByteArray());
vertx.setTimer(publishEveryMs, this::publishLocalChanges);
}

private void publishLocalChanges(long nonsense) {
Set<Queue> localCopy;
private void onUpdateFromCluster(Message<Object> evBusMsg) {
Set<QueueInternal> newQueues = decode((byte[]) evBusMsg.body());
mutx.lock();
try {
localCopy = hasLocalChanges;
/*TODO any way to move this loop outside lock?*/
for (QueueInternal newQueue : newQueues) {
queues.put(newQueue.name, newQueue);
}
} finally {
mutx.unlock();
}
evBus.publish(, );
}

public void resetQueueFailureStatistics(String queueName, BiConsumer<Throwable, Void> onDone) {
Expand All @@ -86,8 +122,8 @@ public void queueMessageSuccess(String queueName, BiConsumer<Throwable, Void> on
try {
assert mutxOwner == null;
mutxOwner = currentThread();
Queue queue = getQueue(queueName);
assert queue.name.equals(queueName) : queue.name +", "+ queueName;
QueueInternal queue = getQueue(queueName);
assert queue.name.equals(queueName) : queue.name + ", " + queueName;
queue.newestSuccessEpchMs = now;
hasLocalChanges.add(queue);
} finally {
Expand All @@ -106,7 +142,7 @@ public long queueMessageFailed(String queueName) {
try {
assert mutxOwner == null;
mutxOwner = currentThread();
Queue queue = getQueue(queueName);
QueueInternal queue = getQueue(queueName);
queue.newestFailureEpchMs = now;
hasLocalChanges.add(queue);
} finally {
Expand All @@ -121,7 +157,7 @@ public void setQueueSlowDownTime(String queueName, int retryDelaySec) {
try {
assert mutxOwner == null;
mutxOwner = currentThread();
Queue queue = getQueue(queueName);
QueueInternal queue = getQueue(queueName);
assert queue.name.equals(queueName);
queue.slowdownTimeSec = retryDelaySec;
hasLocalChanges.add(queue);
Expand All @@ -136,7 +172,7 @@ public void setQueueBackPressureTime(String queueName, long delayReplyMs) {
assert mutxOwner == null;
mutxOwner = currentThread();
try {
Queue queue = getQueue(queueName);
QueueInternal queue = getQueue(queueName);
queue.backpressureDelayMs = delayReplyMs;
hasLocalChanges.add(queue);
} finally {
Expand All @@ -145,10 +181,29 @@ public void setQueueBackPressureTime(String queueName, long delayReplyMs) {
}
}

// TODO get rid of ubly 'JsonObject' in return
/**
* @deprecated use {link #getQueueStatistics2}. WARN: This API bleeds
* weakly-typed stuff! The compiler will FAIL to tell you about bugs.
*/
@Deprecated
public Future<JsonObject> getQueueStatistics(List<String> queueNames) {
log.trace("TODO getQueueStatistics({})", queueNames);
return Promise.<JsonObject>promise().future(/*TODO*/);
return getQueueStatistics2(queueNames).compose(queues -> vertx.executeBlocking(() -> {
JsonArray superfluousJsonArray = new JsonArray();
for (Queue queue : queues) {
JsonObject queueJson = new JsonObject();
queueJson.put(MONITOR_QUEUE_NAME, queue.name);
/*queueJson.put(TODO, queue.TODO);*/
superfluousJsonArray.add(queueJson);
}
JsonObject superfluousJsonObj = new JsonObject();
superfluousJsonObj.put(STATUS, "OK");
superfluousJsonObj.put(QUEUES, superfluousJsonArray);
return superfluousJsonObj;
}));
}

public Future<Set<Queue>> getQueueStatistics2(List<String> queueNames) {
throw new UnsupportedOperationException/*TODO*/("not impl yet");
}

public void getQueuesSpeed(Message<JsonObject> ev, List<String> queueNames) {
Expand All @@ -159,23 +214,50 @@ public void resetQueueStatistics(JsonArray queues, BiConsumer<Throwable, Void> t
throw new UnsupportedOperationException/*TODO*/("not impl yet");
}

private Queue getQueue(String queueName) {
private QueueInternal getQueue(String queueName) {
assert mutxOwner == currentThread();
Queue queue = queues.get(queueName);
if(queue == null){
queue = new Queue();
QueueInternal queue = queues.get(queueName);
if (queue == null) {
queue = new QueueInternal();
queue.name = queueName;
queues.put(queue.name, queue);
}
return queue;
}

private static class Queue {
private String name;
private long newestSuccessEpchMs;
private long newestFailureEpchMs;
private int slowdownTimeSec;
private long backpressureDelayMs;
private void encode(Set<QueueInternal> queues, Writer dst) {
ObjectMapper objectMapper = new ObjectMapper(); /*TODO move*/
try {
objectMapper.writeValue(dst, queues);
} catch (IOException e) {
throw new UncheckedIOException("TODO", e);
}
}

private Set<QueueInternal> decode(byte[] body) {
ObjectMapper objectMapper = new ObjectMapper()/*TODO*/;
try {
return objectMapper.readValue(body, new TypeReference<Set<QueueInternal>>() {});
} catch (IOException e) {
throw new UncheckedIOException("TODO", e);
}
}


private static class QueueInternal {
public String name;
public long newestSuccessEpchMs = -1;
public long newestFailureEpchMs = -1;
public int slowdownTimeSec = -1;
public long backpressureDelayMs = -1;
}

public static class Queue {
public String name;
public long newestSuccessEpchMs = -1;
public long newestFailureEpchMs = -1;
public int slowdownTimeSec = -1;
public long backpressureDelayMs = -1;
}

}

0 comments on commit fa37e98

Please sign in to comment.