-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-51757][SQL] Fix LEAD/LAG Function Offset Exceeds Window Group Size #50552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@@ -183,7 +183,8 @@ abstract class OffsetWindowFunctionFrameBase( | |||
override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { | |||
resetStates(rows) | |||
if (absOffset > rows.length) { | |||
fillDefaultValue(EmptyRow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the code is not needed, just remove them. and add some comments to explain the reason
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it the only place? Seems we should never run fillDefaultValue
in prepare
as the default value can be an expression that references attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems all the existing tests just cover the default value as Literal
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think it's not related to the partition size.
The cause is apply expression on empty row.
We can check the default expression and apply fillDefaultValue(currentRow)
in write if it is not a Literal, or apply it is in prepare if it is a literal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the code is not needed, just remove them. and add some comments to explain the reason
Edit the comments to include the reason
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it the only place? Seems we should never run
fillDefaultValue
inprepare
as the default value can be an expression that references attributes.
I found this is the only place to run fillDefaultValue
in prepare
. I updated the write
in FrameLessOffsetWindowFunctionFrame
to call fillDefaultValue
as well.
I noticed that the write
in UnboundedPrecedingOffsetWindowFunctionFrame
also relies on prepare
to handle cases where offset exceeds the window group size. However, I haven't found a query that triggers this method. It's possible that write
in UnboundedPrecedingOffsetWindowFunctionFrame
also needs to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think it's not related to the partition size. The cause is apply expression on empty row. We can check the default expression and apply
fillDefaultValue(currentRow)
in write if it is not a Literal, or apply it is in prepare if it is a literal.
Thank you for the suggestions! We had a similar idea for fixing this issue. I've updated write
in FrameLessOffsetWindowFunctionFrame
to call fillDefaultValue(currentRow)
.
Could you check the description |
Sorry for the typo. It should be |
The change will cause perf regression if the default expression is a literal. |
I updated the code by introducing a boolean val |
@cloud-fan @beliefer the PR is updated, could you please take another look? |
// Avoid evaluating non-literal defaults with EmptyRow, | ||
// which causes NullPointerException. | ||
// Check whether defaults are Literal or null. | ||
if (onlyLiteralNulls) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should apply fillDefaultValue
for any foldable expression here, not only the null literal.
@@ -180,10 +180,20 @@ abstract class OffsetWindowFunctionFrameBase( | |||
} | |||
} | |||
|
|||
/** Indicates whether the default values are Literal values. */ | |||
protected lazy val onlyLiteralNulls = expressions.forall { e => | |||
e.default == null || (e.default.foldable && e.default.eval() == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think e.default == null || e.default.foldable
should be good enough. As long as the default expression does not reference any columns, we can evaluate it in def prepare
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @beliefer Thank you for the suggestions! I changed the conditions to e.default == null || e.default.isInstanceOf[Literal]
, because e.default == null || e.default.foldable
caused an error to run KafkaMicroBatchV2SourceWithConsumerSuite
test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xin-aurora do you still remember what the foldable expression was (but not literal) that failed the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I changed the code to e.default == null || e.default.foldable
and updated the repository to the latest branch. The KafkaMicroBatchV2SourceWithConsumerSuite
test no longer produces the error.
I am not sure about which specific foldable expression caused the test failed. The previous error was related to Query with Trigger.AvailableNow
. Here are part of the error messages:
[info] - Query with Trigger.AvailableNow should throw error when topic partitions got unavailable during subsequent batches *** FAILED *** (1 minute)
[info] java.lang.AssertionError: assertion failed: Exception tree doesn't contain the expected exception with message: Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow.
[info] org.scalatest.exceptions.TestFailedException: isPropagated was false Partition [topic-40, 1] metadata not propagated after timeout
...
[info] at org.apache.spark.sql.kafka010.KafkaTestUtils.waitUntilMetadataIsPropagated(KafkaTestUtils.scala:614)
[info] at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$createTopic$1(KafkaTestUtils.scala:379)
[info] at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:192)
[info] at org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:378)
[info] at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11(KafkaMicroBatchSourceSuite.scala:351)
[info] at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11$adapted(KafkaMicroBatchSourceSuite.scala:348)
[info] at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:56)
[info] at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:49)
[info] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:880)
[info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
[info] at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
[info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok it's probably a flaky test
LGTM, can we update |
I found both I tried We could fix this potential bug when |
What changes were proposed in this pull request?
The current implementation of the
prepare
inOffsetWindowFunctionFrameBase
:The current implementation of the
write
inFrameLessOffsetWindowFunctionFrame
:These implementations caused the
LEAD
andLAG
functions to haveNullPointerException
when the default value is not Literal and the range of the default value exceeds the window group size.This pr introduced a boolean val
onlyLiteralNulls
and modifiedprepare
andwrite
.The
onlyLiteralNulls
indicated whether the default values are Literal values.In
prepare
, first checkonlyLiteralNulls
. If the default value is Literal, callfillDefaultValue(EmptyRow)
.In
write
, ifonlyLiteralNulls
is false, the default value must be non-literal, callfillDefaultValue(current)
.Why are the changes needed?
Fix
LEAD
andLAG
cause NullPointerException in the window function (SPARK-51757)Does this PR introduce any user-facing change?
No
How was this patch tested?
Add test method in test("lead/lag with column reference as default when offset exceeds window group size") in org.apache.spark.sql.DataFrameWindowFramesSuite
Was this patch authored or co-authored using generative AI tooling?
No.