Skip to content

Commit

Permalink
adds soft pool cleaner, configuration and allocation limiter for Coll…
Browse files Browse the repository at this point in the history
…ectColumnValuesJob (#3618)

* adds soft pool cleaner, configuration and allocation limiter for CollectColumnValuesJob
* use 4 kibibytes as default chunk size for NetworkSession
* Reduce queue size of NetworkSession and make configurable.

---------

Co-authored-by: awildturtok <1553491+awildturtok@users.noreply.github.com>
  • Loading branch information
thoniTUB and awildturtok authored Dec 2, 2024
1 parent 8ead51d commit 9e4c021
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 60 deletions.
34 changes: 17 additions & 17 deletions backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
import java.io.OutputStream;
import java.util.UUID;

import com.bakdata.conquery.models.config.ClusterConfig;
import com.bakdata.conquery.util.SoftPool;
import com.google.common.primitives.Ints;
import io.dropwizard.util.DataSize;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
Expand All @@ -23,13 +21,15 @@ public class ChunkWriter extends ProtocolEncoderAdapter {
public static final int HEADER_SIZE = Integer.BYTES + Byte.BYTES + 2 * Long.BYTES;
public static final byte LAST_MESSAGE = 1;
public static final byte CONTINUED_MESSAGE = 0;

@Getter
@Setter
private int bufferSize = Ints.checkedCast(DataSize.megabytes(2).toBytes());
private final SoftPool<IoBuffer> bufferPool = new SoftPool<>(() -> IoBuffer.allocate(bufferSize));
@SuppressWarnings("rawtypes")
private final CQCoder coder;
private final SoftPool<IoBuffer> bufferPool;

public ChunkWriter(ClusterConfig config, CQCoder coder) {
this.coder = coder;
int bufferSize = Ints.checkedCast(config.getMessageChunkSize().toBytes());
bufferPool = new SoftPool<>(config, () -> IoBuffer.allocate(bufferSize));
}

@SuppressWarnings("unchecked")
@Override
Expand All @@ -47,6 +47,15 @@ private class ChunkOutputStream extends OutputStream {
private IoBuffer buffer = null;
private boolean closed = false;

@Override
public void write(int b) throws IOException {
if (closed) {
throw new IllegalStateException();
}
newBuffer(1);
buffer.put((byte) b);
}

private void newBuffer(int required) {
if (buffer == null || buffer.remaining() < required) {
if (buffer != null) {
Expand Down Expand Up @@ -75,15 +84,6 @@ private void finishBuffer(boolean end) {
buffer = null;
}

@Override
public void write(int b) throws IOException {
if (closed) {
throw new IllegalStateException();
}
newBuffer(1);
buffer.put((byte) b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,40 @@

import com.bakdata.conquery.models.messages.network.NetworkMessage;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.future.DefaultWriteFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.jetbrains.annotations.NotNull;

@RequiredArgsConstructor
@Slf4j
public class NetworkSession implements MessageSender<NetworkMessage<?>> {
public static final int MAX_MESSAGE_LENGTH = 30;
public static final int MAX_QUEUE_LENGTH = 20;
@Getter
private final IoSession session;
private final LinkedBlockingQueue<NetworkMessage<?>> queuedMessages = new LinkedBlockingQueue<>(MAX_QUEUE_LENGTH);
private final LinkedBlockingQueue<NetworkMessage<?>> queuedMessages;

public NetworkSession(IoSession session, int maxQueueLength) {
this.session = session;
queuedMessages = new LinkedBlockingQueue<>(maxQueueLength);
}


@Override
public WriteFuture send(final NetworkMessage<?> message) {
try {
while (!queuedMessages.offer(message, 2, TimeUnit.MINUTES)) {
log.debug("Waiting for full writing queue for {} currently filled by:\n\t- {}",
message,
log.isTraceEnabled()
? new ArrayList<>(queuedMessages).stream()
.map(Objects::toString)
.map(NetworkSession::shorten)
.collect(Collectors.joining("\n\t\t- "))
: "%s messages".formatted(queuedMessages.size())
);
logWaitingMessages(message);
}
}
catch (InterruptedException e) {
log.error("Unexpected interruption, while trying to queue: {}", message, e);
return DefaultWriteFuture.newNotWrittenFuture(session, e);
}
WriteFuture future = session.write(message);

future.addListener(f -> {
if(f instanceof WriteFuture writeFuture && ! writeFuture.isWritten()) {
if (f instanceof WriteFuture writeFuture && !writeFuture.isWritten()) {
log.error("Could not write message: {}", message, writeFuture.getException());
}
queuedMessages.remove(message);
Expand All @@ -55,6 +51,21 @@ public WriteFuture send(final NetworkMessage<?> message) {
return future;
}

private void logWaitingMessages(NetworkMessage<?> message) {
final String waiting;
if (log.isTraceEnabled()) {
waiting = new ArrayList<>(queuedMessages).stream()
.map(Objects::toString)
.map(NetworkSession::shorten)
.collect(Collectors.joining("\n\t\t- "));
}
else {
waiting = "%s messages".formatted(queuedMessages.size());
}

log.debug("Waiting for full writing queue for {} currently filled by:\n\t- {}", message, waiting);
}

@NotNull
private static String shorten(String desc) {
if (desc.length() <= MAX_MESSAGE_LENGTH) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void messageReceived(IoSession session, Object message) {

if (shardNodeInformation == null) {
// In case the shard is not yet registered, we wont have a shardNodeInformation to pull the session from
nwSession = new NetworkSession(session);
nwSession = new NetworkSession(session, config.getCluster().getNetworkSessionMaxQueueLength());
}
else {
nwSession = shardNodeInformation.getSession();
Expand Down Expand Up @@ -111,7 +111,7 @@ public void start() throws IOException {
final ObjectMapper om = internalMapperFactory.createManagerCommunicationMapper(datasetRegistry);

final BinaryJacksonCoder coder = new BinaryJacksonCoder(datasetRegistry, validator, om);
acceptor.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om)));
acceptor.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(config.getCluster(), coder), new ChunkReader(coder, om)));
acceptor.setHandler(this);
acceptor.getSessionConfig().setAll(config.getCluster().getMina());
acceptor.bind(new InetSocketAddress(config.getCluster().getPort()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void sessionCreated(IoSession session) {

@Override
public void sessionOpened(IoSession session) {
NetworkSession networkSession = new NetworkSession(session);
NetworkSession networkSession = new NetworkSession(session, config.getCluster().getNetworkSessionMaxQueueLength());

// Schedule ShardNode and Worker registration, so we don't block this thread which does the actual sending
scheduler.schedule(() -> {
Expand All @@ -76,7 +76,7 @@ public void sessionOpened(IoSession session) {
context.send(new AddShardNode());

for (Worker w : workers.getWorkers().values()) {
w.setSession(new NetworkSession(session));
w.setSession(networkSession);
WorkerInformation info = w.getInfo();
log.info("Sending worker identity '{}'", info.getName());
networkSession.send(new RegisterWorker(info));
Expand Down Expand Up @@ -173,7 +173,7 @@ private NioSocketConnector getClusterConnector(ShardWorkers workers) {

final BinaryJacksonCoder coder = new BinaryJacksonCoder(workers, environment.getValidator(), om);
connector.getFilterChain().addFirst("mdc", new MdcFilter("Shard[%s]"));
connector.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om)));
connector.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(config.getCluster(), coder), new ChunkReader(coder, om)));
connector.setHandler(this);
connector.getSessionConfig().setAll(config.getCluster().getMina());
return connector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public DistributedNamespace createNamespace(NamespaceStorage namespaceStorage, M
namespaceData.getFilterSearch(),
new ClusterEntityResolver(),
namespaceData.getInjectables(),
workerHandler
workerHandler,
config.getCluster()
);

for (ShardNodeInformation node : clusterState.getShardNodes().values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import jakarta.validation.constraints.NotNull;

import io.dropwizard.core.Configuration;
import io.dropwizard.util.DataSize;
import io.dropwizard.util.Duration;
import io.dropwizard.validation.PortRange;
import lombok.Getter;
Expand All @@ -29,6 +30,41 @@ public class ClusterConfig extends Configuration {
private Duration heartbeatTimeout = Duration.minutes(1);
private Duration connectRetryTimeout = Duration.seconds(30);

/**
* @see com.bakdata.conquery.models.messages.namespaces.specific.CollectColumnValuesJob
*
* Number of values to batch for chunking of unique column-values. Lower numbers reduce relative performance but reduce memory demand, avoiding OOM issues.
*/
private int columnValuesPerChunk = 1000;

/**
* @see com.bakdata.conquery.io.mina.NetworkSession
*
* Maximum number of messages allowed to wait for writing before writer-threads are blocked.
*/
private int networkSessionMaxQueueLength = 5;

/**
* {@link org.apache.mina.core.buffer.IoBuffer} size, that mina allocates.
* We assume a pagesize of 4096 bytes == 4 kibibytes
*/
@NotNull
@Valid
private DataSize messageChunkSize = DataSize.kibibytes(4);

/**
* How long the soft pool cleaner waits before reducing the pool size down to softPoolBaselineSize.
*/
@NotNull
@Valid
private Duration softPoolCleanerPause = Duration.seconds(10);

/**
* The number of soft references the soft pool should retain after cleaning.
* The actual number of {@link org.apache.mina.core.buffer.IoBuffer}
*/
private long softPoolBaselineSize = 100;

/**
* Amount of backpressure before jobs can volunteer to block to send messages to their shards.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -27,13 +28,15 @@
import com.bakdata.conquery.models.worker.Worker;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.future.WriteFuture;

/**
* This Job collects the distinct values in the given columns and returns a {@link RegisterColumnValues} message for each column to the namespace on the manager.
Expand All @@ -43,6 +46,12 @@
@CPSType(id = "COLLECT_COLUMN_VALUES", base = NamespacedMessage.class)
public class CollectColumnValuesJob extends WorkerMessage implements ActionReactionMessage {

/**
* Trying to rate-limit how many threads are actively allocating column-values.
*/
private final int MAX_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 5);

public final int columValueChunkSize;
@Getter
private final Set<ColumnId> columns;

Expand All @@ -58,26 +67,42 @@ public void react(Worker context) throws Exception {
final Map<TableId, List<Bucket>> table2Buckets = context.getStorage().getAllBuckets()
.collect(Collectors.groupingBy(Bucket::getTable));


final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(context.getJobsExecutorService());
final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS));

final AtomicInteger done = new AtomicInteger();


final List<? extends ListenableFuture<?>> futures =
columns.stream()
.filter(column -> table2Buckets.get(column.getTable()) != null)
.map(ColumnId::resolve)
.map(column ->
jobsExecutorService.submit(() -> {
final List<Bucket> buckets = table2Buckets.get(column.getTable().getId());

final Set<String> values = buckets.stream()
.flatMap(bucket -> ((StringStore) bucket.getStore(column)).streamValues())
.collect(Collectors.toSet());
context.send(new RegisterColumnValues(getMessageId(), context.getInfo().getId(), column.getId(), values));
log.trace("Finished collections values for column {} as number {}", column, done.incrementAndGet());
})
.map(column -> {
// Acquire before submitting, so we don't spam the executor with waiting threads
return jobsExecutorService.submit(() -> {
final List<Bucket> buckets = table2Buckets.get(column.getTable().getId());

final Set<String> values = buckets.stream()
.flatMap(bucket -> ((StringStore) bucket.getStore(column)).streamValues())
.collect(Collectors.toSet());

log.trace("Finished collections values for column {} as number {}", column, done.incrementAndGet());

// Chunk values, to produce smaller messages
Iterable<List<String>> partition = Iterables.partition(values, columValueChunkSize);

log.trace("BEGIN Sending column values for {}. {} total values in {} sized batches",
column.getId(), values.size(), columValueChunkSize
);

for (List<String> chunk : partition) {
// Send values to manager
RegisterColumnValues message =
new RegisterColumnValues(getMessageId(), context.getInfo().getId(), column.getId(), chunk);
WriteFuture send = context.send(message);

send.awaitUninterruptibly();
}
});
}
)
.collect(Collectors.toList());

Expand All @@ -97,6 +122,9 @@ public void react(Worker context) throws Exception {
}
}

// We may do this, because we own this specific ExecutorService.
jobsExecutorService.shutdown();

log.info("Finished collecting values from these columns: {}", Arrays.toString(columns.toArray()));
context.send(new FinalizeReactionMessage(getMessageId(), context.getInfo().getId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.future.WriteFuture;

/**
* For each {@link com.bakdata.conquery.models.query.queryplan.specific.ConceptNode} calculate the number of matching events and the span of date-ranges.
Expand Down Expand Up @@ -78,7 +79,8 @@ public void execute() throws Exception {

calculateConceptMatches(resolved, matchingStats, worker);

worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), matchingStats));
final WriteFuture writeFuture = worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), matchingStats));
writeFuture.awaitUninterruptibly();

progressReporter.report(1);
}, worker.getJobsExecutorService())
Expand Down
Loading

0 comments on commit 9e4c021

Please sign in to comment.