Skip to content

Commit

Permalink
Implement absolute form conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
jnsrnhld committed Apr 8, 2024
1 parent be7e405 commit 941c5f4
Show file tree
Hide file tree
Showing 73 changed files with 2,393 additions and 381 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.bakdata.conquery.apiv1.execution.ResultAsset;
import com.bakdata.conquery.apiv1.query.CQElement;
import com.bakdata.conquery.apiv1.query.ConceptQuery;
import com.bakdata.conquery.apiv1.query.EditorQuery;
import com.bakdata.conquery.apiv1.query.ExternalUpload;
import com.bakdata.conquery.apiv1.query.ExternalUploadResult;
import com.bakdata.conquery.apiv1.query.Query;
Expand Down Expand Up @@ -137,11 +136,11 @@ public Stream<ExecutionStatus> getQueriesFiltered(Dataset datasetId, UriBuilder
*/
private static boolean canFrontendRender(ManagedExecution q) {
//TODO FK: should this be used to fill into canExpand instead of hiding the Executions?
if (!(q instanceof EditorQuery)) {
if (!(q instanceof ManagedQuery)) {
return false;
}

final Query query = ((EditorQuery) q).getQuery();
final Query query = ((ManagedQuery) q).getQuery();

if (query instanceof ConceptQuery) {
return isFrontendStructure(((ConceptQuery) query).getRoot());
Expand Down Expand Up @@ -291,14 +290,15 @@ public FullExecutionStatus getQueryFullStatus(ManagedExecution query, Subject su
public ExternalUploadResult uploadEntities(Subject subject, Dataset dataset, ExternalUpload upload) {

final Namespace namespace = datasetRegistry.get(dataset.getId());
final CQExternal.ResolveStatistic
statistic =
CQExternal.resolveEntities(upload.getValues(), upload.getFormat(), namespace
.getStorage()
.getIdMapping(), config.getIdColumns(), config.getLocale()
.getDateReader(), upload.isOneRowPerEntity()

);
final CQExternal.ResolveStatistic statistic = CQExternal.resolveEntities(
upload.getValues(),
upload.getFormat(),
namespace.getStorage().getIdMapping(),
config.getIdColumns(),
config.getLocale().getDateReader(),
upload.isOneRowPerEntity(),
true
);

// Resolving nothing is a problem thus we fail.
if (statistic.getResolved().isEmpty()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import javax.annotation.Nullable;

import com.bakdata.conquery.apiv1.query.CQElement;
import com.bakdata.conquery.apiv1.query.EditorQuery;
import com.bakdata.conquery.apiv1.query.Query;
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.models.error.ConqueryError;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.query.ManagedQuery;
import com.bakdata.conquery.models.query.QueryExecutionContext;
import com.bakdata.conquery.models.query.QueryPlanContext;
import com.bakdata.conquery.models.query.QueryResolveContext;
Expand Down Expand Up @@ -48,7 +48,7 @@ public CQReusedQuery(ManagedExecutionId executionId){
private ManagedExecutionId queryId;

@JsonIgnore
private EditorQuery query;
private ManagedQuery query;

@JsonView(View.InternalCommunication.class)
private Query resolvedQuery;
Expand All @@ -74,7 +74,7 @@ public QPNode createQueryPlan(QueryPlanContext context, ConceptQueryPlan plan) {

@Override
public void resolve(QueryResolveContext context) {
query = (EditorQuery) context.getStorage().getExecution(queryId);
query = (ManagedQuery) context.getStorage().getExecution(queryId);
if (query == null) {
throw new ConqueryError.ExecutionCreationResolveError(queryId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class CQExternal extends CQElement {
/**
* Maps from Entity to the computed time-frame.
*/
@Getter(AccessLevel.PRIVATE)
@Getter
@JsonView(View.InternalCommunication.class)
private Map<String, CDateSet> valuesResolved;

Expand Down Expand Up @@ -249,7 +249,8 @@ public void resolve(QueryResolveContext context) {
context.getNamespace().getStorage().getIdMapping(),
context.getConfig().getIdColumns(),
context.getConfig().getLocale().getDateReader(),
onlySingles
onlySingles,
context.getConfig().getSqlConnectorConfig().isEnabled()
);

if (resolved.getResolved().isEmpty()) {
Expand Down Expand Up @@ -296,7 +297,7 @@ public static class ResolveStatistic {
/**
* Helper method to try and resolve entities in values using the specified format.
*/
public static ResolveStatistic resolveEntities(@NotEmpty String[][] values, @NotEmpty List<String> format, EntityIdMap mapping, IdColumnConfig idColumnConfig, @NotNull DateReader dateReader, boolean onlySingles) {
public static ResolveStatistic resolveEntities(@NotEmpty String[][] values, @NotEmpty List<String> format, EntityIdMap mapping, IdColumnConfig idColumnConfig, @NotNull DateReader dateReader, boolean onlySingles, boolean isInSqlMode) {
final Map<String, CDateSet> resolved = new HashMap<>();

final List<String[]> unresolvedDate = new ArrayList<>();
Expand Down Expand Up @@ -329,7 +330,9 @@ public static ResolveStatistic resolveEntities(@NotEmpty String[][] values, @Not
continue;
}

String resolvedId = tryResolveId(row, readers, mapping);
String resolvedId = isInSqlMode
? String.valueOf(row[0])
: tryResolveId(row, readers, mapping);

if (resolvedId == null) {
unresolvedId.add(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public SqlSelects convertToSqlSelects(SelectContext selectContext) {
}

@JsonIgnore
public boolean requiresIntervalPacking() {
public boolean isEventDateSelect() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public SqlSelects convertToSqlSelects(SelectContext selectContext) {
}

@Override
public boolean requiresIntervalPacking() {
public boolean isEventDateSelect() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public SqlSelects convertToSqlSelects(SelectContext selectContext) {
}

@Override
public boolean requiresIntervalPacking() {
public boolean isEventDateSelect() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ protected void setAdditionalFieldsForStatusWithColumnDescription(Subject subject
@Override
public void cancel() {
log.debug("Sending cancel message to all workers.");
getNamespace().getWorkerHandler().sendToAll(new CancelQuery(getId()));
((DistributedNamespace) getNamespace()).getWorkerHandler().sendToAll(new CancelQuery(getId()));
}

@Override
Expand Down Expand Up @@ -180,7 +180,4 @@ public boolean allSubQueriesDone() {
}
}

public DistributedNamespace getNamespace() {
return (DistributedNamespace) super.getNamespace();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

import com.bakdata.conquery.apiv1.execution.ExecutionStatus;
import com.bakdata.conquery.apiv1.execution.FullExecutionStatus;
import com.bakdata.conquery.apiv1.query.EditorQuery;
import com.bakdata.conquery.apiv1.query.Query;
import com.bakdata.conquery.apiv1.query.QueryDescription;
import com.bakdata.conquery.apiv1.query.SecondaryIdQuery;
import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept;
import com.bakdata.conquery.apiv1.query.concept.specific.CQReusedQuery;
import com.bakdata.conquery.apiv1.query.concept.specific.external.CQExternal;
Expand Down Expand Up @@ -42,7 +42,7 @@
@ToString(callSuper = true)
@Slf4j
@CPSType(base = ManagedExecution.class, id = "MANAGED_QUERY")
public class ManagedQuery extends ManagedExecution implements EditorQuery, SingleTableResult, InternalExecution<ShardResult> {
public class ManagedQuery extends ManagedExecution implements SingleTableResult, InternalExecution<ShardResult> {

// Needs to be resolved externally before being executed
private Query query;
Expand Down Expand Up @@ -100,12 +100,18 @@ public long resultRowCount() {
return lastResultCount;
}



@Override
public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status) {

super.setStatusBase(subject, status);
enrichStatusBase(status);
status.setNumberOfResults(getLastResultCount());

Query query1 = getQuery();
status.setQueryType(query1.getClass().getAnnotation(CPSType.class).id());

if (query1 instanceof SecondaryIdQuery) {
status.setSecondaryId(((SecondaryIdQuery) query1).getSecondaryId().getId());
}
}

protected void setAdditionalFieldsForStatusWithColumnDescription(Subject subject, FullExecutionStatus status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,21 @@ public enum SharedAliases {

PRIMARY_COLUMN("primary_id"),
SECONDARY_ID("secondary_id"),
DATES_COLUMN("dates");
DATES_COLUMN("dates"),
DATE_RESTRICTION("date_restriction"),

NOP_TABLE("nop_table"),

// form related
RESOLUTION("resolution"),
INDEX("index"),
STRATIFICATION_RANGE("stratification_range"),
DATE_START("date_start"),
DATE_END("date_end"),
DATE_SERIES("date_series"),
INDEX_DATE("index_date"),
INDEX_START_POSITIVE("index_start_positive"),
INDEX_START_NEGATIVE("index_start_negative");

private final String alias;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.bakdata.conquery.sql.conversion.cqelement;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import com.bakdata.conquery.apiv1.query.concept.specific.external.CQExternal;
import com.bakdata.conquery.models.common.CDateSet;
import com.bakdata.conquery.sql.conversion.NodeConverter;
import com.bakdata.conquery.sql.conversion.SharedAliases;
import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider;
import com.bakdata.conquery.sql.conversion.model.ColumnDateRange;
import com.bakdata.conquery.sql.conversion.model.QueryStep;
import com.bakdata.conquery.sql.conversion.model.Selects;
import com.bakdata.conquery.sql.conversion.model.SqlIdColumns;
import com.google.common.base.Preconditions;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Table;
import org.jooq.impl.DSL;

public class CQExternalConverter implements NodeConverter<CQExternal> {

private static final String CQ_EXTERNAL_CTE_NAME = "external";

@Override
public Class<? extends CQExternal> getConversionClass() {
return CQExternal.class;
}

@Override
public ConversionContext convert(CQExternal external, ConversionContext context) {

SqlFunctionProvider functionProvider = context.getSqlDialect().getFunctionProvider();
List<QueryStep> unions = external.getValuesResolved()
.entrySet().stream()
.flatMap(entry -> createRowSelects(entry, functionProvider).stream())
.toList();

Preconditions.checkArgument(!unions.isEmpty(), "Expecting at least 1 converted resolved row when converting a CQExternal");
QueryStep externalStep = QueryStep.createUnionStep(unions, CQ_EXTERNAL_CTE_NAME, Collections.emptyList());
return context.withQueryStep(externalStep);
}

/**
* For each entry, we need to create a SELECT statement of static values for each pid -> date set. For dialects that support date multiranges, 1 row per ID
* is sufficient. For other dialects there can be multiple rows with the same pid -> date range from the date set.
*/
private static List<QueryStep> createRowSelects(Map.Entry<String, CDateSet> entry, SqlFunctionProvider functionProvider) {

Field<Object> primaryColumn = DSL.field(DSL.val(entry.getKey())).coerce(Object.class).as(SharedAliases.PRIMARY_COLUMN.getAlias());
SqlIdColumns ids = new SqlIdColumns(primaryColumn);

List<ColumnDateRange> validityDateEntries = functionProvider.forCDateSet(entry.getValue(), SharedAliases.DATES_COLUMN);
return validityDateEntries.stream()
.map(validityDateEntry -> createRowSelect(ids, validityDateEntry, functionProvider))
.collect(Collectors.toList());
}

/**
* Creates a SELECT statement of static values for each pid -> date entry, like
* <pre>{@code select 1 as "pid", '[2021-01-01,2022-01-01)'::daterange as "date_range"}</pre>
*/
private static QueryStep createRowSelect(SqlIdColumns ids, ColumnDateRange validityDate, SqlFunctionProvider functionProvider) {

Selects selects = Selects.builder()
.ids(ids)
.validityDate(Optional.ofNullable(validityDate))
.build();

// not all SQL dialects can create a SELECT statement without a FROM clause,
// so we ensure there is some no-op table to select the static values from
Table<? extends Record> table = functionProvider.getNoOpTable();

return QueryStep.builder()
.selects(selects)
.fromTable(table)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class ConversionContext implements Context {
@Nullable
SqlQuery finalQuery;

@Nullable
QueryStep stratificationTable;

/**
* An optional date restriction range. Is set when converting a {@link CQDateRestriction}.
*/
Expand All @@ -59,6 +62,10 @@ public boolean dateRestrictionActive() {
return this.dateRestrictionRange != null;
}

public boolean isWithStratification() {
return this.stratificationTable != null;
}

/**
* Adds a query step to the list of {@link QueryStep} of this context.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public QueryStep convert(DateAggregationContext context, QueryStep previous) {
builder = builder.cteName(dateAggregationTables.cteName(cteStep))
.predecessors(List.of(previous));
}
if (cteStep != DateAggregationCteStep.INVERT) {
if (cteStep != DateAggregationCteStep.INVERT && cteStep != DateAggregationCteStep.NODE_NO_OVERLAP) {
builder = builder.fromTable(QueryStep.toTableLike(dateAggregationTables.getPredecessor(cteStep)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected QueryStep.QueryStepBuilder convertStep(DateAggregationContext context)
SqlIdColumns ids = context.getIds();
SqlIdColumns leftIds = ids.qualify(ROWS_LEFT_TABLE_NAME);
SqlIdColumns rightIds = ids.qualify(ROWS_RIGHT_TABLE_NAME);
SqlIdColumns coalescedIds = SqlIdColumns.coalesce(List.of(leftIds, rightIds));
SqlIdColumns coalescedIds = leftIds.coalesce(List.of(rightIds));

Selects invertSelects = getInvertSelects(rowNumberStep, coalescedIds, context);
TableOnConditionStep<Record> fromTable = selfJoinWithShiftedRows(leftIds, rightIds, rowNumberStep);
Expand Down Expand Up @@ -74,15 +74,15 @@ private Selects getInvertSelects(QueryStep rowNumberStep, SqlIdColumns coalesced
.build();
}

private TableOnConditionStep<Record> selfJoinWithShiftedRows(SqlIdColumns leftPrimaryColumn, SqlIdColumns rightPrimaryColumn, QueryStep rowNumberStep) {
private TableOnConditionStep<Record> selfJoinWithShiftedRows(SqlIdColumns leftIds, SqlIdColumns rightIds, QueryStep rowNumberStep) {

Field<Integer> leftRowNumber = DSL.field(DSL.name(ROWS_LEFT_TABLE_NAME, RowNumberCte.ROW_NUMBER_FIELD_NAME), Integer.class)
.plus(1);
Field<Integer> rightRowNumber = DSL.field(DSL.name(ROWS_RIGHT_TABLE_NAME, RowNumberCte.ROW_NUMBER_FIELD_NAME), Integer.class);

Condition[] joinConditions = Stream.concat(
Stream.of(leftRowNumber.eq(rightRowNumber)),
SqlIdColumns.join(leftPrimaryColumn, rightPrimaryColumn).stream()
leftIds.join(rightIds).stream()
)
.toArray(Condition[]::new);

Expand Down
Loading

0 comments on commit 941c5f4

Please sign in to comment.