Skip to content

Commit

Permalink
Merge branch 'develop' into fix/action-reaction-multiple-messages-han…
Browse files Browse the repository at this point in the history
…dling
  • Loading branch information
awildturtok authored Mar 21, 2024
2 parents 989f9e7 + 82e6829 commit 6f053f2
Show file tree
Hide file tree
Showing 23 changed files with 448 additions and 545 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,6 @@ public String getLocalizedTypeLabel() {

@Override
public ManagedInternalForm<FullExportForm> toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) {
return new ManagedInternalForm<FullExportForm>(this, user, submittedDataset, storage);
return new ManagedInternalForm<>(this, user, submittedDataset, storage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import com.bakdata.conquery.apiv1.execution.ExecutionStatus;
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.query.ManagedQuery;
import com.bakdata.conquery.sql.conquery.SqlManagedQuery;

/**
* Common abstraction for intersecting parts of {@link ManagedQuery} and {@link SqlManagedQuery}.
* Common abstraction for intersecting parts of {@link ManagedQuery}.
*/
public interface EditorQuery {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,4 @@ public interface InternalExecution<R extends ShardResult> {
*/
WorkerMessage createExecutionMessage();

/**
* The callback for the results the shard nodes return.
* Is called once per shard node
*/
void addResult(R result);

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,12 @@ public abstract class ManagedExecution extends IdentifiableImpl<ManagedExecution
@JsonIgnore
@EqualsAndHashCode.Exclude
private transient ExecutionState state = ExecutionState.NEW;

//TODO FK: This is only locked/unlocked, there should be better primitives for that.
@JsonIgnore
@EqualsAndHashCode.Exclude
private transient CountDownLatch execution;
private transient CountDownLatch executingLock;

@JsonIgnore
@EqualsAndHashCode.Exclude
private transient LocalDateTime startTime;
Expand Down Expand Up @@ -194,7 +197,7 @@ public ManagedExecutionId createId() {
/**
* Fails the execution and log the occurred error.
*/
protected void fail(ConqueryErrorInfo error) {
public void fail(ConqueryErrorInfo error) {
if (this.error != null && !this.error.equalsRegardingCodeAndMessage(error)) {
// Warn only again if the error is different (failed might by called per collected result)
log.warn("The execution [{}] failed again with:\n\t{}\n\tThe previous error was: {}", getId(), this.error, error);
Expand All @@ -216,41 +219,44 @@ public void start() {
startTime = LocalDateTime.now();

setState(ExecutionState.RUNNING);
namespace.getExecutionManager().clearQueryResults(this);

execution = new CountDownLatch(1);
resetLock();
}
}

protected void finish(ExecutionState executionState) {
private void resetLock() {
executingLock = new CountDownLatch(1);
}

private void clearLock() {
executingLock.countDown();
}

public void finish(ExecutionState executionState) {
if (getState() == ExecutionState.NEW) {
log.error("Query[{}] was never run.", getId(), new Exception());
}

synchronized (this) {
finishTime = LocalDateTime.now();
progress = null;

// Set execution state before acting on the latch to prevent a race condition
// Not sure if also the storage needs an update first
setState(executionState);
execution.countDown();
clearLock();

// No need to persist failed queries. (As they are most likely invalid)
if (getState() == ExecutionState.DONE) {
getStorage().updateExecution(this);
}
}


log.info(
"{} {} {} within {}",
getState(),
queryId,
this.getClass().getSimpleName(),
getExecutionTime()
);
log.info("{} {} {} within {}", getState(), queryId, getClass().getSimpleName(), getExecutionTime());
}



@JsonIgnore
public Duration getExecutionTime() {
return (startTime != null && finishTime != null) ? Duration.between(startTime, finishTime) : null;
Expand All @@ -263,7 +269,7 @@ public ExecutionState awaitDone(int time, TimeUnit unit) {
if (getState() != ExecutionState.RUNNING) {
return getState();
}
Uninterruptibles.awaitUninterruptibly(execution, time, unit);
Uninterruptibles.awaitUninterruptibly(executingLock, time, unit);

return getState();
}
Expand Down Expand Up @@ -338,7 +344,7 @@ private void setAvailableSecondaryIds(FullExecutionStatus status) {

private void setAdditionalFieldsForStatusWithGroups(FullExecutionStatus status) {
/* Calculate which groups can see this query.
* This usually is usually not done very often and should be reasonable fast, so don't cache this.
* This is usually not done very often and should be reasonable fast, so don't cache this.
*/
List<GroupId> permittedGroups = new ArrayList<>();
for (Group group : storage.getAllGroups()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus sta

@Override
public void cancel() {
//TODO this is no longer called as the ExecutionManager used to call this.
Preconditions.checkNotNull(externalTaskId, "Cannot check external task, because no Id is present");

updateStatus(api.cancelTask(externalTaskId));
Expand All @@ -189,7 +190,7 @@ public Response fetchExternalResult(String assetId) {
}

@Override
protected void finish(ExecutionState executionState) {
public void finish(ExecutionState executionState) {
if (getState().equals(executionState)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public FormQueryPlan(List<DateContext> dateContexts, ArrayConceptQueryPlan featu

if (dateContexts.size() <= 0) {
// There is nothing to do for this FormQueryPlan, but we will return an empty result when its executed
log.warn("dateContexts are empty. Will not produce a result.");
log.trace("dateContexts are empty. Will not produce a result.");
constantCount = 3;
withRelativeEventDate = false;
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* Execution type for simple forms, that are completely executed within Conquery and produce a single table as result.
Expand Down Expand Up @@ -73,6 +74,11 @@ public ManagedInternalForm(F form, User user, Dataset submittedDataset, MetaStor
super(form, user, submittedDataset, storage);
}

@Nullable
public ManagedQuery getSubQuery(ManagedExecutionId subQueryId) {
return flatSubQueries.get(subQueryId);
}

@Override
public void doInitExecutable() {
// Convert sub queries to sub executions
Expand All @@ -88,7 +94,7 @@ private Map<String, ManagedQuery> createSubExecutions() {
return getSubmitted().createSubQueries()
.entrySet()
.stream().collect(Collectors.toMap(
e -> e.getKey(),
Map.Entry::getKey,
e -> e.getValue().toManagedExecution(getOwner(), getDataset(), getStorage())

));
Expand Down Expand Up @@ -167,44 +173,8 @@ public WorkerMessage createExecutionMessage() {
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getQuery())));
}

/**
* Distribute the result to a sub query.
*/
@Override
public void addResult(FormShardResult result) {
if (result.getError().isPresent()) {
fail(result.getError().get());
return;
}

ManagedExecutionId subQueryId = result.getSubQueryId();

ManagedQuery subQuery = flatSubQueries.get(subQueryId);
subQuery.addResult(result);

switch (subQuery.getState()) {
case DONE -> {
if (allSubQueriesDone()) {
finish(ExecutionState.DONE);
}
}
// Fail the whole execution if a subquery fails
case FAILED -> {
fail(
result.getError().orElseThrow(
() -> new IllegalStateException(String.format("Query [%s] failed but no error was set.", getId()))
)
);
}

default -> {
}
}

}


private boolean allSubQueriesDone() {
public boolean allSubQueriesDone() {
synchronized (this) {
return flatSubQueries.values().stream().allMatch(q -> q.getState().equals(ExecutionState.DONE));
}
Expand Down

This file was deleted.

Loading

0 comments on commit 6f053f2

Please sign in to comment.