Skip to content

Commit

Permalink
sharding improvements, continued work on remote client overhaul and b…
Browse files Browse the repository at this point in the history
…ug fixes
  • Loading branch information
LeonHartley committed Feb 17, 2022
1 parent 5d336c8 commit 6395ea3
Show file tree
Hide file tree
Showing 15 changed files with 128 additions and 142 deletions.
11 changes: 10 additions & 1 deletion coerce/src/actor/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl Actor for ActorScheduler {
async fn stopped(&mut self, _ctx: &mut ActorContext) {
trace!("scheduler stopping, total actors={}", self.actors.len());

// TODO: join all these stop() calls into a single future
for actor in self.actors.values() {
actor.stop().await;
trace!(target: "ActorScheduler", "stopping actor (id={})", &actor.actor_id());
Expand Down Expand Up @@ -197,10 +198,18 @@ where
let actor_id_clone = id.clone();
let actor_id = actor_id_clone.as_str();
let actor_type_name = A::type_name();

let node_id = if let Some(system) = &system {
if system.is_remote() { system.remote().node_id() } else { 0 }
} else {
0
};

tracing::trace_span!(
"ActorScheduler::start_actor",
actor_id = actor_id,
actor_type_name = actor_type_name
actor_type_name = actor_type_name,
node_id = node_id,
);

let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
Expand Down
6 changes: 5 additions & 1 deletion coerce/src/remote/actor/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ impl Handler<RegisterNodes> for RemoteRegistry {
.collect::<Vec<RemoteNode>>();

trace!(target: "RemoteRegistry", "registering new nodes {:?}", &unregistered_nodes);
let current_nodes = self.nodes.get_all();

let current_nodes = self.nodes.get_all();
// tokio::spawn(async move {
if unregistered_nodes.len() > 0 {
connect_all(unregistered_nodes, current_nodes, remote.clone()).await;
}
Expand All @@ -112,7 +113,10 @@ impl Handler<RegisterNodes> for RemoteRegistry {
)
.await;
});

self.nodes.add(node);
}
// });
}
}

Expand Down
8 changes: 7 additions & 1 deletion coerce/src/remote/actor/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::remote::cluster::node::{RemoteNode, RemoteNodeState};
use crate::remote::system::{NodeId, RemoteActorSystem};

use crate::actor::message::Message;
use crate::remote::net::client::RemoteClient;
use crate::remote::net::client::{ClientType, RemoteClient};
use crate::remote::net::message::SessionEvent;

use crate::actor::{ActorId, LocalActorRef};
Expand Down Expand Up @@ -34,6 +34,12 @@ impl Message for PopRequest {
type Result = Option<RemoteRequest>;
}

pub struct Connect {
addr: String,
remote_node_id: Option<NodeId>,
client_type: ClientType,
}

pub struct RegisterClient(pub NodeId, pub LocalActorRef<RemoteClient>);

impl Message for RegisterClient {
Expand Down
2 changes: 1 addition & 1 deletion coerce/src/remote/cluster/builder/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl ClusterWorkerBuilder {
drop(enter);
drop(span);

tokio::time::sleep(Duration::from_millis(500)).await;
// tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
21 changes: 12 additions & 9 deletions coerce/src/remote/cluster/sharding/coordinator/allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,19 @@ async fn broadcast_allocation(
let mut futures = vec![];

for host in hosts.into_iter() {
// TODO: apply timeout
futures.push(async move {
let host = host;
if host.node_id() == Some(node_id) {
host.send(ShardAllocated(shard_id, node_id)).await;
} else {
futures.push(async move {
let host = host;

trace!(
"emitting ShardAllocated to node_id={}",
host.node_id().unwrap_or(0)
);
host.send(ShardAllocated(shard_id, node_id)).await
});
trace!(
"emitting ShardAllocated to node_id={}",
host.node_id().unwrap_or(0)
);
host.send(ShardAllocated(shard_id, node_id)).await
});
}
}

let _results = join_all(futures).await;
Expand Down
5 changes: 4 additions & 1 deletion coerce/src/remote/cluster/sharding/coordinator/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::actor::context::{ActorContext, ActorStatus};
use crate::actor::message::Handler;
use crate::actor::{Actor, ActorRefErr, IntoActor, LocalActorRef};
use crate::remote::cluster::sharding::coordinator::ShardCoordinator;
use crate::remote::cluster::sharding::host::ShardHost;
use crate::remote::cluster::sharding::host::{LeaderAllocated, ShardHost};
use crate::remote::stream::pubsub::{PubSub, StreamEvent, Subscription};
use crate::remote::stream::system::{ClusterEvent, SystemEvent, SystemTopic};
use crate::remote::system::NodeId;
Expand Down Expand Up @@ -121,6 +121,7 @@ impl Handler<StreamEvent<SystemTopic>> for CoordinatorSpawner {
self.node_id,
leader_node_id,
);

if leader_node_id == self.node_id && self.coordinator.is_none() {
self.start_coordinator(ctx).await;
} else if self.stop_coordinator().await {
Expand All @@ -130,6 +131,8 @@ impl Handler<StreamEvent<SystemTopic>> for CoordinatorSpawner {
self.node_id
);
}

self.local_shard_host.notify(LeaderAllocated);
}
},
}
Expand Down
30 changes: 27 additions & 3 deletions coerce/src/remote/cluster/sharding/host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,21 @@ pub struct ShardHost {
shard_entity: String,
hosted_shards: HashMap<ShardId, ShardState>,
remote_shards: HashMap<ShardId, ActorRef<Shard>>,
buffered_requests: HashMap<ShardId, Vec<EntityRequest>>,
requests_pending_leader_allocation: VecDeque<EntityRequest>,
requests_pending_shard_allocation: HashMap<ShardId, Vec<EntityRequest>>,
allocator: Box<dyn ShardAllocator + Send + Sync>,
}

