From 2ae550243bc4c165cb094a05c41ba90f3226f2f5 Mon Sep 17 00:00:00 2001 From: Leon Hartley Date: Thu, 4 Jan 2024 17:33:10 +0000 Subject: [PATCH] continued work on cluster singleton functionality --- coerce/src/remote/cluster/node.rs | 2 +- .../remote/cluster/singleton/manager/lease.rs | 17 ++++--- .../remote/cluster/singleton/manager/mod.rs | 35 +++++++-------- .../cluster/singleton/manager/status.rs | 5 ++- coerce/src/remote/heartbeat/mod.rs | 45 +++++++++++-------- coerce/src/remote/stream/system.rs | 14 ++++-- coerce/src/sharding/coordinator/spawner.rs | 2 +- 7 files changed, 69 insertions(+), 51 deletions(-) diff --git a/coerce/src/remote/cluster/node.rs b/coerce/src/remote/cluster/node.rs index 8deb9f3..d240cbc 100644 --- a/coerce/src/remote/cluster/node.rs +++ b/coerce/src/remote/cluster/node.rs @@ -194,7 +194,7 @@ impl From for RemoteNode { addr: s.addr, tag: s.tag, node_started_at: s.node_started_at, - attributes: s.attributes.clone(), + attributes: s.attributes, } } } diff --git a/coerce/src/remote/cluster/singleton/manager/lease.rs b/coerce/src/remote/cluster/singleton/manager/lease.rs index 426a97c..927c5cc 100644 --- a/coerce/src/remote/cluster/singleton/manager/lease.rs +++ b/coerce/src/remote/cluster/singleton/manager/lease.rs @@ -69,12 +69,17 @@ impl Manager { } pub async fn grant_lease(&self, node_id: NodeId) { - self.notify_manager( - node_id, - LeaseAck { - source_node_id: self.node_id, - }, - ) + if let Err(e) = self + .notify_manager( + node_id, + LeaseAck { + source_node_id: self.node_id, + }, + ) + .await + { + error!("Failed to grant lease to node={}, e={}", node_id, e) + } } pub async fn on_lease_ack(&mut self, node_id: NodeId) { diff --git a/coerce/src/remote/cluster/singleton/manager/mod.rs b/coerce/src/remote/cluster/singleton/manager/mod.rs index d76330f..98e7d80 100644 --- a/coerce/src/remote/cluster/singleton/manager/mod.rs +++ b/coerce/src/remote/cluster/singleton/manager/mod.rs @@ -10,7 +10,7 @@ use crate::remote::stream::pubsub::{PubSub, Receive, Subscription}; use crate::remote::stream::system::{ClusterEvent, ClusterMemberUp, SystemEvent, SystemTopic}; use crate::remote::system::{NodeId, RemoteActorSystem}; use crate::remote::RemoteActorRef; -use std::collections::btree_map::Entry; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; pub struct Manager { @@ -92,25 +92,19 @@ impl Manager { None => return, }; + let _ = actor_ref.notify_stop(); + self.state = State::Stopping { actor_ref, lease_requested_by: node_id, }; - - let _ = actor_ref.notify_stop(); } pub async fn on_leader_changed(&mut self, new_leader_id: NodeId, sys: &RemoteActorSystem) { - if new_leader_id == sys.node_id() { - if !self.state.is_running() { - self.begin_starting(sys).await; - } + if new_leader_id == sys.node_id() && !self.state.is_running() { + self.begin_starting(sys).await; } } - - fn create_remote_ref(&self, node_id: NodeId, sys: &RemoteActorSystem) -> ActorRef { - RemoteActorRef::new(self.manager_actor_id.clone(), node_id, sys.clone()).into() - } } #[async_trait] @@ -156,18 +150,20 @@ impl Handler> for Manager { let sys = ctx.system().remote(); match message.0.as_ref() { SystemEvent::Cluster(e) => match e { - ClusterEvent::MemberUp(ClusterMemberUp { leader, nodes }) => { + ClusterEvent::MemberUp(ClusterMemberUp { + leader_id: leader, + nodes, + }) => { for node in nodes { - if !self.selector.includes(node.as_ref()) { + if !self.selector.includes(node.as_ref()) && node.id != &self.node_id { continue; } self.managers - .insert(node.id, self.create_remote_ref(node.id, &sys)); + .insert(node.id, RemoteActorRef::new(self.manager_actor_id.clone(), node.id, sys.clone()).into()); } - if leader == self.node_id { - // TODO: start + if leader == &self.node_id { self.begin_starting(&sys).await; } } @@ -180,9 +176,10 @@ impl Handler> for Manager { ClusterEvent::NodeAdded(node) => { if self.selector.includes(node.as_ref()) { - let entry = self.managers.entry(node.id); - if let Entry::Vacant(e) = entry { - e.insert(self.create_remote_ref(node.id, &sys)); + let mut entry = self.managers.entry(node.id); + if let Entry::Vacant(mut entry) = entry { + let remote_ref = RemoteActorRef::new(self.manager_actor_id.clone(), node.id, sys.clone()).into(); + entry.insert(remote_ref); } } } diff --git a/coerce/src/remote/cluster/singleton/manager/status.rs b/coerce/src/remote/cluster/singleton/manager/status.rs index 9b59fa0..d3bfcbf 100644 --- a/coerce/src/remote/cluster/singleton/manager/status.rs +++ b/coerce/src/remote/cluster/singleton/manager/status.rs @@ -6,6 +6,7 @@ use crate::remote::cluster::singleton::factory::SingletonFactory; use crate::remote::cluster::singleton::manager::{Manager, State}; use crate::remote::cluster::singleton::proto::singleton as proto; use crate::remote::system::NodeId; +use protobuf::EnumOrUnknown; pub struct GetStatus { source_node_id: NodeId, @@ -32,7 +33,7 @@ impl Handler for Manager { let singleton_state = match &self.state { State::Idle => SingletonState::Idle, - State::Starting => SingletonState::Starting, + State::Starting { .. } => SingletonState::Starting, State::Running { .. } => SingletonState::Running, State::Stopping { .. } => SingletonState::Stopping, }; @@ -60,7 +61,7 @@ impl Message for GetStatus { fn write_remote_result(res: Self::Result) -> Result, MessageWrapErr> { proto::ManagerStatus { - singleton_state: Some(res.singleton_state.into()).into(), + singleton_state: EnumOrUnknown::new(res.singleton_state.into()), ..Default::default() } .to_bytes() diff --git a/coerce/src/remote/heartbeat/mod.rs b/coerce/src/remote/heartbeat/mod.rs index 53121f9..a192a0b 100644 --- a/coerce/src/remote/heartbeat/mod.rs +++ b/coerce/src/remote/heartbeat/mod.rs @@ -7,11 +7,11 @@ use crate::actor::system::ActorSystem; use crate::actor::{Actor, BoxedActorRef, IntoActor, LocalActorRef}; use crate::actor::{ActorId, CoreActorRef}; use crate::remote::actor::message::{NodeTerminated, SetRemote}; -use crate::remote::cluster::node::{NodeStatus, RemoteNodeState}; +use crate::remote::cluster::node::{NodeStatus, RemoteNodeRef, RemoteNodeState}; use crate::remote::net::proto::network::PongEvent; use crate::remote::stream::pubsub::PubSub; use crate::remote::stream::system::ClusterEvent::{LeaderChanged, MemberUp}; -use crate::remote::stream::system::{SystemEvent, SystemTopic}; +use crate::remote::stream::system::{ClusterMemberUp, SystemEvent, SystemTopic}; use crate::remote::system::{NodeId, RemoteActorSystem}; use chrono::{DateTime, Utc}; @@ -19,6 +19,7 @@ use std::cmp::Ordering; use std::collections::{HashMap, VecDeque}; use std::ops::Add; +use std::sync::Arc; use crate::remote::heartbeat::health::{ GetHealth, RegisterHealthCheck, RemoveHealthCheck, SystemHealth, @@ -260,17 +261,6 @@ impl Handler for Heartbeat { } }); - if !self.cluster_member_up { - let min_cluster_size_reached = match self.config.minimum_cluster_size { - None => true, - Some(n) => n >= self.node_pings.len(), - }; - - if min_cluster_size_reached { - self.on_min_cluster_size_reached() - } - } - if self.last_heartbeat.is_some() { let oldest_healthy_node = updates.iter().filter(|n| n.status.is_healthy()).next(); @@ -291,12 +281,27 @@ impl Handler for Heartbeat { } } - system.update_nodes(updates).await; - self.last_heartbeat = Some(Utc::now()); + system.update_nodes(updates.clone()).await; if let Some(new_leader_id) = new_leader_id { + if !self.cluster_member_up { + let min_cluster_size_reached = match self.config.minimum_cluster_size { + None => true, + Some(n) => n >= self.node_pings.len(), + }; + + if min_cluster_size_reached { + self.on_min_cluster_size_reached( + new_leader_id, + updates.into_iter().map(|n| Arc::new(n.into())).collect(), + ); + } + } + self.update_leader(new_leader_id); } + + self.last_heartbeat = Some(Utc::now()); } } @@ -320,14 +325,18 @@ impl Heartbeat { } } - fn on_min_cluster_size_reached(&mut self) { + fn on_min_cluster_size_reached(&mut self, leader_id: NodeId, nodes: Vec) { self.cluster_member_up = true; let system = self.system.as_ref().unwrap(); let sys = system.clone(); tokio::spawn(async move { - let _ = - PubSub::publish_locally(SystemTopic, SystemEvent::Cluster(MemberUp), &sys).await; + let _ = PubSub::publish_locally( + SystemTopic, + SystemEvent::Cluster(MemberUp(ClusterMemberUp { leader_id, nodes })), + &sys, + ) + .await; }); } } diff --git a/coerce/src/remote/stream/system.rs b/coerce/src/remote/stream/system.rs index 6a46d06..e14fc3f 100644 --- a/coerce/src/remote/stream/system.rs +++ b/coerce/src/remote/stream/system.rs @@ -20,8 +20,9 @@ pub enum ClusterEvent { LeaderChanged(NodeId), } +#[derive(Debug)] pub struct ClusterMemberUp { - pub leader: NodeId, + pub leader_id: NodeId, pub nodes: Vec, } @@ -60,8 +61,11 @@ impl From for SystemEvent { } impl From for SystemEvent { - fn from(_: MemberUpEvent) -> Self { - SystemEvent::Cluster(ClusterEvent::MemberUp) + fn from(e: MemberUpEvent) -> Self { + SystemEvent::Cluster(ClusterEvent::MemberUp(ClusterMemberUp { + leader_id: e.leader_id, + nodes: e.nodes.into_iter().map(|n| Arc::new(n.into())).collect(), + })) } } @@ -119,8 +123,10 @@ impl StreamData for SystemEvent { write_event(SysEvent::ClusterLeaderChanged, event.write_to_bytes()) } - ClusterEvent::MemberUp => { + ClusterEvent::MemberUp(member_up) => { let event = MemberUpEvent { + leader_id: member_up.leader_id, + nodes: member_up.nodes.iter().map(|n| n.as_ref().clone().into()).collect(), ..Default::default() }; diff --git a/coerce/src/sharding/coordinator/spawner.rs b/coerce/src/sharding/coordinator/spawner.rs index 0d1b402..ba0c99f 100644 --- a/coerce/src/sharding/coordinator/spawner.rs +++ b/coerce/src/sharding/coordinator/spawner.rs @@ -135,7 +135,7 @@ impl Handler> for CoordinatorSpawner { ); } } - ClusterEvent::MemberUp => { + ClusterEvent::MemberUp(member_up) => { // TODO: coordinator should not start until memberup has been signalled. } },