Skip to content

Commit

Permalink
Merge pull request #3272 from ingef/feature/undo-gzip-messages
Browse files Browse the repository at this point in the history
undo gzipping WorkerMessages as it was causing ImportJob to slow down significantly
  • Loading branch information
awildturtok authored Jan 22, 2024
2 parents be18f0a + 1b38713 commit 272d555
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,25 @@
import lombok.Getter;
import lombok.Setter;

@Getter @Setter
@Getter
@Setter
public class ClusterConfig extends Configuration {
@PortRange
private int port = 16170;
@Valid @NotNull
@Valid
@NotNull
private InetAddress managerURL = InetAddress.getLoopbackAddress();
@Valid @NotNull
@Valid
@NotNull
private MinaConfig mina = new MinaConfig();
@Min(1)
private int entityBucketSize = 1000;

/**
* Amount of backpressure before jobs can volunteer to block to send messages to their shards.
*
* <p>
* Mostly {@link com.bakdata.conquery.models.jobs.ImportJob} is interested in this. Note that an average import should create more than #Entities / {@linkplain #entityBucketSize} jobs (via {@link com.bakdata.conquery.models.jobs.CalculateCBlocksJob}) in short succession, which will cause it to sleep. This field helps alleviate memory pressure on the Shards by slowing down the Manager, should it be sending too fast.
*/
@Min(0)
private int backpressure = 3000;
private int backpressure = 1500;
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package com.bakdata.conquery.models.messages.network.specific;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId;
Expand Down Expand Up @@ -60,16 +55,11 @@ public static ForwardToWorker create(WorkerId worker, WorkerMessage message, Obj

@SneakyThrows(IOException.class)
private static byte[] serializeMessage(WorkerMessage message, ObjectWriter writer) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (OutputStream outputStream = new GZIPOutputStream(baos)) {
writer.writeValue(outputStream, message);
}

return baos.toByteArray();
return writer.writeValueAsBytes(message);
}

private static WorkerMessage deserializeMessage(byte[] messageRaw, ObjectMapper mapper) throws java.io.IOException {
return mapper.readerFor(WorkerMessage.class).readValue(new GZIPInputStream(new ByteArrayInputStream(messageRaw)));
return mapper.readerFor(WorkerMessage.class).readValue(messageRaw);
}

@Override
Expand Down

0 comments on commit 272d555

Please sign in to comment.