Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle coalesce read supports split-and-retry #11598

Open
wants to merge 3 commits into
base: branch-24.12
Choose a base branch
from

Conversation

firestarman
Copy link
Collaborator

@firestarman firestarman commented Oct 12, 2024

close #11584

This PR adds in the split-and-retry support when reading buffers from Shuffle for GpuShuffleCoalesceExec and sized hash joins.

When OOM happens, the retry will reduce the target size by half, re-collect buffers from the cache with total size up to the new target batch size, then concatenate the collected buffers and move the concatenated result to GPU. Since the concatenated buffer can not be split, we have to re-collect the buffers and concatenate them again with this new smaller target size.

The new target size will be used by all the following read in this task. The more split-and-retry it gets, the smaller the target size will be. This is just a choice. We prefer to believe that the GPU is busy with high memory pressure when getting an OOM. And more OOMs will possibly occur if using the original target batch size.

The change is big but the idea is quite simple.

Pull in shuffle buffers according to the target size on CPU
and cache them
     |
     |
    \ /
Take the first N buffers with total size up to the target_size   <---
     |                                                               |
     |                                                               |
    \ /                                                              |
Do the coalcesce on CPU                                     (reduce the target_szie by half )
     |                                                               |                                                        
     |                                                               |
    \ /                                                              |
Move the coalcesced big buffer to GPU            ---- (GPU OOM) ---->
     |
     |
    \ /
Done, remove the first N buffers from the cache

This PR also covers some code refactor in the shuffle hash join for the coming Kudo support. See #11590. It tries to move all the code pieces handling serialized tables into GpuShuffleCoalesceExec for easier code maintenance.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

@sameerz sameerz added the feature request New feature or request label Oct 13, 2024
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot of code, and I have not even really started to dig into it. I am concerned about how much is being changed here. From the description it looks like someone has misconfigured the target batch size, so this is going to try and adjust that target size, but just for shuffle coalesce? I am really confused about the goals of this and would like to see examples of when this helps and what the performance impact is to the happy path.

I also am concerned that we are just masking other underlying problems. The spill framework should be set up so that if we hit a point where there is nothing more to spill, then we start to pause other threads. So instead of adjusting the target batch size we adjust the parallelism dynamically. So did the user set the target batch size so high that a single thread would not be able to process a single task? If so, then we need to work with them to set reasonable target batch sizes. If the size looks reasonable, then we need to understand what is using all of the GPU's memory that makes it so that we cannot get enough to copy a single batch to the GPU? We might have some memory leaks somewhere else that this is trying to mask.

metricsMap: Map[String, GpuMetric],
prefetchFirstBatch: Boolean = false): Iterator[ColumnarBatch] = {
if (readOption.useSplitRetryRead) {
val reader = new GpuShuffleCoalesceReader(iter, targetSize, dataTypes, metricsMap)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused why we want to put the coalesce on the GPU if we want to try and split and retry the read? We have it on the CPU to speed up the processing, but the new default puts it on the GPU. Do you have some performance numbers to show the impact of this change?

Copy link
Collaborator Author

@firestarman firestarman Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx for review. And the coalesce is still on the CPU. What this new GpuShuffleCoalesceReader does is just a combination of HostShuffleCoalesceIterator and GpuShuffleCoalesceIterator, along with the additional split-retry support when trying to move the coalcesced buffer to the GPU. To know if GPU OOM happens for split-retry, we have to put all coalesce steps together in this new reader. So the reader can replace the iterators where the HostShuffleCoalesceIterator and GpuShuffleCoalesceIterator are used together. So far it is for only sized hash joins and the coalesce shuffle exec.

Split-retry here does not mean we will split the coalesced buffer. Instead we retry the entire coalesce process with smaller target size, that is re-collecting less shuffle buffers (total size <= target_size/2) from the cache, concatenating them and trying to move to the GPU again. The new coalesced buffer is about half size of the original one, so requires less GPU memory when copying it to the GPU.

The change is big but the idea is quite simple.

Pull in shuffle buffers according to the target size on CPU
and cache them
     |
     |
    \ /
Take the first N buffers with total size up to the target_size   <---
     |                                                               |
     |                                                               |
    \ /                                                              |
Do the coalcesce on CPU                                     (reduce the target_szie by half )
     |                                                               |                                                        
     |                                                               |
    \ /                                                              |
Move the coalcesced big buffer to GPU            ---- (GPU OOM) ---->
     |
     |
    \ /
Done, remove the first N buffers from the cache

@firestarman
Copy link
Collaborator Author

firestarman commented Oct 15, 2024

This is a lot of code, and I have not even really started to dig into it. I am concerned about how much is being changed here. From the description it looks like someone has misconfigured the target batch size, so this is going to try and adjust that target size, but just for shuffle coalesce? I am really confused about the goals of this and would like to see examples of when this helps and what the performance impact is to the happy path.

The change is big but the whole feature can be disabled by simply setting spark.rapids.shuffle.splitRetryRead.enabled to false. Then nothing will be changed. And all the change in GpuShuffledHashJoinExec.scala is 100% code refactor. Since this new reader does not apply to the optimized case in the normal GPU shuffle hash join.

Yes, this adjusts the target size only for shuffle coalesce. And the whole task will be affected only when no other operators in this task will concatenate or split the batches for the output.

This is mainly to improve the stability. It can possibly get better performance only when the time cost of this split retry is lesss than a Spark task retry and the task retry can really happen without this change.

I rememer the target size was well tuned. Changing it will lead to longer operation time for some operators in the query. @winningsix may know more details.

If the size looks reasonable, then we need to understand what is using all of the GPU's memory that makes it so that we cannot get enough to copy a single batch to the GPU? We might have some memory leaks somewhere else that this is trying to mask.

We will try to collect more details when this OOM happen again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] GPU Shuffle coalesce read supports split-and-retry
3 participants