Skip to content

Commit

Permalink
Merge branch 'develop' into sql/feature/flag-select-and-filter-sql-co…
Browse files Browse the repository at this point in the history
…nversion
  • Loading branch information
jnsrnhld committed Jan 19, 2024
2 parents 66727c9 + cdf17ef commit e33f48b
Show file tree
Hide file tree
Showing 105 changed files with 1,285 additions and 430 deletions.
2 changes: 1 addition & 1 deletion backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-core</artifactId>
<version>1.10.1</version>
<version>1.13.0</version>
<exclusions>
<!-- All these transitive deps are already bundled in shiro core-->
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.bakdata.conquery.sql.conversion.cqelement.concept.ConceptCteStep;
import com.bakdata.conquery.sql.conversion.cqelement.concept.FilterContext;
import com.bakdata.conquery.sql.conversion.model.filter.SqlFilters;
import com.bakdata.conquery.sql.conversion.model.select.SumDistinctSqlAggregator;
import com.bakdata.conquery.sql.conversion.model.select.SumSqlAggregator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
Expand Down Expand Up @@ -109,11 +110,17 @@ public FilterNode createFilterNode(RANGE value) {

@Override
public SqlFilters convertToSqlFilter(FilterContext<RANGE> filterContext) {
if (distinctByColumn != null && !distinctByColumn.isEmpty()) {
return SumDistinctSqlAggregator.create(this, filterContext).getSqlFilters();
}
return SumSqlAggregator.create(this, filterContext).getSqlFilters();
}

@Override
public Set<ConceptCteStep> getRequiredSqlSteps() {
if (distinctByColumn != null && !distinctByColumn.isEmpty()) {
return ConceptCteStep.withOptionalSteps(ConceptCteStep.JOIN_PREDECESSORS, ConceptCteStep.AGGREGATION_FILTER);
}
return ConceptCteStep.withOptionalSteps(ConceptCteStep.AGGREGATION_FILTER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;

import javax.validation.constraints.NotNull;

Expand All @@ -24,8 +25,10 @@
import com.bakdata.conquery.models.query.queryplan.aggregators.specific.sum.IntegerSumAggregator;
import com.bakdata.conquery.models.query.queryplan.aggregators.specific.sum.MoneySumAggregator;
import com.bakdata.conquery.models.query.queryplan.aggregators.specific.sum.RealSumAggregator;
import com.bakdata.conquery.sql.conversion.cqelement.concept.ConceptCteStep;
import com.bakdata.conquery.sql.conversion.cqelement.concept.SelectContext;
import com.bakdata.conquery.sql.conversion.model.select.SqlSelects;
import com.bakdata.conquery.sql.conversion.model.select.SumDistinctSqlAggregator;
import com.bakdata.conquery.sql.conversion.model.select.SumSqlAggregator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
Expand Down Expand Up @@ -127,7 +130,18 @@ public boolean isColumnsOfSameType() {

@Override
public SqlSelects convertToSqlSelects(SelectContext selectContext) {
if (distinctByColumn != null && !distinctByColumn.isEmpty()) {
return SumDistinctSqlAggregator.create(this, selectContext).getSqlSelects();
}
return SumSqlAggregator.create(this, selectContext).getSqlSelects();
}

@Override
public Set<ConceptCteStep> getRequiredSqlSteps() {
if (distinctByColumn != null && !distinctByColumn.isEmpty()) {
return ConceptCteStep.withOptionalSteps(ConceptCteStep.JOIN_PREDECESSORS);
}
return ConceptCteStep.MANDATORY_STEPS;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.bakdata.conquery.sql.conversion.cqelement.ConversionContext;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
import com.bakdata.conquery.sql.conversion.model.NameGenerator;
import org.jooq.impl.DSL;

/**
* Entry point for converting {@link QueryDescription} to an SQL query.
Expand All @@ -27,6 +28,7 @@ public ConversionContext convert(QueryDescription queryDescription) {
.nameGenerator(new NameGenerator(config.getDialect().getNameMaxLength()))
.nodeConversions(this)
.sqlDialect(this.dialect)
.primaryColumn(DSL.field(DSL.name(config.getPrimaryColumn())))
.build();
return convert(queryDescription, initialCtx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import lombok.Singular;
import lombok.Value;
import lombok.With;
import org.jooq.Field;

@Value
@With
Expand All @@ -27,6 +28,7 @@ public class ConversionContext implements Context {
@Singular
List<QueryStep> querySteps;
SqlQuery finalQuery;
Field<Object> primaryColumn;
CDateRange dateRestrictionRange;
boolean negation;
boolean isGroupBy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.bakdata.conquery.sql.conversion.model.select.SqlSelect;
import com.bakdata.conquery.sql.conversion.model.select.SqlSelects;
import org.jooq.Condition;
import org.jooq.impl.DSL;

public class CQConceptConverter implements NodeConverter<CQConcept> {

Expand All @@ -41,6 +40,7 @@ public CQConceptConverter(SqlFunctionProvider functionProvider) {
new PreprocessingCte(),
new EventFilterCte(),
new AggregationSelectCte(),
new JoinPredecessorsCte(),
new AggregationFilterCte(),
new FinalConceptCte()
);
Expand Down Expand Up @@ -103,7 +103,6 @@ private ConceptCteContext createConceptCteContext(CQConcept cqConcept, Conversio
.conversionContext(context)
.filters(allFiltersForTable)
.selects(conceptSelects)
.primaryColumn(DSL.field(DSL.name(context.getConfig().getPrimaryColumn())))
.validityDate(validityDateSelect)
.isExcludedFromDateAggregation(cqConcept.isExcludeFromTimeAggregation())
.conceptTables(conceptTables)
Expand All @@ -128,6 +127,10 @@ private Set<ConceptCteStep> getRequiredSteps(CQTable table, boolean dateRestrict
.flatMap(filterValue -> filterValue.getFilter().getRequiredSqlSteps().stream())
.forEach(requiredSteps::add);

Stream.concat(table.getConcept().getSelects().stream(), table.getSelects().stream())
.flatMap(select -> select.getRequiredSqlSteps().stream())
.forEach(requiredSteps::add);

return requiredSteps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class ConceptCteContext implements Context {

ConversionContext conversionContext;
String conceptLabel;
Field<Object> primaryColumn;
Optional<ColumnDateRange> validityDate;
boolean isExcludedFromDateAggregation;
List<SqlSelects> selects;
Expand All @@ -43,6 +42,13 @@ public Stream<SqlSelects> allConceptSelects() {
);
}

public Field<Object> getPrimaryColumn() {
if (previous == null) {
return conversionContext.getPrimaryColumn();
}
return previous.getQualifiedSelects().getPrimaryColumn();
}

@Override
public NameGenerator getNameGenerator() {
return conversionContext.getNameGenerator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ public enum ConceptCteStep implements CteStep {
PREPROCESSING("preprocessing", null),
EVENT_FILTER("event_filter", PREPROCESSING),
AGGREGATION_SELECT("group_select", EVENT_FILTER),
AGGREGATION_FILTER("group_filter", AGGREGATION_SELECT),
JOIN_PREDECESSORS("join_predecessors", AGGREGATION_SELECT),
AGGREGATION_FILTER("group_filter", JOIN_PREDECESSORS),
FINAL("", AGGREGATION_FILTER);

public static final Set<ConceptCteStep> MANDATORY_STEPS = Set.of(ConceptCteStep.PREPROCESSING, ConceptCteStep.AGGREGATION_SELECT, ConceptCteStep.FINAL);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.bakdata.conquery.sql.conversion.cqelement.concept;

import java.util.ArrayList;
import java.util.List;

import com.bakdata.conquery.sql.conversion.model.LogicalOperation;
import com.bakdata.conquery.sql.conversion.model.QueryStep;
import com.bakdata.conquery.sql.conversion.model.QueryStepJoiner;
import com.bakdata.conquery.sql.conversion.model.Selects;
import com.bakdata.conquery.sql.conversion.model.select.SqlSelect;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.TableLike;

class JoinPredecessorsCte extends ConceptCte {

@Override
protected QueryStep.QueryStepBuilder convertStep(ConceptCteContext conceptCteContext) {

List<QueryStep> queriesToJoin = new ArrayList<>();
queriesToJoin.add(conceptCteContext.getPrevious());
conceptCteContext.allConceptSelects()
.flatMap(sqlSelects -> sqlSelects.getAdditionalPredecessors().stream())
.forEach(queriesToJoin::add);

Field<Object> primaryColumn = QueryStepJoiner.coalescePrimaryColumns(queriesToJoin);
List<SqlSelect> mergedSelects = QueryStepJoiner.mergeSelects(queriesToJoin);
Selects selects = Selects.builder()
.primaryColumn(primaryColumn)
.sqlSelects(mergedSelects)
.build();

TableLike<Record> fromTable = QueryStepJoiner.constructJoinedTable(queriesToJoin, LogicalOperation.AND, conceptCteContext.getConversionContext());

return QueryStep.builder()
.selects(selects)
.fromTable(fromTable)
.predecessors(queriesToJoin);
}

@Override
protected ConceptCteStep cteStep() {
return ConceptCteStep.JOIN_PREDECESSORS;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -182,24 +182,24 @@ public Field<Date> toDateField(String dateExpression) {
}

@Override
public Field<?> first(Field<?> column, List<Field<?>> orderByColumns) {
public <T> Field<T> first(Field<T> column, List<Field<?>> orderByColumns) {
if (orderByColumns.isEmpty()) {
orderByColumns = List.of(column);
}
return DSL.field(DSL.sql("FIRST_VALUE({0} {1})", column, DSL.orderBy(orderByColumns)));
return DSL.field(DSL.sql("FIRST_VALUE({0} {1})", column, DSL.orderBy(orderByColumns)), column.getType());
}

@Override
public Field<?> last(Field<?> column, List<Field<?>> orderByColumns) {
public <T> Field<T> last(Field<T> column, List<Field<?>> orderByColumns) {
if (orderByColumns.isEmpty()) {
orderByColumns = List.of(column);
}
return DSL.field(DSL.sql("LAST_VALUE({0} {1} DESC)", column, DSL.orderBy(orderByColumns)));
return DSL.field(DSL.sql("LAST_VALUE({0} {1} DESC)", column, DSL.orderBy(orderByColumns)), column.getType());
}

@Override
public Field<?> random(Field<?> column) {
return DSL.field(DSL.sql("FIRST_VALUE({0} {1})", column, DSL.orderBy(DSL.function("RAND", Object.class))));
public <T> Field<T> random(Field<T> column) {
return DSL.field(DSL.sql("FIRST_VALUE({0} {1})", column, DSL.orderBy(DSL.function("RAND", Object.class))), column.getType());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,22 +164,22 @@ public Field<Date> addDays(Field<Date> dateColumn, int amountOfDays) {
}

@Override
public Field<?> first(Field<?> column, List<Field<?>> orderByColumn) {
return DSL.field(DSL.sql("({0})[1]", DSL.arrayAgg(column)));
public <T> Field<T> first(Field<T> column, List<Field<?>> orderByColumn) {
return DSL.field(DSL.sql("({0})[1]", DSL.arrayAgg(column)), column.getType());
}

@Override
public Field<?> last(Field<?> column, List<Field<?>> orderByColumns) {
public <T> Field<T> last(Field<T> column, List<Field<?>> orderByColumns) {
String orderByClause = orderByColumns.stream()
.map(Field::toString)
.collect(Collectors.joining(", ", "ORDER BY ", " DESC"));
return DSL.field(DSL.sql("({0})[1]", DSL.arrayAgg(DSL.field("%s %s".formatted(column, orderByClause)))));
return DSL.field(DSL.sql("({0})[1]", DSL.arrayAgg(DSL.field("%s %s".formatted(column, orderByClause)))), column.getType());
}

@Override
public Field<?> random(Field<?> column) {
public <T> Field<T> random(Field<T> column) {
WindowSpecificationRowsStep orderByRandomClause = DSL.orderBy(DSL.function("random", Object.class));
return DSL.field(DSL.sql("({0})[1]", DSL.arrayAgg(DSL.field("%s %s".formatted(column, orderByRandomClause)))));
return DSL.field(DSL.sql("({0})[1]", DSL.arrayAgg(DSL.field("%s %s".formatted(column, orderByRandomClause)))), column.getType());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ public interface SqlFunctionProvider {

Field<Date> addDays(Field<Date> dateColumn, int amountOfDays);

Field<?> first(Field<?> field, List<Field<?>> orderByColumn);
<T> Field<T> first(Field<T> field, List<Field<?>> orderByColumn);

Field<?> last(Field<?> column, List<Field<?>> orderByColumns);
<T> Field<T> last(Field<T> column, List<Field<?>> orderByColumns);

Field<?> random(Field<?> column);
<T> Field<T> random(Field<T> column);

Condition likeRegex(Field<String> field, String pattern);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ public static TableLike<Record> constructJoinedTable(List<QueryStep> queriesToJo
return joinedQuery;
}

private static Field<Object> coalescePrimaryColumns(List<QueryStep> querySteps) {
public static Field<Object> coalescePrimaryColumns(List<QueryStep> querySteps) {
List<Field<?>> primaryColumns = querySteps.stream()
.map(queryStep -> queryStep.getQualifiedSelects().getPrimaryColumn())
.collect(Collectors.toList());
return DSL.coalesce(primaryColumns.get(0), primaryColumns.subList(1, primaryColumns.size()).toArray())
.as(PRIMARY_COLUMN_NAME);
}

private static List<SqlSelect> mergeSelects(List<QueryStep> querySteps) {
public static List<SqlSelect> mergeSelects(List<QueryStep> querySteps) {
return querySteps.stream()
.flatMap(queryStep -> queryStep.getQualifiedSelects().getSqlSelects().stream())
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;

import com.bakdata.conquery.sql.conversion.model.QueryStep;
import lombok.Builder;
import lombok.Singular;
import lombok.Value;
Expand All @@ -17,4 +18,7 @@ public class SqlSelects {
// Empty if only used in aggregation select
@Singular
List<SqlSelect> finalSelects;
// Additional predecessors these SqlSelects require
@Singular
List<QueryStep> additionalPredecessors;
}
Loading

0 comments on commit e33f48b

Please sign in to comment.