Skip to content

[SPARK-51757][SQL] Fix LEAD/LAG Function Offset Exceeds Window Group Size #50552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: master
Choose a base branch
from

Conversation

xin-aurora
Copy link

@xin-aurora xin-aurora commented Apr 10, 2025

What changes were proposed in this pull request?

The current implementation of the prepare in OffsetWindowFunctionFrameBase:

  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
    if (offset > rows.length) {
      fillDefaultValue(EmptyRow)
    } else {
    ...
  }

The current implementation of the write in FrameLessOffsetWindowFunctionFrame:

   override def write(index: Int, current: InternalRow): Unit = {
    if (offset > rows.length) {
      // Already use default values in prepare.
    } else {
    ...
  }

These implementations caused the LEAD and LAG functions to have NullPointerException when the default value is not Literal and the range of the default value exceeds the window group size.

This pr introduced a boolean val onlyLiteralNulls and modified prepare and write.

The onlyLiteralNulls indicated whether the default values are Literal values.

In prepare, first check onlyLiteralNulls. If the default value is Literal, call fillDefaultValue(EmptyRow).

In write, if onlyLiteralNulls is false, the default value must be non-literal, call fillDefaultValue(current).

Why are the changes needed?

Fix LEAD and LAG cause NullPointerException in the window function (SPARK-51757)

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add test method in test("lead/lag with column reference as default when offset exceeds window group size") in org.apache.spark.sql.DataFrameWindowFramesSuite

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Apr 10, 2025
@@ -183,7 +183,8 @@ abstract class OffsetWindowFunctionFrameBase(
override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
resetStates(rows)
if (absOffset > rows.length) {
fillDefaultValue(EmptyRow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the code is not needed, just remove them. and add some comments to explain the reason

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the only place? Seems we should never run fillDefaultValue in prepare as the default value can be an expression that references attributes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems all the existing tests just cover the default value as Literal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think it's not related to the partition size.
The cause is apply expression on empty row.
We can check the default expression and apply fillDefaultValue(currentRow) in write if it is not a Literal, or apply it is in prepare if it is a literal.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the code is not needed, just remove them. and add some comments to explain the reason

Edit the comments to include the reason

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the only place? Seems we should never run fillDefaultValue in prepare as the default value can be an expression that references attributes.

I found this is the only place to run fillDefaultValue in prepare. I updated the write in FrameLessOffsetWindowFunctionFrame to call fillDefaultValue as well.
I noticed that the write in UnboundedPrecedingOffsetWindowFunctionFrame also relies on prepare to handle cases where offset exceeds the window group size. However, I haven't found a query that triggers this method. It's possible that write in UnboundedPrecedingOffsetWindowFunctionFrame also needs to be updated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think it's not related to the partition size. The cause is apply expression on empty row. We can check the default expression and apply fillDefaultValue(currentRow) in write if it is not a Literal, or apply it is in prepare if it is a literal.

Thank you for the suggestions! We had a similar idea for fixing this issue. I've updated write in FrameLessOffsetWindowFunctionFrame to call fillDefaultValue(currentRow).

@beliefer
Copy link
Contributor

Could you check the description
The current implementation of the write in OffsetWindowFunctionFrameBase:
Where is it?

@xin-aurora
Copy link
Author

Could you check the description The current implementation of the write in OffsetWindowFunctionFrameBase: Where is it?

Sorry for the typo. It should be FrameLessOffsetWindowFunctionFrame.

@beliefer
Copy link
Contributor

The change will cause perf regression if the default expression is a literal.

@xin-aurora xin-aurora changed the title [SPARK-51757] Fix LEAD/LAG Function Offset Exceeds Partition Size [SPARK-51757][SQL] Fix LEAD/LAG Function Offset Exceeds Window Group Size Apr 16, 2025
@xin-aurora
Copy link
Author

xin-aurora commented Apr 17, 2025

The change will cause perf regression if the default expression is a literal.

I updated the code by introducing a boolean val onlyLiteralNulls. In prepare, if the default expression is a literal, call fillDefaultValue(EmptyRow). In write, if onlyLiteralNulls is false, callfillDefaultValue(current).
Would you say this change causes less pref regression?

@linhongliu-db
Copy link
Contributor

@cloud-fan @beliefer the PR is updated, could you please take another look?

// Avoid evaluating non-literal defaults with EmptyRow,
// which causes NullPointerException.
// Check whether defaults are Literal or null.
if (onlyLiteralNulls) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should apply fillDefaultValue for any foldable expression here, not only the null literal.

@@ -180,10 +180,20 @@ abstract class OffsetWindowFunctionFrameBase(
}
}

/** Indicates whether the default values are Literal values. */
protected lazy val onlyLiteralNulls = expressions.forall { e =>
e.default == null || (e.default.foldable && e.default.eval() == null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think e.default == null || e.default.foldable should be good enough. As long as the default expression does not reference any columns, we can evaluate it in def prepare.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @beliefer Thank you for the suggestions! I changed the conditions to e.default == null || e.default.isInstanceOf[Literal], because e.default == null || e.default.foldable caused an error to run KafkaMicroBatchV2SourceWithConsumerSuite test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xin-aurora do you still remember what the foldable expression was (but not literal) that failed the test?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I changed the code to e.default == null || e.default.foldable and updated the repository to the latest branch. The KafkaMicroBatchV2SourceWithConsumerSuite test no longer produces the error.

I am not sure about which specific foldable expression caused the test failed. The previous error was related to Query with Trigger.AvailableNow. Here are part of the error messages:

[info] - Query with Trigger.AvailableNow should throw error when topic partitions got unavailable during subsequent batches *** FAILED *** (1 minute)
[info]   java.lang.AssertionError: assertion failed: Exception tree doesn't contain the expected exception with message: Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow.
[info] org.scalatest.exceptions.TestFailedException: isPropagated was false Partition [topic-40, 1] metadata not propagated after timeout
...
[info] 	at org.apache.spark.sql.kafka010.KafkaTestUtils.waitUntilMetadataIsPropagated(KafkaTestUtils.scala:614)
[info] 	at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$createTopic$1(KafkaTestUtils.scala:379)
[info] 	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:192)
[info] 	at org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:378)
[info] 	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11(KafkaMicroBatchSourceSuite.scala:351)
[info] 	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11$adapted(KafkaMicroBatchSourceSuite.scala:348)
[info] 	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:56)
[info] 	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:49)
[info] 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:880)
[info] 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
[info] 	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
[info] 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok it's probably a flaky test

@cloud-fan
Copy link
Contributor

LGTM, can we update UnboundedPrecedingOffsetWindowFunctionFrame#write to call fillDefaultValue as well? It can be a potential bug that we'd better fix to it now.

@xin-aurora
Copy link
Author

xin-aurora commented Apr 25, 2025

LGTM, can we update UnboundedPrecedingOffsetWindowFunctionFrame#write to call fillDefaultValue as well? It can be a potential bug that we'd better fix to it now.

I found both UnboundedPrecedingOffsetWindowFunctionFrame#write and UnboundedOffsetWindowFunctionFrame#write do not handle non-literal default values. However, they don't trigger the same error that lead/lag do (where the offset exceeds the window group size).

I tried nth_value() with an unbounded window. Sincenth_value() doesn't allow assigning default values directly, it doesn't trigger the error. We can run something like nth_value($"id", 3, true) instead of nth_value($"id", 3, $"id") or nth_value($"id", 3, -1).

We could fix this potential bug when nth_value() adds support for assigning default values. I can leave a comment about this bug in the two write functions if you think that would be helpful. Please also let me know if you have other window queries to suggest. @cloud-fan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants