From 8329f7d61be365b14fda2a6f8737c5a94a8d6487 Mon Sep 17 00:00:00 2001 From: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> Date: Tue, 19 Mar 2024 11:50:46 +0100 Subject: [PATCH 1/2] adds finalize message to support multiple ReactionMessage per worker Signed-off-by: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> --- .../models/messages/ReactionMessage.java | 2 + .../namespaces/ActionReactionMessage.java | 2 +- .../specific/CollectColumnValuesJob.java | 1 + .../specific/FinalizeReactionMessage.java | 37 +++++++++++++++++++ .../specific/RegisterColumnValues.java | 7 ++++ .../conquery/models/worker/WorkerHandler.java | 12 ++++-- 6 files changed, 57 insertions(+), 4 deletions(-) create mode 100644 backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/FinalizeReactionMessage.java diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/ReactionMessage.java b/backend/src/main/java/com/bakdata/conquery/models/messages/ReactionMessage.java index 7f58d0f1bc..41569d3f40 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/ReactionMessage.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/ReactionMessage.java @@ -15,4 +15,6 @@ public interface ReactionMessage { UUID getCallerId(); WorkerId getWorkerId(); + + boolean lastMessageFromWorker(); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/ActionReactionMessage.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/ActionReactionMessage.java index 52bfd2564d..2fe4044ee8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/ActionReactionMessage.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/ActionReactionMessage.java @@ -6,7 +6,7 @@ import com.bakdata.conquery.models.worker.WorkerHandler; /** - * Interface for {@link WorkerMessage}s that require postprocessing on the manager, after all workers responded with a {@link ReactionMessage}. + * Interface for {@link WorkerMessage}s that require postprocessing on the manager, after all workers responded with possibly multiple {@link ReactionMessage} that are not final and a single {@link com.bakdata.conquery.models.messages.namespaces.specific.FinalizeReactionMessage}. */ public interface ActionReactionMessage { diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java index 1a11689abe..3f999cf8df 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java @@ -94,6 +94,7 @@ public void react(Worker context) throws Exception { } } log.info("Finished collecting values from these columns: {}", Arrays.toString(columns.toArray())); + context.send(new FinalizeReactionMessage(getMessageId(), context.getInfo().getId())); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/FinalizeReactionMessage.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/FinalizeReactionMessage.java new file mode 100644 index 0000000000..fb383ccacc --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/FinalizeReactionMessage.java @@ -0,0 +1,37 @@ +package com.bakdata.conquery.models.messages.namespaces.specific; + +import java.util.UUID; + +import com.bakdata.conquery.io.cps.CPSType; +import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; +import com.bakdata.conquery.models.messages.ReactionMessage; +import com.bakdata.conquery.models.messages.namespaces.NamespaceMessage; +import com.bakdata.conquery.models.messages.namespaces.NamespacedMessage; +import com.bakdata.conquery.models.worker.DistributedNamespace; +import com.fasterxml.jackson.annotation.JsonCreator; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +@CPSType(id = "FINALIZE_REACTION_MESSAGE", base = NamespacedMessage.class) +@AllArgsConstructor(onConstructor_ = @JsonCreator) +@Getter +@Slf4j +@ToString +public class FinalizeReactionMessage extends NamespaceMessage implements ReactionMessage { + + private UUID callerId; + + private WorkerId workerId; + + @Override + public boolean lastMessageFromWorker() { + return true; + } + + @Override + public void react(DistributedNamespace context) throws Exception { + log.debug("Received finalize message from caller '{}' workerId '{}'", callerId, workerId); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/RegisterColumnValues.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/RegisterColumnValues.java index c10e0dc88a..d23e3d22a9 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/RegisterColumnValues.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/RegisterColumnValues.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; /** @@ -24,6 +25,7 @@ @AllArgsConstructor(onConstructor_ = @JsonCreator) @Getter @Slf4j +@ToString public class RegisterColumnValues extends NamespaceMessage implements ReactionMessage { private UUID callerId; @@ -42,4 +44,9 @@ public void react(DistributedNamespace context) throws Exception { } context.getFilterSearch().registerValues(column, values); } + + @Override + public boolean lastMessageFromWorker() { + return false; + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java index 3ce4929110..d01c3d2080 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java @@ -49,7 +49,7 @@ public IdMap getWorkers() { return this.workers; } - public Map pendingReactions = new HashMap<>(); + private Map pendingReactions = new HashMap<>(); public void sendToAll(WorkerMessage msg) { if (workers.isEmpty()) { @@ -76,7 +76,7 @@ public void handleReactionMessage(ReactionMessage message) { throw new IllegalStateException(String.format("No pending action registered (anymore) for caller id %s from reaction message: %s", callerId, message)); } - if (pendingReaction.checkoffWorker(message.getWorkerId())) { + if (pendingReaction.checkoffWorker(message)) { log.debug("Removing pending reaction '{}' as last pending message was received.", callerId); pendingReactions.remove(callerId); } @@ -189,7 +189,12 @@ private record PendingReaction(UUID callerId, Set pendingWorkers, Runn /** * Marks the given worker as not pending. If the last pending worker checks off the afterAllReaction is executed. */ - public synchronized boolean checkoffWorker(WorkerId workerId) { + public synchronized boolean checkoffWorker(ReactionMessage message) { + final WorkerId workerId = message.getWorkerId(); + if (!message.lastMessageFromWorker()) { + log.trace("Received reacting message, but was not the last one: {}", message); + return false; + } if (!pendingWorkers.remove(workerId)) { throw new IllegalStateException(String.format("Could not check off worker %s for action-reaction message '%s'. Worker was not checked in.", workerId, callerId)); } @@ -199,6 +204,7 @@ public synchronized boolean checkoffWorker(WorkerId workerId) { } log.debug("Checked off last worker '{}' for action-reaction message '{}'. Calling hook", workerId, callerId); + afterAllHook.run(); return true; From 989f9e7ec653c22c697afccb1d94e97d08be0bd7 Mon Sep 17 00:00:00 2001 From: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> Date: Tue, 19 Mar 2024 12:44:14 +0100 Subject: [PATCH 2/2] apply intellij suggestions Signed-off-by: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> --- .../com/bakdata/conquery/models/worker/WorkerHandler.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java index d01c3d2080..902dba5c97 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -36,6 +37,7 @@ public class WorkerHandler { /** * All known {@link Worker}s that are part of this Namespace. */ + @Getter private final IdMap workers = new IdMap<>(); /** @@ -45,11 +47,7 @@ public class WorkerHandler { private final NamespaceStorage storage; - public IdMap getWorkers() { - return this.workers; - } - - private Map pendingReactions = new HashMap<>(); + private final Map pendingReactions = new HashMap<>(); public void sendToAll(WorkerMessage msg) { if (workers.isEmpty()) {