Skip to content

Commit fad3723

Browse files
committed
feat: sharding
1 parent ce98c59 commit fad3723

File tree

15 files changed

+253
-25
lines changed

15 files changed

+253
-25
lines changed

.bin/runFor.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,4 @@ printf "\n ${BLUE}Generating Code for ${NAME} ${NC} \n"
118118
sbt slickGen
119119
clear
120120
printf "\n ${BLUE}Starting Service for ${NAME} ${NC} \n"
121-
sbt run
121+
sbt "run -Dconfig.resource=dev-application.conf -Dlogger.resource=logback-local.xml"

.deployment/deployment.tpl.yaml

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
---
2+
kind: Role
3+
apiVersion: rbac.authorization.k8s.io/v1
4+
metadata:
5+
name: pod-reader
6+
namespace: "$NAMESPACE"
7+
rules:
8+
- apiGroups: [""]
9+
resources: ["pods"]
10+
verbs: ["get", "watch", "list"]
11+
---
12+
kind: RoleBinding
13+
apiVersion: rbac.authorization.k8s.io/v1
14+
metadata:
15+
name: read-pods
16+
namespace: "$NAMESPACE"
17+
subjects:
18+
- kind: User
19+
name: "system:serviceaccount:$NAMESPACE:$SANAME"
20+
roleRef:
21+
kind: Role
22+
name: pod-reader
23+
apiGroup: rbac.authorization.k8s.io
24+
25+
# TODO add `automountServiceAccountToken: true` to pod deployment!
26+
27+
ports:
28+
- containerPort: 8080
29+
name: http
30+
- name: management
31+
containerPort: 8558
32+
protocol: TCP
33+
- name: remote
34+
containerPort: 25520
35+
protocol: TCP
36+
readinessProbe:
37+
httpGet:
38+
path: "/ready"
39+
port: management
40+
periodSeconds: 10
41+
failureThreshold: 10
42+
initialDelaySeconds: 20
43+
livenessProbe:
44+
httpGet:
45+
path: "/alive"
46+
port: management
47+
periodSeconds: 10
48+
failureThreshold: 10
49+
initialDelaySeconds: 60

app/Module.scala

+10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import akka.actor.ActorSystem
2+
import akka.management.cluster.bootstrap.ClusterBootstrap
3+
import akka.management.scaladsl.AkkaManagement
14
import com.google.auth.oauth2.GoogleCredentials
25
import com.google.firebase.auth.internal.Utils.isEmulatorMode
36

@@ -39,6 +42,7 @@ class Module(environment: Environment, configuration: Configuration) extends Abs
3942
// bind(classOf[firebaseCreationService]).asEagerSingleton()
4043
// bind(classOf[firebaseDeletionService]).asEagerSingleton()
4144
bind(classOf[JwtValidator]).to(classOf[JwtValidatorImpl])
45+
bind(classOf[AkkaCluster]).asEagerSingleton()
4246

