From 08ccb283ea8856e0ddc44bdcba4509b774226d8c Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 6 Dec 2024 17:39:57 +0200 Subject: [PATCH] Add Docs which wraps the Engine The ProtocolHandler is now implemented only for Blobs. Docs has a builder that takes a Blobs and Gossip, but there is also a way to create a Docs directly from an Engine. --- src/protocol.rs | 114 ++++++++++++++++++++++++++++++++++++++++++++++-- src/rpc.rs | 4 +- tests/util.rs | 46 ++++--------------- 3 files changed, 121 insertions(+), 43 deletions(-) diff --git a/src/protocol.rs b/src/protocol.rs index cfbb45d..6f297cf 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,19 +1,28 @@ //! [`ProtocolHandler`] implementation for the docs [`Engine`]. +use std::{path::PathBuf, sync::Arc}; + use anyhow::Result; use futures_lite::future::Boxed as BoxedFuture; use iroh::{endpoint::Connecting, protocol::ProtocolHandler}; +use iroh_blobs::net_protocol::{Blobs, ProtectCb}; +use iroh_gossip::net::Gossip; +use quic_rpc::server::{ChannelTypes, RpcChannel}; -use crate::engine::Engine; +use crate::{ + engine::{DefaultAuthorStorage, Engine}, + rpc::proto::{Request, RpcService}, + store::Store, +}; -impl ProtocolHandler for Engine { +impl ProtocolHandler for Docs { fn accept(&self, conn: Connecting) -> BoxedFuture> { - let this = self.clone(); + let this = self.engine.clone(); Box::pin(async move { this.handle_connection(conn).await }) } fn shutdown(&self) -> BoxedFuture<()> { - let this = self.clone(); + let this = self.engine.clone(); Box::pin(async move { if let Err(err) = this.shutdown().await { tracing::warn!("shutdown error: {:?}", err); @@ -21,3 +30,100 @@ impl ProtocolHandler for Engine { }) } } + +/// Docs protocol. +#[derive(Debug, Clone)] +pub struct Docs { + engine: Arc>, + #[cfg(feature = "rpc")] + pub(crate) rpc_handler: Arc>, +} + +impl Docs<()> { + /// Create a new [`Builder`] for the docs protocol, using in memory replica and author storage. + pub fn memory() -> Builder { + Builder::default() + } + + /// Create a new [`Builder`] for the docs protocol, using a persistent replica and author storage + /// in the given directory. + pub fn persistent(path: PathBuf) -> Builder { + Builder { path: Some(path) } + } +} + +impl Docs { + /// Get an in memory client to interact with the docs engine. + pub fn client(&self) -> &crate::rpc::client::docs::MemClient { + &self + .rpc_handler + .get_or_init(|| crate::rpc::RpcHandler::new(&self.engine)) + .client + } + + /// Create a new docs protocol with the given engine. + /// + /// Note that usually you would use the [`Builder`] to create a new docs protocol. + pub fn new(engine: Engine) -> Self { + Self { + engine: Arc::new(engine), + rpc_handler: Default::default(), + } + } + + /// Handle a docs request from the RPC server. + pub async fn handle_rpc_request>( + self, + msg: Request, + chan: RpcChannel, + ) -> Result<(), quic_rpc::server::RpcServerError> { + self.engine + .as_ref() + .clone() + .handle_rpc_request(msg, chan) + .await + } + + /// Get the protect callback for the docs engine. + pub fn protect_cb(&self) -> ProtectCb { + self.engine.protect_cb() + } +} + +/// Builder for the docs protocol. +#[derive(Debug, Default)] +pub struct Builder { + path: Option, +} + +impl Builder { + /// Build a [`Docs`] protocol given a [`Blobs`] and [`Gossip`] protocol. + pub async fn build( + self, + blobs: &Blobs, + gossip: &Gossip, + ) -> anyhow::Result> { + let replica_store = match self.path { + Some(ref path) => Store::persistent(path.join("docs.redb"))?, + None => Store::memory(), + }; + let author_store = match self.path { + Some(ref path) => DefaultAuthorStorage::Persistent(path.join("default-author")), + None => DefaultAuthorStorage::Mem, + }; + let engine = Engine::spawn( + blobs.endpoint().clone(), + gossip.clone(), + replica_store, + blobs.store().clone(), + blobs.downloader().clone(), + author_store, + blobs.rt().clone(), + ) + .await?; + Ok(Docs { + engine: Arc::new(engine), + rpc_handler: Default::default(), + }) + } +} diff --git a/src/rpc.rs b/src/rpc.rs index d1e974a..0e6c4bd 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -80,13 +80,13 @@ impl Engine { #[derive(Debug)] pub(crate) struct RpcHandler { /// Client to hand out - client: client::docs::MemClient, + pub(crate) client: client::docs::MemClient, /// Handler task _handler: AbortOnDropHandle<()>, } impl RpcHandler { - fn new(engine: &Engine) -> Self { + pub fn new(engine: &Engine) -> Self { let engine = engine.clone(); let (listener, connector) = quic_rpc::transport::flume::channel(1); let listener = RpcServer::new(listener); diff --git a/tests/util.rs b/tests/util.rs index 8f92a26..8fdb065 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -5,14 +5,15 @@ use std::{ net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, ops::Deref, path::{Path, PathBuf}, - sync::Arc, }; use iroh::{discovery::Discovery, dns::DnsResolver, key::SecretKey, NodeId, RelayMode}; use iroh_blobs::{ + net_protocol::Blobs, store::{GcConfig, Store as BlobStore}, util::local_pool::LocalPool, }; +use iroh_docs::protocol::Docs; use nested_enum_utils::enum_conversions; use quic_rpc::transport::{Connector, Listener}; use serde::{Deserialize, Serialize}; @@ -137,46 +138,17 @@ impl Builder { let addr = endpoint.node_addr().await?; let local_pool = LocalPool::single(); let mut router = iroh::protocol::Router::builder(endpoint.clone()); - - // Setup blobs - let downloader = iroh_blobs::downloader::Downloader::new( - store.clone(), - endpoint.clone(), - local_pool.handle().clone(), - ); - let blobs = iroh_blobs::net_protocol::Blobs::new( - store.clone(), - local_pool.handle().clone(), - Default::default(), - downloader.clone(), - endpoint.clone(), - ); + let blobs = Blobs::builder(store.clone()).build(&local_pool, &endpoint); let gossip = iroh_gossip::net::Gossip::from_endpoint( endpoint.clone(), Default::default(), &addr.info, ); - let replica_store = match self.path { - Some(ref path) => iroh_docs::store::Store::persistent(path.join("docs.redb"))?, - None => iroh_docs::store::Store::memory(), - }; - let author_store = match self.path { - Some(ref path) => { - iroh_docs::engine::DefaultAuthorStorage::Persistent(path.join("default-author")) - } - None => iroh_docs::engine::DefaultAuthorStorage::Mem, + let builder = match self.path { + Some(ref path) => Docs::persistent(path.to_path_buf()), + None => Docs::memory(), }; - let docs = match iroh_docs::engine::Engine::spawn( - endpoint, - gossip.clone(), - replica_store, - store.clone(), - downloader, - author_store, - local_pool.handle().clone(), - ) - .await - { + let docs = match builder.build(&blobs, &gossip).await { Ok(docs) => docs, Err(err) => { store.shutdown().await; @@ -184,8 +156,8 @@ impl Builder { } }; router = router.accept(iroh_blobs::ALPN, blobs.clone()); - router = router.accept(iroh_docs::ALPN, Arc::new(docs.clone())); - router = router.accept(iroh_gossip::ALPN, Arc::new(gossip.clone())); + router = router.accept(iroh_docs::ALPN, docs.clone()); + router = router.accept(iroh_gossip::ALPN, gossip.clone()); // Build the router let router = router.spawn().await?;