From 5f6f84e938ab5aaab1c27adf43b1bc7def21f133 Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Mon, 1 Jul 2024 12:00:57 +0200 Subject: [PATCH] [SDCISA-16293] Continue full re-impl --- .../util/QueueStatisticsCollector.java | 95 +++++++++++++------ 1 file changed, 68 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java index c0aa5a2..c7f91fb 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java +++ b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java @@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; +import static io.vertx.core.Future.failedFuture; import static java.lang.Math.max; import static java.lang.System.currentTimeMillis; import static org.slf4j.LoggerFactory.getLogger; @@ -35,6 +36,7 @@ import static io.vertx.core.Future.succeededFuture; import static java.lang.Thread.currentThread; import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME; +import static org.swisspush.redisques.util.RedisquesAPI.OK; import static org.swisspush.redisques.util.RedisquesAPI.QUEUES; import static org.swisspush.redisques.util.RedisquesAPI.STATUS; @@ -63,6 +65,7 @@ public class QueueStatisticsCollector { private final ObjectMapper objectMapper = new ObjectMapper(); private final Lock mutx = new ReentrantLock(); private Thread mutxOwner; + /** key=QueueName */ private final Map queues = new HashMap<>(); private Set hasLocalChanges = new HashSet<>(); @@ -89,27 +92,31 @@ private void publishLocalChanges(long nonsense) { } var buf = new ByteArrayOutputStream(64); var writer = new OutputStreamWriter(buf); - encode(copyToPublish, writer); - try { - writer.close(); - } catch (IOException e) { - throw new UncheckedIOException("TODO", e); - } - evBus.publish(queueUpdatesAddr, buf.toByteArray()); - vertx.setTimer(publishEveryMs, this::publishLocalChanges); + encode(copyToPublish, writer, writer).compose((OutputStreamWriter writer1) -> { + try { + writer1.close(); + } catch (IOException ex) { + return failedFuture(ex); + } + evBus.publish(queueUpdatesAddr, buf.toByteArray()); + vertx.setTimer(publishEveryMs, this::publishLocalChanges); + return succeededFuture(); + }); } private void onUpdateFromCluster(Message evBusMsg) { - Set newQueues = decode((byte[]) evBusMsg.body()); - mutx.lock(); - try { - /*TODO any way to move this loop outside lock?*/ - for (QueueInternal newQueue : newQueues) { - queues.put(newQueue.name, newQueue); + decode((byte[]) evBusMsg.body()).compose((Set newQueues) -> { + mutx.lock(); + try { + /*TODO any way to move this loop outside lock?*/ + for (QueueInternal newQueue : newQueues) { + queues.put(newQueue.name, newQueue); + } + } finally { + mutx.unlock(); } - } finally { - mutx.unlock(); - } + return succeededFuture(); + }); } public void resetQueueFailureStatistics(String queueName, BiConsumer onDone) { @@ -187,7 +194,7 @@ public void setQueueBackPressureTime(String queueName, long delayReplyMs) { */ @Deprecated public Future getQueueStatistics(List queueNames) { - return getQueueStatistics2(queueNames).compose(queues -> vertx.executeBlocking(() -> { + return getQueueStatistics2(queueNames).compose((Set queues) -> vertx.executeBlocking(() -> { JsonArray superfluousJsonArray = new JsonArray(); for (Queue queue : queues) { JsonObject queueJson = new JsonObject(); @@ -196,14 +203,35 @@ public Future getQueueStatistics(List queueNames) { superfluousJsonArray.add(queueJson); } JsonObject superfluousJsonObj = new JsonObject(); - superfluousJsonObj.put(STATUS, "OK"); + superfluousJsonObj.put(STATUS, OK); superfluousJsonObj.put(QUEUES, superfluousJsonArray); return superfluousJsonObj; })); } public Future> getQueueStatistics2(List queueNames) { - throw new UnsupportedOperationException/*TODO*/("not impl yet"); + Object[] workArr; + workArr = new Object[queueNames.size()]; + mutx.lock(); + try { + // Grab references of the requested entries. + int i = -1; + for (String queueName : queueNames) { + i += 1; + workArr[i] = queues.get(queueName); + } + } finally { + mutx.unlock(); + } + // Map those entries over to the return type IN-PLACE. + for (int i = 0; i < workArr.length; ++i) { + var q = Queue.of((QueueInternal)workArr[i]); + workArr[i] = q; + } + // Map our "work array" to a Set for return. + var ret = new HashSet(workArr.length); + for (Object queue : workArr) ret.add((Queue) queue); + return succeededFuture(ret); } public void getQueuesSpeed(Message ev, List queueNames) { @@ -225,22 +253,25 @@ private QueueInternal getQueue(String queueName) { return queue; } - private void encode(Set queues, Writer dst) { + private Future encode(Set queues, Ctx ctx, Writer dst) { ObjectMapper objectMapper = new ObjectMapper(); /*TODO move*/ try { objectMapper.writeValue(dst, queues); - } catch (IOException e) { - throw new UncheckedIOException("TODO", e); + } catch (IOException ex) { + return failedFuture(ex); } + return succeededFuture(ctx); } - private Set decode(byte[] body) { + private Future> decode(byte[] body) { ObjectMapper objectMapper = new ObjectMapper()/*TODO*/; + Set queues; try { - return objectMapper.readValue(body, new TypeReference>() {}); - } catch (IOException e) { - throw new UncheckedIOException("TODO", e); + queues = objectMapper.readValue(body, new TypeReference>() {}); + } catch (IOException ex) { + return failedFuture(ex); } + return succeededFuture(queues); } @@ -258,6 +289,16 @@ public static class Queue { public long newestFailureEpchMs = -1; public int slowdownTimeSec = -1; public long backpressureDelayMs = -1; + + public static Object of(QueueInternal queueInternal) { + var that = new Queue(); + that.name = queueInternal.name; + that.newestSuccessEpchMs = queueInternal.newestSuccessEpchMs; + that.newestFailureEpchMs = queueInternal.newestFailureEpchMs; + that.slowdownTimeSec = queueInternal.slowdownTimeSec; + that.backpressureDelayMs = queueInternal.backpressureDelayMs; + return that; + } } }