Skip to content

Commit

Permalink
continued work on cluster singleton
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonHartley committed Jan 7, 2024
1 parent fbf6bc2 commit 300e0ca
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 12 deletions.
12 changes: 12 additions & 0 deletions coerce/src/protocol/singleton.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,16 @@ enum SingletonState {
STARTING = 1;
RUNNING = 2;
STOPPING = 3;
}

message RequestLease {
uint64 source_node_id = 1;
}

message LeaseAck {
uint64 source_node_id = 1;
}

message LeaseNack {
uint64 source_node_id = 1;
}
11 changes: 11 additions & 0 deletions coerce/src/remote/cluster/singleton/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,14 @@ pub trait SingletonFactory: 'static + Sync + Send {

fn create(&self) -> Self::Actor;
}

impl<A: Actor, F> SingletonFactory for F
where
F: Fn() -> A + 'static + Sync + Send,
{
type Actor = A;

fn create(&self) -> A {
self()
}
}
9 changes: 6 additions & 3 deletions coerce/src/remote/cluster/singleton/manager/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::actor::Actor;
use crate::remote::cluster::singleton::factory::SingletonFactory;
use crate::remote::cluster::singleton::manager::status::SingletonState;
use crate::remote::cluster::singleton::manager::{Manager, State};
use crate::remote::system::NodeId;
use crate::remote::system::{NodeId, RemoteActorSystem};

