Skip to content

Commit

Permalink
Add Docs<S> which wraps the Engine<S>
Browse files Browse the repository at this point in the history
The ProtocolHandler is now implemented only for Blobs<S>.

Docs<S> has a builder that takes a Blobs<S> and Gossip<S>, but there is also
a way to create a Docs directly from an Engine.
  • Loading branch information
rklaehn committed Dec 6, 2024
1 parent b6c7616 commit 08ccb28
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 43 deletions.
114 changes: 110 additions & 4 deletions src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,129 @@
//! [`ProtocolHandler`] implementation for the docs [`Engine`].
use std::{path::PathBuf, sync::Arc};

use anyhow::Result;
use futures_lite::future::Boxed as BoxedFuture;
use iroh::{endpoint::Connecting, protocol::ProtocolHandler};
use iroh_blobs::net_protocol::{Blobs, ProtectCb};
use iroh_gossip::net::Gossip;
use quic_rpc::server::{ChannelTypes, RpcChannel};

use crate::engine::Engine;
use crate::{
engine::{DefaultAuthorStorage, Engine},
rpc::proto::{Request, RpcService},
store::Store,
};

impl<D: iroh_blobs::store::Store> ProtocolHandler for Engine<D> {
impl<S: iroh_blobs::store::Store> ProtocolHandler for Docs<S> {
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
let this = self.clone();
let this = self.engine.clone();
Box::pin(async move { this.handle_connection(conn).await })
}

fn shutdown(&self) -> BoxedFuture<()> {
let this = self.clone();
let this = self.engine.clone();
Box::pin(async move {
if let Err(err) = this.shutdown().await {
tracing::warn!("shutdown error: {:?}", err);
}
})
}
}

/// Docs protocol.
#[derive(Debug, Clone)]
pub struct Docs<S> {
engine: Arc<Engine<S>>,
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
}

impl Docs<()> {
/// Create a new [`Builder`] for the docs protocol, using in memory replica and author storage.
pub fn memory() -> Builder {
Builder::default()
}

/// Create a new [`Builder`] for the docs protocol, using a persistent replica and author storage
/// in the given directory.
pub fn persistent(path: PathBuf) -> Builder {
Builder { path: Some(path) }
}
}

impl<S: iroh_blobs::store::Store> Docs<S> {
/// Get an in memory client to interact with the docs engine.
pub fn client(&self) -> &crate::rpc::client::docs::MemClient {
&self
.rpc_handler
.get_or_init(|| crate::rpc::RpcHandler::new(&self.engine))
.client
}

/// Create a new docs protocol with the given engine.
///
/// Note that usually you would use the [`Builder`] to create a new docs protocol.
pub fn new(engine: Engine<S>) -> Self {
Self {
engine: Arc::new(engine),
rpc_handler: Default::default(),
}
}

/// Handle a docs request from the RPC server.
pub async fn handle_rpc_request<C: ChannelTypes<RpcService>>(
self,
msg: Request,
chan: RpcChannel<RpcService, C>,
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
self.engine
.as_ref()
.clone()
.handle_rpc_request(msg, chan)
.await
}

/// Get the protect callback for the docs engine.
pub fn protect_cb(&self) -> ProtectCb {
self.engine.protect_cb()
}
}

/// Builder for the docs protocol.
#[derive(Debug, Default)]
pub struct Builder {
path: Option<PathBuf>,
}

impl Builder {
/// Build a [`Docs`] protocol given a [`Blobs`] and [`Gossip`] protocol.
pub async fn build<S: iroh_blobs::store::Store>(
self,
blobs: &Blobs<S>,
gossip: &Gossip,
) -> anyhow::Result<Docs<S>> {
let replica_store = match self.path {
Some(ref path) => Store::persistent(path.join("docs.redb"))?,
None => Store::memory(),
};
let author_store = match self.path {
Some(ref path) => DefaultAuthorStorage::Persistent(path.join("default-author")),
None => DefaultAuthorStorage::Mem,
};
let engine = Engine::spawn(
blobs.endpoint().clone(),
gossip.clone(),
replica_store,
blobs.store().clone(),
blobs.downloader().clone(),
author_store,
blobs.rt().clone(),
)
.await?;
Ok(Docs {
engine: Arc::new(engine),
rpc_handler: Default::default(),
})
}
}
4 changes: 2 additions & 2 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
#[derive(Debug)]
pub(crate) struct RpcHandler {
/// Client to hand out
client: client::docs::MemClient,
pub(crate) client: client::docs::MemClient,
/// Handler task
_handler: AbortOnDropHandle<()>,
}

impl RpcHandler {
fn new<D: iroh_blobs::store::Store>(engine: &Engine<D>) -> Self {
pub fn new<D: iroh_blobs::store::Store>(engine: &Engine<D>) -> Self {
let engine = engine.clone();
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
Expand Down
46 changes: 9 additions & 37 deletions tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ use std::{
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
};

use iroh::{discovery::Discovery, dns::DnsResolver, key::SecretKey, NodeId, RelayMode};
use iroh_blobs::{
net_protocol::Blobs,
store::{GcConfig, Store as BlobStore},
util::local_pool::LocalPool,
};
use iroh_docs::protocol::Docs;
use nested_enum_utils::enum_conversions;
use quic_rpc::transport::{Connector, Listener};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -137,55 +138,26 @@ impl<S: BlobStore> Builder<S> {
let addr = endpoint.node_addr().await?;
let local_pool = LocalPool::single();
let mut router = iroh::protocol::Router::builder(endpoint.clone());

// Setup blobs
let downloader = iroh_blobs::downloader::Downloader::new(
store.clone(),
endpoint.clone(),
local_pool.handle().clone(),
);
let blobs = iroh_blobs::net_protocol::Blobs::new(
store.clone(),
local_pool.handle().clone(),
Default::default(),
downloader.clone(),
endpoint.clone(),
);
let blobs = Blobs::builder(store.clone()).build(&local_pool, &endpoint);
let gossip = iroh_gossip::net::Gossip::from_endpoint(
endpoint.clone(),
Default::default(),
&addr.info,
);
let replica_store = match self.path {
Some(ref path) => iroh_docs::store::Store::persistent(path.join("docs.redb"))?,
None => iroh_docs::store::Store::memory(),
};
let author_store = match self.path {
Some(ref path) => {
iroh_docs::engine::DefaultAuthorStorage::Persistent(path.join("default-author"))
}
None => iroh_docs::engine::DefaultAuthorStorage::Mem,
let builder = match self.path {
Some(ref path) => Docs::persistent(path.to_path_buf()),
None => Docs::memory(),
};
let docs = match iroh_docs::engine::Engine::spawn(
endpoint,
gossip.clone(),
replica_store,
store.clone(),
downloader,
author_store,
local_pool.handle().clone(),
)
.await
{
let docs = match builder.build(&blobs, &gossip).await {
Ok(docs) => docs,
Err(err) => {
store.shutdown().await;
return Err(err);
}
};
router = router.accept(iroh_blobs::ALPN, blobs.clone());
router = router.accept(iroh_docs::ALPN, Arc::new(docs.clone()));
router = router.accept(iroh_gossip::ALPN, Arc::new(gossip.clone()));
router = router.accept(iroh_docs::ALPN, docs.clone());
router = router.accept(iroh_gossip::ALPN, gossip.clone());

// Build the router
let router = router.spawn().await?;
Expand Down

0 comments on commit 08ccb28

Please sign in to comment.