diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto index 5e3d733e45e4..0091700b89ea 100644 --- a/pinot-common/src/main/proto/plan.proto +++ b/pinot-common/src/main/proto/plan.proto @@ -89,6 +89,7 @@ enum JoinType { enum JoinStrategy { HASH = 0; LOOKUP = 1; + BROADCAST = 2; } message JoinNode { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java index d0fd20bb8c12..d1b77800c5ec 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.calcite.rel.hint; +import javax.annotation.Nullable; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.hint.RelHint; @@ -70,6 +71,8 @@ public static class JoinHintOptions { public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast"; // "lookup" can be used when the right table is a dimension table replicated to all workers public static final String LOOKUP_JOIN_STRATEGY = "lookup"; + // "broadcast" can be used when the right table is small enough to be broadcasted to all workers + public static final String BROADCAST_JOIN_STRATEGY = "broadcast"; /** * Max rows allowed to build the right table hash collection. @@ -93,11 +96,27 @@ public static class JoinHintOptions { */ public static final String APPEND_DISTINCT_TO_SEMI_JOIN_PROJECT = "append_distinct_to_semi_join_project"; + @Nullable + public static String getJoinStrategyHint(Join join) { + return PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS, + PinotHintOptions.JoinHintOptions.JOIN_STRATEGY); + } + + public static boolean useLookupJoinStrategy(@Nullable String joinStrategyHint) { + return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint); + } + // TODO: Consider adding a Join implementation with join strategy. public static boolean useLookupJoinStrategy(Join join) { - return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase( - PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS, - PinotHintOptions.JoinHintOptions.JOIN_STRATEGY)); + return useLookupJoinStrategy(getJoinStrategyHint(join)); + } + + public static boolean useBroadcastJoinStrategy(@Nullable String joinStrategyHint) { + return BROADCAST_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint); + } + + public static boolean useBroadcastJoinStrategy(Join join) { + return useBroadcastJoinStrategy(getJoinStrategyHint(join)); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java index 5df0b20c54d4..e331e5ba308b 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java @@ -54,10 +54,15 @@ public void onMatch(RelOptRuleCall call) { JoinInfo joinInfo = join.analyzeCondition(); RelNode newLeft; RelNode newRight; - if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) { + String joinStrategyHint = PinotHintOptions.JoinHintOptions.getJoinStrategyHint(join); + if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(joinStrategyHint)) { // Lookup join - add local exchange on the left side newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON); newRight = right; + } else if (PinotHintOptions.JoinHintOptions.useBroadcastJoinStrategy(joinStrategyHint)) { + // Broadcast join - add local exchange on the left side, broadcast exchange on the right side + newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON); + newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED); } else { // Regular join - add exchange on both sides if (joinInfo.leftKeys.isEmpty()) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java index 3f5ab2261e0c..694cf5d21a17 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java @@ -313,7 +313,8 @@ private JoinNode convertLogicalJoin(LogicalJoin join) { // Check if the join hint specifies the join strategy JoinNode.JoinStrategy joinStrategy; - if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) { + String joinStrategyHint = PinotHintOptions.JoinHintOptions.getJoinStrategyHint(join); + if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(joinStrategyHint)) { joinStrategy = JoinNode.JoinStrategy.LOOKUP; // Run some validations for lookup join @@ -333,6 +334,8 @@ private JoinNode convertLogicalJoin(LogicalJoin join) { Preconditions.checkState(projectInput instanceof TableScan, "Right input for lookup join must be a Project over TableScan, got Project over: %s", projectInput.getClass().getSimpleName()); + } else if (PinotHintOptions.JoinHintOptions.useBroadcastJoinStrategy(joinStrategyHint)) { + joinStrategy = JoinNode.JoinStrategy.BROADCAST; } else { // TODO: Consider adding DYNAMIC_BROADCAST as a separate join strategy joinStrategy = JoinNode.JoinStrategy.HASH; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java index c07392c29859..5d55e43b4f2f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java @@ -102,6 +102,16 @@ public int hashCode() { } public enum JoinStrategy { - HASH, LOOKUP + // HASH is the default equi-join strategy, where both left and right tables are hash partitioned on join keys, then + // shuffled to the same worker to perform the join. + HASH, + + // LOOKUP join strategy can be used for equi-join when the right table is a dimension table replicated to all + // workers. It looks up the in-memory pre-materialized right table to perform the join. + LOOKUP, + + // BROADCAST join strategy can be used when the right table is small enough to be broadcasted to all workers of the + // left table. + BROADCAST } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java index 7ea9d0d16b38..d720b667bbb5 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java @@ -295,6 +295,8 @@ private static JoinNode.JoinStrategy convertJoinStrategy(Plan.JoinStrategy joinS return JoinNode.JoinStrategy.HASH; case LOOKUP: return JoinNode.JoinStrategy.LOOKUP; + case BROADCAST: + return JoinNode.JoinStrategy.BROADCAST; default: throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java index bea6042d02c3..e110eecb3f54 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java @@ -300,6 +300,8 @@ private static Plan.JoinStrategy convertJoinStrategy(JoinNode.JoinStrategy joinS return Plan.JoinStrategy.HASH; case LOOKUP: return Plan.JoinStrategy.LOOKUP; + case BROADCAST: + return Plan.JoinStrategy.BROADCAST; default: throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index 6af598bb3da7..340b228fb8ee 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -97,10 +97,13 @@ private void assignWorkersToNonRootFragment(PlanFragment fragment, DispatchableP Map metadataMap = context.getDispatchablePlanMetadataMap(); DispatchablePlanMetadata metadata = metadataMap.get(fragment.getFragmentId()); boolean leafPlan = isLeafPlan(metadata); - if (isLocalExchange(children)) { - // If it is a local exchange (single child with SINGLETON distribution), use the same worker assignment to avoid + Integer childIdWithLocalExchange = findLocalExchange(children); + if (childIdWithLocalExchange != null) { + // If there is a local exchange (child with SINGLETON distribution), use the same worker assignment to avoid // shuffling data. - // TODO: Support partition parallelism + // TODO: + // 1. Support partition parallelism + // 2. Check if there are conflicts (multiple children with different local exchange) DispatchablePlanMetadata childMetadata = metadataMap.get(children.get(0).getFragmentId()); metadata.setWorkerIdToServerInstanceMap(childMetadata.getWorkerIdToServerInstanceMap()); metadata.setPartitionFunction(childMetadata.getPartitionFunction()); @@ -121,13 +124,21 @@ private void assignWorkersToNonRootFragment(PlanFragment fragment, DispatchableP } } - private boolean isLocalExchange(List children) { - if (children.size() != 1) { - return false; + /** + * Returns the index of the child fragment that has a local exchange (SINGLETON distribution), or {@code null} if none + * exists. + */ + @Nullable + private Integer findLocalExchange(List children) { + int numChildren = children.size(); + for (int i = 0; i < numChildren; i++) { + PlanNode childPlanNode = children.get(i).getFragmentRoot(); + if (childPlanNode instanceof MailboxSendNode + && ((MailboxSendNode) childPlanNode).getDistributionType() == RelDistribution.Type.SINGLETON) { + return i; + } } - PlanNode childPlanNode = children.get(0).getFragmentRoot(); - return childPlanNode instanceof MailboxSendNode - && ((MailboxSendNode) childPlanNode).getDistributionType() == RelDistribution.Type.SINGLETON; + return null; } private static boolean isLeafPlan(DispatchablePlanMetadata metadata) { diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json index f275eca72f4c..a8bcb4fe1e10 100644 --- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json +++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json @@ -758,6 +758,61 @@ } ] }, + "broadcast_join_planning_tests": { + "queries": [ + { + "description": "Simple broadcast join", + "sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy = 'broadcast') */ a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1", + "output": [ + "Execution Plan", + "\nLogicalProject(col1=[$0], col2=[$2])", + "\n LogicalJoin(condition=[=($0, $1)], joinType=[inner])", + "\n PinotLogicalExchange(distribution=[single])", + "\n LogicalProject(col1=[$0])", + "\n LogicalTableScan(table=[[default, a]])", + "\n PinotLogicalExchange(distribution=[broadcast])", + "\n LogicalProject(col1=[$0], col2=[$1])", + "\n LogicalTableScan(table=[[default, b]])", + "\n" + ] + }, + { + "description": "Broadcast join with filter on both left and right table", + "sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy = 'broadcast') */ a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 WHERE a.col2 = 'foo' AND b.col2 = 'bar'", + "output": [ + "Execution Plan", + "\nLogicalProject(col1=[$0], col2=[$2])", + "\n LogicalJoin(condition=[=($0, $1)], joinType=[inner])", + "\n PinotLogicalExchange(distribution=[single])", + "\n LogicalProject(col1=[$0])", + "\n LogicalFilter(condition=[=($1, _UTF-8'foo')])", + "\n LogicalTableScan(table=[[default, a]])", + "\n PinotLogicalExchange(distribution=[broadcast])", + "\n LogicalProject(col1=[$0], col2=[$1])", + "\n LogicalFilter(condition=[=($1, _UTF-8'bar')])", + "\n LogicalTableScan(table=[[default, b]])", + "\n" + ] + }, + { + "description": "Broadcast join with transformation on both left and right table joined key", + "sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy = 'broadcast') */ a.col1, b.col2 FROM a JOIN b ON upper(a.col1) = upper(b.col1)", + "output": [ + "Execution Plan", + "\nLogicalProject(col1=[$0], col2=[$2])", + "\n LogicalJoin(condition=[=($1, $3)], joinType=[inner])", + "\n PinotLogicalExchange(distribution=[single])", + "\n LogicalProject(col1=[$0], $f8=[UPPER($0)])", + "\n LogicalTableScan(table=[[default, a]])", + "\n PinotLogicalExchange(distribution=[broadcast])", + "\n LogicalProject(col2=[$1], $f8=[UPPER($0)])", + "\n LogicalTableScan(table=[[default, b]])", + "\n" + ] + } + ] + + }, "exception_throwing_join_planning_tests": { "queries": [ { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java index 048af3925311..98a35b1c89fb 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java @@ -138,11 +138,11 @@ public ObjectNode visitFilter(FilterNode node, Void context) { @Override public ObjectNode visitJoin(JoinNode node, Void context) { - if (node.getJoinStrategy() == JoinNode.JoinStrategy.HASH) { - return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN); - } else { - assert node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP; + if (node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP) { return recursiveCase(node, MultiStageOperator.Type.LOOKUP_JOIN); + } else { + // TODO: Consider renaming this operator type. It handles multiple join strategies. + return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java index 9570d77b47fd..37664ebeea66 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java @@ -178,12 +178,11 @@ public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext conte MultiStageOperator leftOperator = visit(left, context); PlanNode right = inputs.get(1); MultiStageOperator rightOperator = visit(right, context); - JoinNode.JoinStrategy joinStrategy = node.getJoinStrategy(); - if (joinStrategy == JoinNode.JoinStrategy.HASH) { - return new HashJoinOperator(context, leftOperator, left.getDataSchema(), rightOperator, node); - } else { - assert joinStrategy == JoinNode.JoinStrategy.LOOKUP; + if (node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP) { return new LookupJoinOperator(context, leftOperator, rightOperator, node); + } else { + // TODO: Consider renaming this operator. It handles multiple join strategies. + return new HashJoinOperator(context, leftOperator, left.getDataSchema(), rightOperator, node); } } diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json b/pinot-query-runtime/src/test/resources/queries/QueryHints.json index e8d30ed40905..4dfe3471ee6e 100644 --- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json +++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json @@ -125,6 +125,14 @@ "description": "Colocated JOIN with partition column and group by non-partitioned column with stage parallelism", "sql": "SET stageParallelism=2; SELECT {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.name" }, + { + "description": "Broadcast JOIN without partition hint", + "sql": "SELECT /*+ joinOptions(join_strategy='broadcast') */ {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num" + }, + { + "description": "Broadcast JOIN with partition hint", + "sql": "SELECT /*+ joinOptions(join_strategy='broadcast') */ {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ JOIN {tbl2} ON {tbl1}.num = {tbl2}.num" + }, { "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column", "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.num, {tbl1}.name FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy'))"