Skip to content

Commit

Permalink
makes logging more consistent by propagating the log factory
Browse files Browse the repository at this point in the history
Signed-off-by: Max Thonagel <12283268+thoniTUB@users.noreply.github.com>
  • Loading branch information
thoniTUB committed Apr 2, 2024
1 parent 2fc2bea commit 632ac0f
Show file tree
Hide file tree
Showing 16 changed files with 167 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ private Bootstrap<ConqueryConfig> getShardBootstrap(ConqueryConfig configuration
serverFactory.setApplicationConnectors(standaloneShardConfig.getApplicationConnectors());
clone.setServerFactory(serverFactory);

// Lombok's With does not cover super class members
clone.setLoggingFactory(configuration.getLoggingFactory());


final ConqueryConfig finalClone = clone;

Expand Down Expand Up @@ -136,7 +139,6 @@ public List<ShardNode> getShardNodes() {

public void run(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception {
// start ManagerNode
ConqueryMDC.setLocation("ManagerNode");
log.debug("Starting ManagerNode");

ConqueryConfig managerConfig = config;
Expand All @@ -150,7 +152,6 @@ public void run(Environment environment, Namespace namespace, ConqueryConfig con

managerNode.run(manager);

ConqueryMDC.setLocation("ManagerNode");
log.debug("Starting REST Server");
ConqueryMDC.setLocation(null);
super.run(environment, namespace, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.bakdata.conquery.tasks.PermissionCleanupTask;
import com.bakdata.conquery.tasks.QueryCleanupTask;
import com.bakdata.conquery.tasks.ReloadMetaStorageTask;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
Expand Down Expand Up @@ -79,6 +80,7 @@ public ManagerNode(@NonNull String name) {
}

public void run(Manager manager) throws InterruptedException {
ConqueryMDC.setNode(getName());
Environment environment = manager.getEnvironment();
ConqueryConfig config = manager.getConfig();
validator = environment.getValidator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.bakdata.conquery.io.mina.ChunkWriter;
import com.bakdata.conquery.io.mina.NetworkSession;
import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.mode.cluster.ClusterConnectionManager;
import com.bakdata.conquery.mode.cluster.ClusterMetrics;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.JobManager;
Expand Down Expand Up @@ -49,6 +50,7 @@
import lombok.extern.slf4j.Slf4j;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.mina.core.RuntimeIoException;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
Expand Down Expand Up @@ -88,6 +90,7 @@ public ShardNode(Conquery conquery, String name) {

@Override
protected void run(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception {
ConqueryMDC.setNode(getName());

connector = new NioSocketConnector();

Expand Down Expand Up @@ -119,6 +122,7 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig
Queue<Worker> workersDone = new ConcurrentLinkedQueue<>();
for (WorkerStorage workerStorage : workerStorages) {
loaders.submit(() -> {
ConqueryMDC.setNode(getName());
try {
workersDone.add(workers.createWorker(workerStorage, config.isFailOnError()));
}
Expand Down Expand Up @@ -270,7 +274,9 @@ public void start() throws Exception {
ObjectMapper om = createInternalObjectMapper(View.InternalCommunication.class);

BinaryJacksonCoder coder = new BinaryJacksonCoder(workers, validator, om);
connector.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om)));
final DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
filterChain.addFirst("mdc", new ClusterConnectionManager.ConqueryMdcFilter(ConqueryMDC.getNode()));
filterChain.addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om)));
connector.setHandler(this);
final ConqueryConfig configuration = getConfiguration();
connector.getSessionConfig().setAll(configuration.getCluster().getMina());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public void loadData() {
}
}
});
log.debug("\tloaded store {}: {} entries, {} within {}",
log.debug(
"Loaded store {}: {} entries, {} within {}",
this,
cache.values().size(),
BinaryByteUnit.format(totalSize.sum()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
public interface ManagerProvider {

String JOB_MANAGER_NAME = "ManagerNode";
String JOB_MANAGER_NAME = "manager";

Manager provideManager(ConqueryConfig config, Environment environment);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.filterchain.IoFilterEvent;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.util.CommonEventFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

/**
Expand All @@ -52,27 +55,23 @@ public class ClusterConnectionManager extends IoHandlerAdapter {

@Override
public void sessionOpened(IoSession session) {
ConqueryMDC.setLocation("ManagerNode[" + session.getLocalAddress().toString() + "]");
log.info("New client {} connected, waiting for identity", session.getRemoteAddress());

SharedMetricRegistries.getDefault().registerAll(new ClusterMetrics(session));
}

@Override
public void sessionClosed(IoSession session) {
ConqueryMDC.setLocation("ManagerNode[" + session.getLocalAddress().toString() + "]");
log.info("Client '{}' disconnected ", session.getAttribute(MinaAttributes.IDENTIFIER));
}

@Override
public void exceptionCaught(IoSession session, Throwable cause) {
ConqueryMDC.setLocation("ManagerNode[" + session.getLocalAddress().toString() + "]");
log.error("caught exception", cause);
}

@Override
public void messageReceived(IoSession session, Object message) {
ConqueryMDC.setLocation("ManagerNode[" + session.getLocalAddress().toString() + "]");
if (message instanceof MessageToManagerNode toManagerNode) {

log.trace("ManagerNode received {} from {}", message.getClass().getSimpleName(), session.getRemoteAddress());
Expand Down Expand Up @@ -104,11 +103,13 @@ public void start() throws IOException {
ObjectMapper om = internalObjectMapperCreator.createInternalObjectMapper(View.InternalCommunication.class);
config.configureObjectMapper(om);
BinaryJacksonCoder coder = new BinaryJacksonCoder(datasetRegistry, validator, om);
acceptor.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om)));
final DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
filterChain.addFirst("mdc", new ConqueryMdcFilter(ConqueryMDC.getNode()));
filterChain.addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om)));
acceptor.setHandler(this);
acceptor.getSessionConfig().setAll(config.getCluster().getMina());
acceptor.bind(new InetSocketAddress(config.getCluster().getPort()));
log.info("Started ManagerNode @ {}", acceptor.getLocalAddress());
log.info("Started Cluster Socket @ {}", acceptor.getLocalAddress());
}

public void stop() {
Expand All @@ -130,4 +131,53 @@ public void stop() {
}

}

@RequiredArgsConstructor
public static class ConqueryMdcFilter extends CommonEventFilter {
final private String node;

private ThreadLocal<Integer> callDepth = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};

/**
* Adapted from {@link org.apache.mina.filter.logging.MdcInjectionFilter}
*/
@Override
protected void filter(IoFilterEvent event) throws Exception {

// since this method can potentially call into itself
// we need to check the call depth before clearing the MDC
int currentCallDepth = callDepth.get();
callDepth.set(currentCallDepth + 1);

if (currentCallDepth == 0) {
/* copy context to the MDC when necessary. */
ConqueryMDC.setNode(node);
ConqueryMDC.setLocation(event.getSession().getLocalAddress().toString());
}

try {
/* propagate event down the filter chain */
event.fire();
}
finally {
if (currentCallDepth == 0) {
/* remove context from the MDC */
ConqueryMDC.clearNode();
ConqueryMDC.clearLocation();

callDepth.remove();
}
else {
callDepth.set(currentCallDepth);
}
}


}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ public class JobExecutor extends Thread {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final boolean failOnError;

public JobExecutor(String name, boolean failOnError) {
private final String node;

public JobExecutor(String name, String node, boolean failOnError) {
super(name);

this.node = node;
this.failOnError = failOnError;
JobMetrics.createJobQueueGauge(name, jobs);
}
Expand Down Expand Up @@ -95,6 +98,7 @@ public void close() {
@Override
@SneakyThrows // If failOnError is true
public void run() {
ConqueryMDC.setNode(node);
ConqueryMDC.setLocation(this.getName());

while(!closed.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ public class JobManager implements Closeable {

public JobManager(String name, boolean failOnError) {

slowExecutor = new JobExecutor("Job Manager slow " + name, failOnError);
fastExecutor = new JobExecutor("Job Manager fast " + name, failOnError);
slowExecutor = new JobExecutor("Job Manager slow", name, failOnError);
fastExecutor = new JobExecutor("Job Manager fast", name, failOnError);

slowExecutor.setUncaughtExceptionHandler(notifyExecutorDied);
fastExecutor.setUncaughtExceptionHandler(notifyExecutorDied);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.bakdata.conquery.models.messages.network.NetworkMessage;
import com.bakdata.conquery.models.messages.network.NetworkMessageContext.ManagerNodeNetworkContext;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.bakdata.conquery.util.progressreporter.ProgressReporter;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand All @@ -27,7 +26,6 @@ public class ForwardToNamespace extends MessageToManagerNode implements SlowMess
@Override
public void react(ManagerNodeNetworkContext context) throws Exception {
DistributedNamespace ns = Objects.requireNonNull(context.getDatasetRegistry().get(datasetId), () -> String.format("Missing dataset `%s`", datasetId));
ConqueryMDC.setLocation(ns.getStorage().getDataset().toString());
message.react(ns);
if (message instanceof ReactionMessage reactionMessage) {
ns.getWorkerHandler().handleReactionMessage(reactionMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,28 @@
public class ConqueryMDC {

private static final String LOCATION = "location";
private static final String NODE = "node";

public static void setLocation(String location) {
MDC.put(LOCATION, location);
}

/**
* Use to set the node name (e.g. manager, shard-0, ...) in log message
*/
public static void setNode(String node) {
MDC.put(NODE, node);
}

public static void clearLocation() {
MDC.remove(LOCATION);
}

public static void clearNode() {
MDC.remove(NODE);
}

public static String getNode() {
return MDC.get(NODE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import com.bakdata.conquery.TestTags;
import com.bakdata.conquery.integration.json.JsonIntegrationTest;
import com.bakdata.conquery.integration.json.TestDataImporter;
Expand All @@ -30,9 +32,11 @@
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.config.Dialect;
import com.bakdata.conquery.models.config.SqlConnectorConfig;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.bakdata.conquery.util.support.ConfigOverride;
import com.bakdata.conquery.util.support.StandaloneSupport;
import com.bakdata.conquery.util.support.TestConquery;
import com.bakdata.conquery.util.support.TestLoggingFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Strings;
Expand All @@ -45,6 +49,7 @@
import org.junit.jupiter.api.DynamicContainer;
import org.junit.jupiter.api.DynamicNode;
import org.junit.jupiter.api.DynamicTest;
import org.slf4j.LoggerFactory;

@Slf4j
public class IntegrationTests {
Expand All @@ -56,6 +61,13 @@ public class IntegrationTests {

static {

LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
final Logger logger = lc.getLogger(Logger.ROOT_LOGGER_NAME);

logger.detachAppender("console");
logger.addAppender(TestLoggingFactory.getConsoleAppender("console", lc));
ConqueryMDC.setNode("test");

final ObjectMapper mapper = Jackson.MAPPER.copy();

MAPPER = mapper.setConfig(mapper.getDeserializationConfig().withView(View.Persistence.class))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.bakdata.conquery.util.support;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

import lombok.SneakyThrows;

/**
* Small helper to find open ports by opening random ports together in one context.
* A previous implementation opened and closed ports individually which could cause a port binding collision
* much easier.
*/
public class ClosableSocketSupplier implements Supplier<ServerSocket>, AutoCloseable {

private final List<ServerSocket> openSockets = new ArrayList<>();

@Override
public void close() {
openSockets.forEach((s) -> {
try {
s.close();
}
catch (IOException e) {
throw new IllegalStateException(e);
}
});
openSockets.clear();
}

@Override
@SneakyThrows
public ServerSocket get() {
final ServerSocket serverSocket = new ServerSocket(0);
openSockets.add(serverSocket);
return serverSocket;
}
}
Loading

0 comments on commit 632ac0f

Please sign in to comment.