Skip to content

Commit

Permalink
uses ThreadPool for reading instead of mixing Reader and Thread
Browse files Browse the repository at this point in the history
  • Loading branch information
awildturtok committed Feb 20, 2025
1 parent e119a55 commit b29ff70
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.bakdata.conquery.models.messages.network.NetworkMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -28,14 +30,23 @@
public class PipedJacksonProtocolFilter extends IoFilterAdapter {
private static final AttributeKey READER_KEY = new AttributeKey(PipedJacksonProtocolFilter.class, "reader");

@ToString.Include
private final String name;
private final ObjectMapper mapper;

private final ExecutorService readerThreads = Executors.newCachedThreadPool(
(runnable) -> {
Thread thread = Thread.ofVirtual()
.name("%s#reader".formatted(getClass().getSimpleName()))
.unstarted(runnable);

thread.setDaemon(true);
return thread;
}
);


@Override
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
final IoBuffer buffer = (IoBuffer) message;

while (buffer.hasRemaining()) {
synchronized (session) {
Reader asyncReader = getAsyncReader(session);
Expand Down Expand Up @@ -65,9 +76,9 @@ private Reader getAsyncReader(IoSession session) {
@NotNull
private Reader newAsyncReader(int remaining, NextFilter nextFilter, IoSession session) {
final Reader reader = new Reader(remaining, mapper, nextFilter, session);
reader.start();

reader.setName("%s.%s/%s".formatted(getClass().getSimpleName(), name, session.toString()));
readerThreads.submit(reader::read);

session.setAttribute(READER_KEY, reader);
return reader;
}
Expand Down Expand Up @@ -99,7 +110,7 @@ public synchronized void filterWrite(NextFilter nextFilter, IoSession session, W
}

@Data
private class Reader extends Thread {
private class Reader {

private final ObjectMapper mapper;
private final PipedOutputStream outputStream;
Expand All @@ -115,44 +126,36 @@ public Reader(int remaining, ObjectMapper mapper, NextFilter nextFilter, IoSessi
this.session = session;
this.remaining = remaining;
outputStream = new PipedOutputStream();
inputStream = new PipedInputStream(outputStream, ((int) DataSize.megabytes(1).toBytes()));
setDaemon(true);
inputStream = new PipedInputStream(outputStream, ((int) DataSize.kilobytes(64).toBytes()));
}

public boolean receive(IoBuffer buffer) throws IOException {
if (remaining > buffer.remaining()) {
remaining -= buffer.remaining();
final int reading = Math.min(buffer.remaining(), remaining);

try (InputStream inputStream = buffer.asInputStream()) {
inputStream.transferTo(getOutputStream());
}

return false;
}
// else remaining <= buffer.remaining()

final IoBuffer slice = buffer.getSlice(remaining);
final IoBuffer slice = buffer.getSlice(reading);
try (InputStream inputStream = slice.asInputStream()) {
// This stupid method mutates BOTH buffer and slice WTF.
final long written = inputStream.transferTo(getOutputStream());
remaining -= (int) written;
}

getOutputStream().close();
return true;
if (remaining <= 0) {
getOutputStream().close();
return true;
}

return false;
}

@Override
public void run() {
public void read() {
try {
// It's important to not reuse the parsers!
final NetworkMessage parsed = mapper.readValue(inputStream, NetworkMessage.class);
log.trace("{} Received {}", getName(), parsed);
log.trace("Received {}", parsed);

nextFilter.messageReceived(session, parsed);
}
catch (IOException e) {
log.error("Something went wrong", e);
log.error("Something went wrong reading from {}", session, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.bakdata.conquery.io.mina.MdcFilter;
import com.bakdata.conquery.io.mina.PipedJacksonProtocolFilter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.core.Configuration;
import io.dropwizard.util.Duration;
Expand Down Expand Up @@ -88,7 +87,7 @@ public NioSocketConnector getClusterConnector(ObjectMapper om, IoHandler ioHandl
final NioSocketConnector connector = new NioSocketConnector();


IoFilter codecFilter = new PipedJacksonProtocolFilter("shard_" + mdcLocation, om);
IoFilter codecFilter = new PipedJacksonProtocolFilter(om);

connector.getFilterChain().addFirst("mdc", new MdcFilter(mdcLocation));
connector.getFilterChain().addLast("codec", codecFilter);
Expand All @@ -104,7 +103,7 @@ public NioSocketAcceptor getClusterAcceptor(ObjectMapper om, IoHandler ioHandler

NioSocketAcceptor acceptor = new NioSocketAcceptor();

IoFilter codecFilter = new PipedJacksonProtocolFilter("manager" + mdcLocation, om);
IoFilter codecFilter = new PipedJacksonProtocolFilter(om);

acceptor.getFilterChain().addFirst("mdc", new MdcFilter(mdcLocation));
acceptor.getFilterChain().addLast("codec", codecFilter);
Expand Down

0 comments on commit b29ff70

Please sign in to comment.