Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Channel Operations #56473

Open
wants to merge 41 commits into
base: master
Choose a base branch
from

Conversation

mrufsvold
Copy link
Contributor

@mrufsvold mrufsvold commented Nov 6, 2024

Add append!(ch, iter) and take!(ch, n) functions that operate on channels in batches. When using a buffered channel, this significantly reduces the overhead of acquiring the lock of each put!/take!.

Here is some benchmarking code:

function bench_basic(item_n, buffer_len)
    items = collect(1:item_n)
    ch = Channel{Int}(buffer_len)
    task_n = Threads.nthreads()
    res = Vector{Int}(undef, item_n * task_n)

    for _ in 1:task_n
        Threads.@spawn begin
            for j in items
                put!(ch, j)
            end
        end
    end

    @sync for i in Base.OneTo(task_n)
        Threads.@spawn let offset = (i - 1) * item_n
            for j in Base.OneTo(item_n)
                x = take!(ch)
                res[offset+j] = x
            end
        end
    end
    res
end


function bench_batch(item_n, buffer_len, batch_size=buffer_len > 0 ? buffer_len : 100)
    items = collect(1:item_n)
    ch = Channel{Int}(buffer_len)
    task_n = Threads.nthreads()
    res = Vector{Int}(undef, item_n * task_n)

    for _ in 1:task_n
        Threads.@spawn begin
            i = 1
            while i <= item_n
                chunk = @view items[i:min(i + batch_size - 1, item_n)]
                append!(ch, chunk)
                i += batch_size
            end
        end
    end

    @sync for i in Base.OneTo(task_n)
        Threads.@spawn let offset = (i - 1) * item_n
            buff = Vector{Int}(undef, batch_size)
            batch = take!(ch, batch_size, buff)
            batch_len = length(batch)
            batch_i = 1
            for j in Base.OneTo(item_n)
                if batch_i > batch_len
                    batch = take!(ch, batch_size, buff)
                    batch_i = 1
                    batch_len = length(batch)
                end
                x = batch[batch_i]
                res[offset+j] = x
                batch_i += 1
            end
        end

    end
    res
end

Here are results at different buffer sizes:

julia> using Chairmarks

julia> @be bench_basic(10000, 0)
Benchmark: 1 sample with 1 evaluation
        129.346 ms (76015 allocs: 1.851 MiB)

julia> @be bench_batch(10000, 0)
Benchmark: 1 sample with 1 evaluation
        128.819 ms (76031 allocs: 1.858 MiB)

julia> GC.gc()

julia> @be bench_basic(10000, 10)
Benchmark: 2 samples with 1 evaluation
        71.047 ms (104 allocs: 709.797 KiB)
        110.748 ms (104 allocs: 709.797 KiB)

julia> @be bench_batch(10000, 10)
Benchmark: 4 samples with 1 evaluation
        23.534 ms (119 allocs: 710.703 KiB)
        24.859 ms (119 allocs: 710.703 KiB)
        25.958 ms (119 allocs: 710.703 KiB)
        29.972 ms (119 allocs: 710.703 KiB)

julia> GC.gc()

julia> @be bench_basic(10000, 100)
Benchmark: 3 samples with 1 evaluation
        34.429 ms (107 allocs: 716.844 KiB)
        37.284 ms (107 allocs: 716.844 KiB)
        37.844 ms (107 allocs: 716.844 KiB)

julia> @be bench_batch(10000, 100)
Benchmark: 21 samples with 1 evaluation
 min    3.267 ms (119 allocs: 717.594 KiB)
 median 3.525 ms (119 allocs: 717.594 KiB)
 mean   4.724 ms (119 allocs: 717.594 KiB)
 max    15.609 ms (119 allocs: 717.594 KiB)

julia> GC.gc()

julia> @be bench_basic(10000, 1000)
Benchmark: 2 samples with 1 evaluation
        49.205 ms (111 allocs: 763.156 KiB)
        113.279 ms (111 allocs: 763.156 KiB)