pub trait ShardAllocator {
fn allocate(&mut self, actor_id: &ActorId) -> ShardId;
}

pub struct LeaderAllocated;

impl Message for LeaderAllocated {
type Result = ();
}

impl Actor for ShardHost {}

impl ShardHost {
Expand All @@ -49,7 +56,8 @@ impl ShardHost {
shard_entity,
hosted_shards: Default::default(),
remote_shards: Default::default(),
buffered_requests: Default::default(),
requests_pending_shard_allocation: Default::default(),
requests_pending_leader_allocation: Default::default(),
allocator: allocator.map_or_else(
|| Box::new(DefaultAllocator::default()) as Box<dyn ShardAllocator + Send + Sync>,
|s| s,
Expand Down Expand Up @@ -110,6 +118,22 @@ impl Handler<GetCoordinator> for ShardHost {
}
}

#[async_trait]
impl Handler<LeaderAllocated> for ShardHost {
async fn handle(&mut self, message: LeaderAllocated, ctx: &mut ActorContext) {
if self.requests_pending_leader_allocation.len() > 0 {
debug!(
"processing {} buffered requests_pending_leader_allocation",
self.requests_pending_leader_allocation.len(),
);

while let Some(request) = self.requests_pending_leader_allocation.pop_front() {
self.handle(request, ctx).await;
}
}
}
}

#[async_trait]
impl Handler<ShardAllocated> for ShardHost {
async fn handle(&mut self, message: ShardAllocated, ctx: &mut ActorContext) {
Expand Down Expand Up @@ -146,7 +170,7 @@ impl Handler<ShardAllocated> for ShardHost {
self.remote_shards.insert(shard_id, shard_actor);
}

if let Some(buffered_requests) = self.buffered_requests.remove(&shard_id) {
if let Some(buffered_requests) = self.requests_pending_shard_allocation.remove(&shard_id) {
debug!(
"processing {} buffered requests for shard={}",
buffered_requests.len(),
Expand Down
10 changes: 9 additions & 1 deletion coerce/src/remote/cluster/sharding/host/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl Handler<EntityRequest> for ShardHost {
});
} else if let Some(shard) = self.remote_shards.get(&shard_id) {
let shard_ref = shard.clone();

tokio::spawn(remote_entity_request(
shard_ref,
message,
Expand All @@ -72,7 +73,7 @@ impl Handler<EntityRequest> for ShardHost {
} else if ctx.system().remote().current_leader().is_some() {
let leader = self.get_coordinator(&ctx).await;

let buffered_requests = self.buffered_requests.entry(shard_id);
let buffered_requests = self.requests_pending_shard_allocation.entry(shard_id);
let mut buffered_requests = buffered_requests.or_insert_with(|| vec![]);
buffered_requests.push(message);

Expand All @@ -85,6 +86,13 @@ impl Handler<EntityRequest> for ShardHost {
host_ref.notify(ShardAllocated(shard_id, node_id));
}
});
} else {
self.requests_pending_leader_allocation.push_back(message);

debug!(
"no leader allocated, buffering message (requests_pending_leader_allocation={})",
self.requests_pending_leader_allocation.len()
);
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions coerce/src/remote/net/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ impl Handler<Connect> for RemoteClient {
if let Some(connection_state) = self.connect(message, ctx).await {
let remote = ctx.system().remote();
let node_id = connection_state.node_id;
remote
.register_node(RemoteNode::new(
connection_state.node_id,
self.addr.clone(),
connection_state.node_tag.clone(),
Some(connection_state.node_started_at),
))
.await;

remote.notify_register_node(RemoteNode::new(
connection_state.node_id,
self.addr.clone(),
connection_state.node_tag.clone(),
Some(connection_state.node_started_at),
));

remote.register_client(node_id, self.actor_ref(ctx)).await;
self.state = Some(ClientState::Connected(connection_state));
Expand All @@ -61,6 +60,7 @@ impl Handler<Connect> for RemoteClient {

true
} else {
warn!("RemoteClient failed to connect");
self.handle(Disconnected, ctx).await;
false
}
Expand Down
4 changes: 4 additions & 0 deletions coerce/src/remote/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,10 @@ impl RemoteActorSystem {
.unwrap()
}

pub async fn notify_register_node(&self, node: RemoteNode) {
self.inner.registry_ref.notify(RegisterNode(node));
}

pub async fn get_nodes(&self) -> Vec<RemoteNodeState> {
self.inner.registry_ref.send(GetNodes).await.unwrap()
}
Expand Down
2 changes: 2 additions & 0 deletions coerce/tests/test_remote_cluster_formation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ pub async fn test_remote_cluster_workers() {
.start()
.await;

tokio::time::sleep(Duration::from_millis(500)).await;

let nodes_a = remote_c.get_nodes().await;
let nodes_b = remote_2_c.get_nodes().await;
let nodes_c = remote_3_c.get_nodes().await;
Expand Down
100 changes: 0 additions & 100 deletions coerce/tests/test_remote_cluster_nodes.rs

This file was deleted.

Loading

0 comments on commit 6395ea3

Please sign in to comment.