diff --git a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java index 60064ff0fa..1a9a0ab165 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java @@ -4,11 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Vector; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import com.bakdata.conquery.Conquery; import com.bakdata.conquery.mode.cluster.ClusterManager; @@ -84,7 +80,7 @@ public void startStandalone(Environment environment, Namespace namespace, Conque .setNameFormat("ShardNode Storage Loader %d") .setUncaughtExceptionHandler((t, e) -> { ConqueryMDC.setLocation(t.getName()); - log.error(t.getName() + " failed to init storage of ShardNode", e); + log.error("{} failed to init storage of ShardNode", t.getName(), e); }) .build() ); @@ -121,6 +117,7 @@ public void startStandalone(Environment environment, Namespace namespace, Conque for (Future f : tasks) { try { f.get(); + } catch (ExecutionException e) { log.error("during ShardNodes creation", e); diff --git a/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java b/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java index c0cd3886e8..004048186d 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java @@ -1,7 +1,6 @@ package com.bakdata.conquery.commands; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -9,6 +8,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import jakarta.validation.Validator; import com.bakdata.conquery.io.cps.CPSTypeIdResolver; import com.bakdata.conquery.io.jackson.MutableInjectableValues; @@ -31,7 +31,6 @@ import com.bakdata.conquery.tasks.PermissionCleanupTask; import com.bakdata.conquery.tasks.QueryCleanupTask; import com.bakdata.conquery.tasks.ReloadMetaStorageTask; -import com.bakdata.conquery.util.io.ConqueryMDC; import com.fasterxml.jackson.databind.DeserializationConfig; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationConfig; @@ -39,16 +38,11 @@ import io.dropwizard.core.setup.Environment; import io.dropwizard.jersey.DropwizardResourceConfig; import io.dropwizard.lifecycle.Managed; -import jakarta.validation.Validator; import lombok.Getter; import lombok.NonNull; import lombok.SneakyThrows; import lombok.experimental.Delegate; import lombok.extern.slf4j.Slf4j; -import org.apache.mina.core.service.IoHandlerAdapter; -import org.apache.mina.core.session.IdleStatus; -import org.apache.mina.core.session.IoSession; -import org.apache.mina.filter.FilterEvent; import org.glassfish.jersey.internal.inject.AbstractBinder; /** @@ -58,7 +52,7 @@ */ @Slf4j @Getter -public class ManagerNode extends IoHandlerAdapter implements Managed { +public class ManagerNode implements Managed { public static final String DEFAULT_NAME = "manager"; @@ -281,45 +275,4 @@ public void stop() throws Exception { } } - - private void setLocation(IoSession session) { - final String loc = session.getLocalAddress().toString(); - ConqueryMDC.setLocation(loc); - } - - @Override - public void sessionClosed(IoSession session) { - setLocation(session); - log.info("Disconnected."); - } - - @Override - public void sessionCreated(IoSession session) { - setLocation(session); - log.debug("Session created."); - } - - @Override - public void sessionIdle(IoSession session, IdleStatus status) { - setLocation(session); - log.warn("Session idle {}. Last read: {}. Last write: {}.", status, Instant.ofEpochMilli(session.getLastReadTime()), Instant.ofEpochMilli(session.getLastWriteTime())); - } - - @Override - public void messageSent(IoSession session, Object message) { - setLocation(session); - log.trace("Message sent: {}", message); - } - - @Override - public void inputClosed(IoSession session) { - setLocation(session); - log.info("Session closed."); - } - - @Override - public void event(IoSession session, FilterEvent event) throws Exception { - setLocation(session); - log.trace("Event handled: {}", event); - } } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java index 20d4ff0c6e..c25f63a08e 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java @@ -7,8 +7,6 @@ import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.worker.DatasetRegistry; -import com.bakdata.conquery.models.worker.DistributedNamespace; -import com.bakdata.conquery.models.worker.LocalNamespace; import com.bakdata.conquery.models.worker.Namespace; import io.dropwizard.core.setup.Environment; @@ -29,26 +27,10 @@ static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig return new InternalObjectMapperCreator(config, validator); } - static DatasetRegistry createDistributedDatasetRegistry( - NamespaceHandler namespaceHandler, - ConqueryConfig config, - InternalObjectMapperCreator creator - ) { - return createDatasetRegistry(namespaceHandler, creator, config); - } - - static DatasetRegistry createLocalDatasetRegistry( - NamespaceHandler namespaceHandler, + static DatasetRegistry createDatasetRegistry( + NamespaceHandler namespaceHandler, ConqueryConfig config, InternalObjectMapperCreator creator - ) { - return createDatasetRegistry(namespaceHandler, creator, config); - } - - private static DatasetRegistry createDatasetRegistry( - NamespaceHandler namespaceHandler, - InternalObjectMapperCreator creator, - ConqueryConfig config ) { final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), config.getIndex().getEmptyLabel()); DatasetRegistry datasetRegistry = new DatasetRegistry<>( diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java index 9a2340c5db..04f622708c 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java @@ -2,28 +2,22 @@ import java.io.IOException; import java.net.InetSocketAddress; +import jakarta.validation.Validator; import com.bakdata.conquery.io.jackson.View; -import com.bakdata.conquery.io.mina.BinaryJacksonCoder; -import com.bakdata.conquery.io.mina.CQProtocolCodecFilter; -import com.bakdata.conquery.io.mina.ChunkReader; -import com.bakdata.conquery.io.mina.ChunkWriter; -import com.bakdata.conquery.io.mina.MinaAttributes; -import com.bakdata.conquery.io.mina.NetworkSession; +import com.bakdata.conquery.io.mina.*; import com.bakdata.conquery.mode.InternalObjectMapperCreator; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.jobs.Job; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.jobs.ReactingJob; import com.bakdata.conquery.models.messages.SlowMessage; -import com.bakdata.conquery.models.messages.namespaces.specific.ShutdownShard; import com.bakdata.conquery.models.messages.network.MessageToManagerNode; import com.bakdata.conquery.models.messages.network.NetworkMessageContext; import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.DistributedNamespace; import com.bakdata.conquery.util.io.ConqueryMDC; import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.validation.Validator; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -108,8 +102,6 @@ public void start() throws IOException { } public void stop() { - clusterState.getShardNodes().forEach(((socketAddress, shardNodeInformation) -> shardNodeInformation.send(new ShutdownShard()))); - try { acceptor.dispose(); } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java index 81dca7d407..1579acf868 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java @@ -4,12 +4,7 @@ import java.util.List; import java.util.function.Supplier; -import com.bakdata.conquery.mode.DelegateManager; -import com.bakdata.conquery.mode.ImportHandler; -import com.bakdata.conquery.mode.InternalObjectMapperCreator; -import com.bakdata.conquery.mode.ManagerProvider; -import com.bakdata.conquery.mode.NamespaceHandler; -import com.bakdata.conquery.mode.StorageListener; +import com.bakdata.conquery.mode.*; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.worker.ClusterHealthCheck; @@ -27,7 +22,7 @@ public ClusterManager provideManager(ConqueryConfig config, Environment environm final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator()); final ClusterState clusterState = new ClusterState(); final NamespaceHandler namespaceHandler = new ClusterNamespaceHandler(clusterState, config, creator); - final DatasetRegistry datasetRegistry = ManagerProvider.createDistributedDatasetRegistry(namespaceHandler, config, creator); + final DatasetRegistry datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator); creator.init(datasetRegistry); final ClusterConnectionManager connectionManager = diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java index a6d5f63277..762a754eb8 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java @@ -34,7 +34,7 @@ public DelegateManager provideManager(ConqueryConfig config, Env InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator()); NamespaceHandler namespaceHandler = new LocalNamespaceHandler(config, creator, dialectFactory); - DatasetRegistry datasetRegistry = ManagerProvider.createLocalDatasetRegistry(namespaceHandler, config, creator); + DatasetRegistry datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator); creator.init(datasetRegistry); return new DelegateManager<>( diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ShutdownShard.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ShutdownShard.java deleted file mode 100644 index 4990f3464a..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ShutdownShard.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.bakdata.conquery.models.messages.namespaces.specific; - -import com.bakdata.conquery.io.cps.CPSType; -import com.bakdata.conquery.models.messages.network.MessageToShardNode; -import com.bakdata.conquery.models.messages.network.NetworkMessage; -import com.bakdata.conquery.models.messages.network.NetworkMessageContext; -import com.fasterxml.jackson.annotation.JsonCreator; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@CPSType(id = "SHUTDOWN_SHARD", base = NetworkMessage.class) -@Slf4j -@RequiredArgsConstructor(onConstructor_ = @JsonCreator) -@Getter -public class ShutdownShard extends MessageToShardNode.Slow { - - @Override - public void react(NetworkMessageContext.ShardNodeNetworkContext context) throws Exception { - final Thread shutdownDaemon = new Thread("shard shutdown waiter thread") { - @Override - public void run() { - try { - context.getShardNode().stop(); - } - catch (Exception e) { - log.error("Failed while shutting down Shard", e); - } - } - }; - - shutdownDaemon.setDaemon(true); - shutdownDaemon.start(); - } -}