Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
jnsrnhld committed Nov 6, 2024
1 parent 0988c8c commit d0b02b7
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -51,6 +51,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import org.jooq.AggregateFunction;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
Expand All @@ -66,13 +67,16 @@ public class UpdateMatchingStatsSqlJob extends Job {
private static final Name EVENTS = name("events");
private static final Name ENTITIES = name("entities");
private static final Name DATES = name("dates");
private static final Name MIN_DATE = name("min_date");
private static final Name MAX_DATE = name("max_date");

private final DatabaseConfig databaseConfig;
private final SqlExecutionService executionService;
private final DSLContext dslContext;
private final SqlFunctionProvider functionProvider;
private final Set<ConceptId> concepts;
private final ListeningExecutorService executors;
private ListenableFuture<?> all;

public UpdateMatchingStatsSqlJob(
DatabaseConfig databaseConfig,
Expand All @@ -87,7 +91,6 @@ public UpdateMatchingStatsSqlJob(
this.functionProvider = functionProvider;
this.concepts = concepts;
this.executors = MoreExecutors.listeningDecorator(executors);
;
}

@Override
Expand All @@ -109,14 +112,29 @@ public void execute() throws Exception {
.map(treeConcept -> executors.submit(() -> calculateMatchingStats(treeConcept)))
.collect(Collectors.toList());

final ListenableFuture<List<Object>> futureList = Futures.allAsList(runningQueries);
while (!futureList.isDone()) {
log.debug("Waiting for executors collect matching stats...");
all = Futures.allAsList(runningQueries);
while (!all.isDone()) {
try {
all.get(1, TimeUnit.MINUTES);
}
catch (TimeoutException exception) {
log.debug("Still waiting for {}", this);
if (log.isTraceEnabled()) {
log.trace("Waiting for {}", executors);
}
}
}

stopWatch.stop();
log.debug("DONE collecting matching stats. Elapsed time: {} ms.", stopWatch.getTime());
runningQueries.forEach(UpdateMatchingStatsSqlJob::checkForError);
}

@Override
public void cancel() {
if (all != null) {
all.cancel(true);
}
super.cancel();
}

private static boolean isTreeConcept(final Concept<?> concept) {
Expand All @@ -127,15 +145,6 @@ private static boolean isTreeConcept(final Concept<?> concept) {
return true;
}

private static void checkForError(final Future<?> future) {
try {
future.get();
}
catch (ExecutionException | InterruptedException e) {
log.error("Unknown error while querying SQL matching stats. Cause: \n", e.getCause());
}
}

public void calculateMatchingStats(final TreeConcept treeConcept) {

final Map<Connector, Set<Field<?>>> relevantColumns = collectRelevantColumns(treeConcept);
Expand All @@ -151,8 +160,8 @@ public void calculateMatchingStats(final TreeConcept treeConcept) {
final List<ColumnDateRange> validityDates = validityDateMap.values().stream().flatMap(List::stream).map(functionProvider::toDualColumn).toList();
final List<Field<Date>> allStarts = validityDates.stream().map(ColumnDateRange::getStart).toList();
final List<Field<Date>> allEnds = validityDates.stream().map(ColumnDateRange::getEnd).toList();
final ColumnDateRange minAndMax = ColumnDateRange.of(min(functionProvider.least(allStarts)), max(functionProvider.greatest((allEnds))));
final Field<String> validityDateExpression = functionProvider.daterangeStringExpression(minAndMax).as(DATES);
final AggregateFunction<Date> minDate = min(functionProvider.least(allStarts));
final AggregateFunction<Date> maxDate = max(functionProvider.greatest((allEnds)));

// all connectors need the same columns originating from the concept definition - they might have different names in the respective connector tables,
// but as we aliased them already, we can just use the unified aliases in the final query
Expand All @@ -164,7 +173,8 @@ public void calculateMatchingStats(final TreeConcept treeConcept) {
.select(
count(asterisk()).as(EVENTS),
countDistinct(field(ENTITIES)).as(ENTITIES),
validityDateExpression
minDate.as(MIN_DATE),
maxDate.as(MAX_DATE)
)
.from(unioned)
.groupBy(relevantColumnsAliased);
Expand Down Expand Up @@ -303,16 +313,30 @@ private void mapRecordToConceptElements(
});
}

private MatchingStats.Entry toMatchingStatsEntry(Record record) {
private MatchingStats.Entry toMatchingStatsEntry(final Record record) {
final long events = record.get(EVENTS, Integer.class).longValue();
final long entities = record.get(ENTITIES, Integer.class).longValue();
final CDateRange dateSpan = toDateRange(record.get(DATES, String.class));
return new MatchingStats.Entry(events, entities, dateSpan.getMinValue(), dateSpan.getMaxValue());

final int minDate = record.get(MIN_DATE, Date.class) == null
? CDateRange.NEGATIVE_INFINITY
: toValidInt(record.get(MIN_DATE, Date.class));

final int maxDate = record.get(MAX_DATE, Date.class) == null
? CDateRange.POSITIVE_INFINITY
: toValidInt(record.get(MAX_DATE, Date.class));

return new MatchingStats.Entry(events, entities, minDate, maxDate);
}

private CDateRange toDateRange(final String validityDateExpression) {
final List<Integer> dateRange = executionService.getResultSetProcessor().getCDateSetParser().toEpochDayRange(validityDateExpression);
return !dateRange.isEmpty() ? CDateRange.fromList(dateRange) : CDateRange.all();
private static int toValidInt(final Date date) {
final long epochDay = date.toLocalDate().toEpochDay();
if (epochDay < Integer.MIN_VALUE) {
return Integer.MIN_VALUE;
}
else if (epochDay > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return Math.toIntExact(epochDay);
}

private static void addEntryToConceptElement(final ConceptTreeNode<?> mostSpecificChild, final String columnKey, final MatchingStats.Entry entry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -38,6 +39,7 @@ public class LocalNamespace extends Namespace {
private final SqlExecutionService sqlExecutionService;
private final DSLContextWrapper dslContextWrapper;
private final SqlStorageHandler storageHandler;
private final ThreadPoolExecutor executorService;

public LocalNamespace(
ObjectMapper preprocessMapper,
Expand All @@ -63,12 +65,13 @@ public LocalNamespace(
this.sqlExecutionService = sqlExecutionService;
this.dslContextWrapper = dslContextWrapper;
this.storageHandler = storageHandler;
// TODO FK: hoist into Namespace and use at other places && is this the correct way to name them?
this.executorService = this.executionPool.createService("namespace %s worker".formatted(storage.getPathName()));
}

@Override
void updateMatchingStats() {
final Set<ConceptId> concepts = getConceptsWithoutMatchingStats();

Job job = new UpdateMatchingStatsSqlJob(
databaseConfig,
sqlExecutionService,
Expand Down

0 comments on commit d0b02b7

Please sign in to comment.