Skip to content

Commit

Permalink
Merge pull request #3664 from ingef/reintegrate-main
Browse files Browse the repository at this point in the history
Reintegrate Main
  • Loading branch information
awildturtok authored Feb 12, 2025
2 parents b3b417c + ffe734f commit 9cab011
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void disconnectFromCluster() {

@NotNull
private NioSocketConnector getClusterConnector() {
ObjectMapper om = internalMapperFactory.createShardCommunicationMapper();
ObjectMapper om = internalMapperFactory.createShardCommunicationMapper(workers);

return config.getCluster().getClusterConnector(om, this, "Shard");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.bakdata.conquery.mode.cluster;

import jakarta.validation.Validator;

import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.MutableInjectableValues;
import com.bakdata.conquery.io.jackson.View;
Expand All @@ -9,15 +11,17 @@
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.identifiable.ids.IIdInterner;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
import jakarta.validation.Validator;

public record InternalMapperFactory(ConqueryConfig config, Validator validator) {

public ObjectMapper createShardCommunicationMapper() {
return createInternalObjectMapper(View.InternalCommunication.class);
public ObjectMapper createShardCommunicationMapper(ShardWorkers workers) {
ObjectMapper objectMapper = createInternalObjectMapper(View.InternalCommunication.class);
workers.injectInto(objectMapper);
return objectMapper;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public void addBucket(Bucket bucket) {
.filter(connector -> !hasCBlock(new CBlockId(bucket.getId(), connector.getId())))
.forEach(connector -> job.addCBlock(bucket, (ConceptTreeConnector) connector));

if (job.isEmpty()){
return;
}

jobManager.addSlowJob(job);
}

Expand Down Expand Up @@ -340,7 +344,14 @@ public ConceptTreeCache getConceptTreeCache(TreeConcept concept, ImportId imp) {
}

public void removeConceptTreeCacheByImport(ConceptId concept, ImportId imp) {
treeCaches.get(concept).remove(imp);
Map<ImportId, ConceptTreeCache> treeCache = treeCaches.get(concept);

if(treeCache == null) {
// Not all concepts have a cache: only concepts with column-based connectors
return;
}

treeCache.remove(imp);
}

public void removeConceptTreeCacheByConcept(ConceptId concept) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
package com.bakdata.conquery.models.messages.network.specific;

import java.io.IOException;
import java.util.Objects;

import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId;
import com.bakdata.conquery.models.jobs.SimpleJob;
import com.bakdata.conquery.models.messages.SlowMessage;
import com.bakdata.conquery.models.messages.namespaces.WorkerMessage;
import com.bakdata.conquery.models.messages.network.MessageToShardNode;
import com.bakdata.conquery.models.messages.network.NetworkMessage;
import com.bakdata.conquery.models.messages.network.NetworkMessageContext.ShardNodeNetworkContext;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.bakdata.conquery.util.progressreporter.ProgressReporter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.ToString;

/**
Expand All @@ -31,51 +25,29 @@
@CPSType(id = "FORWARD_TO_WORKER", base = NetworkMessage.class)
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@ToString(of = {"workerId", "text"})
@ToString(of = {"workerId", "message"})
public class ForwardToWorker extends MessageToShardNode implements SlowMessage {

private final WorkerId workerId;
private final byte[] messageRaw;
// We cache these on the sender side.
private final WorkerMessage message;

@Getter(onMethod_ = @JsonIgnore(false))
private final boolean slowMessage;
private final String text;

@JsonIgnore
@Setter
private ProgressReporter progressReporter;

public static ForwardToWorker create(WorkerId worker, WorkerMessage message, ObjectWriter writer) {
return new ForwardToWorker(
worker,
serializeMessage(message, writer),
true,
message.toString()
);
}

@SneakyThrows(IOException.class)
private static byte[] serializeMessage(WorkerMessage message, ObjectWriter writer) {
return writer.writeValueAsBytes(message);
}

private static WorkerMessage deserializeMessage(byte[] messageRaw, ObjectMapper mapper) throws java.io.IOException {
return mapper.readerFor(WorkerMessage.class).readValue(messageRaw);
public static ForwardToWorker create(WorkerId worker, WorkerMessage message) {
return new ForwardToWorker(worker, message, true);
}

@Override
public void react(ShardNodeNetworkContext context) throws Exception {
final Worker worker = Objects.requireNonNull(context.getWorkers().getWorker(workerId));
ConqueryMDC.setLocation(worker.toString());


// Jobception: this is to ensure that no subsequent message is deserialized before one message is processed
worker.getJobManager().addSlowJob(new SimpleJob("Process %s".formatted(getText()), () -> {

final WorkerMessage message = deserializeMessage(messageRaw, worker.getCommunicationMapper());

message.setProgressReporter(progressReporter);
message.react(worker);
}));
getMessage().setProgressReporter(progressReporter);
getMessage().react(worker);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.bakdata.conquery.models.worker;

import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

import com.bakdata.conquery.io.mina.MessageSender;
import com.bakdata.conquery.models.identifiable.NamedImpl;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
Expand All @@ -10,8 +13,6 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectWriter;
import it.unimi.dsi.fastutil.ints.IntArraySet;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -54,6 +55,6 @@ public ShardNodeInformation getMessageParent() {

@Override
public MessageToShardNode transform(WorkerMessage message) {
return ForwardToWorker.create(getId(), message, communicationWriter);
return ForwardToWorker.create(getId(), message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,23 @@
"children": []
}
]
},
{
"label": "test_tree_table",
"type": "TREE",
"connectors": [
{
"label": "tree_label",
"name": "test_column",
"table": "table1",
"validityDates": {
"label": "datum",
"column": "table1.datum"
}
}
],
"children": [
]
}
],
"content": {
Expand Down

0 comments on commit 9cab011

Please sign in to comment.