diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java index 727ebd9e2f..c5e38d316f 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java @@ -4,12 +4,10 @@ import java.io.OutputStream; import java.util.UUID; +import com.bakdata.conquery.models.config.ClusterConfig; import com.bakdata.conquery.util.SoftPool; import com.google.common.primitives.Ints; -import io.dropwizard.util.DataSize; -import lombok.Getter; import lombok.RequiredArgsConstructor; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; @@ -23,13 +21,15 @@ public class ChunkWriter extends ProtocolEncoderAdapter { public static final int HEADER_SIZE = Integer.BYTES + Byte.BYTES + 2 * Long.BYTES; public static final byte LAST_MESSAGE = 1; public static final byte CONTINUED_MESSAGE = 0; - - @Getter - @Setter - private int bufferSize = Ints.checkedCast(DataSize.megabytes(2).toBytes()); - private final SoftPool bufferPool = new SoftPool<>(() -> IoBuffer.allocate(bufferSize)); @SuppressWarnings("rawtypes") private final CQCoder coder; + private final SoftPool bufferPool; + + public ChunkWriter(ClusterConfig config, CQCoder coder) { + this.coder = coder; + int bufferSize = Ints.checkedCast(config.getMessageChunkSize().toBytes()); + bufferPool = new SoftPool<>(config, () -> IoBuffer.allocate(bufferSize)); + } @SuppressWarnings("unchecked") @Override @@ -47,6 +47,15 @@ private class ChunkOutputStream extends OutputStream { private IoBuffer buffer = null; private boolean closed = false; + @Override + public void write(int b) throws IOException { + if (closed) { + throw new IllegalStateException(); + } + newBuffer(1); + buffer.put((byte) b); + } + private void newBuffer(int required) { if (buffer == null || buffer.remaining() < required) { if (buffer != null) { @@ -75,15 +84,6 @@ private void finishBuffer(boolean end) { buffer = null; } - @Override - public void write(int b) throws IOException { - if (closed) { - throw new IllegalStateException(); - } - newBuffer(1); - buffer.put((byte) b); - } - @Override public void write(byte[] b, int off, int len) throws IOException { if (closed) { diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/NetworkSession.java b/backend/src/main/java/com/bakdata/conquery/io/mina/NetworkSession.java index 52c61a22fa..41ec824edf 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/NetworkSession.java +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/NetworkSession.java @@ -9,35 +9,30 @@ import com.bakdata.conquery.models.messages.network.NetworkMessage; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.mina.core.future.DefaultWriteFuture; import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.session.IoSession; import org.jetbrains.annotations.NotNull; -@RequiredArgsConstructor @Slf4j public class NetworkSession implements MessageSender> { public static final int MAX_MESSAGE_LENGTH = 30; - public static final int MAX_QUEUE_LENGTH = 20; @Getter private final IoSession session; - private final LinkedBlockingQueue> queuedMessages = new LinkedBlockingQueue<>(MAX_QUEUE_LENGTH); + private final LinkedBlockingQueue> queuedMessages; + + public NetworkSession(IoSession session, int maxQueueLength) { + this.session = session; + queuedMessages = new LinkedBlockingQueue<>(maxQueueLength); + } + @Override public WriteFuture send(final NetworkMessage message) { try { while (!queuedMessages.offer(message, 2, TimeUnit.MINUTES)) { - log.debug("Waiting for full writing queue for {} currently filled by:\n\t- {}", - message, - log.isTraceEnabled() - ? new ArrayList<>(queuedMessages).stream() - .map(Objects::toString) - .map(NetworkSession::shorten) - .collect(Collectors.joining("\n\t\t- ")) - : "%s messages".formatted(queuedMessages.size()) - ); + logWaitingMessages(message); } } catch (InterruptedException e) { @@ -45,8 +40,9 @@ public WriteFuture send(final NetworkMessage message) { return DefaultWriteFuture.newNotWrittenFuture(session, e); } WriteFuture future = session.write(message); + future.addListener(f -> { - if(f instanceof WriteFuture writeFuture && ! writeFuture.isWritten()) { + if (f instanceof WriteFuture writeFuture && !writeFuture.isWritten()) { log.error("Could not write message: {}", message, writeFuture.getException()); } queuedMessages.remove(message); @@ -55,6 +51,21 @@ public WriteFuture send(final NetworkMessage message) { return future; } + private void logWaitingMessages(NetworkMessage message) { + final String waiting; + if (log.isTraceEnabled()) { + waiting = new ArrayList<>(queuedMessages).stream() + .map(Objects::toString) + .map(NetworkSession::shorten) + .collect(Collectors.joining("\n\t\t- ")); + } + else { + waiting = "%s messages".formatted(queuedMessages.size()); + } + + log.debug("Waiting for full writing queue for {} currently filled by:\n\t- {}", message, waiting); + } + @NotNull private static String shorten(String desc) { if (desc.length() <= MAX_MESSAGE_LENGTH) { diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java index 53d0602f52..c9828afb38 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java @@ -76,7 +76,7 @@ public void messageReceived(IoSession session, Object message) { if (shardNodeInformation == null) { // In case the shard is not yet registered, we wont have a shardNodeInformation to pull the session from - nwSession = new NetworkSession(session); + nwSession = new NetworkSession(session, config.getCluster().getNetworkSessionMaxQueueLength()); } else { nwSession = shardNodeInformation.getSession(); @@ -111,7 +111,7 @@ public void start() throws IOException { final ObjectMapper om = internalMapperFactory.createManagerCommunicationMapper(datasetRegistry); final BinaryJacksonCoder coder = new BinaryJacksonCoder(datasetRegistry, validator, om); - acceptor.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om))); + acceptor.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(config.getCluster(), coder), new ChunkReader(coder, om))); acceptor.setHandler(this); acceptor.getSessionConfig().setAll(config.getCluster().getMina()); acceptor.bind(new InetSocketAddress(config.getCluster().getPort())); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java index 71c007fdc4..4d108b330a 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java @@ -65,7 +65,7 @@ public void sessionCreated(IoSession session) { @Override public void sessionOpened(IoSession session) { - NetworkSession networkSession = new NetworkSession(session); + NetworkSession networkSession = new NetworkSession(session, config.getCluster().getNetworkSessionMaxQueueLength()); // Schedule ShardNode and Worker registration, so we don't block this thread which does the actual sending scheduler.schedule(() -> { @@ -76,7 +76,7 @@ public void sessionOpened(IoSession session) { context.send(new AddShardNode()); for (Worker w : workers.getWorkers().values()) { - w.setSession(new NetworkSession(session)); + w.setSession(networkSession); WorkerInformation info = w.getInfo(); log.info("Sending worker identity '{}'", info.getName()); networkSession.send(new RegisterWorker(info)); @@ -173,7 +173,7 @@ private NioSocketConnector getClusterConnector(ShardWorkers workers) { final BinaryJacksonCoder coder = new BinaryJacksonCoder(workers, environment.getValidator(), om); connector.getFilterChain().addFirst("mdc", new MdcFilter("Shard[%s]")); - connector.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om))); + connector.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(config.getCluster(), coder), new ChunkReader(coder, om))); connector.setHandler(this); connector.getSessionConfig().setAll(config.getCluster().getMina()); return connector; diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java index c8bf780127..854b64da25 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java @@ -37,7 +37,8 @@ public DistributedNamespace createNamespace(NamespaceStorage namespaceStorage, M namespaceData.getFilterSearch(), new ClusterEntityResolver(), namespaceData.getInjectables(), - workerHandler + workerHandler, + config.getCluster() ); for (ShardNodeInformation node : clusterState.getShardNodes().values()) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java index f2f0ea1910..f404aa1a48 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java @@ -6,6 +6,7 @@ import jakarta.validation.constraints.NotNull; import io.dropwizard.core.Configuration; +import io.dropwizard.util.DataSize; import io.dropwizard.util.Duration; import io.dropwizard.validation.PortRange; import lombok.Getter; @@ -29,6 +30,41 @@ public class ClusterConfig extends Configuration { private Duration heartbeatTimeout = Duration.minutes(1); private Duration connectRetryTimeout = Duration.seconds(30); + /** + * @see com.bakdata.conquery.models.messages.namespaces.specific.CollectColumnValuesJob + * + * Number of values to batch for chunking of unique column-values. Lower numbers reduce relative performance but reduce memory demand, avoiding OOM issues. + */ + private int columnValuesPerChunk = 1000; + + /** + * @see com.bakdata.conquery.io.mina.NetworkSession + * + * Maximum number of messages allowed to wait for writing before writer-threads are blocked. + */ + private int networkSessionMaxQueueLength = 5; + + /** + * {@link org.apache.mina.core.buffer.IoBuffer} size, that mina allocates. + * We assume a pagesize of 4096 bytes == 4 kibibytes + */ + @NotNull + @Valid + private DataSize messageChunkSize = DataSize.kibibytes(4); + + /** + * How long the soft pool cleaner waits before reducing the pool size down to softPoolBaselineSize. + */ + @NotNull + @Valid + private Duration softPoolCleanerPause = Duration.seconds(10); + + /** + * The number of soft references the soft pool should retain after cleaning. + * The actual number of {@link org.apache.mina.core.buffer.IoBuffer} + */ + private long softPoolBaselineSize = 100; + /** * Amount of backpressure before jobs can volunteer to block to send messages to their shards. *

diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java index dafbb00495..95ca03ecf4 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -27,6 +28,7 @@ import com.bakdata.conquery.models.worker.Worker; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -34,6 +36,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.mina.core.future.WriteFuture; /** * This Job collects the distinct values in the given columns and returns a {@link RegisterColumnValues} message for each column to the namespace on the manager. @@ -43,6 +46,12 @@ @CPSType(id = "COLLECT_COLUMN_VALUES", base = NamespacedMessage.class) public class CollectColumnValuesJob extends WorkerMessage implements ActionReactionMessage { + /** + * Trying to rate-limit how many threads are actively allocating column-values. + */ + private final int MAX_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 5); + + public final int columValueChunkSize; @Getter private final Set columns; @@ -58,26 +67,42 @@ public void react(Worker context) throws Exception { final Map> table2Buckets = context.getStorage().getAllBuckets() .collect(Collectors.groupingBy(Bucket::getTable)); - - final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(context.getJobsExecutorService()); + final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS)); final AtomicInteger done = new AtomicInteger(); - final List> futures = columns.stream() .filter(column -> table2Buckets.get(column.getTable()) != null) .map(ColumnId::resolve) - .map(column -> - jobsExecutorService.submit(() -> { - final List buckets = table2Buckets.get(column.getTable().getId()); - - final Set values = buckets.stream() - .flatMap(bucket -> ((StringStore) bucket.getStore(column)).streamValues()) - .collect(Collectors.toSet()); - context.send(new RegisterColumnValues(getMessageId(), context.getInfo().getId(), column.getId(), values)); - log.trace("Finished collections values for column {} as number {}", column, done.incrementAndGet()); - }) + .map(column -> { + // Acquire before submitting, so we don't spam the executor with waiting threads + return jobsExecutorService.submit(() -> { + final List buckets = table2Buckets.get(column.getTable().getId()); + + final Set values = buckets.stream() + .flatMap(bucket -> ((StringStore) bucket.getStore(column)).streamValues()) + .collect(Collectors.toSet()); + + log.trace("Finished collections values for column {} as number {}", column, done.incrementAndGet()); + + // Chunk values, to produce smaller messages + Iterable> partition = Iterables.partition(values, columValueChunkSize); + + log.trace("BEGIN Sending column values for {}. {} total values in {} sized batches", + column.getId(), values.size(), columValueChunkSize + ); + + for (List chunk : partition) { + // Send values to manager + RegisterColumnValues message = + new RegisterColumnValues(getMessageId(), context.getInfo().getId(), column.getId(), chunk); + WriteFuture send = context.send(message); + + send.awaitUninterruptibly(); + } + }); + } ) .collect(Collectors.toList()); @@ -97,6 +122,9 @@ public void react(Worker context) throws Exception { } } + // We may do this, because we own this specific ExecutorService. + jobsExecutorService.shutdown(); + log.info("Finished collecting values from these columns: {}", Arrays.toString(columns.toArray())); context.send(new FinalizeReactionMessage(getMessageId(), context.getInfo().getId())); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java index 19f54c8d13..94af290f66 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java @@ -31,6 +31,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.mina.core.future.WriteFuture; /** * For each {@link com.bakdata.conquery.models.query.queryplan.specific.ConceptNode} calculate the number of matching events and the span of date-ranges. @@ -78,7 +79,8 @@ public void execute() throws Exception { calculateConceptMatches(resolved, matchingStats, worker); - worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), matchingStats)); + final WriteFuture writeFuture = worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), matchingStats)); + writeFuture.awaitUninterruptibly(); progressReporter.report(1); }, worker.getJobsExecutorService()) diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java index c018a2a797..a21f894dc8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java @@ -9,6 +9,7 @@ import com.bakdata.conquery.io.jackson.Injectable; import com.bakdata.conquery.io.storage.NamespaceStorage; import com.bakdata.conquery.mode.cluster.ClusterEntityResolver; +import com.bakdata.conquery.models.config.ClusterConfig; import com.bakdata.conquery.models.datasets.Column; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.concepts.Concept; @@ -36,6 +37,7 @@ public class DistributedNamespace extends Namespace { private final WorkerHandler workerHandler; private final DistributedExecutionManager executionManager; + private final ClusterConfig clusterConfig; public DistributedNamespace( ObjectMapper preprocessMapper, @@ -45,11 +47,12 @@ public DistributedNamespace( FilterSearch filterSearch, ClusterEntityResolver clusterEntityResolver, List injectables, - WorkerHandler workerHandler - ) { + WorkerHandler workerHandler, + ClusterConfig clusterConfig) { super(preprocessMapper, storage, executionManager, jobManager, filterSearch, clusterEntityResolver, injectables); this.executionManager = executionManager; this.workerHandler = workerHandler; + this.clusterConfig = clusterConfig; } @Override @@ -64,7 +67,13 @@ void updateMatchingStats() { @Override void registerColumnValuesInSearch(Set columns) { log.trace("Sending columns to collect values on shards: {}", Arrays.toString(columns.toArray())); - getWorkerHandler().sendToAll(new CollectColumnValuesJob(columns.stream().map(Column::getId).collect(Collectors.toSet()), this)); + + final CollectColumnValuesJob columnValuesJob = new CollectColumnValuesJob( + clusterConfig.getColumnValuesPerChunk(), + columns.stream().map(Column::getId).collect(Collectors.toSet()), this + ); + + getWorkerHandler().sendToAll(columnValuesJob); } } diff --git a/backend/src/main/java/com/bakdata/conquery/util/SoftPool.java b/backend/src/main/java/com/bakdata/conquery/util/SoftPool.java index 6307c2f86d..9f58bd4d72 100644 --- a/backend/src/main/java/com/bakdata/conquery/util/SoftPool.java +++ b/backend/src/main/java/com/bakdata/conquery/util/SoftPool.java @@ -2,15 +2,54 @@ import java.lang.ref.SoftReference; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; -import lombok.RequiredArgsConstructor; +import com.bakdata.conquery.models.config.ClusterConfig; +import com.google.common.util.concurrent.Uninterruptibles; +import lombok.extern.slf4j.Slf4j; -@RequiredArgsConstructor +@Slf4j public class SoftPool { private final ConcurrentLinkedDeque> pool = new ConcurrentLinkedDeque<>(); + private final AtomicLong poolSize = new AtomicLong(0); private final Supplier supplier; + private final Thread poolCleaner; + private final long softPoolBaselineSize; + private final long cleanerPauseSeconds; + + public SoftPool(ClusterConfig config, Supplier supplier) { + this.supplier = supplier; + + softPoolBaselineSize = config.getSoftPoolBaselineSize(); + cleanerPauseSeconds = config.getSoftPoolCleanerPause().toSeconds(); + + if (softPoolBaselineSize <= 0 || cleanerPauseSeconds <= 0) { + log.debug("Not creating a Cleaner."); + poolCleaner = null; + return; + } + + poolCleaner = new Thread(this::cleanPool, "SoftPool Cleaner"); + // Should not prevent the JVM shutdown -> daemon + poolCleaner.setDaemon(true); + poolCleaner.start(); + } + + /** + * Offer/return a reusable object to the pool. + * + * @param v the object to return to the pool. + */ + public void offer(T v) { + pool.addLast(new SoftReference<>(v)); + + final long currentPoolSize = poolSize.incrementAndGet(); + + log.trace("Pool size: {} (offer)", currentPoolSize); + } /** * Returns a reusable element from the pool if available or @@ -18,8 +57,13 @@ public class SoftPool { */ public T borrow() { SoftReference result; + // First check the pool for available/returned elements while ((result = pool.poll()) != null) { + final long currentPoolSize = poolSize.decrementAndGet(); + + log.trace("Pool size: {} (borrow)", currentPoolSize); + // The pool had an element, inspect if it is still valid final T elem = result.get(); if (elem != null) { @@ -33,10 +77,17 @@ public T borrow() { } /** - * Offer/return a reusable object to the pool. - * @param v the object to return to the pool. + * Trims the pool in a custom interval so that soft references get purged earlier */ - public void offer(T v) { - pool.addLast(new SoftReference<>(v)); + private void cleanPool() { + while (true) { + Uninterruptibles.sleepUninterruptibly(cleanerPauseSeconds, TimeUnit.SECONDS); + + log.trace("Running pool cleaner"); + while (poolSize.get() > softPoolBaselineSize) { + // Poll until we reached the baseline + borrow(); + } + } } }