#[derive(Clone)]
pub struct RequestLease {
Expand All @@ -13,6 +13,8 @@ pub struct RequestLease {

impl Message for RequestLease {
type Result = ();


}

pub struct LeaseAck {
Expand Down Expand Up @@ -47,7 +49,7 @@ impl<F: SingletonFactory> Handler<RequestLease> for Manager<F> {
#[async_trait]
impl<F: SingletonFactory> Handler<LeaseAck> for Manager<F> {
async fn handle(&mut self, message: LeaseAck, ctx: &mut ActorContext) {
self.on_lease_ack(message.source_node_id).await;
self.on_lease_ack(message.source_node_id, ctx).await;
}
}

Expand Down Expand Up @@ -82,14 +84,15 @@ impl<F: SingletonFactory> Manager<F> {
}
}

pub async fn on_lease_ack(&mut self, node_id: NodeId) {
pub async fn on_lease_ack(&mut self, node_id: NodeId, ctx: &ActorContext) {
match &mut self.state {
State::Starting { acknowledged_nodes } => {
acknowledged_nodes.insert(node_id);

// TODO: Can we do it with a quorum rather than *all managers*?
if acknowledged_nodes.len() == self.managers.len() {
info!("starting singleton on node={}", self.node_id);
self.start_actor(ctx).await;
}
}

Expand Down
39 changes: 32 additions & 7 deletions coerce/src/remote/cluster/singleton/manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
mod lease;
mod start;
mod status;

use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::{Actor, ActorFactory, ActorId, ActorRef, ActorRefErr, LocalActorRef};
use crate::actor::{Actor, ActorFactory, ActorId, ActorRef, ActorRefErr, IntoActor, LocalActorRef};
use crate::remote::cluster::node::NodeSelector;
use crate::remote::cluster::singleton::factory::SingletonFactory;
use crate::remote::stream::pubsub::{PubSub, Receive, Subscription};
Expand All @@ -23,6 +24,30 @@ pub struct Manager<F: SingletonFactory> {
managers: HashMap<NodeId, ActorRef<Self>>,
factory: F,
selector: NodeSelector,
sys: RemoteActorSystem,
}

impl<F: SingletonFactory> Manager<F> {
pub fn new(
sys: RemoteActorSystem,
factory: F,
manager_actor_id: ActorId,
singleton_actor_id: ActorId,
selector: NodeSelector,
) -> Self {
Self {
system_event_subscription: None,
state: State::default(),
current_leader_node: None,
node_id: sys.node_id(),
manager_actor_id,
singleton_actor_id,
managers: Default::default(),
factory,
selector,
sys,
}
}
}

impl<A: Actor> Default for State<A> {
Expand Down Expand Up @@ -77,7 +102,7 @@ impl<F: SingletonFactory> Manager<F> {
}
}

pub async fn begin_starting(&mut self, sys: &RemoteActorSystem) {
pub async fn begin_starting(&mut self) {
self.state = State::Starting {
acknowledged_nodes: HashSet::new(),
};
Expand All @@ -100,9 +125,9 @@ impl<F: SingletonFactory> Manager<F> {
};
}

pub async fn on_leader_changed(&mut self, new_leader_id: NodeId, sys: &RemoteActorSystem) {
if new_leader_id == sys.node_id() && !self.state.is_running() {
self.begin_starting(sys).await;
pub async fn on_leader_changed(&mut self, new_leader_id: NodeId) {
if new_leader_id == self.node_id && !self.state.is_running() {
self.begin_starting().await;
}
}
}
Expand Down Expand Up @@ -171,14 +196,14 @@ impl<F: SingletonFactory> Handler<Receive<SystemTopic>> for Manager<F> {
}

if leader == &self.node_id {
self.begin_starting(&sys).await;
self.begin_starting().await;
}
}

ClusterEvent::LeaderChanged(leader) => {
let sys = ctx.system().remote();

self.on_leader_changed(*leader, &sys).await;
self.on_leader_changed(*leader).await;
}

ClusterEvent::NodeAdded(node) => {
Expand Down
68 changes: 68 additions & 0 deletions coerce/src/remote/cluster/singleton/manager/start.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::{Actor, ActorRefErr, IntoActor, LocalActorRef};
use crate::remote::cluster::singleton::factory::SingletonFactory;
use crate::remote::cluster::singleton::manager::{Manager, State};

pub struct ActorStarted<A: Actor> {
actor_ref: LocalActorRef<A>,
}

pub struct ActorFailure {
error: ActorRefErr,
}

impl<A: Actor> Message for ActorStarted<A> {
type Result = ();
}

impl Message for ActorFailure {
type Result = ();
}

impl<F: SingletonFactory> Manager<F> {
pub async fn start_actor(&mut self, ctx: &ActorContext) {
let state = self.factory.create();
let sys = self.sys.actor_system().clone();
let manager_ref = self.actor_ref(ctx);
let actor_id = self.singleton_actor_id.clone();
let _ = tokio::spawn(async move {
let actor = state.into_actor(Some(actor_id), &sys).await;
match actor {
Ok(actor_ref) => {
let _ = manager_ref.notify(ActorStarted { actor_ref });
}
Err(error) => {
let _ = manager_ref.notify(ActorFailure { error });
}
}
});
}
}

#[async_trait]
impl<F: SingletonFactory> Handler<ActorStarted<F::Actor>> for Manager<F> {
async fn handle(&mut self, message: ActorStarted<F::Actor>, ctx: &mut ActorContext) {
match &self.state {
State::Starting { .. } => {
info!("singleton actor started");
}
_ => {
warn!("Invalid state, expected `Starting`");
}
}

let actor_ref = message.actor_ref;
self.state = State::Running { actor_ref }

// TODO: broadcast to all managers that we're running the actor
}
}

#[async_trait]
impl<F: SingletonFactory> Handler<ActorFailure> for Manager<F> {
async fn handle(&mut self, message: ActorFailure, ctx: &mut ActorContext) {
error!("Actor start failed (error={})", message.error);
// TODO: retry start??
}
}
52 changes: 50 additions & 2 deletions coerce/src/remote/cluster/singleton/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,61 @@
use crate::actor::message::{Handler, Message};
use crate::actor::{Actor, ActorFactory, ActorRefErr};
use crate::actor::{Actor, ActorFactory, ActorId, ActorRefErr, IntoActor, IntoActorId, LocalActorRef};
use crate::remote::cluster::node::NodeSelector;
use crate::remote::cluster::singleton::factory::SingletonFactory;
use crate::remote::cluster::singleton::manager::Manager;
use crate::remote::system::RemoteActorSystem;

pub mod factory;
pub mod manager;
pub mod proto;

pub struct Singleton<A: Actor, F: SingletonFactory<Actor = A>> {
manager: F,
manager: LocalActorRef<Manager<F>>,
}

pub struct SingletonBuilder<F: SingletonFactory> {
factory: Option<F>,
singleton_id: Option<ActorId>,
manager_id: Option<ActorId>,
node_selector: NodeSelector,
system: RemoteActorSystem,
}

impl<F: SingletonFactory> SingletonBuilder<F> {
pub fn new(system: RemoteActorSystem) -> Self {
Self {
system,
factory: None,
singleton_id: Some(F::Actor::type_name().into_actor_id()),
manager_id: Some(Manager::<F>::type_name().into_actor_id()),
node_selector: NodeSelector::All,
}
}

pub fn factory(mut self, factory: F) -> Self {
self.factory = Some(factory);
self
}

pub async fn build(mut self) -> Singleton<F::Actor, F> {
let factory = self.factory.expect("factory");
let manager_actor_id = self.manager_id.expect("manager actor id");
let singleton_actor_id = self.singleton_id.expect("singleton actor id");
let actor_system = self.system.actor_system().clone();

let manager = Manager::new(
self.system,
factory,
manager_actor_id.clone(),
singleton_actor_id,
self.node_selector,
)
.into_actor(Some(manager_actor_id), &actor_system)
.await
.expect("start manager actor");

Singleton { manager }
}
}

impl<A: Actor, F: SingletonFactory<Actor = A>> Singleton<A, F> {
Expand Down
63 changes: 63 additions & 0 deletions coerce/tests/test_cluster_singleton.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::time::Duration;
use tokio::time::sleep;
use tracing::Level;
use coerce::actor::Actor;
use coerce::actor::system::ActorSystem;
use coerce::remote::cluster::singleton::SingletonBuilder;
use coerce::remote::system::RemoteActorSystem;

mod util;

struct SingletonActor {

}

impl Actor for SingletonActor {

}

#[tokio::test]
pub async fn test_cluster_singleton_start() {
util::create_logger(Some(Level::DEBUG));

let system = ActorSystem::new();
let system2 = ActorSystem::new();
let remote = RemoteActorSystem::builder()
.with_tag("remote-1")
.with_id(1)
.with_actor_system(system)
.build()
.await;

let remote2 = RemoteActorSystem::builder()
.with_tag("remote-2")
.with_id(2)
.with_actor_system(system2)
.build()
.await;

remote
.clone()
.cluster_worker()
.listen_addr("localhost:30101")
.start()
.await;

remote2
.clone()
.cluster_worker()
.listen_addr("localhost:30102")
.with_seed_addr("localhost:30101")
.start()
.await;

let singleton = SingletonBuilder::new(remote)
.factory(|| SingletonActor {})
.build().await;

let singleton2 = SingletonBuilder::new(remote2)
.factory(|| SingletonActor {})
.build().await;

sleep(Duration::from_secs(5)).await;
}

0 comments on commit 300e0ca

Please sign in to comment.