Skip to content

Commit

Permalink
use backported client actions
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelmosmann committed Nov 19, 2023
1 parent f1bbeaf commit d6e62da
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,28 @@
*/
package de.flapdoodle.embed.mongo.spring.autoconfigure;

import com.mongodb.MongoCredential;
import de.flapdoodle.embed.mongo.client.AuthenticationSetup;
import de.flapdoodle.embed.mongo.client.ClientActions;
import de.flapdoodle.embed.mongo.client.ExecuteMongoClientAction;
import de.flapdoodle.embed.mongo.client.UsernamePassword;
import de.flapdoodle.embed.mongo.commands.MongodArguments;
import de.flapdoodle.embed.mongo.commands.ServerAddress;
import de.flapdoodle.embed.mongo.config.Storage;
import de.flapdoodle.embed.mongo.distribution.IFeatureAwareVersion;
import de.flapdoodle.embed.mongo.packageresolver.Feature;
import de.flapdoodle.embed.mongo.transitions.Mongod;
import de.flapdoodle.embed.mongo.transitions.RunningMongoProcess;
import de.flapdoodle.embed.mongo.transitions.RunningMongodProcess;
import de.flapdoodle.reverse.Listener;
import de.flapdoodle.reverse.StateID;
import de.flapdoodle.types.Try;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.mongo.MongoProperties;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public abstract class AbstractServerFactory<C extends Closeable> {
private static Logger logger = LoggerFactory.getLogger(AbstractServerFactory.class);

protected final MongoProperties properties;
protected AbstractServerFactory(MongoProperties properties) {
private final MongoProperties properties;
private final ExecuteMongoClientAction<C> adapter;

protected AbstractServerFactory(MongoProperties properties, ExecuteMongoClientAction<C> adapter) {
this.properties = properties;
this.adapter = adapter;
}

public final MongodWrapper createWrapper(
Expand All @@ -66,123 +57,30 @@ public final MongodWrapper createWrapper(
}

private Listener addAuthUserToDB(MongoProperties properties) {
Listener.TypedListener.Builder typedBuilder = Listener.typedBuilder();
String username = properties.getUsername();
char[] password = properties.getPassword();
String databaseName = properties.getMongoClientDatabase();

if (username !=null && password !=null) {
typedBuilder.onStateReached(StateID.of(RunningMongodProcess.class),
executeClientActions(createAdminUserWithDatabaseAccess(username, password, databaseName)));
typedBuilder.onStateTearDown(StateID.of(RunningMongodProcess.class),
executeClientActions(Collections.singletonList(shutdown(username, password)))
.andThen(RunningMongoProcess::shutDownCommandAlreadyExecuted));
if (username != null && password != null) {
return ClientActions.setupAuthentication(adapter,
databaseName,
AuthenticationSetup.of(UsernamePassword.of(username, password))
);
} else {
return Listener.builder().build();
}
return typedBuilder.build();
}

private Listener initReplicaSet(IFeatureAwareVersion version, MongoProperties properties, MongodArguments mongodArguments) {
Listener.TypedListener.Builder builder = Listener.typedBuilder();
String username = properties.getUsername();
char[] password = properties.getPassword();

Optional<Storage> replication = mongodArguments.replication();

Optional<MongoClientAction.Credentials> credentials = username != null
? Optional.of(MongoClientAction.credentials("admin", username, password))
: Optional.empty();

if (replication.isPresent() && version.enabled(Feature.RS_INITIATE)) {
Consumer<RunningMongodProcess> initReplicaSet = runningMongodProcess -> {
ServerAddress serverAddress = runningMongodProcess.getServerAddress();
executeClientAction(runningMongodProcess,
MongoClientAction.runCommand("admin",
new Document("replSetInitiate",
new Document("_id", replication.get().getReplSetName())
.append("members", Collections.singletonList(
new Document("_id", 0)
.append("host", serverAddress.getHost() + ":" + serverAddress.getPort())
))))
.withCredentials(credentials)
);
};

builder.onStateReached(StateID.of(RunningMongodProcess.class), initReplicaSet.andThen(runningMongodProcess -> {
AtomicBoolean isMaster=new AtomicBoolean();
MongoClientAction checkIfMaster = MongoClientAction.runCommand("admin", new Document("isMaster", 1))
.withOnResult(doc -> isMaster.set(doc.getBoolean("ismaster")))
.withCredentials(credentials);

long started=System.currentTimeMillis();
long diff;
do {
executeClientAction(runningMongodProcess, checkIfMaster);
diff=System.currentTimeMillis()-started;
logger.info("check if server is elected as master: {} (after {} ms)", isMaster.get(), diff);
Try.run(() ->Thread.sleep(100));
} while (!isMaster.get() && diff<1000);

if (!isMaster.get()) {
throw new IllegalArgumentException("initReplicaSet failed to elect "+runningMongodProcess.getServerAddress()+" as master after "+Duration.ofMillis(diff));
}

}));
}

return builder.build();
}


private Consumer<RunningMongodProcess> executeClientActions(List<? extends MongoClientAction> actions) {
return runningMongodProcess -> executeClientActions(runningMongodProcess, actions);
}

private void executeClientActions(RunningMongodProcess runningMongodProcess, List<? extends MongoClientAction> actions) {
for (MongoClientAction action : actions) {
executeClientAction(runningMongodProcess, action);
}
}

private void executeClientAction(RunningMongodProcess runningMongodProcess, MongoClientAction action) {
try (C client = action.credentials()
.map(c -> client(runningMongodProcess.getServerAddress(),
MongoCredential.createCredential(c.username(), c.database(), c.password().toCharArray())))
.orElseGet(() -> client(runningMongodProcess.getServerAddress()))) {

logger.info("credentials: {}, action: {}", action.credentials(), action.action());

action.onResult()
.accept(resultOfAction(client, action.action()));
if (replication.isPresent()) {
return ClientActions.initReplicaSet(adapter, version, replication.get(), username != null ?
Optional.of(UsernamePassword.of(username, password)) : Optional.empty());
} else {
return Listener.builder().build();
}
catch (IOException e) {
throw new RuntimeException(e);
}
catch (RuntimeException rx) {
action.onError().accept(rx);
}
}

protected abstract C client(ServerAddress serverAddress);

protected abstract C client(ServerAddress serverAddress, MongoCredential credential);

protected abstract Document resultOfAction(C client, MongoClientAction.Action action);

private static List<? extends MongoClientAction> createAdminUserWithDatabaseAccess(String username, char[] password, String databaseName) {
List<ImmutableMongoClientAction> actions = Arrays.asList(
MongoClientAction.createUser("admin", username, password, "root"),
MongoClientAction.createUser(databaseName, username, password, "readWrite")
.withCredentials(MongoClientAction.credentials("admin", username, password)),
// test list collections
MongoClientAction.runCommand(databaseName, MongoClientAction.listCollections())
.withCredentials(MongoClientAction.credentials(databaseName, username, password))
);
return actions;
}

private static MongoClientAction shutdown(String username, char[] password) {
return MongoClientAction.shutdown("admin")
.withCredentials(MongoClientAction.credentials("admin", username, password))
.withOnError(ex -> logger.debug("expected send shutdown exception", ex));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,88 +20,17 @@
*/
package de.flapdoodle.embed.mongo.spring.autoconfigure;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoDatabase;
import de.flapdoodle.checks.Preconditions;
import de.flapdoodle.embed.mongo.commands.MongodArguments;
import de.flapdoodle.embed.mongo.commands.ServerAddress;
import de.flapdoodle.embed.mongo.distribution.IFeatureAwareVersion;
import de.flapdoodle.embed.mongo.packageresolver.Feature;
import de.flapdoodle.embed.mongo.transitions.Mongod;
import de.flapdoodle.embed.mongo.transitions.RunningMongodProcess;
import de.flapdoodle.reverse.Listener;
import de.flapdoodle.reverse.StateID;
import org.bson.Document;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import de.flapdoodle.embed.mongo.client.ReactiveClientAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.mongo.MongoProperties;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

public class ReactiveClientServerFactory extends AbstractServerFactory<MongoClient> {
private static Logger logger = LoggerFactory.getLogger(ReactiveClientServerFactory.class);

ReactiveClientServerFactory(MongoProperties properties) {
super(properties);
super(properties, new ReactiveClientAdapter());
logger.info("reactive server factory");
}

protected Document resultOfAction(MongoClient client, MongoClientAction.Action action) {
if (action instanceof MongoClientAction.RunCommand) {
return get(client.getDatabase(action.database()).runCommand(((MongoClientAction.RunCommand) action).command()));
}
throw new IllegalArgumentException("Action not supported: "+action);
}

protected MongoClient client(ServerAddress serverAddress) {
return MongoClients.create("mongodb://"+serverAddress);
}

protected MongoClient client(ServerAddress serverAddress, MongoCredential credential) {
return MongoClients.create(MongoClientSettings.builder()
.applyConnectionString(new ConnectionString("mongodb://"+serverAddress))
.credential(credential)
.build());
}

private static <T> T get(Publisher<T> publisher) {
CompletableFuture<T> result = new CompletableFuture<>();

publisher.subscribe(new Subscriber<T>() {
@Override public void onSubscribe(Subscription s) {
s.request(1);
}
@Override public void onNext(T t) {
result.complete(t);
}
@Override public void onError(Throwable t) {
result.completeExceptionally(t);
}
@Override public void onComplete() {
}
});

try {
return result.get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,53 +20,17 @@
*/
package de.flapdoodle.embed.mongo.spring.autoconfigure;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import de.flapdoodle.embed.mongo.commands.MongodArguments;
import de.flapdoodle.embed.mongo.commands.ServerAddress;
import de.flapdoodle.embed.mongo.distribution.IFeatureAwareVersion;
import de.flapdoodle.embed.mongo.packageresolver.Feature;
import de.flapdoodle.embed.mongo.transitions.Mongod;
import de.flapdoodle.embed.mongo.transitions.RunningMongodProcess;
import de.flapdoodle.reverse.Listener;
import de.flapdoodle.reverse.StateID;
import org.bson.Document;
import de.flapdoodle.embed.mongo.client.SyncClientAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.mongo.MongoProperties;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;

public class SyncClientServerFactory extends AbstractServerFactory<MongoClient> {
private static Logger logger = LoggerFactory.getLogger(SyncClientServerFactory.class);

SyncClientServerFactory(MongoProperties properties) {
super(properties);
super(properties, new SyncClientAdapter());
logger.info("sync server factory");
}

protected MongoClient client(ServerAddress serverAddress) {
return MongoClients.create("mongodb://"+serverAddress);
}

protected MongoClient client(ServerAddress serverAddress, MongoCredential credential) {
return MongoClients.create(MongoClientSettings.builder()
.applyConnectionString(new ConnectionString("mongodb://"+serverAddress))
.credential(credential)
.build());
}

protected Document resultOfAction(MongoClient client, MongoClientAction.Action action) {
if (action instanceof MongoClientAction.RunCommand) {
return client.getDatabase(action.database()).runCommand(((MongoClientAction.RunCommand) action).command());
}
throw new IllegalArgumentException("Action not supported: "+action);
}
}

0 comments on commit d6e62da

Please sign in to comment.