Skip to content

Commit

Permalink
optimisation: coerce remote message IDs are now incremental uint64s r…
Browse files Browse the repository at this point in the history
…ather than UUIDs
  • Loading branch information
LeonHartley committed Mar 24, 2024
1 parent f3e5de0 commit ad0dcc1
Show file tree
Hide file tree
Showing 22 changed files with 255 additions and 253 deletions.
16 changes: 8 additions & 8 deletions coerce/src/protocol/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,23 @@ message ClientHandshake {
}

message ClientResult {
string message_id = 1;
uint64 message_id = 1;

bytes result = 2;

string trace_id = 3;
}

message ClientErr {
string message_id = 1;
uint64 message_id = 1;

ActorRefErr error = 2;

string trace_id = 3;
}

message PingEvent {
string message_id = 1;
uint64 message_id = 1;

string trace_id = 2;

Expand All @@ -111,13 +111,13 @@ message PingEvent {
}

message PongEvent {
string message_id = 1;
uint64 message_id = 1;

string trace_id = 2;
}

message CreateActorEvent {
string message_id = 1;
uint64 message_id = 1;

string actor_id = 2;

Expand All @@ -129,7 +129,7 @@ message CreateActorEvent {
}

message FindActorEvent {
string message_id = 1;
uint64 message_id = 1;

string actor_id = 2;

Expand All @@ -145,7 +145,7 @@ message ActorAddress {
}

message MessageRequest {
string message_id = 1;
uint64 message_id = 1;

string handler_type = 2;

Expand Down Expand Up @@ -218,7 +218,7 @@ message MemberUpEvent {
}

message RaftRequest {
string message_id = 1;
uint64 message_id = 1;

uint32 request_type = 2;

Expand Down
4 changes: 2 additions & 2 deletions coerce/src/protocol/sharding.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ message StopShard {

uint64 origin_node_id = 2;

string request_id = 3;
uint64 request_id = 3;
}

message ShardStopped {
Expand Down Expand Up @@ -63,7 +63,7 @@ message RemoteEntityRequest {
bytes recipe = 1;
}

string request_id = 1;
uint64 request_id = 1;

string actor_id = 2;

Expand Down
6 changes: 3 additions & 3 deletions coerce/src/remote/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ pub(crate) type BoxedActorHandler = Box<dyn ActorHandler + Send + Sync>;
pub(crate) type BoxedMessageHandler = Box<dyn ActorMessageHandler + Send + Sync>;

pub struct RemoteHandler {
requests: HashMap<Uuid, RemoteRequest>,
requests: HashMap<u64, RemoteRequest>,
}

impl RemoteHandler {
pub fn push_request(&mut self, message_id: Uuid, request: RemoteRequest) {
pub fn push_request(&mut self, message_id: u64, request: RemoteRequest) {
self.requests.insert(message_id, request);
}

pub fn pop_request(&mut self, message_id: Uuid) -> Option<RemoteRequest> {
pub fn pop_request(&mut self, message_id: u64) -> Option<RemoteRequest> {
self.requests.remove(&message_id)
}

Expand Down
4 changes: 2 additions & 2 deletions coerce/src/remote/actor/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ impl Handler<GetActorNode> for RemoteRegistry {
// let span = tracing::trace_span!("RemoteRegistry::GetActorNode::Remote");
// let _enter = span.enter();

let message_id = Uuid::new_v4();
let system = system;
let message_id = system.next_msg_id();
let (res_tx, res_rx) = tokio::sync::oneshot::channel();

trace!("remote request={}", message_id);
Expand All @@ -157,7 +157,7 @@ impl Handler<GetActorNode> for RemoteRegistry {
.notify_node(
assigned_registry_node,
SessionEvent::FindActor(FindActorEvent {
message_id: message_id.to_string(),
message_id,
actor_id: id.to_string(),
trace_id,
..Default::default()
Expand Down
10 changes: 4 additions & 6 deletions coerce/src/remote/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ where
// let span = tracing::trace_span!("RemoteActorRef::notify", actor_type, message_type);
// let _enter = span.enter();

let id = Uuid::new_v4();

let id = self.system.next_msg_id();
let request = self.create_request(msg, String::new(), id, false)?;
self.system.notify_node(self.node_id, request).await;

Expand All @@ -73,8 +72,7 @@ where
Msg: 'static + Send + Sync,
<Msg as Message>::Result: 'static + Send + Sync,
{
let id = Uuid::new_v4();

let id = self.system.next_msg_id();
let (res_tx, res_rx) = oneshot::channel();
self.system.push_request(id, res_tx);

Expand Down Expand Up @@ -102,7 +100,7 @@ where
&self,
msg: Envelope<Msg>,
trace_id: String,
id: Uuid,
message_id: u64,
requires_response: bool,
) -> Result<SessionEvent, ActorRefErr>
where
Expand All @@ -118,7 +116,7 @@ where
let actor_id = header.actor_id.to_string();
let origin_node_id = self.system.node_id();
SessionEvent::NotifyActor(MessageRequest {
message_id: id.to_string(),
message_id,
handler_type,
actor_id,
trace_id,
Expand Down
2 changes: 1 addition & 1 deletion coerce/src/remote/net/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Actor for RemoteClient {
);

let ping_event = SessionEvent::Ping(PingEvent {
message_id: Uuid::new_v4().to_string(),
message_id: 0,
node_id: ctx.system().remote().node_id(),
system_terminated: true,
..PingEvent::default()
Expand Down
5 changes: 3 additions & 2 deletions coerce/src/remote/net/client/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ impl Handler<PingTick> for RemoteClient {
};

let (res_tx, res_rx) = oneshot::channel();
let message_id = Uuid::new_v4();

let message_id = remote.next_msg_id();
remote.push_request(message_id, res_tx);

let ping_event = SessionEvent::Ping(PingEvent {
message_id: message_id.to_string(),
message_id,
node_id: remote.node_id(),
..PingEvent::default()
});
Expand Down
12 changes: 6 additions & 6 deletions coerce/src/remote/net/client/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl StreamReceiver for ClientMessageReceiver {
}
}
ClientEvent::Result(res) => {
match sys.pop_request(Uuid::from_str(&res.message_id).unwrap()) {
match sys.pop_request(res.message_id) {
Some(res_tx) => {
let _ = res_tx.send(RemoteResponse::Ok(res.result));
}
Expand All @@ -122,20 +122,20 @@ impl StreamReceiver for ClientMessageReceiver {
}
}
ClientEvent::Err(e) => {
info!("received client error!");
match sys.pop_request(Uuid::from_str(&e.message_id).unwrap()) {
debug!("received client error!");
match sys.pop_request(e.message_id) {
Some(res_tx) => {
let _ = res_tx.send(RemoteResponse::Err(e.error.unwrap().into()));
}
None => {
// :P
warn!("received unsolicited pong");
warn!("received unsolicited client error");
}
}
}
ClientEvent::Ping(_ping) => {}
ClientEvent::Pong(pong) => {
match sys.pop_request(Uuid::from_str(&pong.message_id).unwrap()) {
match sys.pop_request(pong.message_id) {
Some(res_tx) => {
let _ = res_tx.send(RemoteResponse::Ok(
PongEvent {
Expand All @@ -147,7 +147,7 @@ impl StreamReceiver for ClientMessageReceiver {
));
}
None => {
// :P
// :P
warn!("received unsolicited pong");
}
}
Expand Down
Loading

0 comments on commit ad0dcc1

Please sign in to comment.