Skip to content

Commit

Permalink
Fixed runtime race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg von Nessi authored and gvonness committed Feb 15, 2022
1 parent 5a34f9e commit d6b21e2
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 36 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ThisBuild / baseVersion := "0.1.2"
ThisBuild / baseVersion := "0.1.3"
ThisBuild / organization := "ai.entrolution"
ThisBuild / organizationName := "Entrolution"
ThisBuild / publishGithubUser := "gvonness"
Expand All @@ -21,7 +21,7 @@ ThisBuild / sonatypeRepository := "https://s01.oss.sonatype.org/service/local"

name := "bengal-stm"

version := "0.1.2"
version := "0.1.3"

scalaVersion := "2.13.8"
crossScalaVersions := Seq("2.12.15", "2.13.8")
Expand Down
84 changes: 50 additions & 34 deletions src/main/scala/bengal/stm/TxnRuntimeContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,71 +67,87 @@ private[stm] trait TxnRuntimeContext[F[_]] {
private def withRunningLock[A](fa: F[A])(implicit F: Concurrent[F]): F[A] =
runningSemaphore.permit.use(_ => fa)

private def withWaitingLock[A](fa: F[A])(implicit F: Concurrent[F]): F[A] =
waitingSemaphore.permit.use(_ => fa)

private[stm] def triggerReprocessing(implicit F: Concurrent[F]): F[Unit] =
schedulerTrigger.get.flatMap(_.complete(())).void

private[stm] def registerCompletion(
txnId: TxnId
)(implicit F: Concurrent[F]): F[Unit] =
for {
_ <- withRunningLock(F.pure(runningMap -= txnId))
_ <- triggerReprocessing
_ <- waitingSemaphore.acquire
_ <- withRunningLock(F.pure(runningMap -= txnId))
waitingPopulated <- F.pure(waitingBuffer.nonEmpty)
_ <- if (waitingPopulated)
triggerReprocessing
else {
F.unit
}
_ <- waitingSemaphore.release
} yield ()

private[stm] def submitTxn[V](
analysedTxn: AnalysedTxn[V]
)(implicit F: Concurrent[F]): F[Unit] =
for {
_ <- withWaitingLock(F.pure(waitingBuffer.append(analysedTxn)))
_ <- waitingSemaphore.acquire
_ <- F.pure(waitingBuffer.append(analysedTxn))
_ <- triggerReprocessing
_ <- waitingSemaphore.release
} yield ()

private def attemptExecution(
analysedTxn: AnalysedTxn[_]
)(implicit F: Concurrent[F], TF: Temporal[F]): F[Unit] =
for {
_ <- withRunningLock(
_ <- withRunningLock {
F.pure(runningMap += (analysedTxn.id -> analysedTxn))
)
}
_ <- analysedTxn.execute(this).start
} yield ()

private def getRunningClosure(implicit F: Concurrent[F]): F[IdClosure] =
withRunningLock {
if (runningMap.nonEmpty) {
F.pure(runningMap.values.map(_.idClosure).reduce(_ mergeWith _))
} else {
F.pure(IdClosure.empty)
}
}
for {
_ <- runningSemaphore.acquire
result <- if (runningMap.nonEmpty) {
for {
innerResult <- F.pure {
runningMap.values
.map(_.idClosure)
.reduce(_ mergeWith _)
}
_ <- runningSemaphore.release
} yield innerResult
} else {
for {
_ <- runningSemaphore.release
} yield IdClosure.empty
}
} yield result

private[stm] def reprocessWaiting(implicit
F: Concurrent[F],
TF: Temporal[F]
): F[Unit] =
for {
_ <- schedulerTrigger.get.flatMap(_.get)
_ <- withWaitingLock {
for {
newTrigger <- Deferred[F, Unit]
_ <- schedulerTrigger.set(newTrigger)
bufferLocked <- F.pure(waitingBuffer.toList)
_ <- F.pure(waitingBuffer.clear())
runningClosure <- getRunningClosure
_ <- bufferLocked.foldLeftM(runningClosure) { (i, j) =>
for {
_ <- if (j.idClosure.isCompatibleWith(i)) {
attemptExecution(j)
} else {
F.pure(waitingBuffer.append(j))
}
} yield i.mergeWith(j.idClosure)
}
} yield ()
}
newTrigger <- Deferred[F, Unit]
_ <- schedulerTrigger.get.flatMap(_.get)
_ <- waitingSemaphore.acquire
_ <- schedulerTrigger.set(newTrigger)
_ <- for {
bufferLocked <- F.pure(waitingBuffer.toList)
_ <- F.pure(waitingBuffer.clear())
runningClosure <- getRunningClosure
_ <- bufferLocked.foldLeftM(runningClosure) { (i, j) =>
for {
_ <- if (j.idClosure.isCompatibleWith(i)) {
attemptExecution(j)
} else {
F.pure(waitingBuffer.append(j))
}
} yield i.mergeWith(j.idClosure)
}
} yield ()
_ <- waitingSemaphore.release
} yield ()

@nowarn
Expand Down

0 comments on commit d6b21e2

Please sign in to comment.