From 08ccb283ea8856e0ddc44bdcba4509b774226d8c Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 6 Dec 2024 17:39:57 +0200 Subject: [PATCH 1/8] Add Docs which wraps the Engine The ProtocolHandler is now implemented only for Blobs. Docs has a builder that takes a Blobs and Gossip, but there is also a way to create a Docs directly from an Engine. --- src/protocol.rs | 114 ++++++++++++++++++++++++++++++++++++++++++++++-- src/rpc.rs | 4 +- tests/util.rs | 46 ++++--------------- 3 files changed, 121 insertions(+), 43 deletions(-) diff --git a/src/protocol.rs b/src/protocol.rs index cfbb45d..6f297cf 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,19 +1,28 @@ //! [`ProtocolHandler`] implementation for the docs [`Engine`]. +use std::{path::PathBuf, sync::Arc}; + use anyhow::Result; use futures_lite::future::Boxed as BoxedFuture; use iroh::{endpoint::Connecting, protocol::ProtocolHandler}; +use iroh_blobs::net_protocol::{Blobs, ProtectCb}; +use iroh_gossip::net::Gossip; +use quic_rpc::server::{ChannelTypes, RpcChannel}; -use crate::engine::Engine; +use crate::{ + engine::{DefaultAuthorStorage, Engine}, + rpc::proto::{Request, RpcService}, + store::Store, +}; -impl ProtocolHandler for Engine { +impl ProtocolHandler for Docs { fn accept(&self, conn: Connecting) -> BoxedFuture> { - let this = self.clone(); + let this = self.engine.clone(); Box::pin(async move { this.handle_connection(conn).await }) } fn shutdown(&self) -> BoxedFuture<()> { - let this = self.clone(); + let this = self.engine.clone(); Box::pin(async move { if let Err(err) = this.shutdown().await { tracing::warn!("shutdown error: {:?}", err); @@ -21,3 +30,100 @@ impl ProtocolHandler for Engine { }) } } + +/// Docs protocol. +#[derive(Debug, Clone)] +pub struct Docs { + engine: Arc>, + #[cfg(feature = "rpc")] + pub(crate) rpc_handler: Arc>, +} + +impl Docs<()> { + /// Create a new [`Builder`] for the docs protocol, using in memory replica and author storage. + pub fn memory() -> Builder { + Builder::default() + } + + /// Create a new [`Builder`] for the docs protocol, using a persistent replica and author storage + /// in the given directory. + pub fn persistent(path: PathBuf) -> Builder { + Builder { path: Some(path) } + } +} + +impl Docs { + /// Get an in memory client to interact with the docs engine. + pub fn client(&self) -> &crate::rpc::client::docs::MemClient { + &self + .rpc_handler + .get_or_init(|| crate::rpc::RpcHandler::new(&self.engine)) + .client + } + + /// Create a new docs protocol with the given engine. + /// + /// Note that usually you would use the [`Builder`] to create a new docs protocol. + pub fn new(engine: Engine) -> Self { + Self { + engine: Arc::new(engine), + rpc_handler: Default::default(), + } + } + + /// Handle a docs request from the RPC server. + pub async fn handle_rpc_request>( + self, + msg: Request, + chan: RpcChannel, + ) -> Result<(), quic_rpc::server::RpcServerError> { + self.engine + .as_ref() + .clone() + .handle_rpc_request(msg, chan) + .await + } + + /// Get the protect callback for the docs engine. + pub fn protect_cb(&self) -> ProtectCb { + self.engine.protect_cb() + } +} + +/// Builder for the docs protocol. +#[derive(Debug, Default)] +pub struct Builder { + path: Option, +} + +impl Builder { + /// Build a [`Docs`] protocol given a [`Blobs`] and [`Gossip`] protocol. + pub async fn build( + self, + blobs: &Blobs, + gossip: &Gossip, + ) -> anyhow::Result> { + let replica_store = match self.path { + Some(ref path) => Store::persistent(path.join("docs.redb"))?, + None => Store::memory(), + }; + let author_store = match self.path { + Some(ref path) => DefaultAuthorStorage::Persistent(path.join("default-author")), + None => DefaultAuthorStorage::Mem, + }; + let engine = Engine::spawn( + blobs.endpoint().clone(), + gossip.clone(), + replica_store, + blobs.store().clone(), + blobs.downloader().clone(), + author_store, + blobs.rt().clone(), + ) + .await?; + Ok(Docs { + engine: Arc::new(engine), + rpc_handler: Default::default(), + }) + } +} diff --git a/src/rpc.rs b/src/rpc.rs index d1e974a..0e6c4bd 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -80,13 +80,13 @@ impl Engine { #[derive(Debug)] pub(crate) struct RpcHandler { /// Client to hand out - client: client::docs::MemClient, + pub(crate) client: client::docs::MemClient, /// Handler task _handler: AbortOnDropHandle<()>, } impl RpcHandler { - fn new(engine: &Engine) -> Self { + pub fn new(engine: &Engine) -> Self { let engine = engine.clone(); let (listener, connector) = quic_rpc::transport::flume::channel(1); let listener = RpcServer::new(listener); diff --git a/tests/util.rs b/tests/util.rs index 8f92a26..8fdb065 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -5,14 +5,15 @@ use std::{ net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, ops::Deref, path::{Path, PathBuf}, - sync::Arc, }; use iroh::{discovery::Discovery, dns::DnsResolver, key::SecretKey, NodeId, RelayMode}; use iroh_blobs::{ + net_protocol::Blobs, store::{GcConfig, Store as BlobStore}, util::local_pool::LocalPool, }; +use iroh_docs::protocol::Docs; use nested_enum_utils::enum_conversions; use quic_rpc::transport::{Connector, Listener}; use serde::{Deserialize, Serialize}; @@ -137,46 +138,17 @@ impl Builder { let addr = endpoint.node_addr().await?; let local_pool = LocalPool::single(); let mut router = iroh::protocol::Router::builder(endpoint.clone()); - - // Setup blobs - let downloader = iroh_blobs::downloader::Downloader::new( - store.clone(), - endpoint.clone(), - local_pool.handle().clone(), - ); - let blobs = iroh_blobs::net_protocol::Blobs::new( - store.clone(), - local_pool.handle().clone(), - Default::default(), - downloader.clone(), - endpoint.clone(), - ); + let blobs = Blobs::builder(store.clone()).build(&local_pool, &endpoint); let gossip = iroh_gossip::net::Gossip::from_endpoint( endpoint.clone(), Default::default(), &addr.info, ); - let replica_store = match self.path { - Some(ref path) => iroh_docs::store::Store::persistent(path.join("docs.redb"))?, - None => iroh_docs::store::Store::memory(), - }; - let author_store = match self.path { - Some(ref path) => { - iroh_docs::engine::DefaultAuthorStorage::Persistent(path.join("default-author")) - } - None => iroh_docs::engine::DefaultAuthorStorage::Mem, + let builder = match self.path { + Some(ref path) => Docs::persistent(path.to_path_buf()), + None => Docs::memory(), }; - let docs = match iroh_docs::engine::Engine::spawn( - endpoint, - gossip.clone(), - replica_store, - store.clone(), - downloader, - author_store, - local_pool.handle().clone(), - ) - .await - { + let docs = match builder.build(&blobs, &gossip).await { Ok(docs) => docs, Err(err) => { store.shutdown().await; @@ -184,8 +156,8 @@ impl Builder { } }; router = router.accept(iroh_blobs::ALPN, blobs.clone()); - router = router.accept(iroh_docs::ALPN, Arc::new(docs.clone())); - router = router.accept(iroh_gossip::ALPN, Arc::new(gossip.clone())); + router = router.accept(iroh_docs::ALPN, docs.clone()); + router = router.accept(iroh_gossip::ALPN, gossip.clone()); // Build the router let router = router.spawn().await?; From 21d4fe6befc52eaa75cd7516403c0fe11ac80b7f Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 6 Dec 2024 17:48:37 +0200 Subject: [PATCH 2/8] feature gate the Docs and its builder --- src/protocol.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/protocol.rs b/src/protocol.rs index 6f297cf..78f7e5b 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,3 +1,4 @@ +#![cfg(feature = "rpc")] //! [`ProtocolHandler`] implementation for the docs [`Engine`]. use std::{path::PathBuf, sync::Arc}; From 5426a9ecbe7e937c18bc77d86e7534c51ec89229 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 6 Dec 2024 18:06:35 +0200 Subject: [PATCH 3/8] use the gossip builder API --- Cargo.lock | 2 +- tests/util.rs | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bfb57a3..edc69ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2207,7 +2207,7 @@ dependencies = [ [[package]] name = "iroh-gossip" version = "0.29.0" -source = "git+https://github.com/n0-computer/iroh-gossip?branch=main#d5285e7240da4e233be7c8f83099741f6f272bb0" +source = "git+https://github.com/n0-computer/iroh-gossip?branch=main#0e6fd20203c6468af9d783f1e62379eca283188a" dependencies = [ "anyhow", "async-channel", diff --git a/tests/util.rs b/tests/util.rs index 8fdb065..f0f6610 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -14,6 +14,7 @@ use iroh_blobs::{ util::local_pool::LocalPool, }; use iroh_docs::protocol::Docs; +use iroh_gossip::net::Gossip; use nested_enum_utils::enum_conversions; use quic_rpc::transport::{Connector, Listener}; use serde::{Deserialize, Serialize}; @@ -139,11 +140,7 @@ impl Builder { let local_pool = LocalPool::single(); let mut router = iroh::protocol::Router::builder(endpoint.clone()); let blobs = Blobs::builder(store.clone()).build(&local_pool, &endpoint); - let gossip = iroh_gossip::net::Gossip::from_endpoint( - endpoint.clone(), - Default::default(), - &addr.info, - ); + let gossip = Gossip::builder().spawn(endpoint.clone()).await?; let builder = match self.path { Some(ref path) => Docs::persistent(path.to_path_buf()), None => Docs::memory(), From bcedb9fa790193a5dfa7de85cc1a52df2c5681c4 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 6 Dec 2024 18:17:41 +0200 Subject: [PATCH 4/8] add net feature --- Cargo.toml | 2 +- tests/util.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e524fdf..ade1d94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ futures-util = { version = "0.3.25" } hex = "0.4" iroh-base = { version = "0.29" } iroh-blobs = { version = "0.29.0", optional = true, features = ["downloader"] } -iroh-gossip = { version = "0.29.0", optional = true } +iroh-gossip = { version = "0.29.0", optional = true, features = ["net"] } iroh-metrics = { version = "0.29.0", default-features = false } iroh = { version = "0.29", optional = true } num_enum = "0.7" diff --git a/tests/util.rs b/tests/util.rs index f0f6610..9c577fd 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -136,7 +136,6 @@ impl Builder { builder = builder.dns_resolver(dns_resolver); } let endpoint = builder.bind().await?; - let addr = endpoint.node_addr().await?; let local_pool = LocalPool::single(); let mut router = iroh::protocol::Router::builder(endpoint.clone()); let blobs = Blobs::builder(store.clone()).build(&local_pool, &endpoint); From 35128058be2aab56a782040b0c3e25fd145c6fb5 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 6 Dec 2024 18:44:51 +0200 Subject: [PATCH 5/8] remove top level feature flag --- src/protocol.rs | 5 +++-- tests/util.rs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/protocol.rs b/src/protocol.rs index 78f7e5b..9e878fd 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,4 +1,3 @@ -#![cfg(feature = "rpc")] //! [`ProtocolHandler`] implementation for the docs [`Engine`]. use std::{path::PathBuf, sync::Arc}; @@ -55,6 +54,7 @@ impl Docs<()> { impl Docs { /// Get an in memory client to interact with the docs engine. + #[cfg(feature = "rpc")] pub fn client(&self) -> &crate::rpc::client::docs::MemClient { &self .rpc_handler @@ -73,6 +73,7 @@ impl Docs { } /// Handle a docs request from the RPC server. + #[cfg(feature = "rpc")] pub async fn handle_rpc_request>( self, msg: Request, @@ -99,7 +100,7 @@ pub struct Builder { impl Builder { /// Build a [`Docs`] protocol given a [`Blobs`] and [`Gossip`] protocol. - pub async fn build( + pub async fn spawn( self, blobs: &Blobs, gossip: &Gossip, diff --git a/tests/util.rs b/tests/util.rs index 9c577fd..2e51a50 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -144,7 +144,7 @@ impl Builder { Some(ref path) => Docs::persistent(path.to_path_buf()), None => Docs::memory(), }; - let docs = match builder.build(&blobs, &gossip).await { + let docs = match builder.spawn(&blobs, &gossip).await { Ok(docs) => docs, Err(err) => { store.shutdown().await; From 7bd0c00bc1af093658983d3939f3adbb780a5d52 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 6 Dec 2024 19:08:25 +0200 Subject: [PATCH 6/8] Make Engine !Clone and remove a few nested arcs. --- src/engine.rs | 22 +++++++++------------- src/protocol.rs | 6 ++---- src/rpc.rs | 23 ++++++++++++++--------- src/rpc/docs_handle_request.rs | 6 +++--- 4 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index bc98661..a10abc5 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -42,23 +42,21 @@ const SUBSCRIBE_CHANNEL_CAP: usize = 256; /// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with /// peers and a gossip swarm for each syncing document. -#[derive(derive_more::Debug, Clone)] +#[derive(derive_more::Debug)] pub struct Engine { /// [`Endpoint`] used by the engine. pub endpoint: Endpoint, /// Handle to the actor thread. pub sync: SyncHandle, /// The persistent default author for this engine. - pub default_author: Arc, + pub default_author: DefaultAuthor, to_live_actor: mpsc::Sender, #[allow(dead_code)] - actor_handle: Arc>, + actor_handle: AbortOnDropHandle<()>, #[debug("ContentStatusCallback")] content_status_cb: ContentStatusCallback, local_pool_handle: LocalPoolHandle, blob_store: D, - #[cfg(feature = "rpc")] - pub(crate) rpc_handler: Arc>, } impl Engine { @@ -116,24 +114,22 @@ impl Engine { endpoint, sync, to_live_actor: live_actor_tx, - actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)), + actor_handle: AbortOnDropHandle::new(actor_handle), content_status_cb, - default_author: Arc::new(default_author), + default_author, local_pool_handle, blob_store: bao_store, - #[cfg(feature = "rpc")] - rpc_handler: Default::default(), }) } /// Return a callback that can be added to blobs to protect the content of /// all docs from garbage collection. pub fn protect_cb(&self) -> ProtectCb { - let this = self.clone(); + let sync = self.sync.clone(); Box::new(move |live| { - let this = this.clone(); + let sync = sync.clone(); Box::pin(async move { - let doc_hashes = match this.sync.content_hashes().await { + let doc_hashes = match sync.content_hashes().await { Ok(hashes) => hashes, Err(err) => { tracing::warn!("Error getting doc hashes: {}", err); @@ -202,7 +198,7 @@ impl Engine { // Create a future that sends channel senders to the respective actors. // We clone `self` so that the future does not capture any lifetimes. - let this = self.clone(); + let this = self; // Subscribe to insert events from the replica. let a = { diff --git a/src/protocol.rs b/src/protocol.rs index 9e878fd..d7ca15a 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -58,7 +58,7 @@ impl Docs { pub fn client(&self) -> &crate::rpc::client::docs::MemClient { &self .rpc_handler - .get_or_init(|| crate::rpc::RpcHandler::new(&self.engine)) + .get_or_init(|| crate::rpc::RpcHandler::new(self.engine.clone())) .client } @@ -79,9 +79,7 @@ impl Docs { msg: Request, chan: RpcChannel, ) -> Result<(), quic_rpc::server::RpcServerError> { - self.engine - .as_ref() - .clone() + crate::rpc::Handler(self.engine.clone()) .handle_rpc_request(msg, chan) .await } diff --git a/src/rpc.rs b/src/rpc.rs index 0e6c4bd..f252a84 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,5 +1,7 @@ //! Quic RPC implementation for docs. +use std::{ops::Deref, sync::Arc}; + use proto::{Request, RpcService}; use quic_rpc::{ server::{ChannelTypes, RpcChannel}, @@ -17,15 +19,18 @@ mod docs_handle_request; type RpcError = serde_error::Error; type RpcResult = std::result::Result; -impl Engine { - /// Get an in memory client to interact with the docs engine. - pub fn client(&self) -> &client::docs::MemClient { - &self - .rpc_handler - .get_or_init(|| RpcHandler::new(self)) - .client +#[derive(Debug, Clone)] +pub(crate) struct Handler(pub(crate) Arc>); + +impl Deref for Handler { + type Target = Engine; + + fn deref(&self) -> &Self::Target { + &self.0 } +} +impl Handler { /// Handle a docs request from the RPC server. pub async fn handle_rpc_request>( self, @@ -86,8 +91,8 @@ pub(crate) struct RpcHandler { } impl RpcHandler { - pub fn new(engine: &Engine) -> Self { - let engine = engine.clone(); + pub fn new(engine: Arc>) -> Self { + let engine = Handler(engine); let (listener, connector) = quic_rpc::transport::flume::channel(1); let listener = RpcServer::new(listener); let client = client::docs::MemClient::new(RpcClient::new(connector)); diff --git a/src/rpc/docs_handle_request.rs b/src/rpc/docs_handle_request.rs index 2c147ff..350dc05 100644 --- a/src/rpc/docs_handle_request.rs +++ b/src/rpc/docs_handle_request.rs @@ -27,14 +27,14 @@ use super::{ SetRequest, SetResponse, ShareRequest, ShareResponse, StartSyncRequest, StartSyncResponse, StatusRequest, StatusResponse, }, - RpcError, RpcResult, + Handler, RpcError, RpcResult, }; -use crate::{engine::Engine, Author, DocTicket, NamespaceSecret}; +use crate::{Author, DocTicket, NamespaceSecret}; /// Capacity for the flume channels to forward sync store iterators to async RPC streams. const ITER_CHANNEL_CAP: usize = 64; -impl Engine { +impl Handler { pub(super) async fn author_create( self, _req: AuthorCreateRequest, From f2e32e6bf361f7bdf95d52fca1a7eff8ae54355c Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 6 Dec 2024 19:15:19 +0200 Subject: [PATCH 7/8] fully qualify all the things --- src/protocol.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/protocol.rs b/src/protocol.rs index d7ca15a..e9ffaec 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -7,11 +7,9 @@ use futures_lite::future::Boxed as BoxedFuture; use iroh::{endpoint::Connecting, protocol::ProtocolHandler}; use iroh_blobs::net_protocol::{Blobs, ProtectCb}; use iroh_gossip::net::Gossip; -use quic_rpc::server::{ChannelTypes, RpcChannel}; use crate::{ engine::{DefaultAuthorStorage, Engine}, - rpc::proto::{Request, RpcService}, store::Store, }; @@ -74,10 +72,12 @@ impl Docs { /// Handle a docs request from the RPC server. #[cfg(feature = "rpc")] - pub async fn handle_rpc_request>( + pub async fn handle_rpc_request< + C: quic_rpc::server::ChannelTypes, + >( self, - msg: Request, - chan: RpcChannel, + msg: crate::rpc::proto::Request, + chan: quic_rpc::server::RpcChannel, ) -> Result<(), quic_rpc::server::RpcServerError> { crate::rpc::Handler(self.engine.clone()) .handle_rpc_request(msg, chan) From 5edc52c50f8101255cbb5b1fe048509b287689cd Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 6 Dec 2024 19:25:56 +0200 Subject: [PATCH 8/8] more feature flag madness --- src/protocol.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/protocol.rs b/src/protocol.rs index e9ffaec..ba0e58e 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -66,6 +66,7 @@ impl Docs { pub fn new(engine: Engine) -> Self { Self { engine: Arc::new(engine), + #[cfg(feature = "rpc")] rpc_handler: Default::default(), } } @@ -121,9 +122,6 @@ impl Builder { blobs.rt().clone(), ) .await?; - Ok(Docs { - engine: Arc::new(engine), - rpc_handler: Default::default(), - }) + Ok(Docs::new(engine)) } }