Skip to content

Commit

Permalink
[Spool] Actual implementation (#14507)
Browse files Browse the repository at this point in the history
  • Loading branch information
gortiz authored Jan 7, 2025
1 parent 4588f8c commit 3adcedb
Show file tree
Hide file tree
Showing 32 changed files with 596 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,14 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
boolean inferPartitionHint = _config.getProperty(CommonConstants.Broker.CONFIG_OF_INFER_PARTITION_HINT,
CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT);
boolean defaultUseSpool = _config.getProperty(CommonConstants.Broker.CONFIG_OF_SPOOLS,
CommonConstants.Broker.DEFAULT_OF_SPOOLS);
QueryEnvironment queryEnvironment = new QueryEnvironment(QueryEnvironment.configBuilder()
.database(database)
.tableCache(_tableCache)
.workerManager(_workerManager)
.defaultInferPartitionHint(inferPartitionHint)
.defaultUseSpools(defaultUseSpool)
.build());
switch (sqlNodeAndOptions.getSqlNode().getKind()) {
case EXPLAIN:
Expand Down
4 changes: 3 additions & 1 deletion pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,15 @@ message MailboxReceiveNode {
}

message MailboxSendNode {
int32 receiverStageId = 1;
// kept for backward compatibility. Brokers populate it, but servers should prioritize receiverStageIds
int32 receiverStageId = 1 [deprecated = true];
ExchangeType exchangeType = 2;
DistributionType distributionType = 3;
repeated int32 keys = 4;
bool prePartitioned = 5;
repeated Collation collations = 6;
bool sort = 7;
repeated int32 receiverStageIds = 8;
}

message ProjectNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ public QueryEnvironment(String database, TableCache tableCache, @Nullable Worker
private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) {
WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions);
HepProgram traitProgram = getTraitProgram(workerManager);
return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram);
return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram,
sqlNodeAndOptions.getOptions());
}

@Nullable
Expand All @@ -163,14 +164,6 @@ private WorkerManager getWorkerManager(SqlNodeAndOptions sqlNodeAndOptions) {
}
}

/**
* Returns the planner context that should be used only for parsing queries.
*/
private PlannerContext getParsingPlannerContext() {
HepProgram traitProgram = getTraitProgram(null);
return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram);
}