4347
/**
4448
* Inject Modules depended on environment (Test, Prod, Dev)
@@ -58,6 +62,12 @@ class Module(environment: Environment, configuration: Configuration) extends Abs
5862

5963
}
6064

65+
@Singleton
66+
class AkkaCluster @Inject() (system: ActorSystem) {
67+
AkkaManagement(system).start()
68+
ClusterBootstrap(system).start()
69+
}
70+
6171
@Singleton
6272
class LoggingTracingCreator @Inject() (lifecycle: ApplicationLifecycle) {
6373
LoggingTraceExporter.register()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package de.innfactory.bootstrapplay2.actorsharding.domain.common
2+
3+
import akka.actor.ActorSystem
4+
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
5+
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
6+
import akka.util.Timeout
7+
8+
import javax.inject.{Inject, Singleton}
9+
import scala.concurrent.ExecutionContext
10+
import scala.concurrent.duration._
11+
12+
@Singleton
13+
class Sharding @Inject() (system: ActorSystem)(implicit ec: ExecutionContext) {
14+
15+
implicit val timeout: Timeout = 10.seconds
16+
17+
// Convert classic actor system of play to typed
18+
private val actorSystem: akka.actor.typed.ActorSystem[_] = system.toTyped
19+
20+
private val scheduler: akka.actor.typed.Scheduler = actorSystem.scheduler
21+
22+
private val sharding: ClusterSharding = ClusterSharding(actorSystem)
23+
24+
def getSharding: ClusterSharding = sharding
25+
def getActorSystem: akka.actor.typed.ActorSystem[_] = actorSystem
26+
def getScheduler: akka.actor.typed.Scheduler = scheduler
27+
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package de.innfactory.bootstrapplay2.actorsharding.domain.interfaces
2+
3+
import com.google.inject.ImplementedBy
4+
import de.innfactory.bootstrapplay2.actorsharding.domain.services.HelloWorldServiceImpl
5+
import de.innfactory.bootstrapplay2.actorsystem.domain.commands.Response
6+
7+
import scala.concurrent.Future
8+
9+
@ImplementedBy(classOf[HelloWorldServiceImpl])
10+
trait HelloWorldService {
11+
def queryHelloWorld(query: String): Future[Response]
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package de.innfactory.bootstrapplay2.actorsharding.domain.services
2+
3+
import akka.actor._
4+
import akka.actor.typed.Scheduler
5+
import akka.cluster.sharding.typed.ShardingEnvelope
6+
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
7+
import akka.util.Timeout
8+
import de.innfactory.bootstrapplay2.actorsharding.domain.common.Sharding
9+
import de.innfactory.bootstrapplay2.actorsharding.domain.interfaces.HelloWorldService
10+
import de.innfactory.bootstrapplay2.actorsystem.domain.commands.{Command, QueryHelloWorld, Response}
11+
import akka.actor.typed.ActorRef
12+
import akka.actor.typed.scaladsl.AskPattern.Askable
13+
import de.innfactory.bootstrapplay2.actorsystem.domain.actors.HelloWorldActor
14+
15+
import javax.inject._
16+
import scala.concurrent.duration._
17+
import scala.concurrent.{ExecutionContext, Future}
18+
19+
@Singleton
20+
class HelloWorldServiceImpl @Inject() (
21+
)(implicit ec: ExecutionContext, system: ActorSystem, sharding: Sharding)
22+
extends HelloWorldService {
23+
24+
implicit val timeout: Timeout = sharding.timeout
25+
implicit private val scheduler: Scheduler = sharding.getScheduler
26+
private val clusterShard: ClusterSharding = sharding.getSharding
27+
28+
val helloWorldTag = "PLAN_CONVERSION"
29+
val helloWorldTypeKey: EntityTypeKey[Command] =
30+
EntityTypeKey[Command](helloWorldTag)
31+
32+
val helloWorldShardRegion: ActorRef[ShardingEnvelope[Command]] =
33+
clusterShard.init(
34+
Entity(helloWorldTypeKey)(createBehavior = entityContext => HelloWorldActor())
35+
)
36+
37+
def queryHelloWorld(query: String): Future[Response] = {
38+
val result = helloWorldShardRegion.ask((ref: akka.actor.typed.ActorRef[Response]) =>
39+
ShardingEnvelope.apply(
40+
"shardingEnvelopeId",
41+
QueryHelloWorld(query, ref)
42+
)
43+
)
44+
result
45+
}
46+
}

app/de/innfactory/bootstrapplay2/actorsystem/application/ActorHelloWorldController.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package de.innfactory.bootstrapplay2.actorsystem.application
22
import cats.data.EitherT
33
import de.innfactory.bootstrapplay2.actorsystem.domain.commands.{ResponseQueryHelloWorld, ResponseQueryHelloWorldError}
4-
import de.innfactory.bootstrapplay2.actorsystem.infrastructure.HelloWorldService
4+
import de.innfactory.bootstrapplay2.actorsystem.domain.interfaces.HelloWorldService
55
import de.innfactory.bootstrapplay2.apidefinition.{ActorAPIController, HelloworldResponse}
66
import de.innfactory.play.smithy4play.ImplicitLogContext
77
import de.innfactory.smithy4play.{AutoRouting, ContextRoute}

app/de/innfactory/bootstrapplay2/actorsystem/domain/HelloWorldActor.scala app/de/innfactory/bootstrapplay2/actorsystem/domain/actors/HelloWorldActor.scala

+3-10
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,9 @@
1-
package de.innfactory.bootstrapplay2.actorsystem.domain
1+
package de.innfactory.bootstrapplay2.actorsystem.domain.actors
22

3-
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer}
43
import akka.actor.typed.Behavior
4+
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer}
5+
import de.innfactory.bootstrapplay2.actorsystem.domain.commands._
56
import play.api.Logger
6-
import de.innfactory.bootstrapplay2.actorsystem.domain.commands.{
7-
Command,
8-
QueryError,
9-
QueryHelloWorld,
10-
QueryHelloWorldResult,
11-
ResponseQueryHelloWorld,
12-
ResponseQueryHelloWorldError
13-
}
147

158
import scala.concurrent.{ExecutionContext, Future}
169
import scala.util.{Failure, Success}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package de.innfactory.bootstrapplay2.actorsystem.domain.interfaces
2+
3+
import com.google.inject.ImplementedBy
4+
import de.innfactory.bootstrapplay2.actorsystem.domain.commands.Response
5+
import de.innfactory.bootstrapplay2.actorsystem.domain.services.HelloWorldServiceImpl
6+
7+
import scala.concurrent.Future
8+
9+
@ImplementedBy(classOf[HelloWorldServiceImpl])
10+
trait HelloWorldService {
11+
def queryHelloWorld(query: String): Future[Response]
12+
}

app/de/innfactory/bootstrapplay2/actorsystem/infrastructure/HelloWorldService.scala app/de/innfactory/bootstrapplay2/actorsystem/domain/services/HelloWorldServiceImpl.scala

+6-12
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
1-
package de.innfactory.bootstrapplay2.actorsystem.infrastructure
1+
package de.innfactory.bootstrapplay2.actorsystem.domain.services
22

3-
import de.innfactory.bootstrapplay2.actorsystem.domain.HelloWorldActor
4-
import de.innfactory.bootstrapplay2.actorsystem.domain.commands.{Command, QueryHelloWorld, Response}
53
import akka.actor._
64
import akka.actor.typed.scaladsl.AskPattern._
7-
import akka.util.Timeout
8-
import com.google.inject.ImplementedBy
9-
import javax.inject._
105
import akka.actor.typed.scaladsl.adapter._
6+
import akka.util.Timeout
7+
import de.innfactory.bootstrapplay2.actorsystem.domain.actors.HelloWorldActor
8+
import de.innfactory.bootstrapplay2.actorsystem.domain.commands.{Command, QueryHelloWorld, Response}
9+
import de.innfactory.bootstrapplay2.actorsystem.domain.interfaces.HelloWorldService
1110

11+
import javax.inject._
1212
import scala.concurrent.duration._
1313
import scala.concurrent.{ExecutionContext, Future}
1414

15-
@ImplementedBy(classOf[HelloWorldServiceImpl])
16-
trait HelloWorldService {
17-
def queryHelloWorld(query: String): Future[Response]
18-
}
19-
2015
@Singleton
2116
class HelloWorldServiceImpl @Inject() (
2217
)(implicit ec: ExecutionContext, system: ActorSystem)
@@ -40,5 +35,4 @@ class HelloWorldServiceImpl @Inject() (
4035
helloWorldActor.ask((ref: akka.actor.typed.ActorRef[Response]) => QueryHelloWorld(query, ref))
4136
result
4237
}
43-
4438
}

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ lazy val root = (project in file("."))
119119
Docker / packageName := "bootstrap-play2",
120120
dockerUpdateLatest := latest,
121121
dockerRepository := dockerRegistry,
122-
dockerExposedPorts := Seq(8080, 8080),
122+
dockerExposedPorts := Seq(8080, 25520, 8558),
123123
dockerEntrypoint := Seq(""),
124124
dockerBaseImage := "openjdk:11.0.6-jre-slim",
125125
dockerEntrypoint := Seq("/opt/docker/bin/bootstrap-play2", "-Dplay.server.pidfile.path=/dev/null")

conf/application.conf

+54
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,58 @@ opencensus-scala {
100100
trace {
101101
sampling-probability = 1
102102
}
103+
}
104+
105+
// Akka cluster
106+
107+
akka.serialization.jackson {
108+
jackson-modules += "com.fasterxml.jackson.datatype.joda.JodaModule"
109+
jackson-modules += "de.innfactory.familotel.adcenter.commons.jackson.JsValueDeSerializerModule"
110+
}
111+
112+
akka.cluster.seed-nodes = [ ]
113+
akka.cluster.seed-nodes = ${?AKKA_SEED_NODES}
114+
115+
akka {
116+
loglevel = "INFO"
117+
actor {
118+
provider = "cluster"
119+
debug.receive = false
120+
}
121+
cluster {
122+
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
123+
shutdown-after-unsuccessful-join-seed-nodes = 60s
124+
}
125+
}
126+
127+
128+
akka.cluster.log-info-verbose = off
129+
130+
akka.management {
131+
cluster.bootstrap {
132+
contact-point-discovery {
133+
discovery-method = kubernetes-api
134+
}
135+
}
136+
}
137+
akka.discovery {
138+
kubernetes-api {
139+
pod-namespace = "dev"
140+
pod-namespace = ${?NAMESPACE}
141+
pod-label-selector = "appName=bootstrapplay2"
142+
}
143+
}
144+
145+
akka.actor {
146+
allow-java-serialization = off
147+
serializers {
148+
jackson-json-event = "akka.serialization.jackson.JacksonJsonSerializer"
149+
}
150+
serialization-identifiers {
151+
jackson-json-event = 9001
152+
}
153+
serialization-bindings {
154+
"de.innfactory.familotel.adcenter.infrastructure.actorsystem.PersistenceSerializable" = jackson-json
155+
"play.api.libs.json.JsValue" = jackson-json
156+
}
103157
}

conf/dev-application.conf

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
include "application.conf"
2+
3+
akka.cluster.seed-nodes = [ "akka://application@127.0.0.1:25520" ]

project/Dependencies.scala

+15
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,22 @@ object Dependencies {
55

66
val scalaVersion = "2.13.8"
77
val akkaVersion = "2.6.19"
8+
val akkaManagementVersion = "1.1.3"
89

910
val akkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion
1011
val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.2.9"
1112
val akka = "com.typesafe.akka" %% "akka-actor" % akkaVersion
1213

14+
val akkaDiscovery = "com.typesafe.akka" %% "akka-discovery" % akkaVersion
15+
val akkaManagementClusterHttp =
16+
"com.lightbend.akka.management" %% "akka-management-cluster-http" % akkaManagementVersion
17+
val akkaManagementClusterBootstrap =
18+
"com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % akkaManagementVersion
19+
val akkaDiscoveryKubernetes =
20+
"com.lightbend.akka.discovery" %% "akka-discovery-kubernetes-api" % akkaManagementVersion
21+
22+
val akkaClusterShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion
23+
1324
// https://github.com/akka/akka/issues/29351
1425
val akkaJackson = "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion
1526

@@ -68,6 +79,10 @@ object Dependencies {
6879
akka,
6980
akkaTyped,
7081
akkaJackson,
82+
akkaClusterShardingTyped,
83+
akkaManagementClusterBootstrap,
84+
akkaManagementClusterHttp,
85+
akkaDiscoveryKubernetes,
7186
guice,
7287
slickPg,
7388
slickPgPlayJson,

test/resources/application.conf

+12
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,15 @@ opencensus-scala {
3131
sampling-probability = 1
3232
}
3333
}
34+
35+
akka.cluster.seed-nodes = [ "akka://application@127.0.0.1:25520" ]
36+
37+
akka {
38+
remote {
39+
artery {
40+
transport = tcp # See Selecting a transport below
41+
canonical.hostname = "127.0.0.1"
42+
canonical.port = 25520
43+
}
44+
}
45+
}

0 commit comments

Comments
 (0)