Skip to content

Commit

Permalink
Allow passivation from inside the actor (#67)
Browse files Browse the repository at this point in the history
* Allow passivation from inside the actor

* Add support for receive timeout

* fmt

* Make sure timeout is set in the actor thread + add a test
  • Loading branch information
ghostdogpr authored May 3, 2020
1 parent b5301ad commit ed3e3df
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/main/scala/zio/akka/cluster/sharding/Entity.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package zio.akka.cluster.sharding

import scala.concurrent.duration.Duration
import zio.{ Ref, Task, UIO }

trait Entity[State] {
def replyToSender[R](msg: R): Task[Unit]
def id: String
def state: Ref[Option[State]]
def stop: UIO[Unit]
def passivate: UIO[Unit]
def passivateAfter(duration: Duration): UIO[Unit]
}
5 changes: 5 additions & 0 deletions src/main/scala/zio/akka/cluster/sharding/SetTimeout.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package zio.akka.cluster.sharding

import scala.concurrent.duration.Duration

case class SetTimeout(duration: Duration)
16 changes: 11 additions & 5 deletions src/main/scala/zio/akka/cluster/sharding/Sharding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zio.akka.cluster.sharding

import scala.concurrent.duration._
import scala.reflect.ClassTag
import akka.actor.{ Actor, ActorContext, ActorRef, ActorSystem, PoisonPill, Props }
import akka.actor.{ Actor, ActorContext, ActorRef, ActorSystem, PoisonPill, Props, ReceiveTimeout }
import akka.cluster.sharding.ShardRegion.Passivate
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings }
import akka.pattern.{ ask => askPattern }
Expand Down Expand Up @@ -136,13 +136,19 @@ object Sharding {
val ref: Ref[Option[State]] = rts.unsafeRun(Ref.make[Option[State]](None))
val actorContext: ActorContext = context
val entity: Entity[State] = new Entity[State] {
override def id: String = context.self.path.name
override def state: Ref[Option[State]] = ref
override def stop: UIO[Unit] = UIO(actorContext.stop(self))
override def replyToSender[R](msg: R): Task[Unit] = Task(context.sender() ! msg)
override def id: String = actorContext.self.path.name
override def state: Ref[Option[State]] = ref
override def stop: UIO[Unit] = UIO(actorContext.stop(self))
override def passivate: UIO[Unit] = UIO(actorContext.parent ! Passivate(PoisonPill))
override def passivateAfter(duration: Duration): UIO[Unit] = UIO(actorContext.self ! SetTimeout(duration))
override def replyToSender[R](msg: R): Task[Unit] = Task(actorContext.sender() ! msg)
}

def receive: Receive = {
case SetTimeout(duration) =>
actorContext.setReceiveTimeout(duration)
case ReceiveTimeout =>
actorContext.parent ! Passivate(PoisonPill)
case p: Passivate =>
actorContext.parent ! p
case MessagePayload(msg) =>
Expand Down
23 changes: 23 additions & 0 deletions src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,29 @@ object ShardingSpec extends DefaultRunnableSpec {
} yield res
)(isNone).provideLayer(actorSystem)
},
testM("passivateAfter") {
assertM(
for {
p <- Promise.make[Nothing, Option[Unit]]
onMessage = (msg: String) =>
msg match {
case "set" => ZIO.accessM[Entity[Unit]](_.state.set(Some(())))
case "get" => ZIO.accessM[Entity[Unit]](_.state.get.flatMap(s => p.succeed(s).unit))
case "timeout" => ZIO.accessM[Entity[Unit]](_.passivateAfter((1 millisecond).asScala))
}
sharding <- Sharding.start(shardName, onMessage)
_ <- sharding.send(shardId, "set")
_ <- sharding.send(shardId, "timeout")
_ <- ZIO
.sleep(3 seconds)
.provideLayer(
Clock.live
) // give time to the ShardCoordinator to notice the death of the actor and recreate one
_ <- sharding.send(shardId, "get")
res <- p.await
} yield res
)(isNone).provideLayer(actorSystem)
},
testM("work with 2 actor systems") {
assertM(
actorSystem.build.use(a1 =>
Expand Down

0 comments on commit ed3e3df

Please sign in to comment.