Skip to content

Commit

Permalink
continued work on cluster singleton functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonHartley committed Jan 4, 2024
1 parent 1f44815 commit 2ae5502
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 51 deletions.
2 changes: 1 addition & 1 deletion coerce/src/remote/cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl From<RemoteNodeState> for RemoteNode {
addr: s.addr,
tag: s.tag,
node_started_at: s.node_started_at,
attributes: s.attributes.clone(),
attributes: s.attributes,
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions coerce/src/remote/cluster/singleton/manager/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,17 @@ impl<F: SingletonFactory> Manager<F> {
}

pub async fn grant_lease(&self, node_id: NodeId) {
self.notify_manager(
node_id,
LeaseAck {
source_node_id: self.node_id,
},
)
if let Err(e) = self
.notify_manager(
node_id,
LeaseAck {
source_node_id: self.node_id,
},
)
.await
{
error!("Failed to grant lease to node={}, e={}", node_id, e)
}
}

pub async fn on_lease_ack(&mut self, node_id: NodeId) {
Expand Down
35 changes: 16 additions & 19 deletions coerce/src/remote/cluster/singleton/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::remote::stream::pubsub::{PubSub, Receive, Subscription};
use crate::remote::stream::system::{ClusterEvent, ClusterMemberUp, SystemEvent, SystemTopic};
use crate::remote::system::{NodeId, RemoteActorSystem};
use crate::remote::RemoteActorRef;
use std::collections::btree_map::Entry;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};

pub struct Manager<F: SingletonFactory> {
Expand Down Expand Up @@ -92,25 +92,19 @@ impl<F: SingletonFactory> Manager<F> {
None => return,
};

let _ = actor_ref.notify_stop();

self.state = State::Stopping {
actor_ref,
lease_requested_by: node_id,
};

let _ = actor_ref.notify_stop();
}

pub async fn on_leader_changed(&mut self, new_leader_id: NodeId, sys: &RemoteActorSystem) {
if new_leader_id == sys.node_id() {
if !self.state.is_running() {
self.begin_starting(sys).await;
}
if new_leader_id == sys.node_id() && !self.state.is_running() {
self.begin_starting(sys).await;
}
}

fn create_remote_ref(&self, node_id: NodeId, sys: &RemoteActorSystem) -> ActorRef<Self> {
RemoteActorRef::new(self.manager_actor_id.clone(), node_id, sys.clone()).into()
}
}

#[async_trait]
Expand Down Expand Up @@ -156,18 +150,20 @@ impl<F: SingletonFactory> Handler<Receive<SystemTopic>> for Manager<F> {
let sys = ctx.system().remote();
match message.0.as_ref() {
SystemEvent::Cluster(e) => match e {
ClusterEvent::MemberUp(ClusterMemberUp { leader, nodes }) => {
ClusterEvent::MemberUp(ClusterMemberUp {
leader_id: leader,
nodes,
}) => {
for node in nodes {
if !self.selector.includes(node.as_ref()) {
if !self.selector.includes(node.as_ref()) && node.id != &self.node_id {

Check failure on line 158 in coerce/src/remote/cluster/singleton/manager/mod.rs

View workflow job for this annotation

GitHub Actions / Coerce Runtime Test

can't compare `u64` with `&u64`
continue;
}

self.managers
.insert(node.id, self.create_remote_ref(node.id, &sys));
.insert(node.id, RemoteActorRef::new(self.manager_actor_id.clone(), node.id, sys.clone()).into());
}

if leader == self.node_id {
// TODO: start
if leader == &self.node_id {
self.begin_starting(&sys).await;
}
}
Expand All @@ -180,9 +176,10 @@ impl<F: SingletonFactory> Handler<Receive<SystemTopic>> for Manager<F> {

ClusterEvent::NodeAdded(node) => {
if self.selector.includes(node.as_ref()) {
let entry = self.managers.entry(node.id);
if let Entry::Vacant(e) = entry {
e.insert(self.create_remote_ref(node.id, &sys));
let mut entry = self.managers.entry(node.id);
if let Entry::Vacant(mut entry) = entry {
let remote_ref = RemoteActorRef::new(self.manager_actor_id.clone(), node.id, sys.clone()).into();
entry.insert(remote_ref);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions coerce/src/remote/cluster/singleton/manager/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::remote::cluster::singleton::factory::SingletonFactory;
use crate::remote::cluster::singleton::manager::{Manager, State};
use crate::remote::cluster::singleton::proto::singleton as proto;
use crate::remote::system::NodeId;
use protobuf::EnumOrUnknown;

pub struct GetStatus {
source_node_id: NodeId,
Expand All @@ -32,7 +33,7 @@ impl<F: SingletonFactory> Handler<GetStatus> for Manager<F> {

let singleton_state = match &self.state {
State::Idle => SingletonState::Idle,
State::Starting => SingletonState::Starting,
State::Starting { .. } => SingletonState::Starting,
State::Running { .. } => SingletonState::Running,
State::Stopping { .. } => SingletonState::Stopping,
};
Expand Down Expand Up @@ -60,7 +61,7 @@ impl Message for GetStatus {

fn write_remote_result(res: Self::Result) -> Result<Vec<u8>, MessageWrapErr> {
proto::ManagerStatus {
singleton_state: Some(res.singleton_state.into()).into(),
singleton_state: EnumOrUnknown::new(res.singleton_state.into()),
..Default::default()
}
.to_bytes()
Expand Down
45 changes: 27 additions & 18 deletions coerce/src/remote/heartbeat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ use crate::actor::system::ActorSystem;
use crate::actor::{Actor, BoxedActorRef, IntoActor, LocalActorRef};
use crate::actor::{ActorId, CoreActorRef};
use crate::remote::actor::message::{NodeTerminated, SetRemote};
use crate::remote::cluster::node::{NodeStatus, RemoteNodeState};
use crate::remote::cluster::node::{NodeStatus, RemoteNodeRef, RemoteNodeState};
use crate::remote::net::proto::network::PongEvent;
use crate::remote::stream::pubsub::PubSub;
use crate::remote::stream::system::ClusterEvent::{LeaderChanged, MemberUp};
use crate::remote::stream::system::{SystemEvent, SystemTopic};
use crate::remote::stream::system::{ClusterMemberUp, SystemEvent, SystemTopic};
use crate::remote::system::{NodeId, RemoteActorSystem};
use chrono::{DateTime, Utc};

use std::cmp::Ordering;
use std::collections::{HashMap, VecDeque};

use std::ops::Add;
use std::sync::Arc;

use crate::remote::heartbeat::health::{
GetHealth, RegisterHealthCheck, RemoveHealthCheck, SystemHealth,
Expand Down Expand Up @@ -260,17 +261,6 @@ impl Handler<HeartbeatTick> for Heartbeat {
}
});

if !self.cluster_member_up {
let min_cluster_size_reached = match self.config.minimum_cluster_size {
None => true,
Some(n) => n >= self.node_pings.len(),
};

if min_cluster_size_reached {
self.on_min_cluster_size_reached()
}
}

if self.last_heartbeat.is_some() {
let oldest_healthy_node = updates.iter().filter(|n| n.status.is_healthy()).next();

Expand All @@ -291,12 +281,27 @@ impl Handler<HeartbeatTick> for Heartbeat {
}
}

system.update_nodes(updates).await;
self.last_heartbeat = Some(Utc::now());
system.update_nodes(updates.clone()).await;

if let Some(new_leader_id) = new_leader_id {
if !self.cluster_member_up {
let min_cluster_size_reached = match self.config.minimum_cluster_size {
None => true,
Some(n) => n >= self.node_pings.len(),
};

if min_cluster_size_reached {
self.on_min_cluster_size_reached(
new_leader_id,
updates.into_iter().map(|n| Arc::new(n.into())).collect(),
);
}
}

self.update_leader(new_leader_id);
}

self.last_heartbeat = Some(Utc::now());
}
}

Expand All @@ -320,14 +325,18 @@ impl Heartbeat {
}
}

fn on_min_cluster_size_reached(&mut self) {
fn on_min_cluster_size_reached(&mut self, leader_id: NodeId, nodes: Vec<RemoteNodeRef>) {
self.cluster_member_up = true;
let system = self.system.as_ref().unwrap();

let sys = system.clone();
tokio::spawn(async move {
let _ =
PubSub::publish_locally(SystemTopic, SystemEvent::Cluster(MemberUp), &sys).await;
let _ = PubSub::publish_locally(
SystemTopic,
SystemEvent::Cluster(MemberUp(ClusterMemberUp { leader_id, nodes })),
&sys,
)
.await;
});
}
}
Expand Down
14 changes: 10 additions & 4 deletions coerce/src/remote/stream/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ pub enum ClusterEvent {
LeaderChanged(NodeId),
}

#[derive(Debug)]
pub struct ClusterMemberUp {
pub leader: NodeId,
pub leader_id: NodeId,
pub nodes: Vec<RemoteNodeRef>,
}

Expand Down Expand Up @@ -60,8 +61,11 @@ impl From<LeaderChangedEvent> for SystemEvent {
}

impl From<MemberUpEvent> for SystemEvent {
fn from(_: MemberUpEvent) -> Self {
SystemEvent::Cluster(ClusterEvent::MemberUp)
fn from(e: MemberUpEvent) -> Self {
SystemEvent::Cluster(ClusterEvent::MemberUp(ClusterMemberUp {
leader_id: e.leader_id,
nodes: e.nodes.into_iter().map(|n| Arc::new(n.into())).collect(),
}))
}
}

Expand Down Expand Up @@ -119,8 +123,10 @@ impl StreamData for SystemEvent {
write_event(SysEvent::ClusterLeaderChanged, event.write_to_bytes())
}

ClusterEvent::MemberUp => {
ClusterEvent::MemberUp(member_up) => {
let event = MemberUpEvent {
leader_id: member_up.leader_id,
nodes: member_up.nodes.iter().map(|n| n.as_ref().clone().into()).collect(),
..Default::default()
};

Expand Down
2 changes: 1 addition & 1 deletion coerce/src/sharding/coordinator/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl Handler<Receive<SystemTopic>> for CoordinatorSpawner {
);
}
}
ClusterEvent::MemberUp => {
ClusterEvent::MemberUp(member_up) => {
// TODO: coordinator should not start until memberup has been signalled.
}
},
Expand Down

0 comments on commit 2ae5502

Please sign in to comment.