Skip to content

[SPARK-51752][SQL] Enable rCTE referencing from within a CTE #50546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ object CTESubstitution extends Rule[LogicalPlan] {
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
case LegacyBehaviorPolicy.EXCEPTION =>
assertNoNameConflictsInCTE(plan)
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs, None)
case LegacyBehaviorPolicy.LEGACY =>
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None)
case LegacyBehaviorPolicy.CORRECTED =>
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs, None)
}
if (cteDefs.isEmpty) {
substituted
Expand Down Expand Up @@ -162,7 +162,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
messageParameters = Map.empty)
}
val resolvedCTERelations = resolveCTERelations(relations, isLegacy = true,
forceInline = false, Seq.empty, cteDefs, allowRecursion)
forceInline = false, Seq.empty, cteDefs, None, allowRecursion)
substituteCTE(child, alwaysInline = true, resolvedCTERelations, None)
}
}
Expand Down Expand Up @@ -202,14 +202,18 @@ object CTESubstitution extends Rule[LogicalPlan] {
* @param forceInline always inline the CTE relations if this is true
* @param outerCTEDefs already resolved outer CTE definitions with names
* @param cteDefs all accumulated CTE definitions
* @param recursiveCTERelationAncestor contains information of whether we are in a recursive CTE,
* as well as what CTE that is.
* @return the plan where CTE substitution is applied and optionally the last substituted `With`
* where CTE definitions will be gathered to
*/
private def traverseAndSubstituteCTE(
plan: LogicalPlan,
forceInline: Boolean,
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
cteDefs: ArrayBuffer[CTERelationDef],
recursiveCTERelationAncestor: Option[(String, CTERelationDef)]
): (LogicalPlan, Option[LogicalPlan]) = {
var firstSubstituted: Option[LogicalPlan] = None
val newPlan = plan.resolveOperatorsDownWithPruning(
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
Expand All @@ -220,18 +224,31 @@ object CTESubstitution extends Rule[LogicalPlan] {
errorClass = "RECURSIVE_CTE_WHEN_INLINING_IS_FORCED",
messageParameters = Map.empty)
}
val resolvedCTERelations =

val tempCteDefs = ArrayBuffer.empty[CTERelationDef]
val resolvedCTERelations = if (recursiveCTERelationAncestor.isDefined) {
resolveCTERelations(relations, isLegacy = false, forceInline = false, outerCTEDefs,
tempCteDefs, recursiveCTERelationAncestor, allowRecursion) ++ outerCTEDefs
} else {
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs,
allowRecursion) ++ outerCTEDefs
recursiveCTERelationAncestor, allowRecursion) ++ outerCTEDefs
}
val substituted = substituteCTE(
traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1,
traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs,
recursiveCTERelationAncestor)._1,
// If we are resolving CTEs in a recursive CTE, we need to inline it in case the
// CTE contains the self reference.
forceInline,
resolvedCTERelations,
None)
if (firstSubstituted.isEmpty) {
firstSubstituted = Some(substituted)
}
substituted
if (recursiveCTERelationAncestor.isDefined) {
withCTEDefs(substituted, tempCteDefs.toSeq)
} else {
substituted
}

