forked from JuliaLang/julia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreads_overloads.jl
66 lines (56 loc) · 2.61 KB
/
threads_overloads.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# This file is a part of Julia. License is MIT: https://julialang.org/license
"""
Threads.foreach(f, channel::Channel;
schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
ntasks=Threads.threadpoolsize())
Similar to `foreach(f, channel)`, but iteration over `channel` and calls to
`f` are split across `ntasks` tasks spawned by `Threads.@spawn`. This function
will wait for all internally spawned tasks to complete before returning.
If `schedule isa FairSchedule`, `Threads.foreach` will attempt to spawn tasks in a
manner that enables Julia's scheduler to more freely load-balance work items across
threads. This approach generally has higher per-item overhead, but may perform
better than `StaticSchedule` in concurrence with other multithreaded workloads.
If `schedule isa StaticSchedule`, `Threads.foreach` will spawn tasks in a manner
that incurs lower per-item overhead than `FairSchedule`, but is less amenable
to load-balancing. This approach thus may be more suitable for fine-grained,
uniform workloads, but may perform worse than `FairSchedule` in concurrence
with other multithreaded workloads.
# Examples
```julia-repl
julia> n = 20
julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)
julia> d = Channel{Int}(n) do ch
f = i -> put!(ch, i^2)
Threads.foreach(f, c)
end
julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
```
!!! compat "Julia 1.6"
This function requires Julia 1.6 or later.
"""
function Threads.foreach(f, channel::Channel;
schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
ntasks=Threads.threadpoolsize())
apply = _apply_for_schedule(schedule)
stop = Threads.Atomic{Bool}(false)
@sync for _ in 1:ntasks
Threads.@spawn try
for item in channel
$apply(f, item)
# do `stop[] && break` after `f(item)` to avoid losing `item`.
# this isn't super comprehensive since a task could still get
# stuck on `take!` at `for item in channel`. We should think
# about a more robust mechanism to avoid dropping items. See also
# https://github.com/JuliaLang/julia/pull/34543#discussion_r422695217
stop[] && break
end
catch
stop[] = true
rethrow()
end
end
return nothing
end
_apply_for_schedule(::Threads.StaticSchedule) = (f, x) -> f(x)
_apply_for_schedule(::Threads.FairSchedule) = (f, x) -> wait(Threads.@spawn f(x))