diff --git a/base/condition.jl b/base/condition.jl index 90c53b7ad310d..fd771c9be346a 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -125,104 +125,20 @@ proceeding. """ function wait end -# wait with timeout -# -# The behavior of wait changes if a timeout is specified. There are -# three concurrent entities that can interact: -# 1. Task W: the task that calls wait w/timeout. -# 2. Task T: the task created to handle a timeout. -# 3. Task N: the task that notifies the Condition being waited on. -# -# Typical flow: -# - W enters the Condition's wait queue. -# - W creates T and stops running (calls wait()). -# - T, when scheduled, waits on a Timer. -# - Two common outcomes: -# - N notifies the Condition. -# - W starts running, closes the Timer, sets waiter_left and returns -# the notify'ed value. -# - The closed Timer throws an EOFError to T which simply ends. -# - The Timer expires. -# - T starts running and locks the Condition. -# - T confirms that waiter_left is unset and that W is still in the -# Condition's wait queue; it then removes W from the wait queue, -# sets dosched to true and unlocks the Condition. -# - If dosched is true, T schedules W with the special :timed_out -# value. -# - T ends. -# - W runs and returns :timed_out. -# -# Some possible interleavings: -# - N notifies the Condition but the Timer expires and T starts running -# before W: -# - W closing the expired Timer is benign. -# - T will find that W is no longer in the Condition's wait queue -# (which is protected by a lock) and will not schedule W. -# - N notifies the Condition; W runs and calls wait on the Condition -# again before the Timer expires: -# - W sets waiter_left before leaving. When T runs, it will find that -# waiter_left is set and will not schedule W. -# -# The lock on the Condition's wait queue and waiter_left together -# ensure proper synchronization and behavior of the tasks involved. - """ - wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) + wait(c::GenericCondition; first::Bool=false) Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`. If the keyword `first` is set to `true`, the waiter will be put _first_ in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior. - -If `timeout` is specified, cancel the `wait` when it expires and return -`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1 -millisecond. """ -function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) - timeout == 0.0 || timeout ≥ 1e-3 || throw(ArgumentError("timeout must be ≥ 1 millisecond")) - +function wait(c::GenericCondition; first::Bool=false) ct = current_task() _wait2(c, ct, first) token = unlockall(c.lock) - - timer::Union{Timer, Nothing} = nothing - waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing - if timeout > 0.0 - timer = Timer(timeout) - waiter_left = Threads.Atomic{Bool}(false) - # start a task to wait on the timer - t = Task() do - try - wait(timer) - catch e - # if the timer was closed, the waiting task has been scheduled; do nothing - e isa EOFError && return - end - dosched = false - lock(c.lock) - # Confirm that the waiting task is still in the wait queue and remove it. If - # the task is not in the wait queue, it must have been notified already so we - # don't do anything here. - if !waiter_left[] && ct.queue == c.waitq - dosched = true - Base.list_deletefirst!(c.waitq, ct) - end - unlock(c.lock) - # send the waiting task a timeout - dosched && schedule(ct, :timed_out) - end - t.sticky = false - Threads._spawn_set_thrpool(t, :interactive) - schedule(t) - end - try - res = wait() - if timer !== nothing - close(timer) - waiter_left[] = true - end - return res + return wait() catch q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) rethrow() diff --git a/base/experimental.jl b/base/experimental.jl index 17871b4f346d6..e35e920298c3d 100644 --- a/base/experimental.jl +++ b/base/experimental.jl @@ -10,6 +10,7 @@ module Experimental using Base: Threads, sync_varname, is_function_def, @propagate_inbounds +using Base: GenericCondition using Base.Meta """ @@ -577,4 +578,112 @@ function task_wall_time_ns(t::Task=current_task()) return end_at - start_at end +# wait_with_timeout +# +# A version of `wait(c::Condition)` that additionally allows the +# specification of a timeout. This is experimental as it will likely +# be dropped when a cancellation framework is added. +# +# The parallel behavior of wait_with_timeout is specified here. There +# are three concurrent entities that can interact: +# 1. Task W: the task that calls wait_with_timeout. +# 2. Task T: the task created to handle a timeout. +# 3. Task N: the task that notifies the Condition being waited on. +# +# Typical flow: +# - W enters the Condition's wait queue. +# - W creates T and stops running (calls wait()). +# - T, when scheduled, waits on a Timer. +# - Two common outcomes: +# - N notifies the Condition. +# - W starts running, closes the Timer, sets waiter_left and returns +# the notify'ed value. +# - The closed Timer throws an EOFError to T which simply ends. +# - The Timer expires. +# - T starts running and locks the Condition. +# - T confirms that waiter_left is unset and that W is still in the +# Condition's wait queue; it then removes W from the wait queue, +# sets dosched to true and unlocks the Condition. +# - If dosched is true, T schedules W with the special :timed_out +# value. +# - T ends. +# - W runs and returns :timed_out. +# +# Some possible interleavings: +# - N notifies the Condition but the Timer expires and T starts running +# before W: +# - W closing the expired Timer is benign. +# - T will find that W is no longer in the Condition's wait queue +# (which is protected by a lock) and will not schedule W. +# - N notifies the Condition; W runs and calls wait on the Condition +# again before the Timer expires: +# - W sets waiter_left before leaving. When T runs, it will find that +# waiter_left is set and will not schedule W. +# +# The lock on the Condition's wait queue and waiter_left together +# ensure proper synchronization and behavior of the tasks involved. + +""" + wait_with_timeout(c::GenericCondition; first::Bool=false, timeout::Real=0.0) + +Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`. + +If the keyword `first` is set to `true`, the waiter will be put _first_ +in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior. + +If `timeout` is specified, cancel the `wait` when it expires and return +`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1 +millisecond. +""" +function wait_with_timeout(c::GenericCondition; first::Bool=false, timeout::Real=0.0) + ct = current_task() + Base._wait2(c, ct, first) + token = Base.unlockall(c.lock) + + timer::Union{Timer, Nothing} = nothing + waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing + if timeout > 0.0 + timer = Timer(timeout) + waiter_left = Threads.Atomic{Bool}(false) + # start a task to wait on the timer + t = Task() do + try + wait(timer) + catch e + # if the timer was closed, the waiting task has been scheduled; do nothing + e isa EOFError && return + end + dosched = false + lock(c.lock) + # Confirm that the waiting task is still in the wait queue and remove it. If + # the task is not in the wait queue, it must have been notified already so we + # don't do anything here. + if !waiter_left[] && ct.queue == c.waitq + dosched = true + Base.list_deletefirst!(c.waitq, ct) + end + unlock(c.lock) + # send the waiting task a timeout + dosched && schedule(ct, :timed_out) + end + t.sticky = false + Threads._spawn_set_thrpool(t, :interactive) + schedule(t) + end + + try + res = wait() + if timer !== nothing + close(timer) + waiter_left[] = true + end + return res + catch + q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) + rethrow() + finally + Base.relockall(c.lock, token) + end +end + end # module diff --git a/test/channels.jl b/test/channels.jl index 6e74a2079234c..d654bc63be586 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -40,16 +40,15 @@ end @test fetch(t) == "finished" end -@testset "timed wait on Condition" begin +@testset "wait_with_timeout on Condition" begin a = Threads.Condition() - @test_throws ArgumentError @lock a wait(a; timeout=0.0005) - @test @lock a wait(a; timeout=0.1)==:timed_out + @test @lock a Experimental.wait_with_timeout(a; timeout=0.1)==:timed_out lock(a) @spawn begin @lock a notify(a) end @test try - wait(a; timeout=2) + Experimental.wait_with_timeout(a; timeout=2) true finally unlock(a)