Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky "race successful with wait" test #42

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions shared/src/test/scala/ListenerBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class ListenerBehavior extends munit.FunSuite:
test("race successful without wait"):
val source1 = TSource()
val source2 = TSource()
assert(source1 != source2)
val listener = TestListener(1)
Async.race(source1, source2).onComplete(listener)

Expand Down Expand Up @@ -123,11 +124,15 @@ class ListenerBehavior extends munit.FunSuite:

Async.blocking:
val f1 = Future(source1.completeNowWith(1))
listener.waitWaiter()
listener.sleeping.await
listener.continue()
val f2 = Future(l2.completeNow(1, source2))
val f2 = Future:
val completed = l2.completeNow(1, source2)
if completed then source2.dropListener(l2) // usually the source will do it by default
completed
assert(f1.await || f2.await)
assert(!f1.await || !f2.await)
println(s"${f1.await} ${f2.await}")

assert(source1.listener.isEmpty)
assert(source2.listener.isEmpty)
Expand Down Expand Up @@ -163,7 +168,7 @@ class ListenerBehavior extends munit.FunSuite:

Async.blocking:
val f1 = Future(lockBoth(s1listener, other))
other.waitWaiter()
other.sleeping.await
assert(source2.listener.get.completeNow(1, source2))
other.continue()
assertEquals(f1.await, s1listener)
Expand Down Expand Up @@ -247,7 +252,13 @@ private class TestListener(expected: Int)(using asst: munit.Assertions) extends

private class NumberedTestListener private (sleep: AtomicBoolean, fail: Boolean, expected: Int)(using munit.Assertions)
extends TestListener(expected):
private var waiter: Option[Promise[Unit]] = None
// A promise that is waited for inside `lock` until `continue` is called.
private val waiter = if sleep.get() then Some(Promise[Unit]()) else None
// A promise that is resolved right before the lock starts waiting for `waiter`.
private val sleepPromise = Promise[Unit]()

/** A [[Future]] that resolves when the listener goes to sleep. */
val sleeping = sleepPromise.asFuture

def this(sleep: Boolean, fail: Boolean, expected: Int)(using munit.Assertions) =
this(AtomicBoolean(sleep), fail, expected)
Expand All @@ -257,18 +268,12 @@ private class NumberedTestListener private (sleep: AtomicBoolean, fail: Boolean,
def acquire() =
if sleep.getAndSet(false) then
Async.blocking:
waiter = Some(Promise())
sleepPromise.complete(Success(()))
waiter.get.await
waiter.foreach: promise =>
promise.complete(Success(()))
waiter = None
if fail then false
else true
def release() = ()

def waitWaiter() =
while waiter.isEmpty do Thread.`yield`()

def continue() = waiter.get.complete(Success(()))

/** Dummy source that never completes */
Expand Down
Loading