Skip to content

Commit

Permalink
explicit standalone shard port config
Browse files Browse the repository at this point in the history
Signed-off-by: Max Thonagel <12283268+thoniTUB@users.noreply.github.com>
  • Loading branch information
thoniTUB committed Apr 2, 2024
1 parent 1ae944d commit 2fc2bea
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void run(Bootstrap<ConqueryConfig> bootstrap, Namespace namespace, Conque
@NotNull
private Bootstrap<ConqueryConfig> getShardBootstrap(ConqueryConfig configuration, int id) {
final Bootstrap<ConqueryConfig> bootstrapShard = new Bootstrap<>(conquery);
ConqueryConfig clone = configuration;
ConqueryConfig clone;

if (configuration.getStorage() instanceof XodusStoreFactory) {
final Path managerDir = ((XodusStoreFactory) configuration.getStorage()).getDirectory().resolve("shard-node" + id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.models.worker.WorkerInformation;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.resources.admin.ShutdownTask;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class ShardNode extends ServerCommand<ConqueryConfig> implements IoHandle
private Workers workers;
@Setter
private ScheduledExecutorService scheduler;
ShutdownTask shutdown;


public ShardNode(Conquery conquery) {
Expand Down Expand Up @@ -140,6 +142,10 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig

log.info("All Workers loaded: {}", this.workers.getWorkers().size());

shutdown = new ShutdownTask();
environment.admin().addTask(shutdown);
environment.lifecycle().addServerLifecycleListener(shutdown);

super.run(environment, namespace, config);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.bakdata.conquery.models.messages.namespaces.specific;

import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Collections;

import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.messages.network.MessageToShardNode;
import com.bakdata.conquery.models.messages.network.NetworkMessage;
Expand All @@ -21,7 +25,7 @@ public void react(NetworkMessageContext.ShardNodeNetworkContext context) throws
@Override
public void run() {
try {
context.getShardNode().stop();
context.getShardNode().getShutdown().execute(Collections.emptyMap(), new PrintWriter(OutputStream.nullOutputStream(), false));
}
catch (Exception e) {
log.error("Failed while shutting down Shard", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package com.bakdata.conquery.util.support;

import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Stream;

import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.config.XodusStoreFactory;
Expand All @@ -14,13 +19,13 @@

/**
* This interface allows to override the configuration used in tests.
*
*/
@TestInstance(Lifecycle.PER_CLASS)
public interface ConfigOverride {

/**
* Is called upon initialization of the test instance of Conquery.
*
* @param config The configuration that is initialized with the defaults.
*/
void override(ConqueryConfig config);
Expand All @@ -41,11 +46,19 @@ static ConqueryConfig defaultConfig(File workDir) {
@SneakyThrows
static void configureRandomPorts(ConqueryConfig config) {

try (ServerSocket s0 = new ServerSocket(0); ServerSocket s1 = new ServerSocket(0); ServerSocket s2 = new ServerSocket(0)) {
try (
ClosableSocketSupplier sockets = new ClosableSocketSupplier()
) {
// set random open ports
((HttpConnectorFactory) ((DefaultServerFactory) config.getServerFactory()).getAdminConnectors().get(0)).setPort(s0.getLocalPort());
((HttpConnectorFactory) ((DefaultServerFactory) config.getServerFactory()).getApplicationConnectors().get(0)).setPort(s1.getLocalPort());
config.getCluster().setPort(s2.getLocalPort());
((HttpConnectorFactory) ((DefaultServerFactory) config.getServerFactory()).getAdminConnectors().get(0)).setPort(sockets.get().getLocalPort());
((HttpConnectorFactory) ((DefaultServerFactory) config.getServerFactory()).getApplicationConnectors().get(0)).setPort(sockets.get().getLocalPort());
config.getCluster().setPort(sockets.get().getLocalPort());

config.getStandalone().getShards().stream()
.flatMap(shard -> Stream.concat(
shard.getAdminConnectors().stream(),
shard.getApplicationConnectors().stream()
)).forEach(c -> ((HttpConnectorFactory) c).setPort(sockets.get().getLocalPort()));
}
}

Expand All @@ -55,4 +68,35 @@ static void configureStoragePath(ConqueryConfig config, Path workdir) {
}
}

/**
* Small helper to find open ports by opening random ports together in one context.
* A previous implementation opened and closed ports individually which could cause a port binding collision
* much easier.
*/
static class ClosableSocketSupplier implements Supplier<ServerSocket>, AutoCloseable {

private final List<ServerSocket> openSockets = new ArrayList<>();

@Override
public void close() {
openSockets.forEach((s) -> {
try {
s.close();
}
catch (IOException e) {
throw new IllegalStateException(e);
}
});
openSockets.clear();
}

@Override
@SneakyThrows
public ServerSocket get() {
final ServerSocket serverSocket = new ServerSocket(0);
openSockets.add(serverSocket);
return serverSocket;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private StandaloneSupport buildSupport(DatasetId datasetId, String name, Standal
File localTmpDir = new File(tmpDir, "tmp_" + name);

if (!localTmpDir.exists()) {
if (!localTmpDir.mkdir()) {
if (!localTmpDir.mkdirs()) {
throw new IllegalStateException("Could not create directory for Support:" + localTmpDir);
}
}
Expand Down

0 comments on commit 2fc2bea

Please sign in to comment.