diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/UpdateMatchingStatsSqlJob.java b/backend/src/main/java/com/bakdata/conquery/mode/local/UpdateMatchingStatsSqlJob.java index 9ff5cee85a..06d1ba261a 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/UpdateMatchingStatsSqlJob.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/UpdateMatchingStatsSqlJob.java @@ -16,9 +16,9 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -51,6 +51,7 @@ import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.StopWatch; +import org.jooq.AggregateFunction; import org.jooq.Condition; import org.jooq.DSLContext; import org.jooq.Field; @@ -66,6 +67,8 @@ public class UpdateMatchingStatsSqlJob extends Job { private static final Name EVENTS = name("events"); private static final Name ENTITIES = name("entities"); private static final Name DATES = name("dates"); + private static final Name MIN_DATE = name("min_date"); + private static final Name MAX_DATE = name("max_date"); private final DatabaseConfig databaseConfig; private final SqlExecutionService executionService; @@ -73,6 +76,7 @@ public class UpdateMatchingStatsSqlJob extends Job { private final SqlFunctionProvider functionProvider; private final Set concepts; private final ListeningExecutorService executors; + private ListenableFuture all; public UpdateMatchingStatsSqlJob( DatabaseConfig databaseConfig, @@ -87,7 +91,6 @@ public UpdateMatchingStatsSqlJob( this.functionProvider = functionProvider; this.concepts = concepts; this.executors = MoreExecutors.listeningDecorator(executors); - ; } @Override @@ -109,14 +112,29 @@ public void execute() throws Exception { .map(treeConcept -> executors.submit(() -> calculateMatchingStats(treeConcept))) .collect(Collectors.toList()); - final ListenableFuture> futureList = Futures.allAsList(runningQueries); - while (!futureList.isDone()) { - log.debug("Waiting for executors collect matching stats..."); + all = Futures.allAsList(runningQueries); + while (!all.isDone()) { + try { + all.get(1, TimeUnit.MINUTES); + } + catch (TimeoutException exception) { + log.debug("Still waiting for {}", this); + if (log.isTraceEnabled()) { + log.trace("Waiting for {}", executors); + } + } } stopWatch.stop(); log.debug("DONE collecting matching stats. Elapsed time: {} ms.", stopWatch.getTime()); - runningQueries.forEach(UpdateMatchingStatsSqlJob::checkForError); + } + + @Override + public void cancel() { + if (all != null) { + all.cancel(true); + } + super.cancel(); } private static boolean isTreeConcept(final Concept concept) { @@ -127,15 +145,6 @@ private static boolean isTreeConcept(final Concept concept) { return true; } - private static void checkForError(final Future future) { - try { - future.get(); - } - catch (ExecutionException | InterruptedException e) { - log.error("Unknown error while querying SQL matching stats. Cause: \n", e.getCause()); - } - } - public void calculateMatchingStats(final TreeConcept treeConcept) { final Map>> relevantColumns = collectRelevantColumns(treeConcept); @@ -151,8 +160,8 @@ public void calculateMatchingStats(final TreeConcept treeConcept) { final List validityDates = validityDateMap.values().stream().flatMap(List::stream).map(functionProvider::toDualColumn).toList(); final List> allStarts = validityDates.stream().map(ColumnDateRange::getStart).toList(); final List> allEnds = validityDates.stream().map(ColumnDateRange::getEnd).toList(); - final ColumnDateRange minAndMax = ColumnDateRange.of(min(functionProvider.least(allStarts)), max(functionProvider.greatest((allEnds)))); - final Field validityDateExpression = functionProvider.daterangeStringExpression(minAndMax).as(DATES); + final AggregateFunction minDate = min(functionProvider.least(allStarts)); + final AggregateFunction maxDate = max(functionProvider.greatest((allEnds))); // all connectors need the same columns originating from the concept definition - they might have different names in the respective connector tables, // but as we aliased them already, we can just use the unified aliases in the final query @@ -164,7 +173,8 @@ public void calculateMatchingStats(final TreeConcept treeConcept) { .select( count(asterisk()).as(EVENTS), countDistinct(field(ENTITIES)).as(ENTITIES), - validityDateExpression + minDate.as(MIN_DATE), + maxDate.as(MAX_DATE) ) .from(unioned) .groupBy(relevantColumnsAliased); @@ -303,16 +313,30 @@ private void mapRecordToConceptElements( }); } - private MatchingStats.Entry toMatchingStatsEntry(Record record) { + private MatchingStats.Entry toMatchingStatsEntry(final Record record) { final long events = record.get(EVENTS, Integer.class).longValue(); final long entities = record.get(ENTITIES, Integer.class).longValue(); - final CDateRange dateSpan = toDateRange(record.get(DATES, String.class)); - return new MatchingStats.Entry(events, entities, dateSpan.getMinValue(), dateSpan.getMaxValue()); + + final int minDate = record.get(MIN_DATE, Date.class) == null + ? CDateRange.NEGATIVE_INFINITY + : toValidInt(record.get(MIN_DATE, Date.class)); + + final int maxDate = record.get(MAX_DATE, Date.class) == null + ? CDateRange.POSITIVE_INFINITY + : toValidInt(record.get(MAX_DATE, Date.class)); + + return new MatchingStats.Entry(events, entities, minDate, maxDate); } - private CDateRange toDateRange(final String validityDateExpression) { - final List dateRange = executionService.getResultSetProcessor().getCDateSetParser().toEpochDayRange(validityDateExpression); - return !dateRange.isEmpty() ? CDateRange.fromList(dateRange) : CDateRange.all(); + private static int toValidInt(final Date date) { + final long epochDay = date.toLocalDate().toEpochDay(); + if (epochDay < Integer.MIN_VALUE) { + return Integer.MIN_VALUE; + } + else if (epochDay > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return Math.toIntExact(epochDay); } private static void addEntryToConceptElement(final ConceptTreeNode mostSpecificChild, final String columnKey, final MatchingStats.Entry entry) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java index b1523936e7..c8e62616f6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -38,6 +39,7 @@ public class LocalNamespace extends Namespace { private final SqlExecutionService sqlExecutionService; private final DSLContextWrapper dslContextWrapper; private final SqlStorageHandler storageHandler; + private final ThreadPoolExecutor executorService; public LocalNamespace( ObjectMapper preprocessMapper, @@ -63,12 +65,13 @@ public LocalNamespace( this.sqlExecutionService = sqlExecutionService; this.dslContextWrapper = dslContextWrapper; this.storageHandler = storageHandler; + // TODO FK: hoist into Namespace and use at other places && is this the correct way to name them? + this.executorService = this.executionPool.createService("namespace %s worker".formatted(storage.getPathName())); } @Override void updateMatchingStats() { final Set concepts = getConceptsWithoutMatchingStats(); - Job job = new UpdateMatchingStatsSqlJob( databaseConfig, sqlExecutionService,