Skip to content

Commit

Permalink
cargo fix & general cleanup now that mutable actor refs are no longer…
Browse files Browse the repository at this point in the history
… required
  • Loading branch information
LeonHartley committed Jul 17, 2021
1 parent ea3491b commit 5ac4514
Show file tree
Hide file tree
Showing 34 changed files with 120 additions and 145 deletions.
2 changes: 1 addition & 1 deletion coerce/src/actor/children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl Children {

#[cfg(test)]
mod tests {
use super::*;

use crate::actor::system::ActorSystem;
use crate::actor::{Actor, IntoActor, LocalActorRef};
use std::any::Any;
Expand Down
2 changes: 1 addition & 1 deletion coerce/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,5 @@ impl ActorContext {
}
}

pub fn new_child<A: Actor>(id: &ActorId, actor: A) {}
pub fn new_child<A: Actor>(_id: &ActorId, _actor: A) {}
}
6 changes: 3 additions & 3 deletions coerce/src/actor/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ impl ActorLoop {
actor_type: ActorType,
mut receiver: tokio::sync::mpsc::UnboundedReceiver<MessageHandler<A>>,
mut on_start: Option<tokio::sync::oneshot::Sender<bool>>,
mut actor_ref: LocalActorRef<A>,
parent_ref: Option<BoxedActorRef>,
actor_ref: LocalActorRef<A>,
_parent_ref: Option<BoxedActorRef>,
mut system: Option<ActorSystem>,
) where
A: 'static + Sync + Send,
Expand Down Expand Up @@ -136,7 +136,7 @@ impl ActorLoop {
ctx.set_status(Stopped);

if actor_type.is_tracked() {
if let Some(mut system) = system.take() {
if let Some(system) = system.take() {
system
.scheduler()
.send(DeregisterActor(actor_id))
Expand Down
2 changes: 1 addition & 1 deletion coerce/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use log::error;

use crate::actor::scheduler::ActorType::Tracked;
use std::any::Any;
use tracing::Instrument;

use uuid::Uuid;

pub mod children;
Expand Down
1 change: 0 additions & 1 deletion coerce/src/actor/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::remote::actor::message::SetRemote;
use crate::remote::system::RemoteActorSystem;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;

pub mod timer;

Expand Down
2 changes: 1 addition & 1 deletion coerce/src/actor/scheduler/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Timer {
pub async fn timer_loop<A: Actor, T: TimerTick>(
tick: Duration,
msg: T,
mut actor: LocalActorRef<A>,
actor: LocalActorRef<A>,
mut stop_rx: tokio::sync::oneshot::Receiver<bool>,
) where
A: 'static + Handler<T> + Sync + Send,
Expand Down
3 changes: 1 addition & 2 deletions coerce/src/actor/system.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::actor::scheduler::{start_actor, ActorScheduler, ActorType, GetActor, RegisterActor};
use crate::actor::{new_actor_id, Actor, ActorId, ActorRefErr, LocalActorRef};
use crate::remote::system::RemoteActorSystem;
use std::sync::Arc;
use tracing::Instrument;

use uuid::Uuid;

lazy_static! {
Expand Down
2 changes: 1 addition & 1 deletion coerce/src/actor/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ where
{
async fn handle(&mut self, message: WorkerMessage<M>, _ctx: &mut ActorContext) {
if let Some(worker) = self.workers.pop_front() {
let mut worker_ref = worker.clone();
let worker_ref = worker.clone();

self.workers.push_back(worker);

Expand Down
19 changes: 9 additions & 10 deletions coerce/src/remote/actor/handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::message::Handler;
use crate::remote::actor::message::{
ClientWrite, GetActorNode, GetNodes, PopRequest, PushRequest, RegisterActor, RegisterClient,
RegisterNode, RegisterNodes, SetRemote,
Expand All @@ -22,14 +22,13 @@ use crate::actor::ActorId;
use crate::remote::stream::pubsub::{PubSub, StreamEvent};
use crate::remote::stream::system::{ClusterEvent, SystemEvent, SystemTopic};
use crate::remote::tracing::extract_trace_identifier;
use std::alloc::System;
use std::borrow::{Borrow, BorrowMut};

use uuid::Uuid;

#[async_trait]
impl Handler<SetRemote> for RemoteRegistry {
async fn handle(&mut self, message: SetRemote, ctx: &mut ActorContext) {
let mut sys = message.0;
let sys = message.0;
ctx.set_system(sys.actor_system().clone());
self.system = Some(sys);

Expand Down Expand Up @@ -80,8 +79,8 @@ where

#[async_trait]
impl Handler<RegisterNodes> for RemoteRegistry {
async fn handle(&mut self, message: RegisterNodes, ctx: &mut ActorContext) {
let mut remote_ctx = self.system.as_ref().unwrap().clone();
async fn handle(&mut self, message: RegisterNodes, _ctx: &mut ActorContext) {
let remote_ctx = self.system.as_ref().unwrap().clone();
let nodes = message.0;

let unregistered_nodes = nodes
Expand All @@ -104,7 +103,7 @@ impl Handler<RegisterNodes> for RemoteRegistry {
let sys = remote_ctx.clone();
let node_id = node.id;
tokio::spawn(async move {
let mut sys = sys;
let sys = sys;
PubSub::publish_locally(
SystemTopic,
SystemEvent::Cluster(ClusterEvent::NodeAdded(node_id)),
Expand Down Expand Up @@ -188,7 +187,7 @@ impl Handler<GetActorNode> for RemoteRegistry {
let _enter = span.enter();

let message_id = Uuid::new_v4();
let mut system = system;
let system = system;
let (res_tx, res_rx) = tokio::sync::oneshot::channel();

trace!(target: "RemoteRegistry::GetActorNode", "remote request={}", message_id);
Expand Down Expand Up @@ -276,10 +275,10 @@ impl Handler<StreamEvent<SystemTopic>> for RemoteRegistry {
SystemEvent::Cluster(_) => {
trace!(target: "RemoteRegistry", "cluster event");
let system = self.system.as_ref().unwrap().clone();
let mut registry_ref = ctx.actor_ref::<Self>();
let registry_ref = ctx.actor_ref::<Self>();

tokio::spawn(async move {
let mut sys = system;
let sys = system;
let actor_ids = sys
.actor_system()
.scheduler()
Expand Down
1 change: 0 additions & 1 deletion coerce/src/remote/actor/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::remote::net::message::SessionEvent;

use crate::actor::ActorId;

use std::sync::Arc;
use uuid::Uuid;

pub struct SetRemote(pub RemoteActorSystem);
Expand Down
8 changes: 3 additions & 5 deletions coerce/src/remote/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ use crate::remote::system::RemoteActorSystem;
use std::any::TypeId;
use std::collections::HashMap;

use crate::actor::context::ActorContext;
use crate::actor::scheduler::ActorType::{Anonymous, Tracked};
use crate::remote::stream::pubsub::{PubSub, Subscription};
use crate::remote::stream::system::{SystemEvent, SystemTopic};
use std::sync::Arc;
use crate::actor::scheduler::ActorType::Tracked;
use crate::remote::stream::pubsub::Subscription;

use uuid::Uuid;

pub mod handler;
Expand Down
10 changes: 3 additions & 7 deletions coerce/src/remote/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ use crate::remote::net::message::SessionEvent;
use crate::remote::net::proto::protocol::MessageRequest;
use crate::remote::system::RemoteActorSystem;
use crate::remote::tracing::extract_trace_identifier;
use opentelemetry::global;
use opentelemetry::trace::TraceContextExt;
use std::collections::HashMap;

use std::marker::PhantomData;
use std::sync::Arc;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

use uuid::Uuid;

pub struct RemoteActorRef<A: Actor>
Expand Down Expand Up @@ -52,7 +48,7 @@ where
let message_type = Msg::type_name();
let actor_type = A::type_name();
let span = tracing::trace_span!("RemoteActorRef::send", actor_type, message_type);
let enter = span.enter();
let _enter = span.enter();

let id = Uuid::new_v4();
let (res_tx, res_rx) = tokio::sync::oneshot::channel();
Expand Down
2 changes: 1 addition & 1 deletion coerce/src/remote/cluster/builder/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl ClusterWorkerBuilder {
node_tag = self.system.node_tag()
);

let enter = span.enter();
let _enter = span.enter();

self.system
.register_node(RemoteNode::new(
Expand Down
6 changes: 3 additions & 3 deletions coerce/src/remote/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where
remote_ctx: RemoteActorSystem,
res: tokio::sync::oneshot::Sender<Vec<u8>>,
) {
let mut system = self.system.clone();
let system = self.system.clone();
let actor_id = if !args.actor_id.is_empty() {
args.actor_id
} else {
Expand Down Expand Up @@ -199,10 +199,10 @@ where
buffer: &[u8],
res: tokio::sync::oneshot::Sender<Vec<u8>>,
) {
let mut system = self.system.clone();
let system = self.system.clone();
let actor = system.get_tracked_actor::<A>(actor_id.clone()).await;

if let Some(mut actor) = actor {
if let Some(actor) = actor {
let envelope = M::from_envelope(Envelope::Remote(buffer.to_vec()));
match envelope {
Ok(m) => {
Expand Down
2 changes: 1 addition & 1 deletion coerce/src/remote/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub enum ClientType {
impl RemoteClient {
pub async fn connect(
addr: String,
mut system: RemoteActorSystem,
system: RemoteActorSystem,
nodes: Option<Vec<crate::remote::cluster::node::RemoteNode>>,
client_type: ClientType,
) -> Result<RemoteClient, tokio::io::Error> {
Expand Down
14 changes: 7 additions & 7 deletions coerce/src/remote/net/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::remote::net::proto::protocol::{
RemoteNode as RemoteNodeProto, SessionHandshake, StreamPublish,
};
use crate::remote::stream::mediator::PublishRaw;
use opentelemetry::{global, Context};
use opentelemetry::global;
use protobuf::Message;
use std::collections::HashMap;
use std::str::FromStr;
Expand Down Expand Up @@ -65,7 +65,7 @@ impl RemoteServer {
pub async fn start(
&mut self,
addr: String,
mut system: RemoteActorSystem,
system: RemoteActorSystem,
) -> Result<(), tokio::io::Error> {
let listener = tokio::net::TcpListener::bind(addr).await?;
let (stop_tx, _stop_rx) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -209,10 +209,10 @@ impl StreamReceiver for SessionMessageReceiver {
}

async fn session_handshake(
mut ctx: RemoteActorSystem,
ctx: RemoteActorSystem,
handshake: SessionHandshake,
session_id: Uuid,
mut sessions: LocalActorRef<RemoteSessionStore>,
sessions: LocalActorRef<RemoteSessionStore>,
) {
let mut headers = HashMap::<String, String>::new();
headers.insert("traceparent".to_owned(), handshake.trace_id);
Expand Down Expand Up @@ -256,7 +256,7 @@ async fn session_handshake(
async fn session_handle_message(
msg: MessageRequest,
session_id: Uuid,
mut ctx: RemoteActorSystem,
ctx: RemoteActorSystem,
mut sessions: LocalActorRef<RemoteSessionStore>,
) {
let mut headers = HashMap::<String, String>::new();
Expand Down Expand Up @@ -291,7 +291,7 @@ async fn session_handle_lookup(
msg_id: Uuid,
id: ActorId,
session_id: Uuid,
mut ctx: RemoteActorSystem,
ctx: RemoteActorSystem,
mut sessions: LocalActorRef<RemoteSessionStore>,
) {
let node_id = ctx.locate_actor_node(id.clone()).await;
Expand All @@ -314,7 +314,7 @@ async fn session_handle_lookup(
async fn session_create_actor(
msg: CreateActor,
session_id: Uuid,
mut ctx: RemoteActorSystem,
ctx: RemoteActorSystem,
sessions: LocalActorRef<RemoteSessionStore>,
) {
let msg_id = msg.message_id.clone();
Expand Down
14 changes: 4 additions & 10 deletions coerce/src/remote/stream/mediator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::{Actor, BoxedActorRef, LocalActorRef, ActorRefErr};
use crate::actor::{Actor, LocalActorRef};
use crate::remote::actor::message::SetRemote;
use crate::remote::net::message::SessionEvent;
use crate::remote::net::proto::protocol::StreamPublish;
Expand All @@ -9,10 +9,8 @@ use crate::remote::stream::pubsub::{
StreamEvent, Subscription, Topic, TopicEmitter, TopicSubscriberStore,
};
use crate::remote::system::RemoteActorSystem;
use futures::Stream;
use std::any::Any;

use std::collections::HashMap;
use std::marker::PhantomData;

pub struct MediatorTopic(Box<dyn TopicEmitter>);

Expand Down Expand Up @@ -100,11 +98,7 @@ impl StreamMediator {

#[async_trait]
impl Handler<SetRemote> for StreamMediator {
async fn handle(
&mut self,
message: SetRemote,
_ctx: &mut ActorContext,
) {
async fn handle(&mut self, message: SetRemote, _ctx: &mut ActorContext) {
self.remote = Some(message.0);
}
}
Expand All @@ -114,7 +108,7 @@ impl<T: Topic> Handler<Publish<T>> for StreamMediator {
async fn handle(
&mut self,
message: Publish<T>,
ctx: &mut ActorContext,
_ctx: &mut ActorContext,
) -> Result<(), PublishErr> {
match message.message.write_to_bytes() {
Some(bytes) => {
Expand Down
18 changes: 7 additions & 11 deletions coerce/src/remote/stream/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::system::ActorSystem;
use crate::actor::{Actor, ActorId, BoxedActorRef, LocalActorRef};
use crate::actor::{Actor, LocalActorRef};
use crate::remote::net::StreamMessage;
use crate::remote::stream::mediator::{Publish, PublishRaw, Subscribe, SubscribeErr};
use futures::future::{BoxFuture, LocalBoxFuture};
use futures::task::{Context, Poll};
use futures::{Future, FutureExt, Stream};

use std::any::Any;
use std::collections::hash_map::RandomState;

use std::collections::HashMap;
use std::sync::Arc;
use tokio::macros::support::Pin;
use tokio::sync::broadcast::Sender;
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::{broadcast, oneshot};

use tokio::sync::broadcast;
use tokio::task::JoinHandle;

pub struct PubSub;
Expand Down Expand Up @@ -150,7 +146,7 @@ impl Subscription {
A: Handler<StreamEvent<T>>,
{
let task_handle = Some(tokio::spawn(async move {
let mut receiver_ref = receiver_ref;
let receiver_ref = receiver_ref;
let mut stream_receiver = topic_receiver;
while let Ok(message) = stream_receiver.recv().await {
receiver_ref
Expand Down Expand Up @@ -180,7 +176,7 @@ impl PubSub {
let _enter = span.enter();

let system = ctx.system().remote();
if let Some(mut mediator) = system.stream_mediator() {
if let Some(mediator) = system.stream_mediator() {
mediator
.send(Subscribe::<A, T>::new(topic, ctx.actor_ref()))
.await
Expand Down
Loading

0 comments on commit 5ac4514

Please sign in to comment.