Skip to content

Commit

Permalink
Merge pull request #3321 from ingef/fix/dangling-open-xodux-store
Browse files Browse the repository at this point in the history
open and close all tested stores
  • Loading branch information
thoniTUB authored Mar 6, 2024
2 parents 7e75d0d + 3ad99ff commit dfa8049
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.bakdata.conquery.io.storage.xodus.stores;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import com.bakdata.conquery.models.config.XodusConfig;
import com.fasterxml.jackson.annotation.JsonIgnore;
import jetbrains.exodus.env.Environment;
import jetbrains.exodus.env.Environments;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;

/**
* Keeps transparently track of open environments using a map.
* If an environment is closed it is automatically unregistered.
*/
@RequiredArgsConstructor
@Slf4j
public class EnvironmentRegistry {

@JsonIgnore
private final Map<String, Environment> activeEnvironments = new HashMap<>();

public Environment register(Environment environment) {

final Environment proxyInstance = createManaged(environment);

synchronized (activeEnvironments) {
activeEnvironments.put(environment.getLocation(), proxyInstance);
}
return proxyInstance;
}

@NotNull
private Environment createManaged(Environment environment) {
return new ManagedEnvironment(environment);
}

private void unregister(Environment environment) {
log.debug("Unregister environment: {}", environment.getLocation());
synchronized (activeEnvironments) {
final Environment remove = activeEnvironments.remove(environment.getLocation());

if (remove == null) {
log.warn("Could not unregister environment, because it was not registered: {}", environment.getLocation());
}
}
}

public Environment findOrCreateEnvironment(@NonNull File path, XodusConfig xodusConfig) {
synchronized (activeEnvironments) {

try {
// Check for old env or register new env
return activeEnvironments.computeIfAbsent(
path.toString(),
newPath -> createManaged(Environments.newInstance(newPath, xodusConfig.createConfig()))
);
}
catch (Exception e) {
throw new IllegalStateException("Unable to open environment: " + path, e);
}
}
}

@RequiredArgsConstructor
public class ManagedEnvironment implements Environment {

@Delegate
private final Environment delegate;

public void close() {
synchronized (activeEnvironments) {
log.debug("Environment was closed: {}", delegate.getLocation());
unregister(delegate);
delegate.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.io.storage.xodus.stores.BigStore;
import com.bakdata.conquery.io.storage.xodus.stores.CachedStore;
import com.bakdata.conquery.io.storage.xodus.stores.EnvironmentRegistry;
import com.bakdata.conquery.io.storage.xodus.stores.SerializingStore;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
import com.bakdata.conquery.io.storage.xodus.stores.StoreInfo;
Expand Down Expand Up @@ -62,15 +63,12 @@
import com.bakdata.conquery.util.io.FileUtil;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import io.dropwizard.util.Duration;
import jetbrains.exodus.env.Environment;
import jetbrains.exodus.env.Environments;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -129,6 +127,9 @@ public class XodusStoreFactory implements StoreFactory {
@Valid
private XodusConfig xodus = new XodusConfig();

@JsonIgnore
private EnvironmentRegistry registry = new EnvironmentRegistry();

/**
* Number of threads reading from XoduStore.
* @implNote it's always only one thread reading from disk, dispatching to multiple reader threads.
Expand Down Expand Up @@ -183,9 +184,6 @@ public ExecutorService getReaderExecutorService() {
@JsonIgnore
private transient Validator validator;

@JsonIgnore
private final BiMap<File, Environment> activeEnvironments = HashBiMap.create();

@JsonIgnore
private final transient Multimap<Environment, XodusStore>
openStoresInEnv =
Expand Down Expand Up @@ -217,9 +215,11 @@ private <T extends NamespacedStorage> List<T> loadNamespacedStores(String prefix

ConqueryMDC.setLocation(directory.toString());

if (!environmentHasStores(directory, storesToTest)) {
log.warn("No valid WorkerStorage found in {}", directory);
continue;
try (Environment environment = registry.findOrCreateEnvironment(directory, xodus)) {
if (!environmentHasStores(environment, storesToTest)) {
log.warn("No valid {}storage found in {}", prefix, directory);
continue;
}
}

final T namespacedStorage = creator.apply(name);
Expand All @@ -230,9 +230,8 @@ private <T extends NamespacedStorage> List<T> loadNamespacedStores(String prefix
return storages;
}

private boolean environmentHasStores(File pathName, Set<String> storesToTest) {
final Environment env = findEnvironment(pathName);
final boolean exists = env.computeInTransaction(t -> {
private boolean environmentHasStores(Environment env, Set<String> storesToTest) {
return env.computeInTransaction(t -> {
final List<String> allStoreNames = env.getAllStoreNames(t);
final boolean complete = new HashSet<>(allStoreNames).containsAll(storesToTest);
if (complete) {
Expand All @@ -250,10 +249,6 @@ private boolean environmentHasStores(File pathName, Set<String> storesToTest) {

return loadEnvironmentWithMissingStores;
});
if (!exists) {
closeEnvironment(env);
}
return exists;
}

@Override
Expand Down Expand Up @@ -385,20 +380,15 @@ private File getStorageDir(String pathName) {
return getDirectory().resolve(pathName).toFile();
}

private Environment findEnvironment(@NonNull File path) {
synchronized (activeEnvironments) {
try {
return activeEnvironments.computeIfAbsent(path, (p) -> Environments.newInstance(path, getXodus().createConfig()));
}
catch (Exception e) {
throw new IllegalStateException("Unable to open environment: " + path, e);
}
}
}


private Environment findEnvironment(String pathName) {
final File path = getStorageDir(pathName);
return findEnvironment(path);
return registry.findOrCreateEnvironment(path, getXodus());
}

private Environment findEnvironment(File path) {
return registry.findOrCreateEnvironment(path, getXodus());
}

private void closeStore(XodusStore store) {
Expand All @@ -414,17 +404,7 @@ private void closeStore(XodusStore store) {
}
log.info("Closed last XodusStore in Environment. Closing Environment as well: {}", env.getLocation());

closeEnvironment(env);
}

private void closeEnvironment(Environment env) {
synchronized (activeEnvironments) {

if (activeEnvironments.remove(activeEnvironments.inverse().get(env)) == null) {
return;
}
env.close();
}
env.close();
}

private void removeStore(XodusStore store) {
Expand All @@ -451,7 +431,7 @@ private void removeEnvironment(Environment env) {
throw new IllegalStateException("Cannot delete environment, because it still contains these stores:" + xodusStore);
}

closeEnvironment(env);
env.close();

try {
FileUtil.deleteRecursive(Path.of(env.getLocation()));
Expand Down

0 comments on commit dfa8049

Please sign in to comment.