Skip to content

Commit

Permalink
Merge pull request #3469 from ingef/fix/separate-manager-restart
Browse files Browse the repository at this point in the history
Fix/separate manager restart
  • Loading branch information
thoniTUB authored Jun 17, 2024
2 parents d668265 + fee6ba1 commit 9789c12
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import com.bakdata.conquery.Conquery;
import com.bakdata.conquery.mode.cluster.ClusterManager;
Expand Down Expand Up @@ -84,7 +80,7 @@ public void startStandalone(Environment environment, Namespace namespace, Conque
.setNameFormat("ShardNode Storage Loader %d")
.setUncaughtExceptionHandler((t, e) -> {
ConqueryMDC.setLocation(t.getName());
log.error(t.getName() + " failed to init storage of ShardNode", e);
log.error("{} failed to init storage of ShardNode", t.getName(), e);
})
.build()
);
Expand Down Expand Up @@ -121,6 +117,7 @@ public void startStandalone(Environment environment, Namespace namespace, Conque
for (Future<ShardNode> f : tasks) {
try {
f.get();

}
catch (ExecutionException e) {
log.error("during ShardNodes creation", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.bakdata.conquery.commands;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import jakarta.validation.Validator;

import com.bakdata.conquery.io.cps.CPSTypeIdResolver;
import com.bakdata.conquery.io.jackson.MutableInjectableValues;
Expand All @@ -31,24 +31,18 @@
import com.bakdata.conquery.tasks.PermissionCleanupTask;
import com.bakdata.conquery.tasks.QueryCleanupTask;
import com.bakdata.conquery.tasks.ReloadMetaStorageTask;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
import com.google.common.base.Throwables;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.jersey.DropwizardResourceConfig;
import io.dropwizard.lifecycle.Managed;
import jakarta.validation.Validator;
import lombok.Getter;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.experimental.Delegate;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.FilterEvent;
import org.glassfish.jersey.internal.inject.AbstractBinder;

/**
Expand All @@ -58,7 +52,7 @@
*/
@Slf4j
@Getter
public class ManagerNode extends IoHandlerAdapter implements Managed {
public class ManagerNode implements Managed {

public static final String DEFAULT_NAME = "manager";

Expand Down Expand Up @@ -281,45 +275,4 @@ public void stop() throws Exception {
}

}

private void setLocation(IoSession session) {
final String loc = session.getLocalAddress().toString();
ConqueryMDC.setLocation(loc);
}

@Override
public void sessionClosed(IoSession session) {
setLocation(session);
log.info("Disconnected.");
}

@Override
public void sessionCreated(IoSession session) {
setLocation(session);
log.debug("Session created.");
}

@Override
public void sessionIdle(IoSession session, IdleStatus status) {
setLocation(session);
log.warn("Session idle {}. Last read: {}. Last write: {}.", status, Instant.ofEpochMilli(session.getLastReadTime()), Instant.ofEpochMilli(session.getLastWriteTime()));
}

@Override
public void messageSent(IoSession session, Object message) {
setLocation(session);
log.trace("Message sent: {}", message);
}

@Override
public void inputClosed(IoSession session) {
setLocation(session);
log.info("Session closed.");
}

@Override
public void event(IoSession session, FilterEvent event) throws Exception {
setLocation(session);
log.trace("Event handled: {}", event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.models.worker.Namespace;
import io.dropwizard.core.setup.Environment;

Expand All @@ -29,26 +27,10 @@ static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig
return new InternalObjectMapperCreator(config, validator);
}

static DatasetRegistry<DistributedNamespace> createDistributedDatasetRegistry(
NamespaceHandler<DistributedNamespace> namespaceHandler,
ConqueryConfig config,
InternalObjectMapperCreator creator
) {
return createDatasetRegistry(namespaceHandler, creator, config);
}

static DatasetRegistry<LocalNamespace> createLocalDatasetRegistry(
NamespaceHandler<LocalNamespace> namespaceHandler,
static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(
NamespaceHandler<N> namespaceHandler,
ConqueryConfig config,
InternalObjectMapperCreator creator
) {
return createDatasetRegistry(namespaceHandler, creator, config);
}

private static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(
NamespaceHandler<N> namespaceHandler,
InternalObjectMapperCreator creator,
ConqueryConfig config
) {
final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), config.getIndex().getEmptyLabel());
DatasetRegistry<N> datasetRegistry = new DatasetRegistry<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,22 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import jakarta.validation.Validator;

import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.mina.BinaryJacksonCoder;
import com.bakdata.conquery.io.mina.CQProtocolCodecFilter;
import com.bakdata.conquery.io.mina.ChunkReader;
import com.bakdata.conquery.io.mina.ChunkWriter;
import com.bakdata.conquery.io.mina.MinaAttributes;
import com.bakdata.conquery.io.mina.NetworkSession;
import com.bakdata.conquery.io.mina.*;
import com.bakdata.conquery.mode.InternalObjectMapperCreator;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.Job;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.jobs.ReactingJob;
import com.bakdata.conquery.models.messages.SlowMessage;
import com.bakdata.conquery.models.messages.namespaces.specific.ShutdownShard;
import com.bakdata.conquery.models.messages.network.MessageToManagerNode;
import com.bakdata.conquery.models.messages.network.NetworkMessageContext;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.validation.Validator;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -108,8 +102,6 @@ public void start() throws IOException {
}

public void stop() {
clusterState.getShardNodes().forEach(((socketAddress, shardNodeInformation) -> shardNodeInformation.send(new ShutdownShard())));

try {
acceptor.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
import java.util.List;
import java.util.function.Supplier;

import com.bakdata.conquery.mode.DelegateManager;
import com.bakdata.conquery.mode.ImportHandler;
import com.bakdata.conquery.mode.InternalObjectMapperCreator;
import com.bakdata.conquery.mode.ManagerProvider;
import com.bakdata.conquery.mode.NamespaceHandler;
import com.bakdata.conquery.mode.StorageListener;
import com.bakdata.conquery.mode.*;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.ClusterHealthCheck;
Expand All @@ -27,7 +22,7 @@ public ClusterManager provideManager(ConqueryConfig config, Environment environm
final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator());
final ClusterState clusterState = new ClusterState();
final NamespaceHandler<DistributedNamespace> namespaceHandler = new ClusterNamespaceHandler(clusterState, config, creator);
final DatasetRegistry<DistributedNamespace> datasetRegistry = ManagerProvider.createDistributedDatasetRegistry(namespaceHandler, config, creator);
final DatasetRegistry<DistributedNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);
creator.init(datasetRegistry);

final ClusterConnectionManager connectionManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Env

InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator());
NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, dialectFactory);
DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createLocalDatasetRegistry(namespaceHandler, config, creator);
DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);
creator.init(datasetRegistry);

return new DelegateManager<>(
Expand Down

This file was deleted.

0 comments on commit 9789c12

Please sign in to comment.