Skip to content

Commit

Permalink
refactor(iroh-docs): Replace flume with async_channel in docs (#2540)
Browse files Browse the repository at this point in the history
## Description

This is mostly a 1:1 replacement, except for the fact that the
same_channel api is missing from async_channel. So I replaced it with
some ugly code that uses the fact that a async_channel Sender or
Receiver is just an Arc<Channel>.

To be removed if/when smol-rs/async-channel#98
is merged, but until then I think it is fine.

## Breaking Changes

None

## Notes & open questions

Note: we can not use tokio::sync::mpsc::Channel for the actor because we
can't control from which thread Drop is called.
Note: some streams were Unpin before, but it was not explicit. Now I
added Unpin explicitly (and boxed the stream to make it true). Not sure
if the version check would catch this, pretty sure that not. But taking
away Unpin would have been a breaking change.

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [ ] Tests if relevant.
- [ ] All breaking changes documented.
  • Loading branch information
rklaehn authored Jul 29, 2024
1 parent 956c51a commit 335b6b1
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ workspace = true

[dependencies]
anyhow = "1"
async-channel = "2.3.1"
blake3 = { package = "iroh-blake3", version = "1.4.5"}
bytes = { version = "1.4", features = ["serde"] }
derive_more = { version = "1.0.0-beta.6", features = ["debug", "deref", "display", "from", "try_into", "into", "as_ref"] }
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.11"
futures-buffered = "0.2.4"
futures-lite = "2.3.0"
futures-util = { version = "0.3.25" }
Expand Down
54 changes: 27 additions & 27 deletions src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ enum Action {
#[display("ListAuthors")]
ListAuthors {
#[debug("reply")]
reply: flume::Sender<Result<AuthorId>>,
reply: async_channel::Sender<Result<AuthorId>>,
},
#[display("ListReplicas")]
ListReplicas {
#[debug("reply")]
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
},
#[display("ContentHashes")]
ContentHashes {
Expand Down Expand Up @@ -108,12 +108,12 @@ enum ReplicaAction {
reply: oneshot::Sender<Result<()>>,
},
Subscribe {
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
#[debug("reply")]
reply: oneshot::Sender<Result<()>>,
},
Unsubscribe {
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
#[debug("reply")]
reply: oneshot::Sender<Result<()>>,
},
Expand Down Expand Up @@ -166,7 +166,7 @@ enum ReplicaAction {
},
GetMany {
query: Query,
reply: flume::Sender<Result<SignedEntry>>,
reply: async_channel::Sender<Result<SignedEntry>>,
},
DropReplica {
reply: oneshot::Sender<Result<()>>,
Expand Down Expand Up @@ -222,7 +222,7 @@ struct OpenReplica {
/// [`SyncHandle::drop`] will not block.
#[derive(Debug, Clone)]
pub struct SyncHandle {
tx: flume::Sender<Action>,
tx: async_channel::Sender<Action>,
join_handle: Arc<Option<JoinHandle<()>>>,
}

Expand All @@ -232,7 +232,7 @@ pub struct OpenOpts {
/// Set to true to set sync state to true.
pub sync: bool,
/// Optionally subscribe to replica events.
pub subscribe: Option<flume::Sender<Event>>,
pub subscribe: Option<async_channel::Sender<Event>>,
}
impl OpenOpts {
/// Set sync state to true.
Expand All @@ -241,7 +241,7 @@ impl OpenOpts {
self
}
/// Subscribe to replica events.
pub fn subscribe(mut self, subscribe: flume::Sender<Event>) -> Self {
pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
self.subscribe = Some(subscribe);
self
}
Expand All @@ -255,7 +255,7 @@ impl SyncHandle {
content_status_callback: Option<ContentStatusCallback>,
me: String,
) -> SyncHandle {
let (action_tx, action_rx) = flume::bounded(ACTION_CAP);
let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
let actor = Actor {
store,
states: Default::default(),
Expand Down Expand Up @@ -298,7 +298,7 @@ impl SyncHandle {
pub async fn subscribe(
&self,
namespace: NamespaceId,
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
) -> Result<()> {
let (reply, rx) = oneshot::channel();
self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
Expand All @@ -309,7 +309,7 @@ impl SyncHandle {
pub async fn unsubscribe(
&self,
namespace: NamespaceId,
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
) -> Result<()> {
let (reply, rx) = oneshot::channel();
self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
Expand Down Expand Up @@ -435,7 +435,7 @@ impl SyncHandle {
&self,
namespace: NamespaceId,
query: Query,
reply: flume::Sender<Result<SignedEntry>>,
reply: async_channel::Sender<Result<SignedEntry>>,
) -> Result<()> {
let action = ReplicaAction::GetMany { query, reply };
self.send_replica(namespace, action).await?;
Expand Down Expand Up @@ -489,13 +489,13 @@ impl SyncHandle {
Ok(store)
}

pub async fn list_authors(&self, reply: flume::Sender<Result<AuthorId>>) -> Result<()> {
pub async fn list_authors(&self, reply: async_channel::Sender<Result<AuthorId>>) -> Result<()> {
self.send(Action::ListAuthors { reply }).await
}

pub async fn list_replicas(
&self,
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
) -> Result<()> {
self.send(Action::ListReplicas { reply }).await
}
Expand Down Expand Up @@ -566,7 +566,7 @@ impl SyncHandle {

async fn send(&self, action: Action) -> Result<()> {
self.tx
.send_async(action)
.send(action)
.await
.context("sending to iroh_docs actor failed")?;
Ok(())
Expand All @@ -581,7 +581,10 @@ impl Drop for SyncHandle {
fn drop(&mut self) {
// this means we're dropping the last reference
if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
self.tx.send(Action::Shutdown { reply: None }).ok();
// this call is the reason tx can not be a tokio mpsc channel.
// we have no control about where drop is called, yet tokio send_blocking panics
// when called from inside a tokio runtime.
self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
let handle = handle.take().expect("this can only run once");
if let Err(err) = handle.join() {
warn!(?err, "Failed to join sync actor");
Expand All @@ -593,7 +596,7 @@ impl Drop for SyncHandle {
struct Actor {
store: Store,
states: OpenReplicas,
action_rx: flume::Receiver<Action>,
action_rx: async_channel::Receiver<Action>,
content_status_callback: Option<ContentStatusCallback>,
tasks: JoinSet<()>,
}
Expand All @@ -619,10 +622,10 @@ impl Actor {
}
continue;
}
action = self.action_rx.recv_async() => {
action = self.action_rx.recv() => {
match action {
Ok(action) => action,
Err(flume::RecvError::Disconnected) => {
Err(async_channel::RecvError) => {
debug!("action channel disconnected");
break None;
}
Expand Down Expand Up @@ -979,17 +982,14 @@ impl OpenReplicas {
}

async fn iter_to_channel_async<T: Send + 'static>(
channel: flume::Sender<Result<T>>,
channel: async_channel::Sender<Result<T>>,
iter: Result<impl Iterator<Item = Result<T>>>,
) -> Result<(), SendReplyError> {
match iter {
Err(err) => channel
.send_async(Err(err))
.await
.map_err(send_reply_error)?,
Err(err) => channel.send(Err(err)).await.map_err(send_reply_error)?,
Ok(iter) => {
for item in iter {
channel.send_async(item).await.map_err(send_reply_error)?;
channel.send(item).await.map_err(send_reply_error)?;
}
}
}
Expand Down Expand Up @@ -1032,10 +1032,10 @@ mod tests {
let id = namespace.id();
sync.import_namespace(namespace.into()).await?;
sync.open(id, Default::default()).await?;
let (tx, rx) = flume::bounded(10);
let (tx, rx) = async_channel::bounded(10);
sync.subscribe(id, tx).await?;
sync.close(id).await?;
assert!(rx.recv_async().await.is_err());
assert!(rx.recv().await.is_err());
Ok(())
}
}
10 changes: 5 additions & 5 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ impl Engine {

// Subscribe to insert events from the replica.
let a = {
let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP);
let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
this.sync.subscribe(namespace, s).await?;
r.into_stream()
.map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
Box::pin(r).map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
};

// Subscribe to events from the [`live::Actor`].
let b = {
let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP);
let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
let r = Box::pin(r);
let (reply, reply_rx) = oneshot::channel();
this.to_live_actor
.send(ToLiveActor::Subscribe {
Expand All @@ -188,7 +188,7 @@ impl Engine {
})
.await?;
reply_rx.await??;
r.into_stream().map(|event| Ok(LiveEvent::from(event)))
r.map(|event| Ok(LiveEvent::from(event)))
};

Ok(a.or(b))
Expand Down
22 changes: 11 additions & 11 deletions src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub enum ToLiveActor {
Subscribe {
namespace: NamespaceId,
#[debug("sender")]
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
#[debug("oneshot::Sender")]
reply: sync::oneshot::Sender<Result<()>>,
},
Expand Down Expand Up @@ -153,8 +153,8 @@ pub struct LiveActor<B: iroh_blobs::store::Store> {
gossip: Gossip,
bao_store: B,
downloader: Downloader,
replica_events_tx: flume::Sender<crate::Event>,
replica_events_rx: flume::Receiver<crate::Event>,
replica_events_tx: async_channel::Sender<crate::Event>,
replica_events_rx: async_channel::Receiver<crate::Event>,

/// Send messages to self.
/// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
Expand Down Expand Up @@ -192,7 +192,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
sync_actor_tx: mpsc::Sender<ToLiveActor>,
gossip_actor_tx: mpsc::Sender<ToGossipActor>,
) -> Self {
let (replica_events_tx, replica_events_rx) = flume::bounded(1024);
let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
Self {
inbox,
sync,
Expand Down Expand Up @@ -262,7 +262,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}
}
}
event = self.replica_events_rx.recv_async() => {
event = self.replica_events_rx.recv() => {
trace!(?i, "tick: replica_event");
inc!(Metrics, doc_live_tick_replica_event);
let event = event.context("replica_events closed")?;
Expand Down Expand Up @@ -865,7 +865,7 @@ impl From<&SyncFinished> for SyncDetails {
struct SubscribersMap(HashMap<NamespaceId, Subscribers>);

impl SubscribersMap {
fn subscribe(&mut self, namespace: NamespaceId, sender: flume::Sender<Event>) {
fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
self.0.entry(namespace).or_default().subscribe(sender);
}

Expand Down Expand Up @@ -930,15 +930,15 @@ impl QueuedHashes {
}

#[derive(Debug, Default)]
struct Subscribers(Vec<flume::Sender<Event>>);
struct Subscribers(Vec<async_channel::Sender<Event>>);

impl Subscribers {
fn subscribe(&mut self, sender: flume::Sender<Event>) {
fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
self.0.push(sender)
}

async fn send(&mut self, event: Event) -> bool {
let futs = self.0.iter().map(|sender| sender.send_async(event.clone()));
let futs = self.0.iter().map(|sender| sender.send(event.clone()));
let res = futures_buffered::join_all(futs).await;
// reverse the order so removing does not shift remaining indices
for (i, res) in res.into_iter().enumerate().rev() {
Expand Down Expand Up @@ -977,8 +977,8 @@ mod tests {
#[tokio::test]
async fn test_sync_remove() {
let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
let (a_tx, a_rx) = flume::unbounded();
let (b_tx, b_rx) = flume::unbounded();
let (a_tx, a_rx) = async_channel::unbounded();
let (b_tx, b_rx) = async_channel::unbounded();
let mut subscribers = Subscribers::default();
subscribers.subscribe(a_tx);
subscribers.subscribe(b_tx);
Expand Down
Loading

0 comments on commit 335b6b1

Please sign in to comment.