diff --git a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java index e7273b8860..83b11c88a9 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java @@ -84,7 +84,7 @@ public void run(Bootstrap bootstrap, Namespace namespace, Conque @NotNull private Bootstrap getShardBootstrap(ConqueryConfig configuration, int id) { final Bootstrap bootstrapShard = new Bootstrap<>(conquery); - ConqueryConfig clone = configuration; + ConqueryConfig clone; if (configuration.getStorage() instanceof XodusStoreFactory) { final Path managerDir = ((XodusStoreFactory) configuration.getStorage()).getDirectory().resolve("shard-node" + id); diff --git a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java index ab7999c5ab..c6711aa7b8 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java @@ -36,6 +36,7 @@ import com.bakdata.conquery.models.worker.Worker; import com.bakdata.conquery.models.worker.WorkerInformation; import com.bakdata.conquery.models.worker.Workers; +import com.bakdata.conquery.resources.admin.ShutdownTask; import com.bakdata.conquery.util.io.ConqueryMDC; import com.fasterxml.jackson.databind.DeserializationConfig; import com.fasterxml.jackson.databind.ObjectMapper; @@ -74,6 +75,7 @@ public class ShardNode extends ServerCommand implements IoHandle private Workers workers; @Setter private ScheduledExecutorService scheduler; + ShutdownTask shutdown; public ShardNode(Conquery conquery) { @@ -140,6 +142,10 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig log.info("All Workers loaded: {}", this.workers.getWorkers().size()); + shutdown = new ShutdownTask(); + environment.admin().addTask(shutdown); + environment.lifecycle().addServerLifecycleListener(shutdown); + super.run(environment, namespace, config); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ShutdownShard.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ShutdownShard.java index 4990f3464a..b24e8aadb8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ShutdownShard.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ShutdownShard.java @@ -1,5 +1,9 @@ package com.bakdata.conquery.models.messages.namespaces.specific; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.util.Collections; + import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.models.messages.network.MessageToShardNode; import com.bakdata.conquery.models.messages.network.NetworkMessage; @@ -21,7 +25,7 @@ public void react(NetworkMessageContext.ShardNodeNetworkContext context) throws @Override public void run() { try { - context.getShardNode().stop(); + context.getShardNode().getShutdown().execute(Collections.emptyMap(), new PrintWriter(OutputStream.nullOutputStream(), false)); } catch (Exception e) { log.error("Failed while shutting down Shard", e); diff --git a/backend/src/test/java/com/bakdata/conquery/util/support/ConfigOverride.java b/backend/src/test/java/com/bakdata/conquery/util/support/ConfigOverride.java index 3c67c5ddbc..e59f00b7d8 100644 --- a/backend/src/test/java/com/bakdata/conquery/util/support/ConfigOverride.java +++ b/backend/src/test/java/com/bakdata/conquery/util/support/ConfigOverride.java @@ -1,8 +1,13 @@ package com.bakdata.conquery.util.support; import java.io.File; +import java.io.IOException; import java.net.ServerSocket; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.config.XodusStoreFactory; @@ -14,13 +19,13 @@ /** * This interface allows to override the configuration used in tests. - * */ @TestInstance(Lifecycle.PER_CLASS) public interface ConfigOverride { /** * Is called upon initialization of the test instance of Conquery. + * * @param config The configuration that is initialized with the defaults. */ void override(ConqueryConfig config); @@ -41,11 +46,19 @@ static ConqueryConfig defaultConfig(File workDir) { @SneakyThrows static void configureRandomPorts(ConqueryConfig config) { - try (ServerSocket s0 = new ServerSocket(0); ServerSocket s1 = new ServerSocket(0); ServerSocket s2 = new ServerSocket(0)) { + try ( + ClosableSocketSupplier sockets = new ClosableSocketSupplier() + ) { // set random open ports - ((HttpConnectorFactory) ((DefaultServerFactory) config.getServerFactory()).getAdminConnectors().get(0)).setPort(s0.getLocalPort()); - ((HttpConnectorFactory) ((DefaultServerFactory) config.getServerFactory()).getApplicationConnectors().get(0)).setPort(s1.getLocalPort()); - config.getCluster().setPort(s2.getLocalPort()); + ((HttpConnectorFactory) ((DefaultServerFactory) config.getServerFactory()).getAdminConnectors().get(0)).setPort(sockets.get().getLocalPort()); + ((HttpConnectorFactory) ((DefaultServerFactory) config.getServerFactory()).getApplicationConnectors().get(0)).setPort(sockets.get().getLocalPort()); + config.getCluster().setPort(sockets.get().getLocalPort()); + + config.getStandalone().getShards().stream() + .flatMap(shard -> Stream.concat( + shard.getAdminConnectors().stream(), + shard.getApplicationConnectors().stream() + )).forEach(c -> ((HttpConnectorFactory) c).setPort(sockets.get().getLocalPort())); } } @@ -55,4 +68,35 @@ static void configureStoragePath(ConqueryConfig config, Path workdir) { } } + /** + * Small helper to find open ports by opening random ports together in one context. + * A previous implementation opened and closed ports individually which could cause a port binding collision + * much easier. + */ + static class ClosableSocketSupplier implements Supplier, AutoCloseable { + + private final List openSockets = new ArrayList<>(); + + @Override + public void close() { + openSockets.forEach((s) -> { + try { + s.close(); + } + catch (IOException e) { + throw new IllegalStateException(e); + } + }); + openSockets.clear(); + } + + @Override + @SneakyThrows + public ServerSocket get() { + final ServerSocket serverSocket = new ServerSocket(0); + openSockets.add(serverSocket); + return serverSocket; + } + } + } diff --git a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java index 44c4f49b1f..9da293ab83 100644 --- a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java +++ b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java @@ -227,7 +227,7 @@ private StandaloneSupport buildSupport(DatasetId datasetId, String name, Standal File localTmpDir = new File(tmpDir, "tmp_" + name); if (!localTmpDir.exists()) { - if (!localTmpDir.mkdir()) { + if (!localTmpDir.mkdirs()) { throw new IllegalStateException("Could not create directory for Support:" + localTmpDir); } }