-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch Channel Operations #56473
base: master
Are you sure you want to change the base?
Batch Channel Operations #56473
Conversation
This reverts commit 2ced667.
remove collect def Co-authored-by: Lilith Orion Hafner <lilithhafner@gmail.com>
base/channels.jl
Outdated
function take!(c::Channel{T}, n::Integer) where {T} | ||
return _take(c, n, Vector{T}(undef, n)) | ||
end | ||
function take!(c::Channel{T}, n::Integer, buffer::AbstractVector{T2}) where {T2,T<:T2} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am limiting the buffer to AbstractVector
because, if the channel closes, we need to return fewer than n
elements. In that case, we need to resize the buffer. So that precludes Memory
or Matrix
as buffer types. I'm not sure if there is a more general way to accept more buffer types here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AbstractVector
is fine, but do note that that Memory
is also an abstractvector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I do a check to make sure the buffer is resize!
able, then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving notes to myself about tweaks that need to be made.
base/channels.jl
Outdated
Append all items in `iter` to the channel `c`. If the channel is buffered, this operation requires | ||
fewer `lock` operations than individual `put!`s. Blocks if the channel is full. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to mention: If there is room for 3 more elements and you try to append 4, will it block before adding any elements, or after adding the first 3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the question. the append!
call will not return until all elements in iter
are added to the channel. Internally, it releases the lock on the channel when the buffer is full so that take!
calls can run to make more room.
I expanded this sentence a bit. Can you see if it adds the clarity you're looking for?
end | ||
end | ||
|
||
collect(c::Channel) = take!(c, typemax(UInt)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, collect(::Channel)
falls back to a generic method which just iterates over the channel. I added a custom method for collect
that uses take!(ch,n)
since we can benefit from maintaining the lock.
However, this would change the behavior of parallel collect
calls on the same channel. I think multiple collect
calls would be ill-advised anyway, but I did want to highlight that this would technically be a subtle change.
end | ||
end | ||
|
||
function _positive_int(x::T, purpose::String) where {T<:Union{AbstractFloat, Integer}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, calling Channel(1.5)
throws an InexactError trying to convert to Int. However, the ArgumentError text already captures the real issue (size must be 0, a positive int, or Inf), so I think the additional error type is unnecessary and a bit confusing in the stacktrace (it doesn't point to the issue being with Channel).
So I wrote this check and I'm using it to check to make sure n
is always positive. If people agree with this design choice, it could be extended to the Channel constructor in the future.
I used the lazy string macro on line 220, And it looks like that's causing an error during the tests because the tests are using Julia v1.6, maybe? I'm happy to remove the macro, but I thought it was best practice (especially in a world where 1.10 is LTS) |
Tagging @vtjnash because it looks like you did work on ensuring correctness for channel iteration a while back, so you might have some insights about edge cases I haven't tested for. |
Per @jakobnissen's suggestion, I benchmarked my current solution against a much simpler implementation to ensure the code complexity is worth it. I am very surprised to discover that my implementation does not consistently outperform a naive lock, loop over put/take, unlock approach. I'm going to remove the extra complexity until I can diagnose why. |
Add
append!(ch, iter)
andtake!(ch, n)
functions that operate on channels in batches. When using a buffered channel, this significantly reduces the overhead of acquiring the lock of eachput!/take!
.Here is some benchmarking code:
Here are results at different buffer sizes: