Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: hoping to avoid keeping SecondaryIdQuery subPlans, that are not needed #3260

Merged
merged 14 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.bakdata.conquery.models.query.QueryResolveContext;
import com.bakdata.conquery.models.query.RequiredEntities;
import com.bakdata.conquery.models.query.Visitable;
import com.bakdata.conquery.models.query.queryplan.ConceptQueryPlan;
import com.bakdata.conquery.models.query.queryplan.SecondaryIdQueryPlan;
import com.bakdata.conquery.models.query.resultinfo.ResultInfo;
import com.bakdata.conquery.models.query.resultinfo.SimpleResultInfo;
Expand Down Expand Up @@ -66,8 +67,9 @@ public class SecondaryIdQuery extends Query {

@Override
public SecondaryIdQueryPlan createQueryPlan(QueryPlanContext context) {
final ConceptQueryPlan queryPlan = query.createQueryPlan(context.withSelectedSecondaryId(secondaryId));

return new SecondaryIdQueryPlan(query, context, secondaryId, withSecondaryId, withoutSecondaryId, query.createQueryPlan(context.withSelectedSecondaryId(secondaryId)));
return new SecondaryIdQueryPlan(query, context, secondaryId, withSecondaryId, withoutSecondaryId, queryPlan, context.getSecondaryIdSubPlanRetention());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig
getConfig().getQueries().getExecutionPool(),
() -> createInternalObjectMapper(View.Persistence.Shard.class),
() -> createInternalObjectMapper(View.InternalCommunication.class),
getConfig().getCluster().getEntityBucketSize()
getConfig().getCluster().getEntityBucketSize(),
getConfig().getQueries().getSecondaryIdSubPlanRetention()
);

final Collection<WorkerStorage> workerStorages = config.getStorage().discoverWorkerStorages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@ public class QueryConfig {
private ThreadPoolDefinition executionPool = new ThreadPoolDefinition();

private Duration oldQueriesTime = Duration.days(30);

/**
* Limits how many subQuery-Plans should be cached between executions:
* This number limits how many sub-plans are cached per core so that outliers do not cause massive memory overhead.
*
* TODO Implement global limit of active secondaryId sub plans
*/
private int secondaryIdSubPlanRetention = 15;
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void react(Worker worker) throws Exception {

// Before we start the query, we create it once to test if it will succeed before creating it multiple times for evaluation per core.
try {
query.createQueryPlan(new QueryPlanContext(worker));
query.createQueryPlan(new QueryPlanContext(worker, queryExecutor.getSecondaryIdSubPlanLimit()));
}
catch (Exception e) {
ConqueryError err = asConqueryError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void react(Worker worker) throws Exception {

// Before we start the query, we create it once to test if it will succeed before creating it multiple times for evaluation per core.
try {
query.createQueryPlan(new QueryPlanContext(worker));
query.createQueryPlan(new QueryPlanContext(worker, queryExecutor.getSecondaryIdSubPlanLimit()));
}
catch (Exception e) {
ConqueryError err = asConqueryError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@
import com.bakdata.conquery.models.query.results.ShardResult;
import com.bakdata.conquery.models.worker.Worker;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.RequiredArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
@Data
public class QueryExecutor implements Closeable {

private final Worker worker;

private final ThreadPoolExecutor executor;

private final int secondaryIdSubPlanLimit;

private final Set<ManagedExecutionId> cancelledQueries = new HashSet<>();

public void unsetQueryCancelled(ManagedExecutionId query) {
Expand All @@ -52,7 +54,7 @@ public boolean isCancelled(ManagedExecutionId query) {

public boolean execute(Query query, QueryExecutionContext executionContext, ShardResult result, Set<Entity> entities) {

final ThreadLocal<QueryPlan<?>> plan = ThreadLocal.withInitial(() -> query.createQueryPlan(new QueryPlanContext(worker)));
final ThreadLocal<QueryPlan<?>> plan = ThreadLocal.withInitial(() -> query.createQueryPlan(new QueryPlanContext(worker, secondaryIdSubPlanLimit)));

if (entities.isEmpty()) {
log.warn("Entities for query are empty");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@
import com.bakdata.conquery.models.worker.Worker;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.With;

@RequiredArgsConstructor @AllArgsConstructor @Getter @With
@Data @With
@AllArgsConstructor @RequiredArgsConstructor
public class QueryPlanContext {

@Getter(AccessLevel.NONE)
private final Worker worker;
private final int secondaryIdSubPlanRetention;

private CDateRange dateRestriction = CDateRange.all();


/**
* Set if in {@link com.bakdata.conquery.models.query.queryplan.SecondaryIdQueryPlan}, to the query-active {@link SecondaryIdDescriptionId}.
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.bakdata.conquery.models.common.IRange;
import com.bakdata.conquery.models.query.queryplan.aggregators.Aggregator;
import com.bakdata.conquery.models.query.queryplan.filter.AggregationResultFilterNode;
import lombok.ToString;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public void init(QueryExecutionContext ctx, Entity entity) {
child.init(entity, ctx);
}

public void nextEvent(Bucket bucket, int event) {
getChild().acceptEvent(bucket, event);
public boolean nextEvent(Bucket bucket, int event) {
return getChild().acceptEvent(bucket, event);
}

protected SinglelineEntityResult createResult() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void init(Entity entity, QueryExecutionContext context) {
}

@Override
public void acceptEvent(Bucket bucket, int event) {
public void consumeEvent(Bucket bucket, int event) {
throw new UnsupportedOperationException("This Aggregator uses the result of its siblings and does not accept events");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public abstract class EventIterating {
public void collectRequiredTables(Set<Table> requiredTables) {
}

public Set<Table> collectRequiredTables() {
public final Set<Table> collectRequiredTables() {
Set<Table> out = new HashSet<>();
collectRequiredTables(out);
return out;
Expand All @@ -42,7 +42,12 @@ public void nextTable(QueryExecutionContext ctx, Table currentTable) {
public void nextBlock(Bucket bucket) {
}

public abstract void acceptEvent(Bucket bucket, int event);
/**
* Consume the event of the bucket.
*
* @implSpec If the event was not consumed, may return false. This can be used to discard intermediate results. Currently, only relevant for {@link SecondaryIdQueryPlan}, where subplans are discarded if no event was consumed.
*/
public abstract boolean acceptEvent(Bucket bucket, int event);


public boolean isOfInterest(Bucket bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void nextTable(QueryExecutionContext ctx, Table currentTable) {
}

@Override
public abstract void acceptEvent(Bucket bucket, int event);
public abstract boolean acceptEvent(Bucket bucket, int event);

public abstract boolean isContained();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ public boolean isOfInterest(Entity entity) {
}

@Override
public void acceptEvent(Bucket bucket, int event) {
public boolean acceptEvent(Bucket bucket, int event) {
boolean consumed = false;
for (QPNode currentTableChild : currentTableChildren) {
currentTableChild.acceptEvent(bucket, event);
consumed |= currentTableChild.acceptEvent(bucket, event);
}

return consumed;
}

@Override
Expand Down
Loading
Loading