Skip to content

Commit

Permalink
Query stats followup (#3255)
Browse files Browse the repository at this point in the history
Followup for `statistics` endpoint. Major rework of Histogram logic.
  • Loading branch information
awildturtok authored Mar 5, 2024
1 parent 3b9c360 commit 06042ec
Show file tree
Hide file tree
Showing 43 changed files with 1,096 additions and 460 deletions.
108 changes: 15 additions & 93 deletions backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@

import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.text.NumberFormat;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.random.RandomGenerator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -57,9 +55,7 @@
import com.bakdata.conquery.models.auth.entities.User;
import com.bakdata.conquery.models.auth.permissions.Ability;
import com.bakdata.conquery.models.auth.permissions.ConqueryPermission;
import com.bakdata.conquery.models.common.CDateSet;
import com.bakdata.conquery.models.common.Range;
import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.config.ColumnConfig;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.Dataset;
Expand All @@ -83,11 +79,8 @@
import com.bakdata.conquery.models.query.queryplan.DateAggregationAction;
import com.bakdata.conquery.models.query.resultinfo.ResultInfo;
import com.bakdata.conquery.models.query.resultinfo.UniqueNamer;
import com.bakdata.conquery.models.query.results.EntityResult;
import com.bakdata.conquery.models.query.statistics.ColumnStatsCollector;
import com.bakdata.conquery.models.query.statistics.ResultStatistics;
import com.bakdata.conquery.models.query.visitor.QueryVisitor;
import com.bakdata.conquery.models.types.ResultType;
import com.bakdata.conquery.models.types.SemanticType;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.Namespace;
Expand All @@ -96,8 +89,6 @@
import com.bakdata.conquery.util.io.IdColumnUtil;
import com.google.common.collect.ClassToInstanceMap;
import com.google.common.collect.MutableClassToInstanceMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -114,29 +105,7 @@ public class QueryProcessor {
@Inject
private ConqueryConfig config;

private static CDateSet extractValidityDate(ResultType dateType, Object dateValue) {
if (dateType instanceof ResultType.DateRangeT) {
return CDateSet.create(CDateRange.fromList((List<? extends Number>) dateValue));

}

if (dateType instanceof ResultType.DateT) {
return CDateSet.create(CDateRange.exactly((Integer) dateValue));
}

if (dateType instanceof ResultType.ListT listT) {
final CDateSet out = CDateSet.createEmpty();

for (Object date : ((List<?>) dateValue)) {
out.addAll(extractValidityDate(listT.getElementType(), date));
}

// since they are ordered, we can be sure this is always the correct span
return out;
}

throw new IllegalStateException("Unexpected date Type %s".formatted(dateType));
}

public Stream<ExecutionStatus> getAllQueries(Dataset dataset, HttpServletRequest req, Subject subject, boolean allProviders) {
final Collection<ManagedExecution> allQueries = storage.getAllExecutions();
Expand Down Expand Up @@ -567,7 +536,7 @@ public Stream<Map<String, String>> resolveEntities(Subject subject, List<FilterV
final IdPrinter printer = IdColumnUtil.getIdPrinter(subject, execution, namespace, ids);

// For each included entity emit a Map of { Id-Name -> Id-Value }
return result.streamResults()
return result.streamResults(OptionalLong.empty())
.map(printer::createId)
.map(entityPrintId -> {
final Map<String, String> out = new HashMap<>();
Expand All @@ -590,72 +559,25 @@ public ResultStatistics getResultStatistics(ManagedQuery managedQuery) {
final Query query = managedQuery.getQuery();
final List<ResultInfo> resultInfos = query.getResultInfos();

final RandomGenerator random = new Random();
final int requiredSamples = config.getFrontend().getVisualisationSamples();

final Optional<ResultInfo>
dateInfo =
query.getResultInfos().stream().filter(info -> info.getSemantics().contains(new SemanticType.EventDateT())).findFirst();

final int totalSamples = managedQuery.getLastResultCount().intValue();
final int dateIndex = dateInfo.map(resultInfos::indexOf).orElse(0 /*Discarded if dateInfo is not present*/);

//We collect about $requiredSamples values as samples for visualisation, while streaming the values.
// Note that nextInt produces values > 0 and < totalSamples. This is equivalent to `P(k) = $requiredSamples/$totalSamples` but terser.
final BooleanSupplier samplePicker;
final Locale locale = I18n.LOCALE.get();
final NumberFormat decimalFormat = NumberFormat.getNumberInstance(locale);
decimalFormat.setMaximumFractionDigits(2);

if (totalSamples <= requiredSamples) {
samplePicker = () -> true;
}
else {
samplePicker = () -> random.nextInt(totalSamples) < requiredSamples;
}
final NumberFormat integerFormat = NumberFormat.getNumberInstance(locale);

final boolean hasValidityDates = resultInfos.get(0).getSemantics().contains(new SemanticType.EventDateT());
final ResultType dateType = resultInfos.get(0).getType();

final PrintSettings printSettings = new PrintSettings(false, I18n.LOCALE.get(), managedQuery.getNamespace(), config, null);
final PrintSettings printSettings =
new PrintSettings(true, locale, managedQuery.getNamespace(), config, null, null, decimalFormat, integerFormat);
final UniqueNamer uniqueNamer = new UniqueNamer(printSettings);


final List<ColumnStatsCollector> statsCollectors = resultInfos.stream()
.map(info -> ColumnStatsCollector.getStatsCollector(info, printSettings, samplePicker, info.getType(), uniqueNamer))
.collect(Collectors.toList());

final IntSet entities = new IntOpenHashSet();
final AtomicInteger lines = new AtomicInteger();

final AtomicReference<CDateRange> span = new AtomicReference<>(null);


managedQuery.streamResults()
.peek(result -> entities.add(result.getEntityId()))
.map(EntityResult::listResultLines)
.flatMap(List::stream)
.forEach(line -> {

if (hasValidityDates) {
final CDateSet dateSet = extractValidityDate(dateType, line[0]);
span.getAndAccumulate(dateSet.span(), (old, incoming) -> incoming.spanClosed(old));
}

lines.incrementAndGet();

for (int col = 0; col < line.length; col++) {
final ColumnStatsCollector collector = statsCollectors.get(col);
if (collector == null) {
continue;
}

collector.consume(line[col]);
}
});

return new ResultStatistics(
entities.size(),
lines.get(),
statsCollectors.stream()
.filter(Objects::nonNull) // Not all columns produces stats
.map(ColumnStatsCollector::describe)
.toList(),
span.get().toSimpleRange()
);
return ResultStatistics.collectResultStatistics(managedQuery, resultInfos, dateInfo, dateIndex, printSettings, uniqueNamer, config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,9 @@ public interface ResultHeadersC10n {
@De("Nachbeobachtungszeitraum")
String outcomeDateRange();

@En("{0} others")
@De("{0} andere")
String others(long count);


}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.nio.channels.Channels;
import java.util.List;
import java.util.Locale;
import java.util.OptionalLong;
import java.util.function.Function;

import javax.inject.Inject;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class ResultArrowProcessor {
private final ArrowConfig arrowConfig;


public Response createResultFile(Subject subject, ManagedExecution exec, boolean pretty) {
public Response createResultFile(Subject subject, ManagedExecution exec, boolean pretty, OptionalLong limit) {
return getArrowResult(
(output) -> (root) -> new ArrowFileWriter(root, new DictionaryProvider.MapDictionaryProvider(), Channels.newChannel(output)),
subject,
Expand All @@ -63,21 +64,8 @@ public Response createResultFile(Subject subject, ManagedExecution exec, boolean
FILE_EXTENTION_ARROW_FILE,
FILE_MEDIA_TYPE,
conqueryConfig,
arrowConfig
);
}

public Response createResultStream(Subject subject, ManagedExecution exec, boolean pretty) {
return getArrowResult(
(output) -> (root) -> new ArrowStreamWriter(root, new DictionaryProvider.MapDictionaryProvider(), output),
subject,
((ManagedExecution & SingleTableResult) exec),
datasetRegistry,
pretty,
FILE_EXTENTION_ARROW_STREAM,
STREAM_MEDIA_TYPE,
conqueryConfig,
arrowConfig
arrowConfig,
limit
);
}

Expand All @@ -90,7 +78,9 @@ public static <E extends ManagedExecution & SingleTableResult> Response getArrow
String fileExtension,
MediaType mediaType,
ConqueryConfig config,
ArrowConfig arrowConfig) {
ArrowConfig arrowConfig,
OptionalLong limit
) {

ConqueryMDC.setLocation(subject.getName());

Expand All @@ -102,10 +92,10 @@ public static <E extends ManagedExecution & SingleTableResult> Response getArrow

// Get the locale extracted by the LocaleFilter


final Namespace namespace = datasetRegistry.get(dataset.getId());
IdPrinter idPrinter = IdColumnUtil.getIdPrinter(subject, exec, namespace, config.getIdColumns().getIds());
final Locale locale = I18n.LOCALE.get();

PrintSettings settings = new PrintSettings(
pretty,
locale,
Expand All @@ -127,7 +117,7 @@ public static <E extends ManagedExecution & SingleTableResult> Response getArrow
arrowConfig,
resultInfosId,
resultInfosExec,
exec.streamResults()
exec.streamResults(limit)
);
}
finally {
Expand All @@ -138,5 +128,20 @@ public static <E extends ManagedExecution & SingleTableResult> Response getArrow
return makeResponseWithFileName(Response.ok(out), String.join(".", exec.getLabelWithoutAutoLabelSuffix(), fileExtension), mediaType, ResultUtil.ContentDispositionOption.ATTACHMENT);
}

public Response createResultStream(Subject subject, ManagedExecution exec, boolean pretty, OptionalLong limit) {
return getArrowResult(
(output) -> (root) -> new ArrowStreamWriter(root, new DictionaryProvider.MapDictionaryProvider(), output),
subject,
((ManagedExecution & SingleTableResult) exec),
datasetRegistry,
pretty,
FILE_EXTENTION_ARROW_STREAM,
STREAM_MEDIA_TYPE,
conqueryConfig,
arrowConfig,
limit
);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.Locale;
import java.util.OptionalLong;

import javax.inject.Inject;
import javax.ws.rs.WebApplicationException;
Expand Down Expand Up @@ -39,7 +40,7 @@ public class ResultCsvProcessor {
private final ConqueryConfig config;
private final DatasetRegistry datasetRegistry;

public <E extends ManagedExecution & SingleTableResult> Response createResult(Subject subject, E exec, boolean pretty, Charset charset) {
public <E extends ManagedExecution & SingleTableResult> Response createResult(Subject subject, E exec, boolean pretty, Charset charset, OptionalLong limit) {

final Dataset dataset = exec.getDataset();

Expand All @@ -62,7 +63,7 @@ public <E extends ManagedExecution & SingleTableResult> Response createResult(Su
final StreamingOutput out = os -> {
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(os, charset))) {
final CsvRenderer renderer = new CsvRenderer(config.getCsv().createWriter(writer), settings);
renderer.toCSV(config.getIdColumns().getIdResultInfos(), exec.getResultInfos(), exec.streamResults());
renderer.toCSV(config.getIdColumns().getIdResultInfos(), exec.getResultInfos(), exec.streamResults(limit));
}
catch (EofException e) {
log.trace("User canceled download");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

Expand Down Expand Up @@ -40,6 +41,8 @@

public class ExcelRenderer {

public static final int MAX_LINES = 1_048_576;

private static final Map<Class<? extends ResultType>, TypeWriter> TYPE_WRITER_MAP = Map.of(
ResultType.DateT.class, ExcelRenderer::writeDateCell,
ResultType.IntegerT.class, ExcelRenderer::writeIntegerCell,
Expand Down Expand Up @@ -71,7 +74,7 @@ private interface TypeWriter {
public <E extends ManagedExecution & SingleTableResult> void renderToStream(
List<ResultInfo> idHeaders,
E exec,
OutputStream outputStream) throws IOException {
OutputStream outputStream, OptionalLong limit) throws IOException {
final List<ResultInfo> resultInfosExec = exec.getResultInfos();

setMetaData(exec);
Expand All @@ -85,7 +88,7 @@ public <E extends ManagedExecution & SingleTableResult> void renderToStream(

writeHeader(sheet, idHeaders, resultInfosExec, table);

int writtenLines = writeBody(sheet, resultInfosExec, exec.streamResults());
int writtenLines = writeBody(sheet, resultInfosExec, exec.streamResults(OptionalLong.of(limit.orElse(MAX_LINES))));

postProcessTable(sheet, table, writtenLines, idHeaders.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.bakdata.conquery.io.result.ResultUtil.makeResponseWithFileName;

import java.util.Locale;
import java.util.OptionalLong;

import javax.inject.Inject;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -38,7 +39,7 @@ public class ResultExcelProcessor {

private final ExcelConfig excelConfig;

public <E extends ManagedExecution & SingleTableResult> Response createResult(Subject subject, E exec, boolean pretty) {
public <E extends ManagedExecution & SingleTableResult> Response createResult(Subject subject, E exec, boolean pretty, OptionalLong limit) {

ConqueryMDC.setLocation(subject.getName());

Expand All @@ -57,7 +58,7 @@ public <E extends ManagedExecution & SingleTableResult> Response createResult(Su
final ExcelRenderer excelRenderer = new ExcelRenderer(excelConfig, settings);

final StreamingOutput out = output -> {
excelRenderer.renderToStream(conqueryConfig.getIdColumns().getIdResultInfos(), exec, output);
excelRenderer.renderToStream(conqueryConfig.getIdColumns().getIdResultInfos(), exec, output, limit);
log.trace("FINISHED downloading {}", exec.getId());
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.bakdata.conquery.io.result.ResultUtil.makeResponseWithFileName;

import java.util.Locale;
import java.util.OptionalLong;

import javax.inject.Inject;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -35,7 +36,7 @@ public class ResultParquetProcessor {
private final DatasetRegistry datasetRegistry;
private final ConqueryConfig config;

public Response createResultFile(Subject subject, ManagedExecution exec, boolean pretty) {
public Response createResultFile(Subject subject, ManagedExecution exec, boolean pretty, OptionalLong limit) {

ConqueryMDC.setLocation(subject.getName());

Expand Down Expand Up @@ -68,7 +69,7 @@ public Response createResultFile(Subject subject, ManagedExecution exec, boolean
config.getIdColumns().getIdResultInfos(),
singleTableResult.getResultInfos(),
settings,
singleTableResult.streamResults()
singleTableResult.streamResults(limit)
);

};
Expand Down
Loading

0 comments on commit 06042ec

Please sign in to comment.