Skip to content

Commit

Permalink
Merge pull request #37 from m8nmueller/vthread-schedule-abool
Browse files Browse the repository at this point in the history
Use interruptGuard in VThreadScheduler
  • Loading branch information
natsukagami authored Jan 24, 2024
2 parents 613e625 + 4e2e437 commit bb078a1
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 4 deletions.
24 changes: 20 additions & 4 deletions jvm/src/main/scala/async/VThreadSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gears.async
import scala.annotation.unchecked.uncheckedVariance
import java.util.concurrent.locks.ReentrantLock
import scala.concurrent.duration.FiniteDuration
import java.lang.invoke.{VarHandle, MethodHandles}

object VThreadScheduler extends Scheduler:
private val VTFactory = Thread
Expand All @@ -12,12 +13,27 @@ object VThreadScheduler extends Scheduler:

override def execute(body: Runnable): Unit = VTFactory.newThread(body)

override def schedule(delay: FiniteDuration, body: Runnable): Cancellable =
override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = ScheduledRunnable(delay, body)

private class ScheduledRunnable(val delay: FiniteDuration, val body: Runnable) extends Cancellable {
@volatile var interruptGuard = true // to avoid interrupting the body

val th = VTFactory.newThread: () =>
Thread.sleep(delay.toMillis)
execute(body)
try Thread.sleep(delay.toMillis)
catch case e: InterruptedException => () /* we got cancelled, don't propagate */
if ScheduledRunnable.interruptGuardVar.getAndSet(this, false) then body.run()
th.start()
() => th.interrupt()

final override def cancel(): Unit =
if ScheduledRunnable.interruptGuardVar.getAndSet(this, false) then th.interrupt()
}

private object ScheduledRunnable:
val interruptGuardVar =
MethodHandles
.lookup()
.in(classOf[ScheduledRunnable])
.findVarHandle(classOf[ScheduledRunnable], "interruptGuard", classOf[Boolean])

object VThreadSupport extends AsyncSupport:

Expand Down
42 changes: 42 additions & 0 deletions shared/src/test/scala/SchedulerBehavior.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import gears.async.{Async, Future, Listener}
import gears.async.AsyncOperations.*
import gears.async.default.given
import concurrent.duration.DurationInt
import gears.async.Future.Promise
import scala.util.Success

class SchedulerBehavior extends munit.FunSuite {
test("schedule cancellation works") {
Async.blocking:
var bodyRan = false
val cancellable = Async.current.scheduler.schedule(1.seconds, () => bodyRan = true)

// cancel immediately
cancellable.cancel()

sleep(1000)
assert(!bodyRan)
}

test("schedule cancellation doesn't abort inner code") {
Async.blocking:
var bodyRan = false
val fut = Promise[Unit]()
val cancellable = Async.current.scheduler.schedule(
50.milliseconds,
() =>
fut.complete(Success(()))
Async.blocking:
sleep(500)
bodyRan = true
)

// cancel after body started running
fut.await
cancellable.cancel()

sleep(1000)

assert(bodyRan)
}
}

0 comments on commit bb078a1

Please sign in to comment.