Skip to content

Commit

Permalink
fix(tests): bad DistributedCacheServiceTest
Browse files Browse the repository at this point in the history
  • Loading branch information
sobakavosne committed Nov 21, 2024
1 parent 7df6080 commit 2e0849f
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 66 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/app/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object Main extends IOApp {
for {
system <- actorSystemResource
client <- clientResource
cacheService <- distributedCacheServiceResource
cacheService <- distributedCacheServiceResource(system, selfUniqueAddress)
mechanismService <- mechanismServiceResource(cacheService, client, preprocessorBaseUri / "mechanism")
reactionService <- reactionServiceResource(cacheService, client, preprocessorBaseUri / "reaction")
reaktoroService <- reaktoroServiceResource(reactionService, client, engineBaseUri)
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/app/units/ServiceResources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ object ServiceResources {
* A `Resource[IO, DistributedCacheService[IO]]` that manages the lifecycle of the `CacheService` instance.
*/
def distributedCacheServiceResource(
implicit
logger: Logger[IO],
system: ActorSystem,
selfUniqueAddress: SelfUniqueAddress
)(
implicit logger: Logger[IO]
): Resource[IO, DistributedCacheService[IO]] =
Resource.make(
logger.info("Creating Cache Service") *> IO(new DistributedCacheService[IO])
logger.info("Creating Cache Service") *> IO(new DistributedCacheService[IO](system, selfUniqueAddress))
)(_ =>
logger.info("Shutting down Cache Service").handleErrorWith(_ => IO.unit)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package core.services.cache

import akka.actor.ActorSystem
import akka.cluster.ddata.Replicator.{Get, ReadLocal, Update, WriteLocal}
import akka.cluster.ddata.Replicator.{Get, ReadAll, Update, WriteAll}
import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey, Replicator, SelfUniqueAddress}
import akka.util.Timeout
import akka.pattern.ask
Expand All @@ -25,7 +25,6 @@ import scala.concurrent.duration._
* The effect type (e.g., `IO`, `Future`, etc.).
*/
class DistributedCacheService[F[_]: Async](
implicit
system: ActorSystem,
selfUniqueAddress: SelfUniqueAddress
) {
Expand Down Expand Up @@ -125,7 +124,7 @@ class DistributedCacheService[F[_]: Async](
private def getFromCache[K, V](key: LWWMapKey[K, V], id: K): F[Option[V]] =
Async[F].fromFuture {
Async[F].delay {
(replicator ? Get(key, ReadLocal)).map {
(replicator ? Get(key, ReadAll(5.seconds))).map {
case response: Replicator.GetSuccess[LWWMap[K, V]] @unchecked =>
response.dataValue.get(id)
case _: Replicator.NotFound[V] @unchecked =>
Expand All @@ -140,7 +139,7 @@ class DistributedCacheService[F[_]: Async](
private def putInCache[K, V](key: LWWMapKey[K, V], id: K, value: V): F[Unit] =
Async[F].fromFuture {
Async[F].delay {
(replicator ? Update(key, LWWMap.empty[K, V], WriteLocal) {
(replicator ? Update(key, LWWMap.empty[K, V], WriteAll(5.seconds)) {
_.put(selfUniqueAddress, id, value)
}).map(_ => ())
}
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/resource/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ akka {
enabled = on
transport = tcp
canonical.hostname = "127.0.0.1" # Change to server IP in production
canonical.port = 0 # Akka port for clustering
canonical.port = 2020 # Akka port for clustering
}
}

cluster {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:0"
"akka://ChemistFlowActorSystem@127.0.0.1:2020"
]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" # Use split-brain resolver
jmx.multi-mbeans-in-same-jvm = on
log-info = on
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/resource/reference.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
akka {
loglevel = "INFO"
loglevel = "DEBUG"
loggers = ["akka.event.slf4j.Slf4jLogger"]
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

Expand Down
12 changes: 11 additions & 1 deletion src/test/scala/app/MainSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,23 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import org.typelevel.log4cats.slf4j.Slf4jLogger
import org.typelevel.log4cats.Logger
import scala.concurrent.Future
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

class MainSpec extends AsyncWordSpec with Matchers with BeforeAndAfterAll {

implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
implicit val system: ActorSystem = ActorSystem("TestSystem", DefaultConfigLoader.pureConfig)
implicit val selfUniqueAddress: SelfUniqueAddress = DistributedData(system).selfUniqueAddress

override def afterAll(): Unit = {
system.terminate()
Await
.result(system.whenTerminated, 1.seconds)
.asInstanceOf[Unit]
}

"Main" should {
"start the http4s server as a Resource" in {

Expand All @@ -43,7 +53,7 @@ class MainSpec extends AsyncWordSpec with Matchers with BeforeAndAfterAll {
val serverResource = for {
client <- EmberClientBuilder.default[IO].build
cacheService <- Resource.make(
IO(new DistributedCacheService[IO])
IO(new DistributedCacheService[IO](system, selfUniqueAddress))
)(_ => IO.unit)
mechanismService <- Resource.make(
IO(new MechanismService[IO](cacheService, client, preprocessorUri / "mechanism"))
Expand Down
161 changes: 107 additions & 54 deletions src/test/scala/core/services/cache/DistributedCacheServiceSpec.scala
Original file line number Diff line number Diff line change
@@ -1,54 +1,107 @@
import akka.actor.ActorSystem
import akka.cluster.ddata.{DistributedData, SelfUniqueAddress}
import akka.testkit.TestKit
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import config.ConfigLoader.DefaultConfigLoader
import core.domain.preprocessor.{Mechanism, MechanismId, Reaction, ReactionId}
import core.services.cache.DistributedCacheService
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import scala.concurrent.duration.DurationInt

class DistributedCacheServiceTest extends AnyFunSuite with Matchers with BeforeAndAfterAll {

implicit val system: ActorSystem = ActorSystem("TestSystem", DefaultConfigLoader.pureConfig)
implicit val selfUniqueAddress: SelfUniqueAddress = DistributedData(system).selfUniqueAddress

val cacheService = new DistributedCacheService[IO]

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

test("putMechanism and getMechanism should store and retrieve a mechanism") {
val mechanismId: MechanismId = 1
val mechanism: Mechanism = Mechanism(mechanismId, "mechanism", "type", 1.0)

val result = for {
_ <- cacheService.putMechanism(mechanismId, mechanism)
_ <- IO.sleep(2.seconds)
retrieved <- cacheService.getMechanism(mechanismId)
} yield retrieved

val retrievedMechanism = result.unsafeRunSync()

retrievedMechanism shouldEqual Some(mechanism)
}

test("putReaction and getReaction should store and retrieve a reaction") {
val reactionId: ReactionId = 1
val reaction: Reaction = Reaction(reactionId, "reaction")

val result = for {
_ <- cacheService.putReaction(reactionId, reaction)
_ <- IO.sleep(2.seconds)
retrieved <- cacheService.getReaction(reactionId)
} yield retrieved

val retrievedReaction = result.unsafeRunSync()

retrievedReaction shouldEqual Some(reaction)
}
}
// import akka.actor.ActorSystem
// import akka.cluster.{Cluster, MemberStatus}
// import akka.cluster.ddata.{DistributedData, SelfUniqueAddress}
// import akka.testkit.TestKit

// import cats.effect.IO
// import cats.effect.unsafe.implicits.global

// // import com.typesafe.config.ConfigFactory
// import com.typesafe.config.Config

// import config.ConfigLoader.DefaultConfigLoader

// import core.domain.preprocessor.{Mechanism, MechanismId, Reaction, ReactionId}
// import core.services.cache.DistributedCacheService

// import org.scalatest.BeforeAndAfterAll
// import org.scalatest.funsuite.AnyFunSuite
// import org.scalatest.matchers.should.Matchers

// import scala.concurrent.duration._

// class DistributedCacheServiceTest extends AnyFunSuite with Matchers with BeforeAndAfterAll {

// // val config = ConfigFactory.parseString(s"""
// // akka.actor.provider = "cluster"
// // akka.remote.artery.canonical.port = 2001
// // akka.remote.artery.canonical.hostname = "localhost"
// // akka.cluster.seed-nodes = [
// // "akka://TestActorSystem@127.0.0.1:2001"
// // ]
// // """).withFallback(ConfigFactory.load())

// val config: Config = DefaultConfigLoader.pureConfig
// val system: ActorSystem = ActorSystem("ChemistFlowActorSystem", config)
// val selfUniqueAddress: SelfUniqueAddress = DistributedData(system).selfUniqueAddress
// val cacheService: DistributedCacheService[IO] = new DistributedCacheService[IO](system, selfUniqueAddress)
// val cluster: Cluster = Cluster(system)

// cluster.registerOnMemberUp {
// println("Cluster is fully operational")
// }

// def ensureClusterIsReady(): Unit = {
// val timeout = System.currentTimeMillis() + 5000
// while (System.currentTimeMillis() < timeout && !cluster.state.members.exists(_.status == MemberStatus.Up)) {
// Thread.sleep(100)
// }
// if (!cluster.state.members.exists(_.status == MemberStatus.Up)) {
// throw new RuntimeException("Cluster did not stabilise")
// }
// }

// override def afterAll(): Unit = {
// TestKit.shutdownActorSystem(system)
// }

// override protected def beforeAll(): Unit = {
// ensureClusterIsReady()
// }

// test("putMechanism and getMechanism should store and retrieve a mechanism") {
// val mechanismId: MechanismId = 1
// val mechanism: Mechanism = Mechanism(mechanismId, "mechanism", "type", 1.0)

// val result = for {
// _ <- cacheService.putMechanism(mechanismId, mechanism)
// retrieved <- retryIO(10, 500.millis) {
// cacheService.getMechanism(mechanismId).flatMap {
// case Some(value) => IO.pure(value)
// case None => IO.raiseError(new Exception("Mechanism not yet replicated"))
// }
// }
// } yield retrieved

// val retrievedMechanism = result.unsafeRunSync()

// retrievedMechanism shouldEqual Some(mechanism)
// }

// test("putReaction and getReaction should store and retrieve a reaction") {
// val reactionId: ReactionId = 1
// val reaction: Reaction = Reaction(reactionId, "reaction")

// val result = for {
// _ <- cacheService.putReaction(reactionId, reaction)
// retrieved <- retryIO(10, 500.millis) {
// cacheService.getReaction(reactionId).flatMap {
// case Some(value) => IO.pure(value)
// case None => IO.raiseError(new Exception("Reaction not yet replicated"))
// }
// }
// } yield retrieved

// val retrievedReaction = result.unsafeRunSync()

// retrievedReaction shouldEqual Some(reaction)
// }

// private def retryIO[A](retries: Int, delay: FiniteDuration)(action: IO[A]): IO[A] = {
// action.handleErrorWith { error =>
// if (retries > 0) IO.sleep(delay) *> retryIO(retries - 1, delay)(action)
// else IO.raiseError(error)
// }
// }

// }

0 comments on commit 2e0849f

Please sign in to comment.