From eaee6f6538c5a795c13600b8f10847cb0339c10b Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Mon, 22 Jan 2024 11:08:13 +0100 Subject: [PATCH 1/2] undo gzipping WorkerMessages as it was causing ImportJob to slow down significantly --- .../conquery/models/config/ClusterConfig.java | 13 ++++++++----- .../messages/network/specific/ForwardToWorker.java | 9 +++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java index 47d6756078..9fe3ad7a6b 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java @@ -11,22 +11,25 @@ import lombok.Getter; import lombok.Setter; -@Getter @Setter +@Getter +@Setter public class ClusterConfig extends Configuration { @PortRange private int port = 16170; - @Valid @NotNull + @Valid + @NotNull private InetAddress managerURL = InetAddress.getLoopbackAddress(); - @Valid @NotNull + @Valid + @NotNull private MinaConfig mina = new MinaConfig(); @Min(1) private int entityBucketSize = 1000; /** * Amount of backpressure before jobs can volunteer to block to send messages to their shards. - * + *
* Mostly {@link com.bakdata.conquery.models.jobs.ImportJob} is interested in this. Note that an average import should create more than #Entities / {@linkplain #entityBucketSize} jobs (via {@link com.bakdata.conquery.models.jobs.CalculateCBlocksJob}) in short succession, which will cause it to sleep. This field helps alleviate memory pressure on the Shards by slowing down the Manager, should it be sending too fast. */ @Min(0) - private int backpressure = 3000; + private int backpressure = 1500; } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java b/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java index daba9b5fbc..2ab6246667 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java @@ -5,8 +5,6 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Objects; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; @@ -60,16 +58,15 @@ public static ForwardToWorker create(WorkerId worker, WorkerMessage message, Obj @SneakyThrows(IOException.class) private static byte[] serializeMessage(WorkerMessage message, ObjectWriter writer) { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (OutputStream outputStream = new GZIPOutputStream(baos)) { + try (OutputStream outputStream = new ByteArrayOutputStream()) { writer.writeValue(outputStream, message); } - return baos.toByteArray(); + return new ByteArrayOutputStream().toByteArray(); } private static WorkerMessage deserializeMessage(byte[] messageRaw, ObjectMapper mapper) throws java.io.IOException { - return mapper.readerFor(WorkerMessage.class).readValue(new GZIPInputStream(new ByteArrayInputStream(messageRaw))); + return mapper.readerFor(WorkerMessage.class).readValue(new ByteArrayInputStream(messageRaw)); } @Override From 1b38713024d2eef4e9babed8396c258c62728e44 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Mon, 22 Jan 2024 15:38:51 +0100 Subject: [PATCH 2/2] fixes usage of outputStreams --- .../messages/network/specific/ForwardToWorker.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java b/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java index 2ab6246667..c118778fb8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java @@ -1,9 +1,6 @@ package com.bakdata.conquery.models.messages.network.specific; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.util.Objects; import com.bakdata.conquery.io.cps.CPSType; @@ -58,15 +55,11 @@ public static ForwardToWorker create(WorkerId worker, WorkerMessage message, Obj @SneakyThrows(IOException.class) private static byte[] serializeMessage(WorkerMessage message, ObjectWriter writer) { - try (OutputStream outputStream = new ByteArrayOutputStream()) { - writer.writeValue(outputStream, message); - } - - return new ByteArrayOutputStream().toByteArray(); + return writer.writeValueAsBytes(message); } private static WorkerMessage deserializeMessage(byte[] messageRaw, ObjectMapper mapper) throws java.io.IOException { - return mapper.readerFor(WorkerMessage.class).readValue(new ByteArrayInputStream(messageRaw)); + return mapper.readerFor(WorkerMessage.class).readValue(messageRaw); } @Override