Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iroh-docs): Replace flume with async_channel in docs #2540

Merged
merged 4 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-docs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ workspace = true

[dependencies]
anyhow = "1"
async-channel = "2.3.1"
blake3 = { package = "iroh-blake3", version = "1.4.5"}
bytes = { version = "1.4", features = ["serde"] }
derive_more = { version = "1.0.0-beta.6", features = ["debug", "deref", "display", "from", "try_into", "into", "as_ref"] }
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.11"
futures-buffered = "0.2.4"
futures-lite = "2.3.0"
futures-util = { version = "0.3.25" }
Expand Down
54 changes: 27 additions & 27 deletions iroh-docs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ enum Action {
#[display("ListAuthors")]
ListAuthors {
#[debug("reply")]
reply: flume::Sender<Result<AuthorId>>,
reply: async_channel::Sender<Result<AuthorId>>,
},
#[display("ListReplicas")]
ListReplicas {
#[debug("reply")]
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
},
#[display("ContentHashes")]
ContentHashes {
Expand Down Expand Up @@ -108,12 +108,12 @@ enum ReplicaAction {
reply: oneshot::Sender<Result<()>>,
},
Subscribe {
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
#[debug("reply")]
reply: oneshot::Sender<Result<()>>,
},
Unsubscribe {
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
#[debug("reply")]
reply: oneshot::Sender<Result<()>>,
},
Expand Down Expand Up @@ -166,7 +166,7 @@ enum ReplicaAction {
},
GetMany {
query: Query,
reply: flume::Sender<Result<SignedEntry>>,
reply: async_channel::Sender<Result<SignedEntry>>,
},
DropReplica {
reply: oneshot::Sender<Result<()>>,
Expand Down Expand Up @@ -222,7 +222,7 @@ struct OpenReplica {
/// [`SyncHandle::drop`] will not block.
#[derive(Debug, Clone)]
pub struct SyncHandle {
tx: flume::Sender<Action>,
tx: async_channel::Sender<Action>,
join_handle: Arc<Option<JoinHandle<()>>>,
}

Expand All @@ -232,7 +232,7 @@ pub struct OpenOpts {
/// Set to true to set sync state to true.
pub sync: bool,
/// Optionally subscribe to replica events.
pub subscribe: Option<flume::Sender<Event>>,
pub subscribe: Option<async_channel::Sender<Event>>,
}
impl OpenOpts {
/// Set sync state to true.
Expand All @@ -241,7 +241,7 @@ impl OpenOpts {
self
}
/// Subscribe to replica events.
pub fn subscribe(mut self, subscribe: flume::Sender<Event>) -> Self {
pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
self.subscribe = Some(subscribe);
self
}
Expand All @@ -255,7 +255,7 @@ impl SyncHandle {
content_status_callback: Option<ContentStatusCallback>,
me: String,
) -> SyncHandle {
let (action_tx, action_rx) = flume::bounded(ACTION_CAP);
let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
let actor = Actor {
store,
states: Default::default(),
Expand Down Expand Up @@ -298,7 +298,7 @@ impl SyncHandle {
pub async fn subscribe(
&self,
namespace: NamespaceId,
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
) -> Result<()> {
let (reply, rx) = oneshot::channel();
self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
Expand All @@ -309,7 +309,7 @@ impl SyncHandle {
pub async fn unsubscribe(
&self,
namespace: NamespaceId,
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
) -> Result<()> {
let (reply, rx) = oneshot::channel();
self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
Expand Down Expand Up @@ -435,7 +435,7 @@ impl SyncHandle {
&self,
namespace: NamespaceId,
query: Query,
reply: flume::Sender<Result<SignedEntry>>,
reply: async_channel::Sender<Result<SignedEntry>>,
) -> Result<()> {
let action = ReplicaAction::GetMany { query, reply };
self.send_replica(namespace, action).await?;
Expand Down Expand Up @@ -489,13 +489,13 @@ impl SyncHandle {
Ok(store)
}

pub async fn list_authors(&self, reply: flume::Sender<Result<AuthorId>>) -> Result<()> {
pub async fn list_authors(&self, reply: async_channel::Sender<Result<AuthorId>>) -> Result<()> {
self.send(Action::ListAuthors { reply }).await
}

pub async fn list_replicas(
&self,
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
) -> Result<()> {
self.send(Action::ListReplicas { reply }).await
}
Expand Down Expand Up @@ -566,7 +566,7 @@ impl SyncHandle {

async fn send(&self, action: Action) -> Result<()> {
self.tx
.send_async(action)
.send(action)
.await
.context("sending to iroh_docs actor failed")?;
Ok(())
Expand All @@ -581,7 +581,10 @@ impl Drop for SyncHandle {
fn drop(&mut self) {
// this means we're dropping the last reference
if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
self.tx.send(Action::Shutdown { reply: None }).ok();
// this call is the reason tx can not be a tokio mpsc channel.
// we have no control about where drop is called, yet tokio send_blocking panics
// when called from inside a tokio runtime.
self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
let handle = handle.take().expect("this can only run once");
if let Err(err) = handle.join() {
warn!(?err, "Failed to join sync actor");
Expand All @@ -593,7 +596,7 @@ impl Drop for SyncHandle {
struct Actor {
store: Store,
states: OpenReplicas,
action_rx: flume::Receiver<Action>,
action_rx: async_channel::Receiver<Action>,
content_status_callback: Option<ContentStatusCallback>,
tasks: JoinSet<()>,
}
Expand All @@ -619,10 +622,10 @@ impl Actor {
}
continue;
}
action = self.action_rx.recv_async() => {
action = self.action_rx.recv() => {
match action {
Ok(action) => action,
Err(flume::RecvError::Disconnected) => {
Err(async_channel::RecvError) => {
debug!("action channel disconnected");
break None;
}
Expand Down Expand Up @@ -979,17 +982,14 @@ impl OpenReplicas {
}

async fn iter_to_channel_async<T: Send + 'static>(
channel: flume::Sender<Result<T>>,
channel: async_channel::Sender<Result<T>>,
iter: Result<impl Iterator<Item = Result<T>>>,
) -> Result<(), SendReplyError> {
match iter {
Err(err) => channel
.send_async(Err(err))
.await
.map_err(send_reply_error)?,
Err(err) => channel.send(Err(err)).await.map_err(send_reply_error)?,
Ok(iter) => {
for item in iter {
channel.send_async(item).await.map_err(send_reply_error)?;
channel.send(item).await.map_err(send_reply_error)?;
}
}
}
Expand Down Expand Up @@ -1032,10 +1032,10 @@ mod tests {
let id = namespace.id();
sync.import_namespace(namespace.into()).await?;
sync.open(id, Default::default()).await?;
let (tx, rx) = flume::bounded(10);
let (tx, rx) = async_channel::bounded(10);
sync.subscribe(id, tx).await?;
sync.close(id).await?;
assert!(rx.recv_async().await.is_err());
assert!(rx.recv().await.is_err());
Ok(())
}
}
10 changes: 5 additions & 5 deletions iroh-docs/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ impl Engine {

// Subscribe to insert events from the replica.
let a = {
let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP);
let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
this.sync.subscribe(namespace, s).await?;
r.into_stream()
.map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
Box::pin(r).map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
};

// Subscribe to events from the [`live::Actor`].
let b = {
let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP);
let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
let r = Box::pin(r);
let (reply, reply_rx) = oneshot::channel();
this.to_live_actor
.send(ToLiveActor::Subscribe {
Expand All @@ -188,7 +188,7 @@ impl Engine {
})
.await?;
reply_rx.await??;
r.into_stream().map(|event| Ok(LiveEvent::from(event)))
r.map(|event| Ok(LiveEvent::from(event)))
};

Ok(a.or(b))
Expand Down
22 changes: 11 additions & 11 deletions iroh-docs/src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub enum ToLiveActor {
Subscribe {
namespace: NamespaceId,
#[debug("sender")]
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
#[debug("oneshot::Sender")]
reply: sync::oneshot::Sender<Result<()>>,
},
Expand Down Expand Up @@ -153,8 +153,8 @@ pub struct LiveActor<B: iroh_blobs::store::Store> {
gossip: Gossip,
bao_store: B,
downloader: Downloader,
replica_events_tx: flume::Sender<crate::Event>,
replica_events_rx: flume::Receiver<crate::Event>,
replica_events_tx: async_channel::Sender<crate::Event>,
replica_events_rx: async_channel::Receiver<crate::Event>,

/// Send messages to self.
/// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
Expand Down Expand Up @@ -192,7 +192,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
sync_actor_tx: mpsc::Sender<ToLiveActor>,
gossip_actor_tx: mpsc::Sender<ToGossipActor>,
) -> Self {
let (replica_events_tx, replica_events_rx) = flume::bounded(1024);
let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
Self {
inbox,
sync,
Expand Down Expand Up @@ -262,7 +262,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}
}
}
event = self.replica_events_rx.recv_async() => {
event = self.replica_events_rx.recv() => {
trace!(?i, "tick: replica_event");
inc!(Metrics, doc_live_tick_replica_event);
let event = event.context("replica_events closed")?;
Expand Down Expand Up @@ -865,7 +865,7 @@ impl From<&SyncFinished> for SyncDetails {
struct SubscribersMap(HashMap<NamespaceId, Subscribers>);

impl SubscribersMap {
fn subscribe(&mut self, namespace: NamespaceId, sender: flume::Sender<Event>) {
fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
self.0.entry(namespace).or_default().subscribe(sender);
}

Expand Down Expand Up @@ -930,15 +930,15 @@ impl QueuedHashes {
}

#[derive(Debug, Default)]
struct Subscribers(Vec<flume::Sender<Event>>);
struct Subscribers(Vec<async_channel::Sender<Event>>);

impl Subscribers {
fn subscribe(&mut self, sender: flume::Sender<Event>) {
fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
self.0.push(sender)
}

async fn send(&mut self, event: Event) -> bool {
let futs = self.0.iter().map(|sender| sender.send_async(event.clone()));
let futs = self.0.iter().map(|sender| sender.send(event.clone()));
let res = futures_buffered::join_all(futs).await;
// reverse the order so removing does not shift remaining indices
for (i, res) in res.into_iter().enumerate().rev() {
Expand Down Expand Up @@ -977,8 +977,8 @@ mod tests {
#[tokio::test]
async fn test_sync_remove() {
let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
let (a_tx, a_rx) = flume::unbounded();
let (b_tx, b_rx) = flume::unbounded();
let (a_tx, a_rx) = async_channel::unbounded();
let (b_tx, b_rx) = async_channel::unbounded();
let mut subscribers = Subscribers::default();
subscribers.subscribe(a_tx);
subscribers.subscribe(b_tx);
Expand Down
Loading
Loading