Skip to content

Commit

Permalink
force synchronizing of finish and ManagedQuery.resultRowCount().
Browse files Browse the repository at this point in the history
  • Loading branch information
awildturtok committed Sep 26, 2024
1 parent dd4874c commit d1347f2
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.core.UriBuilder;

import com.bakdata.conquery.apiv1.execution.ExecutionStatus;
import com.bakdata.conquery.apiv1.execution.FullExecutionStatus;
Expand Down Expand Up @@ -52,6 +50,8 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.OptBoolean;
import com.google.common.base.Preconditions;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.core.UriBuilder;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -144,6 +144,21 @@ public ManagedExecution(@NonNull User owner, @NonNull Dataset dataset, MetaStora
this.datasetRegistry = datasetRegistry;
}

private static boolean canSubjectExpand(Subject subject, QueryDescription query) {
NamespacedIdentifiableCollector namespacesIdCollector = new NamespacedIdentifiableCollector();
query.visit(namespacesIdCollector);

final Set<Concept<?>> concepts = namespacesIdCollector.getIdentifiables()
.stream()
.filter(ConceptElement.class::isInstance)
.map(ConceptElement.class::cast)
.<Concept<?>>map(ConceptElement::getConcept)
.collect(Collectors.toSet());

boolean canExpand = subject.isPermittedAll(concepts, Ability.READ);
return canExpand;
}

/**
* Executed right before execution submission.
*/
Expand All @@ -169,8 +184,41 @@ public final void initExecutable(ConqueryConfig config) {
}
}

protected String makeAutoLabel(PrintSettings cfg) {
return makeDefaultLabel(cfg) + AUTO_LABEL_SUFFIX;
}

@JsonIgnore
public Namespace getNamespace() {
return datasetRegistry.get(getDataset().getId());
}

protected abstract void doInitExecutable();

private static boolean containsDates(QueryDescription query) {
return Visitable.stream(query)
.anyMatch(visitable -> {

if (visitable instanceof CQConcept cqConcept) {
return !cqConcept.isExcludeFromTimeAggregation();
}

if (visitable instanceof CQExternal external) {
return external.containsDates();
}

return false;
});
}

/**
* Returns the {@link QueryDescription} that caused this {@link ManagedExecution}.
*/
@JsonIgnore
public abstract QueryDescription getSubmitted();

@JsonIgnore
protected abstract String makeDefaultLabel(PrintSettings cfg);

