Skip to content

Commit

Permalink
Draft
Browse files Browse the repository at this point in the history
  • Loading branch information
jnsrnhld committed Nov 5, 2024
1 parent 5be35d6 commit 0988c8c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
Expand Down Expand Up @@ -47,6 +48,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import org.jooq.Condition;
Expand Down Expand Up @@ -77,14 +79,15 @@ public UpdateMatchingStatsSqlJob(
SqlExecutionService executionService,
SqlFunctionProvider functionProvider,
Set<ConceptId> concepts,
ListeningExecutorService executors
ExecutorService executors
) {
this.databaseConfig = databaseConfig;
this.executionService = executionService;
this.dslContext = executionService.getDslContext();
this.functionProvider = functionProvider;
this.concepts = concepts;
this.executors = executors;
this.executors = MoreExecutors.listeningDecorator(executors);
;
}

@Override
Expand All @@ -106,17 +109,14 @@ public void execute() throws Exception {
.map(treeConcept -> executors.submit(() -> calculateMatchingStats(treeConcept)))
.collect(Collectors.toList());

Futures.whenAllComplete(runningQueries).run(() -> {
stopWatch.stop();
log.debug("DONE collecting matching stats. Elapsed time: {} ms.", stopWatch.getTime());
runningQueries.forEach(UpdateMatchingStatsSqlJob::checkForError);
}, executors);
}
final ListenableFuture<List<Object>> futureList = Futures.allAsList(runningQueries);
while (!futureList.isDone()) {
log.debug("Waiting for executors collect matching stats...");
}

@Override
public void cancel() {
super.cancel();
executors.shutdownNow();
stopWatch.stop();
log.debug("DONE collecting matching stats. Elapsed time: {} ms.", stopWatch.getTime());
runningQueries.forEach(UpdateMatchingStatsSqlJob::checkForError);
}

private static boolean isTreeConcept(final Concept<?> concept) {
Expand Down Expand Up @@ -196,31 +196,22 @@ private Map<Connector, Set<Field<?>>> collectRelevantColumns(final TreeConcept t
}

private Set<String> collectRelevantColumns(final Connector connector, final List<ConceptTreeChild> children) {
final Set<String> relevantColumns = new HashSet<>();

for (ConceptTreeChild child : children) {
if (child.getCondition() == null && child.getChildren().isEmpty()) {
continue;
}

final Set<String> childColumns = new HashSet<>();

// Recursively collect columns from the current child's children, if they exist
if (!child.getChildren().isEmpty()) {
final Set<String> childrenColumns = collectRelevantColumns(connector, child.getChildren());
childColumns.addAll(childrenColumns);
}

// Add columns from the child's condition, if it exists
if (child.getCondition() != null) {
final Set<String> conditionColumns = child.getCondition().getColumns(connector);
childColumns.addAll(conditionColumns);
}
return children.stream().flatMap(child -> collectRelevantColumns(connector, child).stream()).collect(Collectors.toSet());
}

relevantColumns.addAll(childColumns);
private Set<String> collectRelevantColumns(final Connector connector, final ConceptTreeChild child) {
final Set<String> childColumns = new HashSet<>();
// Recursively collect columns from the current child's children, if they exist
if (!child.getChildren().isEmpty()) {
final Set<String> childrenColumns = collectRelevantColumns(connector, child.getChildren());
childColumns.addAll(childrenColumns);
}

return relevantColumns;
// Add columns from the child's condition, if it exists
if (child.getCondition() != null) {
final Set<String> conditionColumns = child.getCondition().getColumns(connector);
childColumns.addAll(conditionColumns);
}
return childColumns;
}

private Map<Connector, List<ColumnDateRange>> createColumnDateRanges(final TreeConcept treeConcept) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -70,7 +68,7 @@ public LocalNamespace(
@Override
void updateMatchingStats() {
final Set<ConceptId> concepts = getConceptsWithoutMatchingStats();
final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(executionPool.createService("sql-matching-stats"));

Job job = new UpdateMatchingStatsSqlJob(
databaseConfig,
sqlExecutionService,
Expand Down

0 comments on commit 0988c8c

Please sign in to comment.