Skip to content

Commit

Permalink
Initial work on singleton proxy, simple PipeTo/pipe_to to run fut…
Browse files Browse the repository at this point in the history
…ures and pipe the result back to actor refs as a message + further improvements
  • Loading branch information
LeonHartley committed Jan 20, 2024
1 parent 945c8aa commit b225db4
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 47 deletions.
27 changes: 27 additions & 0 deletions coerce/src/actor/refs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::actor::supervised::Terminated;
use crate::actor::{Actor, ActorId, ActorPath};
use std::any::Any;
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::sync::Arc;
Expand Down Expand Up @@ -597,3 +598,29 @@ impl<A: Actor> From<RemoteActorRef<A>> for ActorRef<A> {
}
}
}

pub trait PipeTo<A: Actor>
where
A: Handler<Self::Message>,
{
type Message: Message;

fn pipe_to(self, actor_ref: ActorRef<A>);
}

impl<A, F: Future> PipeTo<A> for F
where
F::Output: Message,
A: Handler<F::Output>,
F: 'static + Send,
{
type Message = F::Output;

fn pipe_to(self, actor_ref: ActorRef<A>) {
let fut = self;
tokio::spawn(async move {
let result = fut.await;
let _ = actor_ref.notify(result).await;
});
}
}
4 changes: 4 additions & 0 deletions coerce/src/protocol/singleton.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ message LeaseNack {

message SingletonStarted {
uint64 source_node_id = 1;
}

message SingletonStopped {
uint64 source_node_id = 1;
}
6 changes: 5 additions & 1 deletion coerce/src/remote/cluster/singleton/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use crate::actor::{
};
use crate::remote::cluster::node::NodeSelector;
use crate::remote::cluster::singleton::factory::SingletonFactory;
use crate::remote::cluster::singleton::proxy::Proxy;
use crate::remote::stream::pubsub::{PubSub, Receive, Subscription};
use crate::remote::stream::system::{ClusterEvent, ClusterMemberUp, SystemEvent, SystemTopic};
use crate::remote::system::{NodeId, RemoteActorSystem};
use crate::remote::RemoteActorRef;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::{Debug, Formatter};
use std::time::Duration;

Expand All @@ -29,6 +30,7 @@ pub struct Manager<F: SingletonFactory> {
factory: F,
selector: NodeSelector,
sys: RemoteActorSystem,
proxy: LocalActorRef<Proxy<F::Actor>>,
}

impl<F: SingletonFactory> Manager<F> {
Expand All @@ -38,6 +40,7 @@ impl<F: SingletonFactory> Manager<F> {
manager_actor_id: ActorId,
singleton_actor_id: ActorId,
selector: NodeSelector,
proxy: LocalActorRef<Proxy<F::Actor>>,
) -> Self {
Self {
system_event_subscription: None,
Expand All @@ -50,6 +53,7 @@ impl<F: SingletonFactory> Manager<F> {
factory,
selector,
sys,
proxy,
}
}
}
Expand Down
72 changes: 29 additions & 43 deletions coerce/src/remote/cluster/singleton/manager/start.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::{Actor, ActorRefErr, IntoActor, LocalActorRef};
use crate::actor::{Actor, ActorRefErr, IntoActor, LocalActorRef, PipeTo};
use crate::remote::cluster::singleton::factory::SingletonFactory;
use crate::remote::cluster::singleton::manager::{Manager, State};

pub struct ActorStarted<A: Actor> {
actor_ref: LocalActorRef<A>,
pub enum ActorStartResult<A: Actor> {
Started(LocalActorRef<A>),
Failed(ActorRefErr),
}

pub struct ActorFailure {
error: ActorRefErr,
}

impl<A: Actor> Message for ActorStarted<A> {
type Result = ();
}

impl Message for ActorFailure {
impl<A: Actor> Message for ActorStartResult<A> {
type Result = ();
}

Expand All @@ -26,43 +19,36 @@ impl<F: SingletonFactory> Manager<F> {
let sys = self.sys.actor_system().clone();
let manager_ref = self.actor_ref(ctx);
let actor_id = self.singleton_actor_id.clone();
let _ = tokio::spawn(async move {
let actor = state.into_actor(Some(actor_id), &sys).await;
match actor {
Ok(actor_ref) => {
let _ = manager_ref.notify(ActorStarted { actor_ref });
}
Err(error) => {
let _ = manager_ref.notify(ActorFailure { error });
}
}
});
}
}

#[async_trait]
impl<F: SingletonFactory> Handler<ActorStarted<F::Actor>> for Manager<F> {
async fn handle(&mut self, message: ActorStarted<F::Actor>, ctx: &mut ActorContext) {
match &self.state {
State::Starting { .. } => {
info!("singleton actor started");
}
_ => {
warn!("Invalid state, expected `Starting`");
async move {
match state.into_actor(Some(actor_id), &sys).await {
Ok(actor_ref) => ActorStartResult::Started(actor_ref),
Err(e) => ActorStartResult::Failed(e),
}
}

let actor_ref = message.actor_ref;
self.state = State::Running { actor_ref }

// TODO: broadcast to all managers that we're running the actor
.pipe_to(manager_ref.into());
}
}

#[async_trait]
impl<F: SingletonFactory> Handler<ActorFailure> for Manager<F> {
async fn handle(&mut self, message: ActorFailure, ctx: &mut ActorContext) {
error!("Actor start failed (error={})", message.error);
// TODO: retry start??
impl<F: SingletonFactory> Handler<ActorStartResult<F::Actor>> for Manager<F> {
async fn handle(&mut self, message: ActorStartResult<F::Actor>, ctx: &mut ActorContext) {
match message {
ActorStartResult::Started(actor_ref) => {
match &self.state {
State::Starting { .. } => {
info!("singleton actor started");
}
_ => {
warn!("Invalid state, expected `Starting`");
}
}

self.state = State::Running { actor_ref }

// TODO: broadcast to all managers that we're running the actor
}
ActorStartResult::Failed(e) => {}
}
}
}
20 changes: 17 additions & 3 deletions coerce/src/remote/cluster/singleton/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ use crate::remote::cluster::node::NodeSelector;
use crate::remote::cluster::singleton::factory::SingletonFactory;
use crate::remote::cluster::singleton::manager::lease::{LeaseAck, RequestLease};
use crate::remote::cluster::singleton::manager::Manager;
use crate::remote::cluster::singleton::proxy::Proxy;
use crate::remote::system::builder::RemoteSystemConfigBuilder;
use crate::remote::system::RemoteActorSystem;

pub mod factory;
pub mod manager;
pub mod proto;
pub mod proxy;

pub struct Singleton<A: Actor, F: SingletonFactory<Actor = A>> {
manager: LocalActorRef<Manager<F>>,
proxy: LocalActorRef<Proxy<A>>,
}

pub struct SingletonBuilder<F: SingletonFactory> {
Expand Down Expand Up @@ -43,24 +46,35 @@ impl<F: SingletonFactory> SingletonBuilder<F> {

pub async fn build(mut self) -> Singleton<F::Actor, F> {
let factory = self.factory.expect("factory");

let node_id = self.system.node_id();
let base_manager_id = self.manager_id.expect("manager actor id");
let manager_actor_id =
format!("{}-{}", &base_manager_id, self.system.node_id()).to_actor_id();

let manager_actor_id = format!("{}-{}", &base_manager_id, node_id).to_actor_id();

let proxy_actor_id = format!("{}-{}-proxy", &base_manager_id, node_id).to_actor_id();

let singleton_actor_id = self.singleton_id.expect("singleton actor id");
let actor_system = self.system.actor_system().clone();

let proxy = Proxy::<F::Actor>::new()
.into_actor(Some(proxy_actor_id), &actor_system)
.await
.expect("start proxy actor");

let manager = Manager::new(
self.system,
factory,
base_manager_id,
singleton_actor_id,
self.node_selector,
proxy.clone(),
)
.into_actor(Some(manager_actor_id), &actor_system)
.await
.expect("start manager actor");

Singleton { manager }
Singleton { manager, proxy }
}
}

Expand Down
91 changes: 91 additions & 0 deletions coerce/src/remote/cluster/singleton/proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::{Actor, ActorRef, ToActorId};
use crate::remote::cluster::singleton::proxy::send::Buffered;
use std::collections::VecDeque;

pub mod send;

pub enum ProxyState<A> {
Buffered {
request_queue: VecDeque<Box<dyn Buffered<A>>>,
},

Active {
actor_ref: ActorRef<A>,

Check failure on line 15 in coerce/src/remote/cluster/singleton/proxy/mod.rs

View workflow job for this annotation

GitHub Actions / Coerce Runtime Test

the trait bound `A: persistent::actor::PersistentActor` is not satisfied
},
}

pub struct Proxy<A: Actor> {
state: ProxyState<A>,
}

impl<A: Actor> Proxy<A> {
pub fn new() -> Self {
Self {
state: ProxyState::Buffered {
request_queue: VecDeque::new(),
},
}
}
}

impl<A: Actor> ProxyState<A> {
pub fn is_active(&self) -> bool {
matches!(&self, &ProxyState::Active { .. })
}
}

#[async_trait]
impl<A: Actor> Actor for Proxy<A> {}

pub struct SingletonStarted<A: Actor> {
actor_ref: ActorRef<A>,
}

pub struct SingletonStopping {}

impl<A> Message for SingletonStarted<A> {

Check failure on line 48 in coerce/src/remote/cluster/singleton/proxy/mod.rs

View workflow job for this annotation

GitHub Actions / Coerce Runtime Test

the trait bound `A: persistent::actor::PersistentActor` is not satisfied
type Result = ();
}

impl Message for SingletonStopping {
type Result = ();
}

#[async_trait]
impl<A: Actor> Handler<SingletonStarted<A>> for Proxy<A> {
async fn handle(&mut self, message: SingletonStarted<A>, ctx: &mut ActorContext) {
let actor_ref = message.actor_ref;

match &mut self.state {
ProxyState::Buffered { request_queue } => {
debug!(
"emitting {} buffered messages to {}",
request_queue.len(),
&actor_ref
);

while let Some(mut buffered) = request_queue.pop_front() {
buffered.send(actor_ref.clone());
}
}
_ => {}
}

self.state = ProxyState::Active { actor_ref };
}
}

#[async_trait]
impl<A: Actor> Handler<SingletonStopping> for Proxy<A> {
async fn handle(&mut self, _: SingletonStopping, ctx: &mut ActorContext) {
debug!("singleton actor stopped, buffering messages");

if self.state.is_active() {
self.state = ProxyState::Buffered {
request_queue: VecDeque::new(),
}
}
}
}
63 changes: 63 additions & 0 deletions coerce/src/remote/cluster/singleton/proxy/send.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::{Actor, ActorRef, ActorRefErr};
use crate::remote::cluster::singleton::factory::SingletonFactory;
use crate::remote::cluster::singleton::proxy::{Proxy, ProxyState};
use tokio::sync::oneshot::Sender;

pub struct Send<M: Message> {
message: Option<M>,
result_channel: Option<Sender<M::Result>>,
}

impl<M: Message> Send<M> {
pub fn deliver<A>(&mut self, actor: ActorRef<A>)
where
A: Handler<M>,
{
let message = self.message.take().unwrap();
let result_channel = self.result_channel.take().unwrap();
tokio::spawn(async move {
let res = actor.send(message).await;
match res {
Ok(r) => result_channel.send(r),
Err(_) => {}
}
});
}
}

pub trait Buffered<A: Actor>: 'static + Sync + std::marker::Send {
fn send(&mut self, actor_ref: ActorRef<A>);
}

impl<A: Actor, M: Message> Buffered<A> for Send<M>
where
A: Handler<M>,
{
fn send(&mut self, actor_ref: ActorRef<A>) {
self.deliver(actor_ref)
}
}

impl<M: Message> Message for Send<M> {
type Result = ();
}

#[async_trait]
impl<A: Actor, M: Message> Handler<Send<M>> for Proxy<A>
where
A: Handler<M>,
{
async fn handle(&mut self, mut message: Send<M>, ctx: &mut ActorContext) {
match &mut self.state {
ProxyState::Buffered { request_queue } => {
request_queue.push_back(Box::new(message));
}

ProxyState::Active { actor_ref } => {
message.deliver(actor_ref.clone());
}
}
}
}

0 comments on commit b225db4

Please sign in to comment.