Skip to content

Commit

Permalink
[SDCISA-16293] Continue full re-impl
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed Jul 1, 2024
1 parent fa37e98 commit 5f6f84e
Showing 1 changed file with 68 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;

import static io.vertx.core.Future.failedFuture;
import static java.lang.Math.max;
import static java.lang.System.currentTimeMillis;
import static org.slf4j.LoggerFactory.getLogger;

import static io.vertx.core.Future.succeededFuture;
import static java.lang.Thread.currentThread;
import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME;
import static org.swisspush.redisques.util.RedisquesAPI.OK;
import static org.swisspush.redisques.util.RedisquesAPI.QUEUES;
import static org.swisspush.redisques.util.RedisquesAPI.STATUS;

Expand Down Expand Up @@ -63,6 +65,7 @@ public class QueueStatisticsCollector {
private final ObjectMapper objectMapper = new ObjectMapper();
private final Lock mutx = new ReentrantLock();
private Thread mutxOwner;
/** key=QueueName */
private final Map<String, QueueInternal> queues = new HashMap<>();
private Set<QueueInternal> hasLocalChanges = new HashSet<>();

Expand All @@ -89,27 +92,31 @@ private void publishLocalChanges(long nonsense) {
}
var buf = new ByteArrayOutputStream(64);
var writer = new OutputStreamWriter(buf);
encode(copyToPublish, writer);
try {
writer.close();
} catch (IOException e) {
throw new UncheckedIOException("TODO", e);
}
evBus.publish(queueUpdatesAddr, buf.toByteArray());
vertx.setTimer(publishEveryMs, this::publishLocalChanges);
encode(copyToPublish, writer, writer).compose((OutputStreamWriter writer1) -> {
try {
writer1.close();
} catch (IOException ex) {
return failedFuture(ex);
}
evBus.publish(queueUpdatesAddr, buf.toByteArray());
vertx.setTimer(publishEveryMs, this::publishLocalChanges);
return succeededFuture();
});
}

private void onUpdateFromCluster(Message<Object> evBusMsg) {
Set<QueueInternal> newQueues = decode((byte[]) evBusMsg.body());
mutx.lock();
try {
/*TODO any way to move this loop outside lock?*/
for (QueueInternal newQueue : newQueues) {
queues.put(newQueue.name, newQueue);
decode((byte[]) evBusMsg.body()).compose((Set<QueueInternal> newQueues) -> {
mutx.lock();
try {
/*TODO any way to move this loop outside lock?*/
for (QueueInternal newQueue : newQueues) {
queues.put(newQueue.name, newQueue);
}
} finally {
mutx.unlock();
}
} finally {
mutx.unlock();
}
return succeededFuture();
});
}

public void resetQueueFailureStatistics(String queueName, BiConsumer<Throwable, Void> onDone) {
Expand Down Expand Up @@ -187,7 +194,7 @@ public void setQueueBackPressureTime(String queueName, long delayReplyMs) {
*/
@Deprecated
public Future<JsonObject> getQueueStatistics(List<String> queueNames) {
return getQueueStatistics2(queueNames).compose(queues -> vertx.executeBlocking(() -> {
return getQueueStatistics2(queueNames).compose((Set<Queue> queues) -> vertx.executeBlocking(() -> {
JsonArray superfluousJsonArray = new JsonArray();
for (Queue queue : queues) {
JsonObject queueJson = new JsonObject();
Expand All @@ -196,14 +203,35 @@ public Future<JsonObject> getQueueStatistics(List<String> queueNames) {
superfluousJsonArray.add(queueJson);
}
JsonObject superfluousJsonObj = new JsonObject();
superfluousJsonObj.put(STATUS, "OK");
superfluousJsonObj.put(STATUS, OK);
superfluousJsonObj.put(QUEUES, superfluousJsonArray);
return superfluousJsonObj;
}));
}

public Future<Set<Queue>> getQueueStatistics2(List<String> queueNames) {
throw new UnsupportedOperationException/*TODO*/("not impl yet");
Object[] workArr;
workArr = new Object[queueNames.size()];
mutx.lock();
try {
// Grab references of the requested entries.
int i = -1;
for (String queueName : queueNames) {
i += 1;
workArr[i] = queues.get(queueName);
}
} finally {
mutx.unlock();
}
// Map those entries over to the return type IN-PLACE.
for (int i = 0; i < workArr.length; ++i) {
var q = Queue.of((QueueInternal)workArr[i]);
workArr[i] = q;
}
// Map our "work array" to a Set for return.
var ret = new HashSet<Queue>(workArr.length);
for (Object queue : workArr) ret.add((Queue) queue);
return succeededFuture(ret);
}

public void getQueuesSpeed(Message<JsonObject> ev, List<String> queueNames) {
Expand All @@ -225,22 +253,25 @@ private QueueInternal getQueue(String queueName) {
return queue;
}

private void encode(Set<QueueInternal> queues, Writer dst) {
private <Ctx> Future<Ctx> encode(Set<QueueInternal> queues, Ctx ctx, Writer dst) {
ObjectMapper objectMapper = new ObjectMapper(); /*TODO move*/
try {
objectMapper.writeValue(dst, queues);
} catch (IOException e) {
throw new UncheckedIOException("TODO", e);
} catch (IOException ex) {
return failedFuture(ex);
}
return succeededFuture(ctx);
}

private Set<QueueInternal> decode(byte[] body) {
private Future<Set<QueueInternal>> decode(byte[] body) {
ObjectMapper objectMapper = new ObjectMapper()/*TODO*/;
Set<QueueInternal> queues;
try {
return objectMapper.readValue(body, new TypeReference<Set<QueueInternal>>() {});
} catch (IOException e) {
throw new UncheckedIOException("TODO", e);
queues = objectMapper.readValue(body, new TypeReference<Set<QueueInternal>>() {});
} catch (IOException ex) {
return failedFuture(ex);
}
return succeededFuture(queues);
}


Expand All @@ -258,6 +289,16 @@ public static class Queue {
public long newestFailureEpchMs = -1;
public int slowdownTimeSec = -1;
public long backpressureDelayMs = -1;

public static Object of(QueueInternal queueInternal) {
var that = new Queue();
that.name = queueInternal.name;
that.newestSuccessEpchMs = queueInternal.newestSuccessEpchMs;
that.newestFailureEpchMs = queueInternal.newestFailureEpchMs;
that.slowdownTimeSec = queueInternal.slowdownTimeSec;
that.backpressureDelayMs = queueInternal.backpressureDelayMs;
return that;
}
}

}

0 comments on commit 5f6f84e

Please sign in to comment.