diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/FullExportForm.java b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/FullExportForm.java index f457c1a04f..aa392f2710 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/FullExportForm.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/FullExportForm.java @@ -127,6 +127,6 @@ public String getLocalizedTypeLabel() { @Override public ManagedInternalForm toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) { - return new ManagedInternalForm(this, user, submittedDataset, storage); + return new ManagedInternalForm<>(this, user, submittedDataset, storage); } } diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/EditorQuery.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/EditorQuery.java index 84e2a46bc7..21c2096e1f 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/EditorQuery.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/EditorQuery.java @@ -3,10 +3,9 @@ import com.bakdata.conquery.apiv1.execution.ExecutionStatus; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.models.query.ManagedQuery; -import com.bakdata.conquery.sql.conquery.SqlManagedQuery; /** - * Common abstraction for intersecting parts of {@link ManagedQuery} and {@link SqlManagedQuery}. + * Common abstraction for intersecting parts of {@link ManagedQuery}. */ public interface EditorQuery { diff --git a/backend/src/main/java/com/bakdata/conquery/models/execution/InternalExecution.java b/backend/src/main/java/com/bakdata/conquery/models/execution/InternalExecution.java index f34aa3223f..2474583b01 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/execution/InternalExecution.java +++ b/backend/src/main/java/com/bakdata/conquery/models/execution/InternalExecution.java @@ -15,10 +15,4 @@ public interface InternalExecution { */ WorkerMessage createExecutionMessage(); - /** - * The callback for the results the shard nodes return. - * Is called once per shard node - */ - void addResult(R result); - } diff --git a/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java b/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java index a8c0792d33..90eec7a4af 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java +++ b/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java @@ -106,9 +106,12 @@ public abstract class ManagedExecution extends IdentifiableImpl permittedGroups = new ArrayList<>(); for (Group group : storage.getAllGroups()) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java index 58c3fcfb01..cc619b66d9 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java @@ -169,6 +169,7 @@ public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus sta @Override public void cancel() { + //TODO this is no longer called as the ExecutionManager used to call this. Preconditions.checkNotNull(externalTaskId, "Cannot check external task, because no Id is present"); updateStatus(api.cancelTask(externalTaskId)); @@ -189,7 +190,7 @@ public Response fetchExternalResult(String assetId) { } @Override - protected void finish(ExecutionState executionState) { + public void finish(ExecutionState executionState) { if (getState().equals(executionState)) { return; } diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/FormQueryPlan.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/FormQueryPlan.java index 3b5df25c50..d07db86cbf 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/FormQueryPlan.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/FormQueryPlan.java @@ -41,7 +41,7 @@ public FormQueryPlan(List dateContexts, ArrayConceptQueryPlan featu if (dateContexts.size() <= 0) { // There is nothing to do for this FormQueryPlan, but we will return an empty result when its executed - log.warn("dateContexts are empty. Will not produce a result."); + log.trace("dateContexts are empty. Will not produce a result."); constantCount = 3; withRelativeEventDate = false; return; diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java index 405c8e68de..5e4170944a 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java @@ -39,6 +39,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * Execution type for simple forms, that are completely executed within Conquery and produce a single table as result. @@ -73,6 +74,11 @@ public ManagedInternalForm(F form, User user, Dataset submittedDataset, MetaStor super(form, user, submittedDataset, storage); } + @Nullable + public ManagedQuery getSubQuery(ManagedExecutionId subQueryId) { + return flatSubQueries.get(subQueryId); + } + @Override public void doInitExecutable() { // Convert sub queries to sub executions @@ -88,7 +94,7 @@ private Map createSubExecutions() { return getSubmitted().createSubQueries() .entrySet() .stream().collect(Collectors.toMap( - e -> e.getKey(), + Map.Entry::getKey, e -> e.getValue().toManagedExecution(getOwner(), getDataset(), getStorage()) )); @@ -167,44 +173,8 @@ public WorkerMessage createExecutionMessage() { .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getQuery()))); } - /** - * Distribute the result to a sub query. - */ - @Override - public void addResult(FormShardResult result) { - if (result.getError().isPresent()) { - fail(result.getError().get()); - return; - } - - ManagedExecutionId subQueryId = result.getSubQueryId(); - - ManagedQuery subQuery = flatSubQueries.get(subQueryId); - subQuery.addResult(result); - - switch (subQuery.getState()) { - case DONE -> { - if (allSubQueriesDone()) { - finish(ExecutionState.DONE); - } - } - // Fail the whole execution if a subquery fails - case FAILED -> { - fail( - result.getError().orElseThrow( - () -> new IllegalStateException(String.format("Query [%s] failed but no error was set.", getId())) - ) - ); - } - - default -> { - } - } - - } - - private boolean allSubQueriesDone() { + public boolean allSubQueriesDone() { synchronized (this) { return flatSubQueries.values().stream().allMatch(q -> q.getState().equals(ExecutionState.DONE)); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectQueryResult.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectQueryResult.java deleted file mode 100644 index cf5293e966..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectQueryResult.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.bakdata.conquery.models.messages.namespaces.specific; - -import com.bakdata.conquery.io.cps.CPSType; -import com.bakdata.conquery.models.messages.namespaces.NamespaceMessage; -import com.bakdata.conquery.models.messages.namespaces.NamespacedMessage; -import com.bakdata.conquery.models.query.results.ShardResult; -import com.bakdata.conquery.models.worker.DistributedNamespace; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; - -/** - * Workers send their part of the query result to ManagerNode for assembly. - */ -@CPSType(id = "COLLECT_QUERY_RESULT", base = NamespacedMessage.class) -@AllArgsConstructor -@NoArgsConstructor -@Getter -@Setter -@ToString(of = "result") -@Slf4j -public class CollectQueryResult extends NamespaceMessage { - - private ShardResult result; - - @Override - public void react(DistributedNamespace context) throws Exception { - log.info("Received {} of size {}", result, result.getResults().size()); - - context.getExecutionManager().handleQueryResult(result); - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java index 9e47c3ddd8..7db39b2938 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java @@ -1,148 +1,107 @@ package com.bakdata.conquery.models.query; -import java.util.ArrayList; +import java.util.Collection; import java.util.List; -import java.util.UUID; -import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; -import com.bakdata.conquery.apiv1.query.QueryDescription; import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.metrics.ExecutionMetrics; import com.bakdata.conquery.mode.cluster.ClusterState; import com.bakdata.conquery.models.auth.AuthorizationHelper; import com.bakdata.conquery.models.auth.entities.Group; -import com.bakdata.conquery.models.auth.entities.User; -import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Dataset; -import com.bakdata.conquery.models.error.ConqueryError; import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.execution.InternalExecution; import com.bakdata.conquery.models.execution.ManagedExecution; -import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; +import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; +import com.bakdata.conquery.models.messages.namespaces.specific.CancelQuery; import com.bakdata.conquery.models.query.results.EntityResult; import com.bakdata.conquery.models.query.results.ShardResult; import com.bakdata.conquery.models.worker.Namespace; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalNotification; -import lombok.RequiredArgsConstructor; +import com.bakdata.conquery.models.worker.WorkerHandler; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -@RequiredArgsConstructor @Slf4j -public class DistributedExecutionManager implements ExecutionManager { +public class DistributedExecutionManager extends ExecutionManager { - private final MetaStorage storage; - private final ClusterState clusterState; - - private final Cache>> executionResults = - CacheBuilder.newBuilder() - .softValues() - .removalListener(this::executionRemoved) - .build(); + public record DistributedResult(Map> results) implements Result { - /** - * Manage state of evicted Queries, setting them to NEW. - */ - private void executionRemoved(RemovalNotification> removalNotification) { - // If removal was done manually we assume it was also handled properly - if (!removalNotification.wasEvicted()) { - return; + public DistributedResult() { + this(new ConcurrentHashMap<>()); } - final ManagedExecutionId executionId = removalNotification.getKey(); - - log.warn("Evicted Results for Query[{}] (Reason: {})", executionId, removalNotification.getCause()); - - final ManagedExecution execution = storage.getExecution(executionId); - - // The query might already be deleted - if(execution != null) { - execution.reset(); + @Override + public Stream streamQueryResults() { + return results.values().stream().flatMap(Collection::stream); } } - @Override - public ManagedExecution runQuery(Namespace namespace, QueryDescription query, User user, Dataset submittedDataset, ConqueryConfig config, boolean system) { - final ManagedExecution execution = createExecution(query, user, submittedDataset, system); - execute(namespace, execution, config); + private final ClusterState clusterState; - return execution; + public DistributedExecutionManager(MetaStorage storage, ClusterState state) { + super(storage); + clusterState = state; } - @Override - public void execute(Namespace namespace, ManagedExecution execution, ConqueryConfig config) { - try { - execution.initExecutable(namespace, config); - } - catch (Exception e) { - // ConqueryErrors are usually user input errors so no need to log them at level=ERROR - if (e instanceof ConqueryError) { - log.warn("Failed to initialize Query[{}]", execution.getId(), e); - } - else { - log.error("Failed to initialize Query[{}]", execution.getId(), e); - } - - storage.removeExecution(execution.getId()); - throw e; - } - - log.info("Starting execution[{}]", execution.getQueryId()); - - execution.start(); - - - final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(execution.getOwner(), storage).map(Group::getName).orElse("none"); - ExecutionMetrics.getRunningQueriesCounter(primaryGroupName).inc(); - - if (execution instanceof InternalExecution internalExecution) { - log.info("Executing Query[{}] in Dataset[{}]", execution.getQueryId(), namespace.getDataset().getId()); - clusterState.getWorkerHandlers().get(execution.getDataset().getId()).sendToAll(internalExecution.createExecutionMessage()); - } - } @Override - public ManagedExecution createExecution(QueryDescription query, User user, Dataset submittedDataset, boolean system) { - return createQuery(query, UUID.randomUUID(), user, submittedDataset, system); - } + protected void doExecute(Namespace namespace, InternalExecution internalExecution) { + ManagedExecution execution = (ManagedExecution & InternalExecution) internalExecution; + log.info("Executing Query[{}] in Dataset[{}]", execution.getQueryId(), namespace.getDataset().getId()); - // Visible for testing - public ManagedExecution createQuery(QueryDescription query, UUID queryId, User user, Dataset submittedDataset, boolean system) { - // Transform the submitted query into an initialized execution - ManagedExecution managed = query.toManagedExecution(user, submittedDataset, storage); - managed.setSystem(system); - managed.setQueryId(queryId); + final WorkerHandler workerHandler = getWorkerHandler(execution); - // Store the execution - storage.addExecution(managed); + workerHandler.sendToAll(internalExecution.createExecutionMessage()); + } - return managed; + private WorkerHandler getWorkerHandler(ManagedExecution execution) { + return clusterState.getWorkerHandlers() + .get(execution.getDataset().getId()); } /** * Receive part of query result and store into query. * - * @param result + * @implNote subQueries of Forms are managed by the form itself, so need to be passed from outside. */ - public > void handleQueryResult(R result) { + @SneakyThrows + public > void handleQueryResult(R result, E query) { - final ManagedExecutionId executionId = result.getQueryId(); - final E query = (E) storage.getExecution(executionId); + + log.debug("Received Result[size={}] for Query[{}]", result.getResults().size(), result.getQueryId()); + log.trace("Received Result\n{}", result.getResults()); if (query.getState() != ExecutionState.RUNNING) { + log.warn("Received result for Query[{}] that is not RUNNING but {}", query.getId(), query.getState()); return; } - query.addResult(result); + if (result.getError().isPresent()) { + query.fail(result.getError().get()); + } + else { + + // We don't collect all results together into a fat list as that would cause lots of huge re-allocations for little gain. + final DistributedResult results = getResult(query, DistributedResult::new); + results.results.put(result.getWorkerId(), result.getResults()); + + final Set finishedWorkers = results.results.keySet(); + + // If all known workers have returned a result, the query is DONE. + if (finishedWorkers.equals(getWorkerHandler(query).getAllWorkerIds())) { + query.finish(ExecutionState.DONE); + } + } // State changed to DONE or FAILED if (query.getState() != ExecutionState.RUNNING) { - final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(query.getOwner(), storage).map(Group::getName).orElse("none"); + final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(query.getOwner(), getStorage()).map(Group::getName).orElse("none"); ExecutionMetrics.getRunningQueriesCounter(primaryGroupName).dec(); ExecutionMetrics.getQueryStateCounter(query.getState(), primaryGroupName).inc(); @@ -150,44 +109,17 @@ public /* This log is here to prevent an NPE which could occur when no strong reference to result.getResults() existed anymore after the query finished and immediately was reset */ - log.trace("Collected metrics for execution {}. Last result received: {}:", executionId, result.getResults()); + log.trace("Collected metrics for execution {}. Last result received: {}:", result.getQueryId(), result.getResults()); } } - - /** - * Register another result for the execution. - */ - - @SneakyThrows(ExecutionException.class) // can only occur if ArrayList::new fails which is unlikely and would have other problems also - public void addQueryResult(ManagedExecution execution, List queryResults) { - // We don't collect all results together into a fat list as that would cause lots of huge re-allocations for little gain. - executionResults.get(execution.getId(), ArrayList::new) - .add(queryResults); - } - - /** - * Discard the query's results. - */ - @Override - public void clearQueryResults(ManagedExecution execution) { - executionResults.invalidate(execution.getId()); - } - - @Override - public Stream streamQueryResults(ManagedExecution execution) { - final List> resultParts = executionResults.getIfPresent(execution.getId()); - - return resultParts == null - ? Stream.empty() - : resultParts.stream().flatMap(List::stream); - - } - @Override public void cancelQuery(Dataset dataset, ManagedExecution query) { + log.debug("Sending cancel message to all workers."); + query.cancel(); + getWorkerHandler(query).sendToAll(new CancelQuery(query.getId())); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java index 7635da9705..4f560a0cbf 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java @@ -1,33 +1,151 @@ package com.bakdata.conquery.models.query; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.stream.Stream; import com.bakdata.conquery.apiv1.query.QueryDescription; +import com.bakdata.conquery.io.storage.MetaStorage; +import com.bakdata.conquery.metrics.ExecutionMetrics; +import com.bakdata.conquery.models.auth.AuthorizationHelper; +import com.bakdata.conquery.models.auth.entities.Group; import com.bakdata.conquery.models.auth.entities.User; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Dataset; +import com.bakdata.conquery.models.error.ConqueryError; +import com.bakdata.conquery.models.execution.InternalExecution; import com.bakdata.conquery.models.execution.ManagedExecution; +import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.query.results.EntityResult; import com.bakdata.conquery.models.worker.Namespace; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalNotification; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; -public interface ExecutionManager { +@Data +@Slf4j +public abstract class ExecutionManager { - ManagedExecution runQuery(Namespace namespace, QueryDescription query, User user, Dataset submittedDataset, ConqueryConfig config, boolean system); + public interface Result { + Stream streamQueryResults(); + } - void execute(Namespace namespace, ManagedExecution execution, ConqueryConfig config); + private final MetaStorage storage; - ManagedExecution createExecution(QueryDescription query, User user, Dataset submittedDataset, boolean system); - - void cancelQuery(final Dataset dataset, final ManagedExecution query); + private final Cache executionResults = + CacheBuilder.newBuilder() + .softValues() + .removalListener(this::executionRemoved) + .build(); /** - * Discard the query's results. + * Manage state of evicted Queries, setting them to NEW. */ - void clearQueryResults(ManagedExecution execution); + private void executionRemoved(RemovalNotification removalNotification) { + // If removal was done manually we assume it was also handled properly + if (!removalNotification.wasEvicted()) { + return; + } - /** - * Stream the results of the query, if available. - */ - Stream streamQueryResults(ManagedExecution execution); + final ManagedExecutionId executionId = removalNotification.getKey(); + + log.warn("Evicted Results for Query[{}] (Reason: {})", executionId, removalNotification.getCause()); + + final ManagedExecution execution = getExecution(executionId); + + // The query might already be deleted + if (execution != null) { + execution.reset(); + } + } + + + public ManagedExecution getExecution(ManagedExecutionId execution) { + return storage.getExecution(execution); + } + + protected R getResult(ManagedExecution execution, Callable defaultProvider) throws ExecutionException { + return executionResults.get(execution.getId(), defaultProvider); + } + + protected void addResult(ManagedExecution execution, R result) { + executionResults.put(execution.getId(), result); + } + + public final ManagedExecution runQuery(Namespace namespace, QueryDescription query, User user, Dataset submittedDataset, ConqueryConfig config, boolean system) { + final ManagedExecution execution = createExecution(query, user, submittedDataset, system); + execute(namespace, execution, config); + + return execution; + } + + + public final void execute(Namespace namespace, ManagedExecution execution, ConqueryConfig config) { + + clearQueryResults(execution); + + try { + execution.initExecutable(namespace, config); + } + catch (Exception e) { + // ConqueryErrors are usually user input errors so no need to log them at level=ERROR + if (e instanceof ConqueryError) { + log.warn("Failed to initialize Query[{}]", execution.getId(), e); + } + else { + log.error("Failed to initialize Query[{}]", execution.getId(), e); + } + + storage.removeExecution(execution.getId()); + throw e; + } + + log.info("Starting execution[{}]", execution.getQueryId()); + + execution.start(); + + final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(execution.getOwner(), storage).map(Group::getName).orElse("none"); + ExecutionMetrics.getRunningQueriesCounter(primaryGroupName).inc(); + + if (execution instanceof InternalExecution internalExecution) { + doExecute(namespace, internalExecution); + } + } + + protected abstract void doExecute(Namespace namespace, InternalExecution execution); + + // Visible for testing + public final ManagedExecution createExecution(QueryDescription query, User user, Dataset submittedDataset, boolean system) { + return createQuery(query, UUID.randomUUID(), user, submittedDataset, system); + } + + public final ManagedExecution createQuery(QueryDescription query, UUID queryId, User user, Dataset submittedDataset, boolean system) { + // Transform the submitted query into an initialized execution + ManagedExecution managed = query.toManagedExecution(user, submittedDataset, storage); + managed.setSystem(system); + managed.setQueryId(queryId); + + // Store the execution + storage.addExecution(managed); + + return managed; + } + + public abstract void cancelQuery(final Dataset dataset, final ManagedExecution query); + + public void clearQueryResults(ManagedExecution execution) { + executionResults.invalidate(execution.getId()); + } + + public Stream streamQueryResults(ManagedExecution execution) { + final R resultParts = executionResults.getIfPresent(execution.getId()); + + return resultParts == null + ? Stream.empty() + : resultParts.streamQueryResults(); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java b/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java index 6a1638f9a8..460fefa906 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java @@ -1,12 +1,9 @@ package com.bakdata.conquery.models.query; -import java.util.Collections; import java.util.List; import java.util.OptionalLong; -import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.stream.Collectors; import java.util.stream.Stream; import com.bakdata.conquery.apiv1.execution.ExecutionStatus; @@ -25,15 +22,11 @@ import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.execution.InternalExecution; import com.bakdata.conquery.models.execution.ManagedExecution; -import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; import com.bakdata.conquery.models.messages.namespaces.WorkerMessage; -import com.bakdata.conquery.models.messages.namespaces.specific.CancelQuery; import com.bakdata.conquery.models.messages.namespaces.specific.ExecuteQuery; import com.bakdata.conquery.models.query.resultinfo.ResultInfo; import com.bakdata.conquery.models.query.results.EntityResult; import com.bakdata.conquery.models.query.results.ShardResult; -import com.bakdata.conquery.models.worker.DistributedNamespace; -import com.bakdata.conquery.models.worker.WorkerInformation; import com.bakdata.conquery.util.QueryUtils; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -58,8 +51,6 @@ public class ManagedQuery extends ManagedExecution implements EditorQuery, Singl */ private Long lastResultCount; - @JsonIgnore - private transient Set involvedWorkers; @JsonIgnore private transient List columnDescriptions; @@ -78,28 +69,10 @@ protected void doInitExecutable() { query.resolve(new QueryResolveContext(getNamespace(), getConfig(), getStorage(), null)); } - @Override - public void addResult(ShardResult result) { - log.debug("Received Result[size={}] for Query[{}]", result.getResults().size(), result.getQueryId()); - - log.trace("Received Result\n{}", result.getResults()); - - if (result.getError().isPresent()) { - fail(result.getError().get()); - return; - } - - involvedWorkers.remove(result.getWorkerId()); - - getNamespace().getExecutionManager().addQueryResult(this, result.getResults()); - - if (involvedWorkers.isEmpty() && getState() == ExecutionState.RUNNING) { - finish(ExecutionState.DONE); - } - } @Override - protected void finish(ExecutionState executionState) { + public void finish(ExecutionState executionState) { + //TODO this is not optimal with SQLExecutionService as this might fully evaluate the query. lastResultCount = query.countResults(streamResults(OptionalLong.empty())); super.finish(executionState); @@ -127,13 +100,7 @@ public long resultRowCount() { return lastResultCount; } - @Override - public void start() { - super.start(); - involvedWorkers = Collections.synchronizedSet(getNamespace().getWorkerHandler().getWorkers().stream() - .map(WorkerInformation::getId) - .collect(Collectors.toSet())); - } + @Override public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status) { @@ -161,9 +128,7 @@ public void reset() { @Override public void cancel() { - log.debug("Sending cancel message to all workers."); - getNamespace().getWorkerHandler().sendToAll(new CancelQuery(getId())); } @Override @@ -196,8 +161,4 @@ public void visit(Consumer visitor) { query.visit(visitor); } - public DistributedNamespace getNamespace() { - return (DistributedNamespace) super.getNamespace(); - } - } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java b/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java index 1e1e1b9d05..a07989bb0b 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java @@ -1,19 +1,54 @@ package com.bakdata.conquery.models.query.results; import com.bakdata.conquery.io.cps.CPSType; +import com.bakdata.conquery.models.execution.ExecutionState; +import com.bakdata.conquery.models.forms.managed.ManagedInternalForm; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; +import com.bakdata.conquery.models.messages.namespaces.NamespacedMessage; +import com.bakdata.conquery.models.query.DistributedExecutionManager; +import com.bakdata.conquery.models.query.ManagedQuery; import lombok.EqualsAndHashCode; import lombok.Getter; -@CPSType(id = "FORM_SHARD_RESULT", base = ShardResult.class) +@CPSType(id = "FORM_SHARD_RESULT", base = NamespacedMessage.class) @EqualsAndHashCode(callSuper = true) @Getter public class FormShardResult extends ShardResult { - private final ManagedExecutionId subQueryId; - public FormShardResult(ManagedExecutionId queryId, ManagedExecutionId subQueryId, WorkerId workerId) { - super(queryId, workerId); - this.subQueryId = subQueryId; + private final ManagedExecutionId formId; + + public FormShardResult(ManagedExecutionId formId, ManagedExecutionId subQueryId, WorkerId workerId) { + super(subQueryId, workerId); + this.formId = formId; } + + /** + * Distribute the result to a sub query. + * + * @param executionManager + */ + @Override + public void addResult(DistributedExecutionManager executionManager) { + final ManagedInternalForm managedInternalForm = (ManagedInternalForm) executionManager.getExecution(getFormId()); + final ManagedQuery subQuery = managedInternalForm.getSubQuery(getQueryId()); + + + executionManager.handleQueryResult(this, subQuery); + + // Fail the whole execution if a subquery fails + if (ExecutionState.FAILED.equals(subQuery.getState())) { + managedInternalForm.fail( + getError().orElseThrow( + () -> new IllegalStateException(String.format("Query[%s] failed but no error was set.", subQuery.getId())) + ) + ); + } + + if (managedInternalForm.allSubQueriesDone()) { + managedInternalForm.finish(ExecutionState.DONE); + } + + } + } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java b/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java index 6dd533ca9d..8c5a192593 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java @@ -10,7 +10,11 @@ import com.bakdata.conquery.models.error.ConqueryError; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; -import com.bakdata.conquery.models.messages.namespaces.specific.CollectQueryResult; +import com.bakdata.conquery.models.messages.namespaces.NamespaceMessage; +import com.bakdata.conquery.models.messages.namespaces.NamespacedMessage; +import com.bakdata.conquery.models.query.DistributedExecutionManager; +import com.bakdata.conquery.models.query.ManagedQuery; +import com.bakdata.conquery.models.worker.DistributedNamespace; import com.bakdata.conquery.models.worker.Worker; import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.Getter; @@ -22,13 +26,13 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "type") @CPSBase -@CPSType(id = "SHARD_RESULT", base = ShardResult.class) +@CPSType(id = "SHARD_RESULT", base = NamespacedMessage.class) @Getter @Setter @Slf4j @ToString(onlyExplicitlyIncluded = true) @NoArgsConstructor -public class ShardResult { +public class ShardResult extends NamespaceMessage { @ToString.Include @@ -63,7 +67,7 @@ public synchronized void finish(@NonNull List results, Optional results, Optional pendingReactions = new HashMap<>(); + @NotNull + public Set getAllWorkerIds() { + return getWorkers().stream() + .map(WorkerInformation::getId) + .collect(Collectors.toSet()); + } + + public IdMap getWorkers() { + return this.workers; + } + + public void sendToAll(WorkerMessage msg) { if (workers.isEmpty()) { throw new IllegalStateException("There are no workers yet"); diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java index 680ad81dad..fde5116efb 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java @@ -1,69 +1,54 @@ package com.bakdata.conquery.sql.conquery; -import java.util.stream.Stream; - -import com.bakdata.conquery.apiv1.query.Query; -import com.bakdata.conquery.apiv1.query.QueryDescription; import com.bakdata.conquery.io.storage.MetaStorage; -import com.bakdata.conquery.models.auth.entities.User; -import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Dataset; +import com.bakdata.conquery.models.execution.ExecutionState; +import com.bakdata.conquery.models.execution.InternalExecution; import com.bakdata.conquery.models.execution.ManagedExecution; +import com.bakdata.conquery.models.forms.managed.ManagedInternalForm; import com.bakdata.conquery.models.query.ExecutionManager; -import com.bakdata.conquery.models.query.QueryResolveContext; -import com.bakdata.conquery.models.query.results.EntityResult; +import com.bakdata.conquery.models.query.ManagedQuery; import com.bakdata.conquery.models.worker.Namespace; import com.bakdata.conquery.sql.SqlContext; import com.bakdata.conquery.sql.conversion.SqlConverter; -import com.bakdata.conquery.sql.conversion.dialect.SqlDialect; import com.bakdata.conquery.sql.conversion.model.SqlQuery; import com.bakdata.conquery.sql.execution.SqlExecutionResult; import com.bakdata.conquery.sql.execution.SqlExecutionService; import lombok.extern.slf4j.Slf4j; @Slf4j -public class SqlExecutionManager implements ExecutionManager { - private final MetaStorage metaStorage; +public class SqlExecutionManager extends ExecutionManager { + private final SqlExecutionService executionService; private final SqlConverter converter; - public SqlExecutionManager(final SqlContext context, SqlExecutionService sqlExecutionService, MetaStorage metaStorage) { - SqlDialect sqlDialect = context.getSqlDialect(); - this.metaStorage = metaStorage; - this.executionService = sqlExecutionService; - this.converter = new SqlConverter(sqlDialect, context.getConfig()); + public SqlExecutionManager(final SqlContext context, SqlExecutionService sqlExecutionService, MetaStorage storage) { + super(storage); + executionService = sqlExecutionService; + converter = new SqlConverter(context.getSqlDialect(), context.getConfig()); } @Override - public SqlManagedQuery runQuery(Namespace namespace, QueryDescription query, User user, Dataset submittedDataset, ConqueryConfig config, boolean system) { - // required for properly setting date aggregation action in all nodes of the query graph - query.resolve(new QueryResolveContext(namespace, config, metaStorage, null)); - SqlManagedQuery execution = createExecution(query, user, submittedDataset, system); - execution.initExecutable(namespace, config); - execution.start(); - // todo(tm): Non-blocking execution - SqlExecutionResult result = this.executionService.execute(execution); - execution.finish(result); - return execution; - } + protected void doExecute(Namespace namespace, InternalExecution execution) { - @Override - public void execute(Namespace namespace, ManagedExecution execution, ConqueryConfig config) { - if (!(execution instanceof SqlManagedQuery)) { - throw new UnsupportedOperationException("The SQL execution manager can only execute SQL queries, but got a %s".formatted(execution.getClass())); + // todo(tm): Non-blocking execution + if (execution instanceof ManagedQuery managedQuery) { + SqlQuery sqlQuery = converter.convert(managedQuery.getQuery()); + SqlExecutionResult result = executionService.execute(sqlQuery); + addResult(managedQuery, result); + managedQuery.setLastResultCount(((long) result.getRowCount())); + managedQuery.finish(ExecutionState.DONE); + return; } - this.executionService.execute(((SqlManagedQuery) execution)); - } + if (execution instanceof ManagedInternalForm managedForm) { + managedForm.getSubQueries().values().forEach(subQuery -> doExecute(namespace, subQuery)); + managedForm.finish(ExecutionState.DONE); + return; + } - @Override - public SqlManagedQuery createExecution(QueryDescription query, User user, Dataset submittedDataset, boolean system) { - Query castQuery = (Query) query; - SqlQuery converted = this.converter.convert(castQuery); - SqlManagedQuery sqlManagedQuery = new SqlManagedQuery(castQuery, user, submittedDataset, metaStorage, converted); - metaStorage.addExecution(sqlManagedQuery); - return sqlManagedQuery; + throw new IllegalStateException("Unexpected type of execution: %s".formatted(execution.getClass())); } @Override @@ -71,14 +56,4 @@ public void cancelQuery(Dataset dataset, ManagedExecution query) { // unsupported for now } - @Override - public void clearQueryResults(ManagedExecution execution) { - // unsupported for now - } - - @Override - public Stream streamQueryResults(ManagedExecution execution) { - throw new UnsupportedOperationException("Streaming for now not supported"); - } - } diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlManagedQuery.java b/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlManagedQuery.java deleted file mode 100644 index 3e6d6e7a4a..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlManagedQuery.java +++ /dev/null @@ -1,113 +0,0 @@ -package com.bakdata.conquery.sql.conquery; - -import java.util.List; -import java.util.OptionalLong; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import java.util.stream.Stream; - -import com.bakdata.conquery.apiv1.execution.ExecutionStatus; -import com.bakdata.conquery.apiv1.query.EditorQuery; -import com.bakdata.conquery.apiv1.query.Query; -import com.bakdata.conquery.apiv1.query.QueryDescription; -import com.bakdata.conquery.io.cps.CPSType; -import com.bakdata.conquery.io.storage.MetaStorage; -import com.bakdata.conquery.models.auth.entities.Subject; -import com.bakdata.conquery.models.auth.entities.User; -import com.bakdata.conquery.models.datasets.Dataset; -import com.bakdata.conquery.models.execution.ExecutionState; -import com.bakdata.conquery.models.execution.ManagedExecution; -import com.bakdata.conquery.models.query.PrintSettings; -import com.bakdata.conquery.models.query.QueryResolveContext; -import com.bakdata.conquery.models.query.SingleTableResult; -import com.bakdata.conquery.models.query.Visitable; -import com.bakdata.conquery.models.query.resultinfo.ResultInfo; -import com.bakdata.conquery.models.query.results.EntityResult; -import com.bakdata.conquery.sql.conversion.model.SqlQuery; -import com.bakdata.conquery.sql.execution.SqlExecutionResult; -import com.bakdata.conquery.util.QueryUtils; -import lombok.Getter; -import lombok.NonNull; -import lombok.Setter; - -@Setter -@Getter -@CPSType(base = ManagedExecution.class, id = "SQL_QUERY") -public class SqlManagedQuery extends ManagedExecution implements EditorQuery, SingleTableResult { - - private Query query; - private SqlQuery sqlQuery; - private SqlExecutionResult result; - private Long lastResultCount; - - protected SqlManagedQuery(MetaStorage storage) { - super(storage); - } - - public SqlManagedQuery(Query query, User owner, Dataset dataset, MetaStorage storage, SqlQuery sqlQuery) { - super(owner, dataset, storage); - this.query = query; - this.sqlQuery = sqlQuery; - } - - @Override - protected void doInitExecutable() { - query.resolve(new QueryResolveContext(getNamespace(), getConfig(), getStorage(), null)); - } - - @Override - public QueryDescription getSubmitted() { - return query; - } - - @Override - protected String makeDefaultLabel(PrintSettings cfg) { - return QueryUtils.makeQueryLabel(query, cfg, getId()); - } - - @Override - public void cancel() { - //TODO when async is implemented. - } - - @Override - public void visit(Consumer visitor) { - visitor.accept(this); - } - - @Override - public List getResultInfos() { - return query.getResultInfos(); - } - - @Override - public Stream streamResults(OptionalLong maybeLimit) { - final Stream results = result.getTable().stream(); - - if(maybeLimit.isEmpty()){ - return results; - } - - final long limit = maybeLimit.getAsLong(); - final AtomicLong consumed = new AtomicLong(); - - return results.takeWhile(line -> consumed.addAndGet(line.length()) < limit); - } - - @Override - public long resultRowCount() { - return result.getRowCount(); - } - - @Override - public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status) { - super.setStatusBase(subject, status); - enrichStatusBase(status); - } - - public void finish(final SqlExecutionResult result) { - this.result = result; - this.lastResultCount = (long) result.getRowCount(); - super.finish(ExecutionState.DONE); - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionResult.java b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionResult.java index 72393776de..fa9a313918 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionResult.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionResult.java @@ -1,12 +1,14 @@ package com.bakdata.conquery.sql.execution; import java.util.List; +import java.util.stream.Stream; +import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.results.EntityResult; import lombok.Value; @Value -public class SqlExecutionResult { +public class SqlExecutionResult implements ExecutionManager.Result { List columnNames; List table; @@ -15,7 +17,11 @@ public class SqlExecutionResult { public SqlExecutionResult(List columnNames, List table) { this.columnNames = columnNames; this.table = table; - this.rowCount = table.size(); + rowCount = table.size(); } + @Override + public Stream streamQueryResults() { + return table.stream(); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java index 492fa4320f..a2a4ff7050 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java @@ -15,10 +15,9 @@ import com.bakdata.conquery.models.query.resultinfo.ResultInfo; import com.bakdata.conquery.models.query.results.EntityResult; import com.bakdata.conquery.models.types.ResultType; -import com.bakdata.conquery.sql.conquery.SqlManagedQuery; -import com.google.common.base.Stopwatch; +import com.bakdata.conquery.sql.conversion.model.SqlQuery; +import lombok.Data; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jooq.DSLContext; import org.jooq.Record; @@ -26,8 +25,9 @@ import org.jooq.Select; import org.jooq.exception.DataAccessException; -@RequiredArgsConstructor + @Slf4j +@Data public class SqlExecutionService { private static final int PID_COLUMN_INDEX = 1; @@ -38,55 +38,25 @@ public class SqlExecutionService { private final ResultSetProcessor resultSetProcessor; - public SqlExecutionResult execute(SqlManagedQuery sqlQuery) { - log.info("Starting SQL execution[{}]", sqlQuery.getQueryId()); - Stopwatch stopwatch = Stopwatch.createStarted(); - SqlExecutionResult result = dslContext.connectionResult(connection -> createStatementAndExecute(sqlQuery, connection)); - log.info("Finished SQL execution[{}] with {} results within {}", sqlQuery.getQueryId(), result.getRowCount(), stopwatch.elapsed()); - return result; - } + public SqlExecutionResult execute(SqlQuery sqlQuery) { - public Result fetch(Select query) { - log.debug("Executing query: \n{}", query); - try { - return dslContext.fetch(query); - } - catch (DataAccessException exception) { - throw new ConqueryError.SqlError(exception); - } - } + final SqlExecutionResult result = dslContext.connectionResult(connection -> createStatementAndExecute(sqlQuery, connection)); - /** - * Executes the query and returns the results as a Stream. - *

- * Note: The returned Stream is resourceful. It must be closed by the caller, because it contains a reference to an open {@link ResultSet} - * and {@link PreparedStatement}. - * - * @param query The query to be executed. - * @return A Stream of query results. - */ - public Stream fetchStream(Select query) { - log.debug("Executing query: \n{}", query); - try { - return dslContext.fetchStream(query); - } - catch (DataAccessException exception) { - throw new ConqueryError.SqlError(exception); - } + return result; } - private SqlExecutionResult createStatementAndExecute(SqlManagedQuery sqlQuery, Connection connection) { + private SqlExecutionResult createStatementAndExecute(SqlQuery sqlQuery, Connection connection) { - String sqlString = sqlQuery.getSqlQuery().getSql(); - List> resultTypes = sqlQuery.getSqlQuery().getResultInfos().stream().map(ResultInfo::getType).collect(Collectors.toList()); + final String sqlString = sqlQuery.getSql(); + final List> resultTypes = sqlQuery.getResultInfos().stream().map(ResultInfo::getType).collect(Collectors.toList()); log.info("Executing query: \n{}", sqlString); try (Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sqlString)) { - int columnCount = resultSet.getMetaData().getColumnCount(); - List columnNames = getColumnNames(resultSet, columnCount); - List resultTable = createResultTable(resultSet, resultTypes, columnCount); + final int columnCount = resultSet.getMetaData().getColumnCount(); + final List columnNames = getColumnNames(resultSet, columnCount); + final List resultTable = createResultTable(resultSet, resultTypes, columnCount); return new SqlExecutionResult(columnNames, resultTable); } @@ -96,15 +66,6 @@ private SqlExecutionResult createStatementAndExecute(SqlManagedQuery sqlQuery, C } } - private List createResultTable(ResultSet resultSet, List> resultTypes, int columnCount) throws SQLException { - List resultTable = new ArrayList<>(resultSet.getFetchSize()); - while (resultSet.next()) { - SqlEntityResult resultRow = getResultRow(resultSet, resultTypes, columnCount); - resultTable.add(resultRow); - } - return resultTable; - } - private List getColumnNames(ResultSet resultSet, int columnCount) { // JDBC ResultSet indices start with 1 return IntStream.rangeClosed(1, columnCount) @@ -112,6 +73,15 @@ private List getColumnNames(ResultSet resultSet, int columnCount) { .toList(); } + private List createResultTable(ResultSet resultSet, List> resultTypes, int columnCount) throws SQLException { + final List resultTable = new ArrayList<>(resultSet.getFetchSize()); + while (resultSet.next()) { + final SqlEntityResult resultRow = getResultRow(resultSet, resultTypes, columnCount); + resultTable.add(resultRow); + } + return resultTable; + } + private String getColumnName(ResultSet resultSet, int columnIndex) { try { return resultSet.getMetaData().getColumnName(columnIndex); @@ -123,15 +93,44 @@ private String getColumnName(ResultSet resultSet, int columnIndex) { private SqlEntityResult getResultRow(ResultSet resultSet, List> resultTypes, int columnCount) throws SQLException { - String id = resultSet.getString(PID_COLUMN_INDEX); - Object[] resultRow = new Object[columnCount - 1]; + final String id = resultSet.getString(PID_COLUMN_INDEX); + final Object[] resultRow = new Object[columnCount - 1]; for (int resultSetIndex = VALUES_OFFSET_INDEX; resultSetIndex <= columnCount; resultSetIndex++) { - int resultTypeIndex = resultSetIndex - VALUES_OFFSET_INDEX; - resultRow[resultTypeIndex] = resultTypes.get(resultTypeIndex).getFromResultSet(resultSet, resultSetIndex, this.resultSetProcessor); + final int resultTypeIndex = resultSetIndex - VALUES_OFFSET_INDEX; + resultRow[resultTypeIndex] = resultTypes.get(resultTypeIndex).getFromResultSet(resultSet, resultSetIndex, resultSetProcessor); } return new SqlEntityResult(id, resultRow); } + public Result fetch(Select query) { + log.debug("Executing query: \n{}", query); + try { + return dslContext.fetch(query); + } + catch (DataAccessException exception) { + throw new ConqueryError.SqlError(exception); + } + } + + /** + * Executes the query and returns the results as a Stream. + *

+ * Note: The returned Stream is resourceful. It must be closed by the caller, because it contains a reference to an open {@link ResultSet} + * and {@link PreparedStatement}. + * + * @param query The query to be executed. + * @return A Stream of query results. + */ + public Stream fetchStream(Select query) { + log.debug("Executing query: \n{}", query); + try { + return dslContext.fetchStream(query); + } + catch (DataAccessException exception) { + throw new ConqueryError.SqlError(exception); + } + } + } diff --git a/backend/src/main/java/com/bakdata/conquery/util/io/IdColumnUtil.java b/backend/src/main/java/com/bakdata/conquery/util/io/IdColumnUtil.java index 0e36472140..8c2832477c 100644 --- a/backend/src/main/java/com/bakdata/conquery/util/io/IdColumnUtil.java +++ b/backend/src/main/java/com/bakdata/conquery/util/io/IdColumnUtil.java @@ -12,12 +12,9 @@ import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.identifiable.mapping.AutoIncrementingPseudomizer; import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; -import com.bakdata.conquery.models.identifiable.mapping.EntityPrintId; import com.bakdata.conquery.models.identifiable.mapping.FullIdPrinter; import com.bakdata.conquery.models.identifiable.mapping.IdPrinter; import com.bakdata.conquery.models.worker.Namespace; -import com.bakdata.conquery.sql.conquery.SqlManagedQuery; -import com.bakdata.conquery.sql.execution.SqlEntityResult; import lombok.experimental.UtilityClass; @UtilityClass @@ -57,9 +54,6 @@ public static IdPrinter getIdPrinter(Subject owner, ManagedExecution execution, if (owner.isPermitted(execution.getDataset(), Ability.PRESERVE_ID)) { // todo(tm): The integration of ids in the sql connector needs to be properly managed - if (execution instanceof SqlManagedQuery) { - return entityResult -> EntityPrintId.from(((SqlEntityResult) entityResult).getId()); - } return new FullIdPrinter(namespace.getStorage().getIdMapping(), size, pos); } diff --git a/backend/src/test/java/com/bakdata/conquery/integration/sql/dialect/PostgreSqlIntegrationTests.java b/backend/src/test/java/com/bakdata/conquery/integration/sql/dialect/PostgreSqlIntegrationTests.java index 3053d6f9e1..614efe4ff7 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/sql/dialect/PostgreSqlIntegrationTests.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/sql/dialect/PostgreSqlIntegrationTests.java @@ -5,7 +5,6 @@ import java.util.stream.Stream; import com.bakdata.conquery.TestTags; -import com.bakdata.conquery.apiv1.query.ConceptQuery; import com.bakdata.conquery.integration.ConqueryIntegrationTests; import com.bakdata.conquery.integration.IntegrationTests; import com.bakdata.conquery.integration.json.SqlTestDataImporter; @@ -16,7 +15,6 @@ import com.bakdata.conquery.models.i18n.I18n; import com.bakdata.conquery.models.query.resultinfo.ResultInfo; import com.bakdata.conquery.sql.DslContextFactory; -import com.bakdata.conquery.sql.conquery.SqlManagedQuery; import com.bakdata.conquery.sql.conversion.dialect.PostgreSqlDialect; import com.bakdata.conquery.sql.conversion.model.SqlQuery; import com.bakdata.conquery.sql.conversion.supplier.DateNowSupplier; @@ -82,11 +80,11 @@ public void shouldThrowException() { // This can be removed as soon as we switch to a full integration test including the REST API I18n.init(); SqlExecutionService executionService = new SqlExecutionService(dslContext, ResultSetProcessorFactory.create(testSqlDialect)); - SqlManagedQuery validQuery = new SqlManagedQuery(new ConceptQuery(), null, null, null, toSqlQuery("SELECT 1")); + SqlQuery validQuery = toSqlQuery("SELECT 1"); Assertions.assertThatNoException().isThrownBy(() -> executionService.execute(validQuery)); // executing an empty query should throw an SQL error - SqlManagedQuery emptyQuery = new SqlManagedQuery(new ConceptQuery(), null, null, null, toSqlQuery("")); + SqlQuery emptyQuery = toSqlQuery(""); Assertions.assertThatThrownBy(() -> executionService.execute(emptyQuery)) .isInstanceOf(ConqueryError.SqlError.class) .hasMessageContaining("$org.postgresql.util.PSQLException"); diff --git a/frontend/src/js/button/QueryResultHistoryButton.tsx b/frontend/src/js/button/QueryResultHistoryButton.tsx index 27de64b222..ab0cce18a4 100644 --- a/frontend/src/js/button/QueryResultHistoryButton.tsx +++ b/frontend/src/js/button/QueryResultHistoryButton.tsx @@ -6,6 +6,8 @@ import { useDispatch, useSelector } from "react-redux"; import type { StateT } from "../app/reducers"; import { openHistory, useNewHistorySession } from "../entity-history/actions"; +import { ColumnDescription } from "../api/types"; +import { useGetAuthorizedUrl } from "../authorization/useAuthorizedUrl"; import IconButton from "./IconButton"; const SxIconButton = styled(IconButton)` @@ -13,17 +15,22 @@ const SxIconButton = styled(IconButton)` height: 35px; `; -interface PropsT { +export const QueryResultHistoryButton = ({ + url, + label, + columns, +}: { + columns: ColumnDescription[]; label: string; -} - -export const QueryResultHistoryButton = ({ label }: PropsT) => { + url: string; +}) => { const { t } = useTranslation(); const dispatch = useDispatch(); const isLoading = useSelector( (state) => state.entityHistory.isLoading, ); + const getAuthorizedUrl = useGetAuthorizedUrl(); const newHistorySession = useNewHistorySession(); return ( @@ -31,7 +38,7 @@ export const QueryResultHistoryButton = ({ label }: PropsT) => { icon={isLoading ? faSpinner : faListUl} frame onClick={async () => { - await newHistorySession(label); + await newHistorySession(getAuthorizedUrl(url), columns, label); dispatch(openHistory()); }} > diff --git a/frontend/src/js/entity-history/actions.ts b/frontend/src/js/entity-history/actions.ts index 6988b06e48..e3041d96dc 100644 --- a/frontend/src/js/entity-history/actions.ts +++ b/frontend/src/js/entity-history/actions.ts @@ -27,11 +27,9 @@ import { import { exists } from "../common/helpers/exists"; import { useDatasetId } from "../dataset/selectors"; import { loadCSV, parseCSVWithHeaderToObj } from "../file/csv"; -import { useLoadPreviewData } from "../preview/actions"; import { setMessage } from "../snack-message/actions"; import { SnackMessageType } from "../snack-message/reducer"; -import { Table } from "apache-arrow"; import { EntityEvent, EntityId } from "./reducer"; import { isDateColumn, isSourceColumn } from "./timeline/util"; @@ -102,29 +100,44 @@ export const loadHistoryData = createAsyncAction( export const PREFERRED_ID_KINDS = ["EGK", "PID"]; export const DEFAULT_ID_KIND = "EGK"; +function getPreferredIdColumns(columns: ColumnDescription[]) { + const findColumnIdxWithIdKind = (kind: string) => + columns.findIndex((col) => + col.semantics.some((s) => s.type === "ID" && s.kind === kind), + ); + + return PREFERRED_ID_KINDS.map((kind) => ({ + columnIdx: findColumnIdxWithIdKind(kind), + idKind: kind, + })); +} + +async function onLoadHistoryData(url: string, columns: ColumnDescription[]) { + try { + const result = await loadCSV(url); + return { + csv: result.data, + columns, + resultUrl: url, + }; + } catch (e) { + console.error(e); + return null; + } +} + // TODO: This starts a session with the current query results, // but there will be other ways of starting a history session // - from a dropped file with a list of entities // - from a previous query export function useNewHistorySession() { const dispatch = useDispatch(); - const loadPreviewData = useLoadPreviewData(); - const queryId = useSelector( - (state) => state.preview.lastQuery, - ); const { updateHistorySession } = useUpdateHistorySession(); - return async (label: string) => { - if (!queryId) { - dispatch(loadHistoryData.failure(new Error("Could not load query data"))); - return; - } - + return async (url: string, columns: ColumnDescription[], label: string) => { dispatch(loadHistoryData.request()); - const result = await loadPreviewData(queryId, { - noLoading: true, - }); + const result = await onLoadHistoryData(url, columns); if (!result) { dispatch( @@ -133,16 +146,24 @@ export function useNewHistorySession() { return; } - const entityIds = new Table(result.initialTableData.value) - .toArray() + const preferredIdColumns = getPreferredIdColumns(columns); + if (preferredIdColumns.length === 0) { + dispatch(loadHistoryData.failure(new Error("No valid ID columns found"))); + return; + } + + const entityIds = result.csv + .slice(1) .map((row) => { - for (const [k, v] of Object.entries(row)) { - if (PREFERRED_ID_KINDS.includes(v as string)) { + for (const col of preferredIdColumns) { + // some values might be empty, search for defined values + if (row[col.columnIdx]) { return { - id: v as string, - kind: k, + id: row[col.columnIdx], + kind: col.idKind, }; } + return null; } }) .filter(exists); diff --git a/frontend/src/js/query-runner/QueryResults.tsx b/frontend/src/js/query-runner/QueryResults.tsx index 0edc48da9b..2d027d7fc5 100644 --- a/frontend/src/js/query-runner/QueryResults.tsx +++ b/frontend/src/js/query-runner/QueryResults.tsx @@ -12,6 +12,7 @@ import { isEmpty } from "../common/helpers/commonHelper"; import FaIcon from "../icon/FaIcon"; import { canViewEntityPreview, canViewQueryPreview } from "../user/selectors"; +import { exists } from "../common/helpers/exists"; import DownloadResultsDropdownButton from "./DownloadResultsDropdownButton"; const Root = styled("div")` @@ -48,9 +49,11 @@ const QueryResults: FC = ({ resultLabel, resultUrls, resultCount, + resultColumns, queryType, }) => { const { t } = useTranslation(); + const csvUrl = resultUrls.find(({ url }) => url.endsWith("csv")); const canViewHistory = useSelector(canViewEntityPreview); const canViewPreview = useSelector(canViewQueryPreview); @@ -70,7 +73,13 @@ const QueryResults: FC = ({ )} {canViewPreview && } - {canViewHistory && } + {!!csvUrl && canViewHistory && exists(resultColumns) && ( + + )} {resultUrls.length > 0 && ( )}