case other =>
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
Expand All @@ -247,6 +264,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
forceInline: Boolean,
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef],
recursiveCTERelationAncestor: Option[(String, CTERelationDef)],
allowRecursion: Boolean): Seq[(String, CTERelationDef)] = {
val alwaysInline = isLegacy || forceInline
var resolvedCTERelations = if (alwaysInline) {
Expand All @@ -255,6 +273,21 @@ object CTESubstitution extends Rule[LogicalPlan] {
outerCTEDefs
}
for ((name, relation) <- relations) {
// If recursion is allowed (RECURSIVE keyword specified)
// then it has higher priority than outer or previous relations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question about semantics here. Let's see an example

WITH RECURSIVE t AS ...
  WITH t AS ...
    SELECT ... FROM t

The t references to the non-recursive CTE def t or the recursive one? It looks more reasonable to reference the non-recursive one as it's closer in the scope. It's weird to make recursive CTE def to have higher priority.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It references the inner non recursive one (just checked). The comment just says that the recursive reference has a higher priority than the outer ones or the ones within in the same WITH statement.

// Therefore, we construct a `CTERelationDef` for the current relation.
// Later if we encounter unresolved relation which we need to find which CTE Def it is
// referencing to, we first check if it is a reference to this one. If yes, then we set the
// reference as being recursive.
val recursiveCTERelation = if (allowRecursion) {
Some(name -> CTERelationDef(relation))
} else {
// If there is an outer recursive CTE relative to this one, and this one isn't recursive,
// then the self reference with the first-check priority is going to be the CteRelationDef
// of this recursive ancestor.
recursiveCTERelationAncestor
}

val innerCTEResolved = if (isLegacy) {
// In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner
// `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations.
Expand Down Expand Up @@ -305,26 +338,20 @@ object CTESubstitution extends Rule[LogicalPlan] {
} else {
resolvedCTERelations
}
traverseAndSubstituteCTE(relation, forceInline, nonConflictingCTERelations, cteDefs)._1
traverseAndSubstituteCTE(relation, forceInline, nonConflictingCTERelations,
cteDefs, recursiveCTERelation)._1
}

// If recursion is allowed (RECURSIVE keyword specified)
// then it has higher priority than outer or previous relations.
// Therefore, we construct a `CTERelationDef` for the current relation.
// Later if we encounter unresolved relation which we need to find which CTE Def it is
// referencing to, we first check if it is a reference to this one. If yes, then we set the
// reference as being recursive.
val recursiveCTERelation = if (allowRecursion) {
Some(name -> CTERelationDef(relation))
} else {
None
}
// CTE definition can reference a previous one or itself if recursion allowed.
val substituted = substituteCTE(innerCTEResolved, alwaysInline,
resolvedCTERelations, recursiveCTERelation)
val cteRelation = recursiveCTERelation
val cteRelation = if (allowRecursion) {
recursiveCTERelation
.map(_._2.copy(child = substituted))
.getOrElse(CTERelationDef(substituted))
} else {
CTERelationDef(substituted)
}
if (!alwaysInline) {
cteDefs += cteRelation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,26 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
cteDef.copy(child = alias.copy(child = loop))
}

// Simple case of duplicating (UNION ALL) clause.
case alias @ SubqueryAlias(_, withCTE @ WithCTE(
Union(Seq(anchor, recursion), false, false), innerCteDefs)) =>
if (!anchor.resolved) {
cteDef
} else {
// We need to check whether any of the inner CTEs has a self reference and replace
// it if needed
val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
innerCteDef.copy(child = rewriteRecursiveCTERefs(
innerCteDef.child, anchor, cteDef.id, None))
}
val loop = UnionLoop(
cteDef.id,
anchor,
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None))
cteDef.copy(child = alias.copy(child = withCTE.copy(
plan = loop, cteDefs = newInnerCteDefs)))
}

// The case of CTE name followed by a parenthesized list of column name(s), eg.
// WITH RECURSIVE t(n).
case alias @ SubqueryAlias(_,
Expand All @@ -100,7 +120,31 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
cteDef.copy(child = alias.copy(child = columnAlias.copy(child = loop)))
}

// If the recursion is described with an UNION (deduplicating) clause then the
// The case of CTE name followed by a parenthesized list of column name(s), eg.
// WITH RECURSIVE t(n).
case alias @ SubqueryAlias(_,
columnAlias @ UnresolvedSubqueryColumnAliases(
colNames,
withCTE @ WithCTE(Union(Seq(anchor, recursion), false, false), innerCteDefs)
)) =>
if (!anchor.resolved) {
cteDef
} else {
// We need to check whether any of the inner CTEs has a self reference and replace
// it if needed
val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
innerCteDef.copy(child = rewriteRecursiveCTERefs(
innerCteDef.child, anchor, cteDef.id, Some(colNames)))
}
val loop = UnionLoop(
cteDef.id,
anchor,
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames)))
cteDef.copy(child = alias.copy(child = columnAlias.copy(
child = withCTE.copy(plan = loop, cteDefs = newInnerCteDefs))))
}

// If the recursion is described with a UNION (deduplicating) clause then the
// recursive term should not return those rows that have been calculated previously,
// and we exclude those rows from the current iteration result.
case alias @ SubqueryAlias(_,
Expand All @@ -123,6 +167,34 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
cteDef.copy(child = alias.copy(child = loop))
}

// UNION case with CTEs inside.
case alias @ SubqueryAlias(_, withCTE @ WithCTE(
Distinct(Union(Seq(anchor, recursion), false, false)), innerCteDefs)) =>
cteDef.failAnalysis(
errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE",
messageParameters = Map.empty)
if (!anchor.resolved) {
cteDef
} else {
// We need to check whether any of the inner CTEs has a self reference and replace
// it if needed
val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
innerCteDef.copy(child = rewriteRecursiveCTERefs(
innerCteDef.child, anchor, cteDef.id, None))
}
val loop = UnionLoop(
cteDef.id,
Distinct(anchor),
Except(
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None),
UnionLoopRef(cteDef.id, anchor.output, true),
isAll = false
)
)
cteDef.copy(child = alias.copy(child = withCTE.copy(
plan = loop, cteDefs = newInnerCteDefs)))
}

// The case of CTE name followed by a parenthesized list of column name(s).
case alias @ SubqueryAlias(_,
columnAlias@UnresolvedSubqueryColumnAliases(
Expand All @@ -147,6 +219,37 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
cteDef.copy(child = alias.copy(child = columnAlias.copy(child = loop)))
}

// The case of CTE name followed by a parenthesized list of column name(s).
case alias @ SubqueryAlias(_,
columnAlias@UnresolvedSubqueryColumnAliases(
colNames,
WithCTE(Distinct(Union(Seq(anchor, recursion), false, false)), innerCteDefs)
)) =>
cteDef.failAnalysis(
errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE",
messageParameters = Map.empty)
if (!anchor.resolved) {
cteDef
} else {
// We need to check whether any of the inner CTEs has a self reference and replace
// it if needed
val newInnerCteDefs = innerCteDefs.map { innerCteDef =>
innerCteDef.copy(child = rewriteRecursiveCTERefs(
innerCteDef.child, anchor, cteDef.id, Some(colNames)))
}
val loop = UnionLoop(
cteDef.id,
Distinct(anchor),
Except(
rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames)),
UnionLoopRef(cteDef.id, anchor.output, true),
isAll = false
)
)
cteDef.copy(child = alias.copy(child = columnAlias.copy(
child = withCTE.copy(plan = loop, cteDefs = newInnerCteDefs))))
}

case other =>
// We do not support cases of sole Union (needs a SubqueryAlias above it), nor
// Project (as UnresolvedSubqueryColumnAliases have not been substituted with the
Expand Down
Loading