Skip to content

Commit

Permalink
expect MemberUp before actioning other cluster events
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonHartley committed Feb 4, 2024
1 parent 99f179f commit 9aa8ead
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions coerce/src/remote/cluster/singleton/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct Manager<F: SingletonFactory> {
selector: NodeSelector,
sys: RemoteActorSystem,
proxy: LocalActorRef<Proxy<F::Actor>>,
cluster_up: bool,
}

impl<F: SingletonFactory> Manager<F> {
Expand All @@ -57,6 +58,7 @@ impl<F: SingletonFactory> Manager<F> {
selector,
sys,
proxy,
cluster_up: false,
}
}
}
Expand Down Expand Up @@ -263,6 +265,8 @@ impl<F: SingletonFactory> Handler<Receive<SystemTopic>> for Manager<F> {
leader_id: leader,
nodes,
}) => {
self.cluster_up = true;

for node in nodes {
if node.id == self.node_id || !self.selector.includes(node.as_ref()) {
continue;
Expand All @@ -285,6 +289,10 @@ impl<F: SingletonFactory> Handler<Receive<SystemTopic>> for Manager<F> {
}

ClusterEvent::LeaderChanged(leader) => {
if !self.cluster_up {
return;
}

self.on_leader_changed(*leader, ctx).await;
}

Expand All @@ -306,11 +314,13 @@ impl<F: SingletonFactory> Handler<Receive<SystemTopic>> for Manager<F> {
ClusterEvent::NodeRemoved(node) => {
self.managers.remove(&node.id);

if let State::Starting { acknowledged_nodes } = &mut self.state {
acknowledged_nodes.remove(&node.id);
if self.cluster_up {
if let State::Starting { acknowledged_nodes } = &mut self.state {
acknowledged_nodes.remove(&node.id);

if acknowledged_nodes.len() == self.managers.len() {
self.on_all_managers_acknowledged(ctx).await;
if acknowledged_nodes.len() == self.managers.len() {
self.on_all_managers_acknowledged(ctx).await;
}
}
}
}
Expand Down

0 comments on commit 9aa8ead

Please sign in to comment.