/**
* Plan a SQL query.
*
Expand All @@ -185,7 +178,6 @@ private PlannerContext getParsingPlannerContext() {
*/
public QueryPlannerResult planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) {
try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) {
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext);
// TODO: current code only assume one SubPlan per query, but we should support multiple SubPlans per query.
// Each SubPlan should be able to run independently from Broker then set the results into the dependent
Expand All @@ -209,8 +201,7 @@ public DispatchableSubPlan planQuery(String sqlQuery) {
*
* Similar to {@link QueryEnvironment#planQuery(String, SqlNodeAndOptions, long)}, this API runs the query
* compilation. But it doesn't run the distributed {@link DispatchableSubPlan} generation, instead it only
* returns the
* explained logical plan.
* returns the explained logical plan.
*
* @param sqlQuery SQL query string.
* @param sqlNodeAndOptions parsed SQL query.
Expand All @@ -221,7 +212,6 @@ public QueryPlannerResult explainQuery(String sqlQuery, SqlNodeAndOptions sqlNod
@Nullable AskingServerStageExplainer.OnServerExplainer onServerExplainer) {
try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) {
SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelRoot relRoot = compileQuery(explain.getExplicandum(), plannerContext);
if (explain instanceof SqlPhysicalExplain) {
// get the physical plan for query.
Expand Down Expand Up @@ -271,8 +261,9 @@ public String explainQuery(String sqlQuery, long requestId) {
}

public List<String> getTableNamesForQuery(String sqlQuery) {
try (PlannerContext plannerContext = getParsingPlannerContext()) {
SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery).getSqlNode();
SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery);
try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) {
SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
sqlNode = ((SqlExplain) sqlNode).getExplicandum();
}
Expand All @@ -288,8 +279,9 @@ public List<String> getTableNamesForQuery(String sqlQuery) {
* Returns whether the query can be successfully compiled in this query environment
*/
public boolean canCompileQuery(String query) {
try (PlannerContext plannerContext = getParsingPlannerContext()) {
SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode();
SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) {
SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
sqlNode = ((SqlExplain) sqlNode).getExplicandum();
}
Expand Down Expand Up @@ -400,7 +392,7 @@ private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContex

private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext, long requestId,
@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker) {
SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker);
SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker, useSpools(plannerContext.getOptions()));
PinotDispatchPlanner pinotDispatchPlanner =
new PinotDispatchPlanner(plannerContext, _envConfig.getWorkerManager(), requestId, _envConfig.getTableCache());
return pinotDispatchPlanner.createDispatchableSubPlan(plan);
Expand Down Expand Up @@ -465,6 +457,14 @@ public static ImmutableQueryEnvironment.Config.Builder configBuilder() {
return ImmutableQueryEnvironment.Config.builder();
}

public boolean useSpools(Map<String, String> options) {
String optionValue = options.get(CommonConstants.Broker.Request.QueryOptionKey.USE_SPOOLS);
if (optionValue == null) {
return _envConfig.defaultUseSpools();
}
return Boolean.parseBoolean(optionValue);
}

@Value.Immutable
public interface Config {
String getDatabase();
Expand All @@ -484,6 +484,18 @@ default boolean defaultInferPartitionHint() {
return CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT;
}

/**
* Whether to use spools or not.
*
* This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration.
* This default value can be always overridden at query level by the query option
* {@link CommonConstants.Broker.Request.QueryOptionKey#USE_SPOOLS}.
*/
@Value.Default
default boolean defaultUseSpools() {
return CommonConstants.Broker.DEFAULT_OF_SPOOLS;
}

/**
* Returns the worker manager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,16 @@ public class PlannerContext implements AutoCloseable {
private final RelOptPlanner _relOptPlanner;
private final LogicalPlanner _relTraitPlanner;

private Map<String, String> _options;
private final Map<String, String> _options;

public PlannerContext(FrameworkConfig config, Prepare.CatalogReader catalogReader, RelDataTypeFactory typeFactory,
HepProgram optProgram, HepProgram traitProgram) {
HepProgram optProgram, HepProgram traitProgram, Map<String, String> options) {
_planner = new PlannerImpl(config);
_validator = new Validator(config.getOperatorTable(), catalogReader, typeFactory);
_relOptPlanner = new LogicalPlanner(optProgram, Contexts.EMPTY_CONTEXT, config.getTraitDefs());
_relTraitPlanner = new LogicalPlanner(traitProgram, Contexts.EMPTY_CONTEXT,
Collections.singletonList(RelDistributionTraitDef.INSTANCE));
_options = options;
}

public PlannerImpl getPlanner() {
Expand All @@ -74,10 +75,6 @@ public LogicalPlanner getRelTraitPlanner() {
return _relTraitPlanner;
}

public void setOptions(Map<String, String> options) {
_options = options;
}

public Map<String, String> getOptions() {
return _options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.apache.pinot.query.planner.explain;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.AggregateNode;
Expand Down Expand Up @@ -212,14 +215,22 @@ public StringBuilder visitMailboxSend(MailboxSendNode node, Context context) {
private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) {
appendInfo(node, context);

int receiverStageId = node.getReceiverStageId();
List<MailboxInfo> receiverMailboxInfos =
_dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId)
.getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
List<Stream<String>> perStageDescriptions = new ArrayList<>();
// This iterator is guaranteed to be sorted by stageId
for (Integer receiverStageId : node.getReceiverStageIds()) {
List<MailboxInfo> receiverMailboxInfos =
_dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId)
.getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
// Sort to ensure print order
Stream<String> stageDescriptions = receiverMailboxInfos.stream()
.sorted(Comparator.comparingInt(MailboxInfo::getPort))
.map(v -> "[" + receiverStageId + "]@" + v);
perStageDescriptions.add(stageDescriptions);
}
context._builder.append("->");
// Sort to ensure print order
String receivers = receiverMailboxInfos.stream().sorted(Comparator.comparingInt(MailboxInfo::getPort))
.map(v -> "[" + receiverStageId + "]@" + v).collect(Collectors.joining(",", "{", "}"));
String receivers = perStageDescriptions.stream()
.flatMap(Function.identity())
.collect(Collectors.joining(",", "{", "}"));
return context._builder.append(receivers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class EquivalentStagesFinder {
private EquivalentStagesFinder() {
}

public static GroupedStages findEquivalentStages(MailboxSendNode root) {
public static GroupedStages findEquivalentStages(PlanNode root) {
Visitor visitor = new Visitor();
root.visit(visitor, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,31 @@ public class EquivalentStagesReplacer {
private EquivalentStagesReplacer() {
}

public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages) {
replaceEquivalentStages(root, equivalentStages, OnSubstitution.NO_OP);
}

/**
* Replaces the equivalent stages in the query plan.
*
* @param root Root plan node
* @param equivalentStages Equivalent stages
*/
public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages) {
root.visit(Replacer.INSTANCE, equivalentStages);
public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages, OnSubstitution listener) {
root.visit(new Replacer(listener), equivalentStages);
}

public interface OnSubstitution {
OnSubstitution NO_OP = (receiver, oldSender, newSender) -> {
};
void onSubstitution(int receiver, int oldSender, int newSender);
}

private static class Replacer extends PlanNodeVisitor.DepthFirstVisitor<Void, GroupedStages> {
private static final Replacer INSTANCE = new Replacer();
private final OnSubstitution _listener;

private Replacer() {
public Replacer(OnSubstitution listener) {
_listener = listener;
}

@Override
Expand All @@ -62,6 +73,7 @@ public Void visitMailboxReceive(MailboxReceiveNode node, GroupedStages equivalen
// we don't want to visit the children of the node given it is going to be pruned
node.setSender(leader);
leader.addReceiver(node);
_listener.onSubstitution(node.getStageId(), sender.getStageId(), leader.getStageId());
} else {
visitMailboxSend(leader, equivalenceGroups);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ private PinotLogicalQueryPlanner() {
* Converts a Calcite {@link RelRoot} into a Pinot {@link SubPlan}.
*/
public static SubPlan makePlan(RelRoot relRoot,
@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker) {
@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker, boolean useSpools) {
PlanNode rootNode = new RelToPlanNodeConverter(tracker).toPlanNode(relRoot.rel);

PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker);
PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker, useSpools);
return new SubPlan(rootFragment,
new SubPlanMetadata(RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel), relRoot.fields), List.of());

Expand Down Expand Up @@ -89,10 +89,16 @@ public static SubPlan makePlan(RelRoot relRoot,
}

private static PlanFragment planNodeToPlanFragment(
PlanNode node, @Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker) {
PlanNode node, @Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker, boolean useSpools) {
PlanFragmenter fragmenter = new PlanFragmenter();
PlanFragmenter.Context fragmenterContext = fragmenter.createContext();
node = node.visit(fragmenter, fragmenterContext);

if (useSpools) {
GroupedStages equivalentStages = EquivalentStagesFinder.findEquivalentStages(node);
EquivalentStagesReplacer.replaceEquivalentStages(node, equivalentStages, fragmenter);
}

Int2ObjectOpenHashMap<PlanFragment> planFragmentMap = fragmenter.getPlanFragmentMap();
Int2ObjectOpenHashMap<IntList> childPlanFragmentIdsMap = fragmenter.getChildPlanFragmentIdsMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
* 3. Assign current PlanFragment ID to {@link MailboxReceiveNode};
* 4. Increment current PlanFragment ID by one and assign it to the {@link MailboxSendNode}.
*/
public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.Context> {
public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.Context>,
EquivalentStagesReplacer.OnSubstitution {
private final Int2ObjectOpenHashMap<PlanFragment> _planFragmentMap = new Int2ObjectOpenHashMap<>();
private final Int2ObjectOpenHashMap<IntList> _childPlanFragmentIdsMap = new Int2ObjectOpenHashMap<>();

Expand Down Expand Up @@ -86,6 +87,16 @@ private PlanNode process(PlanNode node, Context context) {
return node;
}

@Override
public void onSubstitution(int receiver, int oldSender, int newSender) {
IntList senders = _childPlanFragmentIdsMap.get(receiver);
senders.rem(oldSender);
if (!senders.contains(newSender)) {
senders.add(newSender);
}
_planFragmentMap.remove(oldSender);
}

@Override
public PlanNode visitAggregate(AggregateNode node, Context context) {
return process(node, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pinot.query.planner.physical;

import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
Expand All @@ -37,10 +40,7 @@


public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, DispatchablePlanContext> {
public static final DispatchablePlanVisitor INSTANCE = new DispatchablePlanVisitor();

private DispatchablePlanVisitor() {
}
private final Set<MailboxSendNode> _visited = Collections.newSetFromMap(new IdentityHashMap<>());

private static DispatchablePlanMetadata getOrCreateDispatchablePlanMetadata(PlanNode node,
DispatchablePlanContext context) {
Expand Down Expand Up @@ -104,10 +104,12 @@ public Void visitMailboxReceive(MailboxReceiveNode node, DispatchablePlanContext

@Override
public Void visitMailboxSend(MailboxSendNode node, DispatchablePlanContext context) {
node.getInputs().get(0).visit(this, context);
DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context);
dispatchablePlanMetadata.setPrePartitioned(node.isPrePartitioned());
context.getDispatchablePlanStageRootMap().put(node.getStageId(), node);
if (_visited.add(node)) {
node.getInputs().get(0).visit(this, context);
DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context);
dispatchablePlanMetadata.setPrePartitioned(node.isPrePartitioned());
context.getDispatchablePlanStageRootMap().put(node.getStageId(), node);
}
return null;
}

Expand Down
Loading

0 comments on commit 3adcedb

Please sign in to comment.