Skip to content

Commit

Permalink
Merge pull request #3357 from ingef/fix/action-reaction-multiple-mess…
Browse files Browse the repository at this point in the history
…ages-handling

adds finalize message to support multiple ReactionMessage per worker
  • Loading branch information
awildturtok authored Mar 21, 2024
2 parents 82e6829 + 6f053f2 commit a3c39f4
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ public interface ReactionMessage {
UUID getCallerId();

WorkerId getWorkerId();

boolean lastMessageFromWorker();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.bakdata.conquery.models.worker.WorkerHandler;

/**
* Interface for {@link WorkerMessage}s that require postprocessing on the manager, after all workers responded with a {@link ReactionMessage}.
* Interface for {@link WorkerMessage}s that require postprocessing on the manager, after all workers responded with possibly multiple {@link ReactionMessage} that are not final and a single {@link com.bakdata.conquery.models.messages.namespaces.specific.FinalizeReactionMessage}.
*/
public interface ActionReactionMessage {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public void react(Worker context) throws Exception {
}
}
log.info("Finished collecting values from these columns: {}", Arrays.toString(columns.toArray()));
context.send(new FinalizeReactionMessage(getMessageId(), context.getInfo().getId()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.bakdata.conquery.models.messages.namespaces.specific;

import java.util.UUID;

import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId;
import com.bakdata.conquery.models.messages.ReactionMessage;
import com.bakdata.conquery.models.messages.namespaces.NamespaceMessage;
import com.bakdata.conquery.models.messages.namespaces.NamespacedMessage;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.fasterxml.jackson.annotation.JsonCreator;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

@CPSType(id = "FINALIZE_REACTION_MESSAGE", base = NamespacedMessage.class)
@AllArgsConstructor(onConstructor_ = @JsonCreator)
@Getter
@Slf4j
@ToString
public class FinalizeReactionMessage extends NamespaceMessage implements ReactionMessage {

private UUID callerId;

private WorkerId workerId;

@Override
public boolean lastMessageFromWorker() {
return true;
}

@Override
public void react(DistributedNamespace context) throws Exception {
log.debug("Received finalize message from caller '{}' workerId '{}'", callerId, workerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -24,6 +25,7 @@
@AllArgsConstructor(onConstructor_ = @JsonCreator)
@Getter
@Slf4j
@ToString
public class RegisterColumnValues extends NamespaceMessage implements ReactionMessage {

private UUID callerId;
Expand All @@ -42,4 +44,9 @@ public void react(DistributedNamespace context) throws Exception {
}
context.getFilterSearch().registerValues(column, values);
}

@Override
public boolean lastMessageFromWorker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -38,6 +39,7 @@ public class WorkerHandler {
/**
* All known {@link Worker}s that are part of this Namespace.
*/
@Getter
private final IdMap<WorkerId, WorkerInformation> workers = new IdMap<>();

/**
Expand All @@ -47,7 +49,9 @@ public class WorkerHandler {

private final NamespaceStorage storage;

@NotNull
private final Map<UUID, PendingReaction> pendingReactions = new HashMap<>();

@NotNull
public Set<WorkerId> getAllWorkerIds() {
return getWorkers().stream()
.map(WorkerInformation::getId)
Expand All @@ -58,8 +62,7 @@ public IdMap<WorkerId, WorkerInformation> getWorkers() {
return this.workers;
}

public Map<UUID, PendingReaction> pendingReactions = new HashMap<>();


public void sendToAll(WorkerMessage msg) {
if (workers.isEmpty()) {
throw new IllegalStateException("There are no workers yet");
Expand All @@ -85,7 +88,7 @@ public void handleReactionMessage(ReactionMessage message) {
throw new IllegalStateException(String.format("No pending action registered (anymore) for caller id %s from reaction message: %s", callerId, message));
}

if (pendingReaction.checkoffWorker(message.getWorkerId())) {
if (pendingReaction.checkoffWorker(message)) {
log.debug("Removing pending reaction '{}' as last pending message was received.", callerId);
pendingReactions.remove(callerId);
}
Expand Down Expand Up @@ -198,7 +201,12 @@ private record PendingReaction(UUID callerId, Set<WorkerId> pendingWorkers, Runn
/**
* Marks the given worker as not pending. If the last pending worker checks off the afterAllReaction is executed.
*/
public synchronized boolean checkoffWorker(WorkerId workerId) {
public synchronized boolean checkoffWorker(ReactionMessage message) {
final WorkerId workerId = message.getWorkerId();
if (!message.lastMessageFromWorker()) {
log.trace("Received reacting message, but was not the last one: {}", message);
return false;
}
if (!pendingWorkers.remove(workerId)) {
throw new IllegalStateException(String.format("Could not check off worker %s for action-reaction message '%s'. Worker was not checked in.", workerId, callerId));
}
Expand All @@ -208,6 +216,7 @@ public synchronized boolean checkoffWorker(WorkerId workerId) {
}

log.debug("Checked off last worker '{}' for action-reaction message '{}'. Calling hook", workerId, callerId);

afterAllHook.run();
return true;

Expand Down

0 comments on commit a3c39f4

Please sign in to comment.