From b5e5b1d1fce53a802f71c73bdb42522f67eba520 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20M=C3=BCller?= Date: Mon, 1 Jan 2024 16:59:26 +0100 Subject: [PATCH 1/6] Use interruptGuard in VThreadScheduler --- jvm/src/main/scala/async/VThreadSupport.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/jvm/src/main/scala/async/VThreadSupport.scala b/jvm/src/main/scala/async/VThreadSupport.scala index a547332e..c646f4c9 100644 --- a/jvm/src/main/scala/async/VThreadSupport.scala +++ b/jvm/src/main/scala/async/VThreadSupport.scala @@ -3,6 +3,7 @@ package gears.async import scala.annotation.unchecked.uncheckedVariance import java.util.concurrent.locks.ReentrantLock import scala.concurrent.duration.FiniteDuration +import java.util.concurrent.atomic.AtomicBoolean object VThreadScheduler extends Scheduler: private val VTFactory = Thread @@ -13,11 +14,14 @@ object VThreadScheduler extends Scheduler: override def execute(body: Runnable): Unit = VTFactory.newThread(body) override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = + val interruptGuard = AtomicBoolean(true) // to avoid interrupting the body + val th = VTFactory.newThread: () => Thread.sleep(delay.toMillis) - execute(body) + if interruptGuard.getAndSet(false) then body.run() th.start() - () => th.interrupt() + + () => if interruptGuard.getAndSet(false) then th.interrupt() object VThreadSupport extends AsyncSupport: From bdd4f70c957aad27cb3aac90a1b7be34f0515136 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Mon, 22 Jan 2024 15:27:38 +0100 Subject: [PATCH 2/6] Add scheduler's Schedule tests re: cancellation --- shared/src/test/scala/SchedulerBehavior.scala | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 shared/src/test/scala/SchedulerBehavior.scala diff --git a/shared/src/test/scala/SchedulerBehavior.scala b/shared/src/test/scala/SchedulerBehavior.scala new file mode 100644 index 00000000..9f9a98b8 --- /dev/null +++ b/shared/src/test/scala/SchedulerBehavior.scala @@ -0,0 +1,42 @@ +import gears.async.{Async, Future, Listener} +import gears.async.AsyncOperations.* +import gears.async.default.given +import concurrent.duration.DurationInt +import gears.async.Future.Promise +import scala.util.Success + +class SchedulerBehavior extends munit.FunSuite { + test("schedule cancellation works") { + Async.blocking: + var bodyRan = false + val cancellable = Async.current.scheduler.schedule(1.seconds, () => bodyRan = true) + + // cancel immediately + cancellable.cancel() + + sleep(1000) + assert(!bodyRan) + } + + test("schedule cancellation doesn't abort inner code") { + Async.blocking: + var bodyRan = false + val fut = Promise[Unit]() + val cancellable = Async.current.scheduler.schedule( + 50.milliseconds, + () => + fut.complete(Success(())) + Async.blocking: + sleep(500) + bodyRan = true + ) + + // cancel after body started running + fut.await + cancellable.cancel() + + sleep(1000) + + assert(bodyRan) + } +} From 77d05faac2abfdd37bb8e13675f3b63af7b53c8a Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Mon, 22 Jan 2024 15:27:51 +0100 Subject: [PATCH 3/6] Attempt idea with VarHandle --- jvm/src/main/scala/async/VThreadSupport.scala | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/jvm/src/main/scala/async/VThreadSupport.scala b/jvm/src/main/scala/async/VThreadSupport.scala index c646f4c9..0859b298 100644 --- a/jvm/src/main/scala/async/VThreadSupport.scala +++ b/jvm/src/main/scala/async/VThreadSupport.scala @@ -4,6 +4,7 @@ import scala.annotation.unchecked.uncheckedVariance import java.util.concurrent.locks.ReentrantLock import scala.concurrent.duration.FiniteDuration import java.util.concurrent.atomic.AtomicBoolean +import java.lang.invoke.{VarHandle, MethodHandles} object VThreadScheduler extends Scheduler: private val VTFactory = Thread @@ -13,15 +14,27 @@ object VThreadScheduler extends Scheduler: override def execute(body: Runnable): Unit = VTFactory.newThread(body) - override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = - val interruptGuard = AtomicBoolean(true) // to avoid interrupting the body + override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = ScheduleRunner(delay, body) + + private class ScheduleRunner(val delay: FiniteDuration, val body: Runnable) extends Cancellable { + var interruptGuard = true // to avoid interrupting the body val th = VTFactory.newThread: () => - Thread.sleep(delay.toMillis) - if interruptGuard.getAndSet(false) then body.run() + try Thread.sleep(delay.toMillis) + catch case e: InterruptedException => () /* we got cancelled, don't propagate */ + if ScheduleRunner.interruptGuardVar.getAndSet(this, false) then body.run() th.start() - () => if interruptGuard.getAndSet(false) then th.interrupt() + override def cancel(): Unit = + if ScheduleRunner.interruptGuardVar.getAndSet(this, false) then th.interrupt() + } + + private object ScheduleRunner: + val interruptGuardVar = + MethodHandles + .lookup() + .in(classOf[ScheduleRunner]) + .findVarHandle(classOf[ScheduleRunner], "interruptGuard", classOf[Boolean]) object VThreadSupport extends AsyncSupport: From 3f54921cdf14af661d24d5427db4e0974e0a63ba Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Mon, 22 Jan 2024 15:28:41 +0100 Subject: [PATCH 4/6] Remove AtomicBoolean import --- jvm/src/main/scala/async/VThreadSupport.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/jvm/src/main/scala/async/VThreadSupport.scala b/jvm/src/main/scala/async/VThreadSupport.scala index 0859b298..5c9f8238 100644 --- a/jvm/src/main/scala/async/VThreadSupport.scala +++ b/jvm/src/main/scala/async/VThreadSupport.scala @@ -3,7 +3,6 @@ package gears.async import scala.annotation.unchecked.uncheckedVariance import java.util.concurrent.locks.ReentrantLock import scala.concurrent.duration.FiniteDuration -import java.util.concurrent.atomic.AtomicBoolean import java.lang.invoke.{VarHandle, MethodHandles} object VThreadScheduler extends Scheduler: From 7cf0a670271221634a90f114fd0953538df158d6 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Mon, 22 Jan 2024 22:33:12 +0100 Subject: [PATCH 5/6] Address review comments --- jvm/src/main/scala/async/VThreadSupport.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jvm/src/main/scala/async/VThreadSupport.scala b/jvm/src/main/scala/async/VThreadSupport.scala index 5c9f8238..e513494e 100644 --- a/jvm/src/main/scala/async/VThreadSupport.scala +++ b/jvm/src/main/scala/async/VThreadSupport.scala @@ -16,7 +16,7 @@ object VThreadScheduler extends Scheduler: override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = ScheduleRunner(delay, body) private class ScheduleRunner(val delay: FiniteDuration, val body: Runnable) extends Cancellable { - var interruptGuard = true // to avoid interrupting the body + @volatile var interruptGuard = true // to avoid interrupting the body val th = VTFactory.newThread: () => try Thread.sleep(delay.toMillis) @@ -24,7 +24,7 @@ object VThreadScheduler extends Scheduler: if ScheduleRunner.interruptGuardVar.getAndSet(this, false) then body.run() th.start() - override def cancel(): Unit = + final override def cancel(): Unit = if ScheduleRunner.interruptGuardVar.getAndSet(this, false) then th.interrupt() } From 4e2e4371de4c290882b0ad9abf52573a979f19e7 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Mon, 22 Jan 2024 22:34:29 +0100 Subject: [PATCH 6/6] Rename ScheduleRunner -> ScheduledRunnable --- jvm/src/main/scala/async/VThreadSupport.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/jvm/src/main/scala/async/VThreadSupport.scala b/jvm/src/main/scala/async/VThreadSupport.scala index e513494e..83153146 100644 --- a/jvm/src/main/scala/async/VThreadSupport.scala +++ b/jvm/src/main/scala/async/VThreadSupport.scala @@ -13,27 +13,27 @@ object VThreadScheduler extends Scheduler: override def execute(body: Runnable): Unit = VTFactory.newThread(body) - override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = ScheduleRunner(delay, body) + override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = ScheduledRunnable(delay, body) - private class ScheduleRunner(val delay: FiniteDuration, val body: Runnable) extends Cancellable { + private class ScheduledRunnable(val delay: FiniteDuration, val body: Runnable) extends Cancellable { @volatile var interruptGuard = true // to avoid interrupting the body val th = VTFactory.newThread: () => try Thread.sleep(delay.toMillis) catch case e: InterruptedException => () /* we got cancelled, don't propagate */ - if ScheduleRunner.interruptGuardVar.getAndSet(this, false) then body.run() + if ScheduledRunnable.interruptGuardVar.getAndSet(this, false) then body.run() th.start() final override def cancel(): Unit = - if ScheduleRunner.interruptGuardVar.getAndSet(this, false) then th.interrupt() + if ScheduledRunnable.interruptGuardVar.getAndSet(this, false) then th.interrupt() } - private object ScheduleRunner: + private object ScheduledRunnable: val interruptGuardVar = MethodHandles .lookup() - .in(classOf[ScheduleRunner]) - .findVarHandle(classOf[ScheduleRunner], "interruptGuard", classOf[Boolean]) + .in(classOf[ScheduledRunnable]) + .findVarHandle(classOf[ScheduledRunnable], "interruptGuard", classOf[Boolean]) object VThreadSupport extends AsyncSupport: