Skip to content

Commit

Permalink
begin work on replicator writes + add Leader replicator state
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonHartley committed Mar 26, 2024
1 parent 900f901 commit b3e3e67
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 58 deletions.
2 changes: 1 addition & 1 deletion coerce/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ use crate::actor::message::{Handler, Message};
use crate::actor::scheduler::ActorType::{Anonymous, Tracked};
use crate::actor::system::ActorSystem;

pub use refs::*;
use std::fmt::Debug;
use std::hash::Hasher;

Check warning on line 147 in coerce/src/actor/mod.rs

View workflow job for this annotation

GitHub Actions / Coerce Runtime Test

unused import: `std::hash::Hasher`
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
pub use refs::*;

#[cfg(feature = "remote")]
use crate::actor::message::Envelope;

Check warning on line 155 in coerce/src/actor/mod.rs

View workflow job for this annotation

GitHub Actions / Coerce Runtime Test

unused import: `crate::actor::message::Envelope`
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub struct Replicate {}
pub struct Replicate<K, V> {}

Check failure on line 1 in modules/replication/coerce-replication/src/simple/consensus.rs

View workflow job for this annotation

GitHub Actions / Coerce Runtime Test

parameter `K` is never used

Check failure on line 1 in modules/replication/coerce-replication/src/simple/consensus.rs

View workflow job for this annotation

GitHub Actions / Coerce Runtime Test

parameter `V` is never used
2 changes: 2 additions & 0 deletions modules/replication/coerce-replication/src/simple/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::storage::StorageErr;
use coerce::actor::message::{MessageUnwrapErr, MessageWrapErr};
use coerce::actor::ActorRefErr;
use coerce::remote::system::NodeId;

