diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/ReactionMessage.java b/backend/src/main/java/com/bakdata/conquery/models/messages/ReactionMessage.java index 7f58d0f1bc..41569d3f40 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/ReactionMessage.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/ReactionMessage.java @@ -15,4 +15,6 @@ public interface ReactionMessage { UUID getCallerId(); WorkerId getWorkerId(); + + boolean lastMessageFromWorker(); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/ActionReactionMessage.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/ActionReactionMessage.java index 52bfd2564d..2fe4044ee8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/ActionReactionMessage.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/ActionReactionMessage.java @@ -6,7 +6,7 @@ import com.bakdata.conquery.models.worker.WorkerHandler; /** - * Interface for {@link WorkerMessage}s that require postprocessing on the manager, after all workers responded with a {@link ReactionMessage}. + * Interface for {@link WorkerMessage}s that require postprocessing on the manager, after all workers responded with possibly multiple {@link ReactionMessage} that are not final and a single {@link com.bakdata.conquery.models.messages.namespaces.specific.FinalizeReactionMessage}. */ public interface ActionReactionMessage { diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java index 1a11689abe..3f999cf8df 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java @@ -94,6 +94,7 @@ public void react(Worker context) throws Exception { } } log.info("Finished collecting values from these columns: {}", Arrays.toString(columns.toArray())); + context.send(new FinalizeReactionMessage(getMessageId(), context.getInfo().getId())); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/FinalizeReactionMessage.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/FinalizeReactionMessage.java new file mode 100644 index 0000000000..fb383ccacc --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/FinalizeReactionMessage.java @@ -0,0 +1,37 @@ +package com.bakdata.conquery.models.messages.namespaces.specific; + +import java.util.UUID; + +import com.bakdata.conquery.io.cps.CPSType; +import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; +import com.bakdata.conquery.models.messages.ReactionMessage; +import com.bakdata.conquery.models.messages.namespaces.NamespaceMessage; +import com.bakdata.conquery.models.messages.namespaces.NamespacedMessage; +import com.bakdata.conquery.models.worker.DistributedNamespace; +import com.fasterxml.jackson.annotation.JsonCreator; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +@CPSType(id = "FINALIZE_REACTION_MESSAGE", base = NamespacedMessage.class) +@AllArgsConstructor(onConstructor_ = @JsonCreator) +@Getter +@Slf4j +@ToString +public class FinalizeReactionMessage extends NamespaceMessage implements ReactionMessage { + + private UUID callerId; + + private WorkerId workerId; + + @Override + public boolean lastMessageFromWorker() { + return true; + } + + @Override + public void react(DistributedNamespace context) throws Exception { + log.debug("Received finalize message from caller '{}' workerId '{}'", callerId, workerId); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/RegisterColumnValues.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/RegisterColumnValues.java index c10e0dc88a..d23e3d22a9 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/RegisterColumnValues.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/RegisterColumnValues.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; /** @@ -24,6 +25,7 @@ @AllArgsConstructor(onConstructor_ = @JsonCreator) @Getter @Slf4j +@ToString public class RegisterColumnValues extends NamespaceMessage implements ReactionMessage { private UUID callerId; @@ -42,4 +44,9 @@ public void react(DistributedNamespace context) throws Exception { } context.getFilterSearch().registerValues(column, values); } + + @Override + public boolean lastMessageFromWorker() { + return false; + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java index 235cfc3cd4..fc51ca115b 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -38,6 +39,7 @@ public class WorkerHandler { /** * All known {@link Worker}s that are part of this Namespace. */ + @Getter private final IdMap workers = new IdMap<>(); /** @@ -47,7 +49,9 @@ public class WorkerHandler { private final NamespaceStorage storage; - @NotNull + private final Map pendingReactions = new HashMap<>(); + + @NotNull public Set getAllWorkerIds() { return getWorkers().stream() .map(WorkerInformation::getId) @@ -58,8 +62,7 @@ public IdMap getWorkers() { return this.workers; } - public Map pendingReactions = new HashMap<>(); - + public void sendToAll(WorkerMessage msg) { if (workers.isEmpty()) { throw new IllegalStateException("There are no workers yet"); @@ -85,7 +88,7 @@ public void handleReactionMessage(ReactionMessage message) { throw new IllegalStateException(String.format("No pending action registered (anymore) for caller id %s from reaction message: %s", callerId, message)); } - if (pendingReaction.checkoffWorker(message.getWorkerId())) { + if (pendingReaction.checkoffWorker(message)) { log.debug("Removing pending reaction '{}' as last pending message was received.", callerId); pendingReactions.remove(callerId); } @@ -198,7 +201,12 @@ private record PendingReaction(UUID callerId, Set pendingWorkers, Runn /** * Marks the given worker as not pending. If the last pending worker checks off the afterAllReaction is executed. */ - public synchronized boolean checkoffWorker(WorkerId workerId) { + public synchronized boolean checkoffWorker(ReactionMessage message) { + final WorkerId workerId = message.getWorkerId(); + if (!message.lastMessageFromWorker()) { + log.trace("Received reacting message, but was not the last one: {}", message); + return false; + } if (!pendingWorkers.remove(workerId)) { throw new IllegalStateException(String.format("Could not check off worker %s for action-reaction message '%s'. Worker was not checked in.", workerId, callerId)); } @@ -208,6 +216,7 @@ public synchronized boolean checkoffWorker(WorkerId workerId) { } log.debug("Checked off last worker '{}' for action-reaction message '{}'. Calling hook", workerId, callerId); + afterAllHook.run(); return true;