Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Select filter value auto-completion for SQL mode #3277

Merged
merged 8 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.mode.Manager;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.auth.AuthorizationController;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.forms.frontendconfiguration.FormScanner;
import com.bakdata.conquery.models.i18n.I18n;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.Namespace;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.resources.ResourcesProvider;
import com.bakdata.conquery.resources.admin.AdminServlet;
Expand Down Expand Up @@ -244,19 +246,21 @@ public void loadNamespaces() {


ExecutorService loaders = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
DatasetRegistry<? extends Namespace> registry = getDatasetRegistry();

// Namespaces load their storage themselves, so they can inject Namespace relevant objects into stored objects
final Collection<NamespaceStorage> namespaceStorages = getConfig().getStorage().discoverNamespaceStorages();
StorageHandler storageHandler = registry.getStorageHandler();
final Collection<NamespaceStorage> namespaceStorages = getConfig().getStorage().discoverNamespaceStorages(storageHandler);
for (NamespaceStorage namespaceStorage : namespaceStorages) {
loaders.submit(() -> {
getDatasetRegistry().createNamespace(namespaceStorage);
registry.createNamespace(namespaceStorage);
});
}


loaders.shutdown();
while (!loaders.awaitTermination(1, TimeUnit.MINUTES)) {
final int coundLoaded = getDatasetRegistry().getDatasets().size();
final int coundLoaded = registry.getDatasets().size();
log.debug("Waiting for Worker namespaces to load. {} are already finished. {} pending.", coundLoaded, namespaceStorages.size()
- coundLoaded);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.bakdata.conquery.ConqueryConstants;
import com.bakdata.conquery.io.storage.xodus.stores.KeyIncludingStore;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.config.StoreFactory;
import com.bakdata.conquery.models.datasets.PreviewConfig;
import com.bakdata.conquery.models.datasets.concepts.StructureNode;
Expand Down Expand Up @@ -40,8 +41,8 @@ public class NamespaceStorage extends NamespacedStorage {

protected SingletonStore<Dictionary> primaryDictionary;

public NamespaceStorage(StoreFactory storageFactory, String pathName, Validator validator) {
super(storageFactory, pathName, validator);
public NamespaceStorage(StoreFactory storageFactory, String pathName, Validator validator, StorageHandler storageHandler) {
super(storageFactory, pathName, validator, storageHandler);
}

public EncodedDictionary getPrimaryDictionary() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.bakdata.conquery.io.storage.xodus.stores.KeyIncludingStore;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.config.StoreFactory;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Dataset;
Expand Down Expand Up @@ -52,6 +53,9 @@ public abstract class NamespacedStorage extends ConqueryStorage {
@Getter
private final StoreFactory storageFactory;

@Getter
private final StorageHandler storageHandler;

@Getter
private final Validator validator;

Expand All @@ -62,10 +66,11 @@ public abstract class NamespacedStorage extends ConqueryStorage {
protected IdentifiableStore<Import> imports;
protected IdentifiableStore<Concept<?>> concepts;

public NamespacedStorage(StoreFactory storageFactory, String pathName, Validator validator) {
public NamespacedStorage(StoreFactory storageFactory, String pathName, Validator validator, StorageHandler storageHandler) {
this.pathName = pathName;
this.storageFactory = storageFactory;
this.validator = validator;
this.storageHandler = storageHandler;
}

public void openStores(ObjectMapper objectMapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.bakdata.conquery.io.storage.xodus.stores.KeyIncludingStore;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
import com.bakdata.conquery.mode.cluster.ClusterStorageHandler;
import com.bakdata.conquery.models.config.StoreFactory;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.events.Bucket;
Expand All @@ -28,7 +29,7 @@ public class WorkerStorage extends NamespacedStorage {
private IdentifiableStore<CBlock> cBlocks;

public WorkerStorage(StoreFactory storageFactory, Validator validator, String pathName) {
super(storageFactory, pathName, validator);
super(storageFactory, pathName, validator, new ClusterStorageHandler());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
import javax.validation.Validator;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.mode.cluster.ClusterStorageHandler;
import com.bakdata.conquery.mode.local.SqlStorageHandler;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.models.worker.Namespace;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import io.dropwizard.setup.Environment;

/**
Expand All @@ -27,16 +32,39 @@ static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig
return new InternalObjectMapperCreator(config, validator);
}

static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(NamespaceHandler<N> namespaceHandler, ConqueryConfig config,
InternalObjectMapperCreator creator) {
static DatasetRegistry<DistributedNamespace> createDistributedDatasetRegistry(
NamespaceHandler<DistributedNamespace> namespaceHandler,
ConqueryConfig config,
InternalObjectMapperCreator creator
) {
ClusterStorageHandler storageHandler = new ClusterStorageHandler();
return createDatasetRegistry(namespaceHandler, creator, storageHandler, config);
}

static DatasetRegistry<LocalNamespace> createLocalDatasetRegistry(
NamespaceHandler<LocalNamespace> namespaceHandler,
ConqueryConfig config,
InternalObjectMapperCreator creator,
SqlExecutionService sqlExecutionService
) {
SqlStorageHandler storageHandler = new SqlStorageHandler(sqlExecutionService);
return createDatasetRegistry(namespaceHandler, creator, storageHandler, config);
}

private static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(
NamespaceHandler<N> namespaceHandler,
InternalObjectMapperCreator creator,
StorageHandler storageHandler,
ConqueryConfig config
) {
final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), config.getIndex().getEmptyLabel());
DatasetRegistry<N> datasetRegistry = new DatasetRegistry<>(
config.getCluster().getEntityBucketSize(),
config,
creator,
namespaceHandler,
indexService
indexService,
storageHandler
);
MetaStorage storage = new MetaStorage(config.getStorage(), datasetRegistry);
datasetRegistry.setMetaStorage(storage);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.bakdata.conquery.mode;

import java.util.List;

import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.models.datasets.Column;

public interface StorageHandler {

List<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column);

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ClusterManager provideManager(ConqueryConfig config, Environment environm
final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator());
final ClusterState clusterState = new ClusterState();
final NamespaceHandler<DistributedNamespace> namespaceHandler = new ClusterNamespaceHandler(clusterState, config, creator);
final DatasetRegistry<DistributedNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);
final DatasetRegistry<DistributedNamespace> datasetRegistry = ManagerProvider.createDistributedDatasetRegistry(namespaceHandler, config, creator);
creator.init(datasetRegistry);

final ClusterConnectionManager connectionManager =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.bakdata.conquery.mode.cluster;

import java.util.List;

import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.ImportColumn;
import com.bakdata.conquery.models.events.stores.root.StringStore;

public class ClusterStorageHandler implements StorageHandler {

@Override
public List<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, das darf glaube ich keine Liste sein, die sind riesig. Kriegst du das mit Streams implementiert?

return namespaceStorage.getAllImports().stream()
.filter(imp -> imp.getTable().equals(column.getTable()))
.flatMap(imp -> {
final ImportColumn importColumn = imp.getColumns()[column.getPosition()];
return ((StringStore) importColumn.getTypeDescription()).iterateValues();
})
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.bakdata.conquery.sql.conversion.dialect.HanaSqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.PostgreSqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
import com.bakdata.conquery.sql.execution.ResultSetProcessorFactory;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import io.dropwizard.setup.Environment;
import org.jooq.DSLContext;

Expand All @@ -34,8 +36,14 @@ public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Env
DSLContext dslContext = DslContextFactory.create(sqlConnectorConfig);
SqlDialect sqlDialect = createSqlDialect(sqlConnectorConfig, dslContext);
SqlContext sqlContext = new SqlContext(sqlConnectorConfig, sqlDialect);
NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, sqlContext);
DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);

SqlExecutionService sqlExecutionService = new SqlExecutionService(
sqlDialect.getDSLContext(),
ResultSetProcessorFactory.create(sqlDialect)
);

NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, sqlContext, sqlExecutionService);
DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createLocalDatasetRegistry(namespaceHandler, config, creator, sqlExecutionService);
creator.init(datasetRegistry);

return new DelegateManager<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.sql.SqlContext;
import com.bakdata.conquery.sql.conquery.SqlExecutionManager;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
Expand All @@ -20,16 +21,18 @@ public class LocalNamespaceHandler implements NamespaceHandler<LocalNamespace> {
private final ConqueryConfig config;
private final InternalObjectMapperCreator mapperCreator;
private final SqlContext sqlContext;
private final SqlExecutionService sqlExecutionService;

@Override
public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, IndexService indexService) {
NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, mapperCreator, indexService);
ExecutionManager executionManager = new SqlExecutionManager(sqlContext, metaStorage);
ExecutionManager executionManager = new SqlExecutionManager(sqlContext, sqlExecutionService, metaStorage);
return new LocalNamespace(
namespaceData.getPreprocessMapper(),
namespaceData.getCommunicationMapper(),
namespaceStorage,
executionManager,
sqlExecutionService,
namespaceData.getJobManager(),
namespaceData.getFilterSearch(),
namespaceData.getIndexService(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.bakdata.conquery.mode.local;

import java.util.Collections;
import java.util.List;

import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jooq.Record1;
import org.jooq.Result;
import org.jooq.Select;
import org.jooq.impl.DSL;

@Slf4j
@RequiredArgsConstructor
public class SqlStorageHandler implements StorageHandler {

private final SqlExecutionService sqlExecutionService;

@Override
public List<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column) {
Select<Record1<Object>> columValuesQuery = DSL.selectDistinct(DSL.field(DSL.name(column.getName())))
.from(DSL.table(DSL.name(column.getTable().getName())));
return queryForDistinctValues(columValuesQuery);
}

private List<String> queryForDistinctValues(Select<Record1<Object>> columValuesQuery) {
Result<?> result = sqlExecutionService.execute(columValuesQuery);
try {
return result.getValues(0, String.class).stream()
// the database might return null or an empty string as a distinct value
.filter(value -> value != null && !value.isEmpty())
.toList();
}
catch (Exception e) {
log.error("Expecting exactly 1 column in Result when querying for distinct values of a column. Query: {}. Error: {}", columValuesQuery, e);
}
return Collections.emptyList();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.auth.entities.Group;
import com.bakdata.conquery.models.auth.entities.Role;
import com.bakdata.conquery.models.auth.entities.User;
Expand Down Expand Up @@ -37,7 +38,7 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "type")
public interface StoreFactory {

Collection<NamespaceStorage> discoverNamespaceStorages();
Collection<NamespaceStorage> discoverNamespaceStorages(StorageHandler storageHandler);

Collection<WorkerStorage> discoverWorkerStorages();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.bakdata.conquery.io.storage.xodus.stores.StoreInfo;
import com.bakdata.conquery.io.storage.xodus.stores.WeakCachedStore;
import com.bakdata.conquery.io.storage.xodus.stores.XodusStore;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.auth.entities.Group;
import com.bakdata.conquery.models.auth.entities.Role;
import com.bakdata.conquery.models.auth.entities.User;
Expand Down Expand Up @@ -93,7 +94,6 @@
@CPSType(id = "XODUS", base = StoreFactory.class)
public class XodusStoreFactory implements StoreFactory {


/**
* The store names are created by hand here because the abstraction of {@link BigStore}
* creates two stores. Defining the expected stores like this, does not require a lot or complicated logic.
Expand Down Expand Up @@ -198,8 +198,8 @@ public ExecutorService getReaderExecutorService() {
Multimaps.synchronizedSetMultimap(MultimapBuilder.hashKeys().hashSetValues().build());

@Override
public Collection<NamespaceStorage> discoverNamespaceStorages() {
return loadNamespacedStores("dataset_", (storePath) -> new NamespaceStorage(this, storePath, getValidator()), NAMESPACE_STORES);
public Collection<NamespaceStorage> discoverNamespaceStorages(StorageHandler storageHandler) {
return loadNamespacedStores("dataset_", (storePath) -> new NamespaceStorage(this, storePath, getValidator(), storageHandler), NAMESPACE_STORES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.bakdata.conquery.models.dictionary.Dictionary;
import com.bakdata.conquery.models.dictionary.MapDictionary;
import com.bakdata.conquery.models.events.MajorTypeId;
import com.bakdata.conquery.models.events.stores.root.StringStore;
import com.bakdata.conquery.models.identifiable.IdMutex;
import com.bakdata.conquery.models.identifiable.Labeled;
import com.bakdata.conquery.models.identifiable.ids.NamespacedIdentifiable;
Expand Down Expand Up @@ -156,13 +155,8 @@ public TrieSearch<FrontendValue> createTrieSearch(IndexConfig config, NamespaceS

final TrieSearch<FrontendValue> search = new TrieSearch<>(suffixLength, config.getSearchSplitChars());

storage.getAllImports().stream()
.filter(imp -> imp.getTable().equals(getTable()))
.flatMap(imp -> {
final ImportColumn importColumn = imp.getColumns()[getPosition()];

return ((StringStore) importColumn.getTypeDescription()).iterateValues();
})
storage.getStorageHandler()
.lookupColumnValues(storage, this).stream()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perfekt, du handlest das hier nur als stream weiter. Dann bitte auch als Stream ausgeben. Die haben potential riesig zu sein

.map(value -> new FrontendValue(value, value))
.onClose(() -> log.debug("DONE processing values for {}", getId()))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.bakdata.conquery.models.error;

import java.sql.SQLException;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -256,7 +255,7 @@ public String getMessageTemplate(ErrorMessages errorMessages) {
@CPSType(base = ConqueryError.class, id = "CQ_SQL_ERROR")
@RequiredArgsConstructor(onConstructor_ = {@JsonCreator})
public static class SqlError extends ConqueryError {
private final SQLException error;
private final Exception error;

@Override
public String getMessageTemplate(ErrorMessages errorMessages) {
Expand Down
Loading
Loading