@Override
public ManagedExecutionId createId() {
Expand All @@ -197,49 +245,57 @@ public void fail(ConqueryErrorInfo error) {
finish(ExecutionState.FAILED);
}

public void start() {
synchronized (this) {
Preconditions.checkArgument(isInitialized(), "The execution must have been initialized first");
public synchronized void finish(ExecutionState executionState) {

if (getExecutionManager().isResultPresent(getId())) {
Preconditions.checkArgument(getExecutionManager().getResult(getId()).getState() != ExecutionState.RUNNING);
}
finishTime = LocalDateTime.now();
progress = null;

startTime = LocalDateTime.now();
// Set execution state before acting on the latch to prevent a race condition
// Not sure if also the storage needs an update first
getMetaStorage().updateExecution(this);

getMetaStorage().updateExecution(this);
}
getExecutionManager().updateState(getId(), executionState);

// Signal to waiting threads that the execution finished
getExecutionManager().clearBarrier(getId());

log.info("{} {} {} within {}", executionState, getId(), getClass().getSimpleName(), getExecutionTime());
}

public void finish(ExecutionState executionState) {
@JsonIgnore
protected ExecutionManager getExecutionManager() {
return getNamespace().getExecutionManager();
}

@JsonIgnore
public Duration getExecutionTime() {
return (startTime != null && finishTime != null) ? Duration.between(startTime, finishTime) : null;
}

public void start() {
synchronized (this) {
finishTime = LocalDateTime.now();
progress = null;

// Set execution state before acting on the latch to prevent a race condition
// Not sure if also the storage needs an update first
getMetaStorage().updateExecution(this);
Preconditions.checkArgument(isInitialized(), "The execution must have been initialized first");

getExecutionManager().updateState(getId(), executionState);
if (getExecutionManager().isResultPresent(getId())) {
Preconditions.checkArgument(getExecutionManager().getResult(getId()).getState() != ExecutionState.RUNNING);
}

// Signal to waiting threads that the execution finished
getExecutionManager().clearBarrier(getId());
startTime = LocalDateTime.now();

getMetaStorage().updateExecution(this);
}

log.info("{} {} {} within {}", executionState, getId(), getClass().getSimpleName(), getExecutionTime());
}

/**
* Renders a lightweight status with meta information about this query. Computation an size should be small for this.
*/
public OverviewExecutionStatus buildStatusOverview(UriBuilder url, Subject subject) {
OverviewExecutionStatus status = new OverviewExecutionStatus();
setStatusBase(subject, status);


@JsonIgnore
public Duration getExecutionTime() {
return (startTime != null && finishTime != null) ? Duration.between(startTime, finishTime) : null;
return status;
}


public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status) {
status.setLabel(label == null ? queryId.toString() : getLabelWithoutAutoLabelSuffix());
status.setPristineLabel(label == null || queryId.toString().equals(label) || isAutoLabeled());
Expand All @@ -261,14 +317,26 @@ public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus sta
}
}

/**
* Renders a lightweight status with meta information about this query. Computation an size should be small for this.
*/
public OverviewExecutionStatus buildStatusOverview(UriBuilder url, Subject subject) {
OverviewExecutionStatus status = new OverviewExecutionStatus();
setStatusBase(subject, status);
@JsonIgnore
public String getLabelWithoutAutoLabelSuffix() {
final int idx;
if (label != null && (idx = label.lastIndexOf(AUTO_LABEL_SUFFIX)) != -1) {
return label.substring(0, idx);
}
return label;
}

return status;
@JsonIgnore
public boolean isAutoLabeled() {
return label != null && label.endsWith(AUTO_LABEL_SUFFIX);
}

public ExecutionState getState() {
if (!getExecutionManager().isResultPresent(getId())) {
return ExecutionState.NEW;
}

return getExecutionManager().getResult(getId()).getState();
}

/**
Expand Down Expand Up @@ -339,82 +407,18 @@ protected void setAdditionalFieldsForStatusWithSource(Subject subject, FullExecu
status.setQuery(canSubjectExpand(subject, query) ? getSubmitted() : null);
}

private static boolean containsDates(QueryDescription query) {
return Visitable.stream(query)
.anyMatch(visitable -> {

if (visitable instanceof CQConcept cqConcept) {
return !cqConcept.isExcludeFromTimeAggregation();
}

if (visitable instanceof CQExternal external) {
return external.containsDates();
}

return false;
});
}

private static boolean canSubjectExpand(Subject subject, QueryDescription query) {
NamespacedIdentifiableCollector namespacesIdCollector = new NamespacedIdentifiableCollector();
query.visit(namespacesIdCollector);

final Set<Concept<?>> concepts = namespacesIdCollector.getIdentifiables()
.stream()
.filter(ConceptElement.class::isInstance)
.map(ConceptElement.class::cast)
.<Concept<?>>map(ConceptElement::getConcept)
.collect(Collectors.toSet());

boolean canExpand = subject.isPermittedAll(concepts, Ability.READ);
return canExpand;
}

public ExecutionState getState() {
if (!getExecutionManager().isResultPresent(getId())) {
return ExecutionState.NEW;
}

return getExecutionManager().getResult(getId()).getState();
}

@JsonIgnore
public boolean isReadyToDownload() {
return getState() == ExecutionState.DONE;
}

/**
* Returns the {@link QueryDescription} that caused this {@link ManagedExecution}.
*/
@JsonIgnore
public abstract QueryDescription getSubmitted();

@JsonIgnore
public String getLabelWithoutAutoLabelSuffix() {
final int idx;
if (label != null && (idx = label.lastIndexOf(AUTO_LABEL_SUFFIX)) != -1) {
return label.substring(0, idx);
}
return label;
}

@JsonIgnore
public boolean isAutoLabeled() {
return label != null && label.endsWith(AUTO_LABEL_SUFFIX);
}

@JsonIgnore
protected abstract String makeDefaultLabel(PrintSettings cfg);

protected String makeAutoLabel(PrintSettings cfg) {
return makeDefaultLabel(cfg) + AUTO_LABEL_SUFFIX;
}

@Override
public ConqueryPermission createPermission(Set<Ability> abilities) {
return ExecutionPermission.onInstance(abilities, getId());
}

//// Shortcut helper methods

public void reset() {
// This avoids endless loops with already reset queries
if (getState().equals(ExecutionState.NEW)) {
Expand All @@ -425,16 +429,4 @@ public void reset() {
}

public abstract void cancel();

//// Shortcut helper methods

@JsonIgnore
public Namespace getNamespace() {
return datasetRegistry.get(getDataset().getId());
}

@JsonIgnore
protected ExecutionManager getExecutionManager() {
return getNamespace().getExecutionManager();
}
}
Loading

0 comments on commit d1347f2

Please sign in to comment.