#[derive(Debug)]
pub enum Error {
Expand All @@ -9,6 +10,7 @@ pub enum Error {
ActorRef(ActorRefErr),
Deserialisation(MessageUnwrapErr),
Serialisation(MessageWrapErr),
LeaderChanged { new_leader_id: NodeId },
}

impl From<StorageErr> for Error {
Expand Down
120 changes: 84 additions & 36 deletions modules/replication/coerce-replication/src/simple/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub mod write;

use crate::simple::heartbeat::HeartbeatTick;
use crate::simple::read::Read;
use crate::simple::write::Write;
use crate::simple::write::{Mutation, UncommittedMutation, Write};
use crate::storage::{Key, Storage, Value};
use coerce::actor::context::ActorContext;
use coerce::actor::message::{Handler, Message};
Expand All @@ -25,16 +25,20 @@ use coerce::remote::cluster::group::{Node, NodeGroup, NodeGroupEvent, Subscribe}
use coerce::remote::cluster::node::NodeSelector;
use coerce::remote::system::{NodeId, RemoteActorSystem};

use std::collections::{HashMap, VecDeque};
use crate::simple::error::Error;
use std::collections::{HashMap, HashSet, VecDeque};
use std::mem;
use std::time::Duration;
use tokio::sync::oneshot;

pub enum Request<K: Key, V: Value> {
Read(Read<K, V>),
Write(Write<K, V>),
}

enum State<S: Storage> {
None,

Joining {
request_buffer: VecDeque<Request<S::Key, S::Value>>,
},
Expand All @@ -45,16 +49,37 @@ enum State<S: Storage> {

Available {
cluster: Cluster<S>,
heartbeat_timer: Option<Timer>,
pending_mutations: HashMap<u64, Mutation<S::Key, S::Value>>,
},

Leader {
cluster: Cluster<S>,
uncommitted_mutations: HashMap<u64, UncommittedMutation<S::Key, S::Value>>,
heartbeat_timer: Timer,
},
}

impl<S> Default for State<S> {

Check failure on line 62 in modules/replication/coerce-replication/src/simple/mod.rs

View workflow job for this annotation

GitHub Actions / Coerce Runtime Test

the trait bound `S: storage::Storage` is not satisfied
fn default() -> Self {

Check failure on line 63 in modules/replication/coerce-replication/src/simple/mod.rs

View workflow job for this annotation

GitHub Actions / Coerce Runtime Test

the trait bound `S: storage::Storage` is not satisfied
Self::None
}
}

struct Cluster<S: Storage> {
current_leader: NodeId,
leader_actor: ActorRef<Replicator<S>>,
nodes: HashMap<NodeId, Node<Replicator<S>>>,
}

impl<S: Storage> Cluster<S> {
pub fn update_leader(&mut self, leader_id: NodeId) {
let leader_actor = self.nodes.get(&leader_id).unwrap().actor.clone();

self.current_leader = leader_id;
self.leader_actor = leader_actor;
}
}

pub struct Replicator<S: Storage> {
storage: S,
group: LocalActorRef<NodeGroup<Self>>,
Expand Down Expand Up @@ -134,14 +159,8 @@ impl<S: Storage> Handler<NodeGroupEvent<Replicator<S>>> for Replicator<S> {
NodeGroupEvent::MemberUp { leader_id, nodes } => {
debug!(leader_id = leader_id, node_count = nodes.len(), "member up");

let (mut leader_actor, heartbeat_timer) = if leader_id == self.system.node_id() {
let timer = start_heartbeat_timer::<S>(ctx);
let actor_ref = ActorRef::from(self.actor_ref(ctx));

(Some(actor_ref), Some(timer))
} else {
(None, None)
};
let is_leader = leader_id == self.system.node_id();
let (mut leader_actor) = is_leader.then(|| ActorRef::from(self.actor_ref(ctx)));

let mut node_map = HashMap::new();
for node in nodes {
Expand All @@ -152,19 +171,26 @@ impl<S: Storage> Handler<NodeGroupEvent<Replicator<S>>> for Replicator<S> {
node_map.insert(node.node_id, node);
}

let old_state = mem::replace(
&mut self.state,
let cluster = Cluster {
current_leader: leader_id,
leader_actor: leader_actor.unwrap(),
nodes: node_map,
};

let new_state = if is_leader {
State::Leader {
cluster,
uncommitted_mutations: Default::default(),
heartbeat_timer: start_heartbeat_timer::<S>(ctx),
}
} else {
State::Available {
cluster: Cluster {
current_leader: leader_id,
leader_actor: leader_actor.unwrap(),
nodes: node_map,
},
heartbeat_timer,
},
);

match old_state {
cluster,
pending_mutations: Default::default(),
}
};

match mem::replace(&mut self.state, new_state) {
State::Joining { request_buffer } => {
debug!(
pending_requests = request_buffer.len(),
Expand All @@ -189,7 +215,7 @@ impl<S: Storage> Handler<NodeGroupEvent<Replicator<S>>> for Replicator<S> {
debug!(node_id = node.node_id, "node added");

match &mut self.state {
State::Available { cluster, .. } => {
State::Available { cluster, .. } | State::Leader { cluster, .. } => {
cluster.nodes.insert(node.node_id, node);
}
_ => {}
Expand All @@ -200,7 +226,7 @@ impl<S: Storage> Handler<NodeGroupEvent<Replicator<S>>> for Replicator<S> {
debug!(node_id = node_id, "node removed");

match &mut self.state {
State::Available { cluster, .. } => {
State::Available { cluster, .. } | State::Leader { cluster, .. } => {
cluster.nodes.remove(&node_id);
}
_ => {}
Expand All @@ -210,22 +236,44 @@ impl<S: Storage> Handler<NodeGroupEvent<Replicator<S>>> for Replicator<S> {
NodeGroupEvent::LeaderChanged(leader_id) => {
info!(leader_id = leader_id, "leader changed");

match &mut self.state {
State::Available {
cluster,
heartbeat_timer,
match mem::take(&mut self.state) {
State::Leader {
mut cluster,
uncommitted_mutations,
..
} => {
let leader_actor = cluster.nodes.get(&leader_id).unwrap().actor.clone();
cluster.current_leader = leader_id;
cluster.leader_actor = leader_actor;
cluster.update_leader(leader_id);

for (_, mutation) in uncommitted_mutations {
mutation.on_completion.notify_err(
Error::LeaderChanged {
new_leader_id: leader_id,
},
&self.system,
);
}

self.state = State::Available {
cluster,
pending_mutations: Default::default(),
}
}

State::Available { mut cluster, .. } => {
cluster.update_leader(leader_id);

let is_leader = leader_id == self.system.node_id();
if is_leader {
if heartbeat_timer.is_none() {
*heartbeat_timer = Some(start_heartbeat_timer::<S>(ctx));
}
self.state = State::Leader {
cluster,
uncommitted_mutations: Default::default(),
heartbeat_timer: start_heartbeat_timer(ctx),
};
} else {
*heartbeat_timer = None;
self.state = State::Available {
cluster,
pending_mutations: Default::default(),
}
}
}

Expand Down
28 changes: 11 additions & 17 deletions modules/replication/coerce-replication/src/simple/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,28 @@ impl<S: Storage> Handler<Read<S::Key, S::Value>> for Replicator<S> {
);
}

State::Recovering { cluster } => {
debug!("node is currently recovering, forwarding to leader actor");
State::Available { cluster, .. } | State::Recovering { cluster, .. } => {
let on_completion = message.on_completion.take().unwrap();

debug!("forwarding request to leader node");

tokio::spawn(remote_read(
cluster.leader_actor.clone(),
message.key,
message.on_completion.take().unwrap(),
on_completion,
self.system.clone(),
));
}

State::Available { cluster, .. } => {
State::Leader { .. } => {
let on_completion = message.on_completion.take().unwrap();
let data = self.storage.read(message.key).await;

if cluster.current_leader == self.system.node_id() {
let data = self.storage.read(message.key).await;

debug!("local read, node is leader, emitting result");
let _ = on_completion.send(data.map_err(|e| e.into()));
} else {
tokio::spawn(remote_read(
cluster.leader_actor.clone(),
message.key,
on_completion,
self.system.clone(),
));
}
debug!("local read, node is leader, emitting result");
let _ = on_completion.send(data.map_err(|e| e.into()));
}

_ => {}
}
}
}
Expand Down
Loading

0 comments on commit b3e3e67

Please sign in to comment.