From b29ff709b1b67d5ccfba159a8090ec6210bccf4a Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Thu, 20 Feb 2025 09:42:51 +0100 Subject: [PATCH] uses ThreadPool for reading instead of mixing Reader and Thread --- .../io/mina/PipedJacksonProtocolFilter.java | 55 ++++++++++--------- .../conquery/models/config/ClusterConfig.java | 5 +- 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/PipedJacksonProtocolFilter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/PipedJacksonProtocolFilter.java index 40690f0228..420052501b 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/PipedJacksonProtocolFilter.java +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/PipedJacksonProtocolFilter.java @@ -5,6 +5,8 @@ import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import com.bakdata.conquery.models.messages.network.NetworkMessage; import com.fasterxml.jackson.databind.ObjectMapper; @@ -28,14 +30,23 @@ public class PipedJacksonProtocolFilter extends IoFilterAdapter { private static final AttributeKey READER_KEY = new AttributeKey(PipedJacksonProtocolFilter.class, "reader"); - @ToString.Include - private final String name; private final ObjectMapper mapper; + private final ExecutorService readerThreads = Executors.newCachedThreadPool( + (runnable) -> { + Thread thread = Thread.ofVirtual() + .name("%s#reader".formatted(getClass().getSimpleName())) + .unstarted(runnable); + + thread.setDaemon(true); + return thread; + } + ); + + @Override public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { final IoBuffer buffer = (IoBuffer) message; - while (buffer.hasRemaining()) { synchronized (session) { Reader asyncReader = getAsyncReader(session); @@ -65,9 +76,9 @@ private Reader getAsyncReader(IoSession session) { @NotNull private Reader newAsyncReader(int remaining, NextFilter nextFilter, IoSession session) { final Reader reader = new Reader(remaining, mapper, nextFilter, session); - reader.start(); - reader.setName("%s.%s/%s".formatted(getClass().getSimpleName(), name, session.toString())); + readerThreads.submit(reader::read); + session.setAttribute(READER_KEY, reader); return reader; } @@ -99,7 +110,7 @@ public synchronized void filterWrite(NextFilter nextFilter, IoSession session, W } @Data - private class Reader extends Thread { + private class Reader { private final ObjectMapper mapper; private final PipedOutputStream outputStream; @@ -115,44 +126,36 @@ public Reader(int remaining, ObjectMapper mapper, NextFilter nextFilter, IoSessi this.session = session; this.remaining = remaining; outputStream = new PipedOutputStream(); - inputStream = new PipedInputStream(outputStream, ((int) DataSize.megabytes(1).toBytes())); - setDaemon(true); + inputStream = new PipedInputStream(outputStream, ((int) DataSize.kilobytes(64).toBytes())); } public boolean receive(IoBuffer buffer) throws IOException { - if (remaining > buffer.remaining()) { - remaining -= buffer.remaining(); + final int reading = Math.min(buffer.remaining(), remaining); - try (InputStream inputStream = buffer.asInputStream()) { - inputStream.transferTo(getOutputStream()); - } - - return false; - } - // else remaining <= buffer.remaining() - - final IoBuffer slice = buffer.getSlice(remaining); + final IoBuffer slice = buffer.getSlice(reading); try (InputStream inputStream = slice.asInputStream()) { // This stupid method mutates BOTH buffer and slice WTF. final long written = inputStream.transferTo(getOutputStream()); remaining -= (int) written; } - getOutputStream().close(); - return true; + if (remaining <= 0) { + getOutputStream().close(); + return true; + } + + return false; } - @Override - public void run() { + public void read() { try { - // It's important to not reuse the parsers! final NetworkMessage parsed = mapper.readValue(inputStream, NetworkMessage.class); - log.trace("{} Received {}", getName(), parsed); + log.trace("Received {}", parsed); nextFilter.messageReceived(session, parsed); } catch (IOException e) { - log.error("Something went wrong", e); + log.error("Something went wrong reading from {}", session, e); } } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java index a5574994ba..b5c1402a28 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java @@ -11,7 +11,6 @@ import com.bakdata.conquery.io.mina.MdcFilter; import com.bakdata.conquery.io.mina.PipedJacksonProtocolFilter; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; import io.dropwizard.core.Configuration; import io.dropwizard.util.Duration; @@ -88,7 +87,7 @@ public NioSocketConnector getClusterConnector(ObjectMapper om, IoHandler ioHandl final NioSocketConnector connector = new NioSocketConnector(); - IoFilter codecFilter = new PipedJacksonProtocolFilter("shard_" + mdcLocation, om); + IoFilter codecFilter = new PipedJacksonProtocolFilter(om); connector.getFilterChain().addFirst("mdc", new MdcFilter(mdcLocation)); connector.getFilterChain().addLast("codec", codecFilter); @@ -104,7 +103,7 @@ public NioSocketAcceptor getClusterAcceptor(ObjectMapper om, IoHandler ioHandler NioSocketAcceptor acceptor = new NioSocketAcceptor(); - IoFilter codecFilter = new PipedJacksonProtocolFilter("manager" + mdcLocation, om); + IoFilter codecFilter = new PipedJacksonProtocolFilter(om); acceptor.getFilterChain().addFirst("mdc", new MdcFilter(mdcLocation)); acceptor.getFilterChain().addLast("codec", codecFilter);