julia> @be bench_batch(10000, 1000)
Benchmark: 44 samples with 1 evaluation
 min    1.658 ms (128 allocs: 780.312 KiB)
 median 1.889 ms (128 allocs: 780.312 KiB)
 mean   2.168 ms (128 allocs: 780.312 KiB)
 max    7.967 ms (128 allocs: 780.312 KiB)

@oscardssmith oscardssmith added performance Must go faster multithreading Base.Threads and related functionality labels Nov 6, 2024
base/channels.jl Outdated
function take!(c::Channel{T}, n::Integer) where {T}
return _take(c, n, Vector{T}(undef, n))
end
function take!(c::Channel{T}, n::Integer, buffer::AbstractVector{T2}) where {T2,T<:T2}
Copy link
Contributor Author

@mrufsvold mrufsvold Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am limiting the buffer to AbstractVector because, if the channel closes, we need to return fewer than n elements. In that case, we need to resize the buffer. So that precludes Memory or Matrix as buffer types. I'm not sure if there is a more general way to accept more buffer types here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AbstractVector is fine, but do note that that Memory is also an abstractvector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I do a check to make sure the buffer is resize!able, then?

@mrufsvold mrufsvold marked this pull request as ready for review November 6, 2024 22:22
@mrufsvold mrufsvold changed the title WIP: Batch Channel Operations Batch Channel Operations Nov 6, 2024
Copy link
Contributor Author

@mrufsvold mrufsvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving notes to myself about tweaks that need to be made.

base/channels.jl Outdated
Comment on lines 461 to 462
Append all items in `iter` to the channel `c`. If the channel is buffered, this operation requires
fewer `lock` operations than individual `put!`s. Blocks if the channel is full.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to mention: If there is room for 3 more elements and you try to append 4, will it block before adding any elements, or after adding the first 3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the question. the append! call will not return until all elements in iter are added to the channel. Internally, it releases the lock on the channel when the buffer is full so that take! calls can run to make more room.

I expanded this sentence a bit. Can you see if it adds the clarity you're looking for?

@mrufsvold mrufsvold requested a review from jakobnissen November 7, 2024 16:11
end
end

collect(c::Channel) = take!(c, typemax(UInt))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, collect(::Channel) falls back to a generic method which just iterates over the channel. I added a custom method for collect that uses take!(ch,n) since we can benefit from maintaining the lock.

However, this would change the behavior of parallel collect calls on the same channel. I think multiple collect calls would be ill-advised anyway, but I did want to highlight that this would technically be a subtle change.

end
end

function _positive_int(x::T, purpose::String) where {T<:Union{AbstractFloat, Integer}}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, calling Channel(1.5) throws an InexactError trying to convert to Int. However, the ArgumentError text already captures the real issue (size must be 0, a positive int, or Inf), so I think the additional error type is unnecessary and a bit confusing in the stacktrace (it doesn't point to the issue being with Channel).

So I wrote this check and I'm using it to check to make sure n is always positive. If people agree with this design choice, it could be extended to the Channel constructor in the future.

@mrufsvold
Copy link
Contributor Author

mrufsvold commented Nov 7, 2024

I used the lazy string macro on line 220, And it looks like that's causing an error during the tests because the tests are using Julia v1.6, maybe?

I'm happy to remove the macro, but I thought it was best practice (especially in a world where 1.10 is LTS)

@mrufsvold
Copy link
Contributor Author

Tagging @vtjnash because it looks like you did work on ensuring correctness for channel iteration a while back, so you might have some insights about edge cases I haven't tested for.

@mrufsvold
Copy link
Contributor Author

Per @jakobnissen's suggestion, I benchmarked my current solution against a much simpler implementation to ensure the code complexity is worth it. I am very surprised to discover that my implementation does not consistently outperform a naive lock, loop over put/take, unlock approach. I'm going to remove the extra complexity until I can diagnose why.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multithreading Base.Threads and related functionality performance Must go faster
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants