Skip to content

Commit

Permalink
Replace flume with async_channel in docs
Browse files Browse the repository at this point in the history
This is mostly a 1:1 replacement, except for the fact that the same_channel
api is missing from async_channel. So I replaced it with some ugly code
that uses the fact that a async_channel Sender or Receiver is just an Arc<Channel>.

To be removed if/when smol-rs/async-channel#98 is
merged, but until then I think it is fine.
  • Loading branch information
rklaehn committed Jul 24, 2024
1 parent 9052905 commit b0082bc
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 57 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-docs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ workspace = true

[dependencies]
anyhow = "1"
async-channel = "2.3.1"
blake3 = { package = "iroh-blake3", version = "1.4.5"}
bytes = { version = "1.4", features = ["serde"] }
derive_more = { version = "1.0.0-beta.6", features = ["debug", "deref", "display", "from", "try_into", "into", "as_ref"] }
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.11"
futures-buffered = "0.2.4"
futures-lite = "2.3.0"
futures-util = { version = "0.3.25" }
Expand Down
54 changes: 27 additions & 27 deletions iroh-docs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ enum Action {
#[display("ListAuthors")]
ListAuthors {
#[debug("reply")]
reply: flume::Sender<Result<AuthorId>>,
reply: async_channel::Sender<Result<AuthorId>>,
},
#[display("ListReplicas")]
ListReplicas {
#[debug("reply")]
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
},
#[display("ContentHashes")]
ContentHashes {
Expand Down Expand Up @@ -108,12 +108,12 @@ enum ReplicaAction {
reply: oneshot::Sender<Result<()>>,
},
Subscribe {
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
#[debug("reply")]
reply: oneshot::Sender<Result<()>>,
},
Unsubscribe {
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
#[debug("reply")]
reply: oneshot::Sender<Result<()>>,
},
Expand Down Expand Up @@ -166,7 +166,7 @@ enum ReplicaAction {
},
GetMany {
query: Query,
reply: flume::Sender<Result<SignedEntry>>,
reply: async_channel::Sender<Result<SignedEntry>>,
},
DropReplica {
reply: oneshot::Sender<Result<()>>,
Expand Down Expand Up @@ -222,7 +222,7 @@ struct OpenReplica {
/// [`SyncHandle::drop`] will not block.
#[derive(Debug, Clone)]
pub struct SyncHandle {
tx: flume::Sender<Action>,
tx: async_channel::Sender<Action>,
join_handle: Arc<Option<JoinHandle<()>>>,
}

Expand All @@ -232,7 +232,7 @@ pub struct OpenOpts {
/// Set to true to set sync state to true.
pub sync: bool,
/// Optionally subscribe to replica events.
pub subscribe: Option<flume::Sender<Event>>,
pub subscribe: Option<async_channel::Sender<Event>>,
}
impl OpenOpts {
/// Set sync state to true.
Expand All @@ -241,7 +241,7 @@ impl OpenOpts {
self
}
/// Subscribe to replica events.
pub fn subscribe(mut self, subscribe: flume::Sender<Event>) -> Self {
pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
self.subscribe = Some(subscribe);
self
}
Expand All @@ -255,7 +255,7 @@ impl SyncHandle {
content_status_callback: Option<ContentStatusCallback>,
me: String,
) -> SyncHandle {
let (action_tx, action_rx) = flume::bounded(ACTION_CAP);
let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
let actor = Actor {
store,
states: Default::default(),
Expand Down Expand Up @@ -298,7 +298,7 @@ impl SyncHandle {
pub async fn subscribe(
&self,
namespace: NamespaceId,
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
) -> Result<()> {
let (reply, rx) = oneshot::channel();
self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
Expand All @@ -309,7 +309,7 @@ impl SyncHandle {
pub async fn unsubscribe(
&self,
namespace: NamespaceId,
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
) -> Result<()> {
let (reply, rx) = oneshot::channel();
self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
Expand Down Expand Up @@ -435,7 +435,7 @@ impl SyncHandle {
&self,
namespace: NamespaceId,
query: Query,
reply: flume::Sender<Result<SignedEntry>>,
reply: async_channel::Sender<Result<SignedEntry>>,
) -> Result<()> {
let action = ReplicaAction::GetMany { query, reply };
self.send_replica(namespace, action).await?;
Expand Down Expand Up @@ -489,13 +489,13 @@ impl SyncHandle {
Ok(store)
}

pub async fn list_authors(&self, reply: flume::Sender<Result<AuthorId>>) -> Result<()> {
pub async fn list_authors(&self, reply: async_channel::Sender<Result<AuthorId>>) -> Result<()> {
self.send(Action::ListAuthors { reply }).await
}

pub async fn list_replicas(
&self,
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
) -> Result<()> {
self.send(Action::ListReplicas { reply }).await
}
Expand Down Expand Up @@ -566,7 +566,7 @@ impl SyncHandle {

async fn send(&self, action: Action) -> Result<()> {
self.tx
.send_async(action)
.send(action)
.await
.context("sending to iroh_docs actor failed")?;
Ok(())
Expand All @@ -581,7 +581,10 @@ impl Drop for SyncHandle {
fn drop(&mut self) {
// this means we're dropping the last reference
if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
self.tx.send(Action::Shutdown { reply: None }).ok();
// this call is the reason tx can not be a tokio mpsc channel.
// we have no control about where drop is called, yet tokio send_blocking panics
// when called from inside a tokio runtime.
self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
let handle = handle.take().expect("this can only run once");
if let Err(err) = handle.join() {
warn!(?err, "Failed to join sync actor");
Expand All @@ -593,7 +596,7 @@ impl Drop for SyncHandle {
struct Actor {
store: Store,
states: OpenReplicas,
action_rx: flume::Receiver<Action>,
action_rx: async_channel::Receiver<Action>,
content_status_callback: Option<ContentStatusCallback>,
tasks: JoinSet<()>,
}
Expand All @@ -619,10 +622,10 @@ impl Actor {
}
continue;
}
action = self.action_rx.recv_async() => {
action = self.action_rx.recv() => {
match action {
Ok(action) => action,
Err(flume::RecvError::Disconnected) => {
Err(async_channel::RecvError) => {
debug!("action channel disconnected");
break None;
}
Expand Down Expand Up @@ -979,17 +982,14 @@ impl OpenReplicas {
}

async fn iter_to_channel_async<T: Send + 'static>(
channel: flume::Sender<Result<T>>,
channel: async_channel::Sender<Result<T>>,
iter: Result<impl Iterator<Item = Result<T>>>,
) -> Result<(), SendReplyError> {
match iter {
Err(err) => channel
.send_async(Err(err))
.await
.map_err(send_reply_error)?,
Err(err) => channel.send(Err(err)).await.map_err(send_reply_error)?,
Ok(iter) => {
for item in iter {
channel.send_async(item).await.map_err(send_reply_error)?;
channel.send(item).await.map_err(send_reply_error)?;
}
}
}
Expand Down Expand Up @@ -1032,10 +1032,10 @@ mod tests {
let id = namespace.id();
sync.import_namespace(namespace.into()).await?;
sync.open(id, Default::default()).await?;
let (tx, rx) = flume::bounded(10);
let (tx, rx) = async_channel::bounded(10);
sync.subscribe(id, tx).await?;
sync.close(id).await?;
assert!(rx.recv_async().await.is_err());
assert!(rx.recv().await.is_err());
Ok(())
}
}
10 changes: 5 additions & 5 deletions iroh-docs/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ impl Engine {

// Subscribe to insert events from the replica.
let a = {
let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP);
let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
this.sync.subscribe(namespace, s).await?;
r.into_stream()
.map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
Box::pin(r).map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
};

// Subscribe to events from the [`live::Actor`].
let b = {
let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP);
let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
let r = Box::pin(r);
let (reply, reply_rx) = oneshot::channel();
this.to_live_actor
.send(ToLiveActor::Subscribe {
Expand All @@ -188,7 +188,7 @@ impl Engine {
})
.await?;
reply_rx.await??;
r.into_stream().map(|event| Ok(LiveEvent::from(event)))
r.map(|event| Ok(LiveEvent::from(event)))
};

Ok(a.or(b))
Expand Down
22 changes: 11 additions & 11 deletions iroh-docs/src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub enum ToLiveActor {
Subscribe {
namespace: NamespaceId,
#[debug("sender")]
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
#[debug("oneshot::Sender")]
reply: sync::oneshot::Sender<Result<()>>,
},
Expand Down Expand Up @@ -153,8 +153,8 @@ pub struct LiveActor<B: iroh_blobs::store::Store> {
gossip: Gossip,
bao_store: B,
downloader: Downloader,
replica_events_tx: flume::Sender<crate::Event>,
replica_events_rx: flume::Receiver<crate::Event>,
replica_events_tx: async_channel::Sender<crate::Event>,
replica_events_rx: async_channel::Receiver<crate::Event>,

/// Send messages to self.
/// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
Expand Down Expand Up @@ -192,7 +192,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
sync_actor_tx: mpsc::Sender<ToLiveActor>,
gossip_actor_tx: mpsc::Sender<ToGossipActor>,
) -> Self {
let (replica_events_tx, replica_events_rx) = flume::bounded(1024);
let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
Self {
inbox,
sync,
Expand Down Expand Up @@ -262,7 +262,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}
}
}
event = self.replica_events_rx.recv_async() => {
event = self.replica_events_rx.recv() => {
trace!(?i, "tick: replica_event");
inc!(Metrics, doc_live_tick_replica_event);
let event = event.context("replica_events closed")?;
Expand Down Expand Up @@ -865,7 +865,7 @@ impl From<&SyncFinished> for SyncDetails {
struct SubscribersMap(HashMap<NamespaceId, Subscribers>);

impl SubscribersMap {
fn subscribe(&mut self, namespace: NamespaceId, sender: flume::Sender<Event>) {
fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
self.0.entry(namespace).or_default().subscribe(sender);
}

Expand Down Expand Up @@ -930,15 +930,15 @@ impl QueuedHashes {
}

#[derive(Debug, Default)]
struct Subscribers(Vec<flume::Sender<Event>>);
struct Subscribers(Vec<async_channel::Sender<Event>>);

impl Subscribers {
fn subscribe(&mut self, sender: flume::Sender<Event>) {
fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
self.0.push(sender)
}

async fn send(&mut self, event: Event) -> bool {
let futs = self.0.iter().map(|sender| sender.send_async(event.clone()));
let futs = self.0.iter().map(|sender| sender.send(event.clone()));
let res = futures_buffered::join_all(futs).await;
// reverse the order so removing does not shift remaining indices
for (i, res) in res.into_iter().enumerate().rev() {
Expand Down Expand Up @@ -977,8 +977,8 @@ mod tests {
#[tokio::test]
async fn test_sync_remove() {
let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
let (a_tx, a_rx) = flume::unbounded();
let (b_tx, b_rx) = flume::unbounded();
let (a_tx, a_rx) = async_channel::unbounded();
let (b_tx, b_rx) = async_channel::unbounded();
let mut subscribers = Subscribers::default();
subscribers.subscribe(a_tx);
subscribers.subscribe(b_tx);
Expand Down
Loading

0 comments on commit b0082bc

Please sign in to comment.