Skip to content

Commit

Permalink
feat!: Add Docs<S> which wraps the Engine<S> (#18)
Browse files Browse the repository at this point in the history
* Add Docs<S> which wraps the Engine<S>

The ProtocolHandler is now implemented only for Blobs<S>.

Docs<S> has a builder that takes a Blobs<S> and Gossip<S>, but there is also
a way to create a Docs directly from an Engine.

* feature gate the Docs<S> and its builder

* use the gossip builder API

* add net feature

* remove top level feature flag

* Make Engine !Clone and remove a few nested arcs.

* fully qualify all the things

* more feature flag madness
  • Loading branch information
rklaehn authored Dec 6, 2024
1 parent b6c7616 commit 857f0dc
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 75 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ futures-util = { version = "0.3.25" }
hex = "0.4"
iroh-base = { version = "0.29" }
iroh-blobs = { version = "0.29.0", optional = true, features = ["downloader"] }
iroh-gossip = { version = "0.29.0", optional = true }
iroh-gossip = { version = "0.29.0", optional = true, features = ["net"] }
iroh-metrics = { version = "0.29.0", default-features = false }
iroh = { version = "0.29", optional = true }
num_enum = "0.7"
Expand Down
22 changes: 9 additions & 13 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,21 @@ const SUBSCRIBE_CHANNEL_CAP: usize = 256;

/// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with
/// peers and a gossip swarm for each syncing document.
#[derive(derive_more::Debug, Clone)]
#[derive(derive_more::Debug)]
pub struct Engine<D> {
/// [`Endpoint`] used by the engine.
pub endpoint: Endpoint,
/// Handle to the actor thread.
pub sync: SyncHandle,
/// The persistent default author for this engine.
pub default_author: Arc<DefaultAuthor>,
pub default_author: DefaultAuthor,
to_live_actor: mpsc::Sender<ToLiveActor>,
#[allow(dead_code)]
actor_handle: Arc<AbortOnDropHandle<()>>,
actor_handle: AbortOnDropHandle<()>,
#[debug("ContentStatusCallback")]
content_status_cb: ContentStatusCallback,
local_pool_handle: LocalPoolHandle,
blob_store: D,
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
}

impl<D: iroh_blobs::store::Store> Engine<D> {
Expand Down Expand Up @@ -116,24 +114,22 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
endpoint,
sync,
to_live_actor: live_actor_tx,
actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)),
actor_handle: AbortOnDropHandle::new(actor_handle),
content_status_cb,
default_author: Arc::new(default_author),
default_author,
local_pool_handle,
blob_store: bao_store,
#[cfg(feature = "rpc")]
rpc_handler: Default::default(),
})
}

/// Return a callback that can be added to blobs to protect the content of
/// all docs from garbage collection.
pub fn protect_cb(&self) -> ProtectCb {
let this = self.clone();
let sync = self.sync.clone();
Box::new(move |live| {
let this = this.clone();
let sync = sync.clone();
Box::pin(async move {
let doc_hashes = match this.sync.content_hashes().await {
let doc_hashes = match sync.content_hashes().await {
Ok(hashes) => hashes,
Err(err) => {
tracing::warn!("Error getting doc hashes: {}", err);
Expand Down Expand Up @@ -202,7 +198,7 @@ impl<D: iroh_blobs::store::Store> Engine<D> {

// Create a future that sends channel senders to the respective actors.
// We clone `self` so that the future does not capture any lifetimes.
let this = self.clone();
let this = self;

// Subscribe to insert events from the replica.
let a = {
Expand Down
112 changes: 108 additions & 4 deletions src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,127 @@
//! [`ProtocolHandler`] implementation for the docs [`Engine`].
use std::{path::PathBuf, sync::Arc};

use anyhow::Result;
use futures_lite::future::Boxed as BoxedFuture;
use iroh::{endpoint::Connecting, protocol::ProtocolHandler};
use iroh_blobs::net_protocol::{Blobs, ProtectCb};
use iroh_gossip::net::Gossip;

use crate::engine::Engine;
use crate::{
engine::{DefaultAuthorStorage, Engine},
store::Store,
};

impl<D: iroh_blobs::store::Store> ProtocolHandler for Engine<D> {
impl<S: iroh_blobs::store::Store> ProtocolHandler for Docs<S> {
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
let this = self.clone();
let this = self.engine.clone();
Box::pin(async move { this.handle_connection(conn).await })
}

fn shutdown(&self) -> BoxedFuture<()> {
let this = self.clone();
let this = self.engine.clone();
Box::pin(async move {
if let Err(err) = this.shutdown().await {
tracing::warn!("shutdown error: {:?}", err);
}
})
}
}

/// Docs protocol.
#[derive(Debug, Clone)]
pub struct Docs<S> {
engine: Arc<Engine<S>>,
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
}

impl Docs<()> {
/// Create a new [`Builder`] for the docs protocol, using in memory replica and author storage.
pub fn memory() -> Builder {
Builder::default()
}

/// Create a new [`Builder`] for the docs protocol, using a persistent replica and author storage
/// in the given directory.
pub fn persistent(path: PathBuf) -> Builder {
Builder { path: Some(path) }
}
}

impl<S: iroh_blobs::store::Store> Docs<S> {
/// Get an in memory client to interact with the docs engine.
#[cfg(feature = "rpc")]
pub fn client(&self) -> &crate::rpc::client::docs::MemClient {
&self
.rpc_handler
.get_or_init(|| crate::rpc::RpcHandler::new(self.engine.clone()))
.client
}

/// Create a new docs protocol with the given engine.
///
/// Note that usually you would use the [`Builder`] to create a new docs protocol.
pub fn new(engine: Engine<S>) -> Self {
Self {
engine: Arc::new(engine),
#[cfg(feature = "rpc")]
rpc_handler: Default::default(),
}
}

/// Handle a docs request from the RPC server.
#[cfg(feature = "rpc")]
pub async fn handle_rpc_request<
C: quic_rpc::server::ChannelTypes<crate::rpc::proto::RpcService>,
>(
self,
msg: crate::rpc::proto::Request,
chan: quic_rpc::server::RpcChannel<crate::rpc::proto::RpcService, C>,
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
crate::rpc::Handler(self.engine.clone())
.handle_rpc_request(msg, chan)
.await
}

/// Get the protect callback for the docs engine.
pub fn protect_cb(&self) -> ProtectCb {
self.engine.protect_cb()
}
}

/// Builder for the docs protocol.
#[derive(Debug, Default)]
pub struct Builder {
path: Option<PathBuf>,
}

impl Builder {
/// Build a [`Docs`] protocol given a [`Blobs`] and [`Gossip`] protocol.
pub async fn spawn<S: iroh_blobs::store::Store>(
self,
blobs: &Blobs<S>,
gossip: &Gossip,
) -> anyhow::Result<Docs<S>> {
let replica_store = match self.path {
Some(ref path) => Store::persistent(path.join("docs.redb"))?,
None => Store::memory(),
};
let author_store = match self.path {
Some(ref path) => DefaultAuthorStorage::Persistent(path.join("default-author")),
None => DefaultAuthorStorage::Mem,
};
let engine = Engine::spawn(
blobs.endpoint().clone(),
gossip.clone(),
replica_store,
blobs.store().clone(),
blobs.downloader().clone(),
author_store,
blobs.rt().clone(),
)
.await?;
Ok(Docs::new(engine))
}
}
25 changes: 15 additions & 10 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Quic RPC implementation for docs.
use std::{ops::Deref, sync::Arc};

use proto::{Request, RpcService};
use quic_rpc::{
server::{ChannelTypes, RpcChannel},
Expand All @@ -17,15 +19,18 @@ mod docs_handle_request;
type RpcError = serde_error::Error;
type RpcResult<T> = std::result::Result<T, RpcError>;

impl<D: iroh_blobs::store::Store> Engine<D> {
/// Get an in memory client to interact with the docs engine.
pub fn client(&self) -> &client::docs::MemClient {
&self
.rpc_handler
.get_or_init(|| RpcHandler::new(self))
.client
#[derive(Debug, Clone)]
pub(crate) struct Handler<S>(pub(crate) Arc<Engine<S>>);

impl<S> Deref for Handler<S> {
type Target = Engine<S>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<D: iroh_blobs::store::Store> Handler<D> {
/// Handle a docs request from the RPC server.
pub async fn handle_rpc_request<C: ChannelTypes<RpcService>>(
self,
Expand Down Expand Up @@ -80,14 +85,14 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
#[derive(Debug)]
pub(crate) struct RpcHandler {
/// Client to hand out
client: client::docs::MemClient,
pub(crate) client: client::docs::MemClient,
/// Handler task
_handler: AbortOnDropHandle<()>,
}

impl RpcHandler {
fn new<D: iroh_blobs::store::Store>(engine: &Engine<D>) -> Self {
let engine = engine.clone();
pub fn new<D: iroh_blobs::store::Store>(engine: Arc<Engine<D>>) -> Self {
let engine = Handler(engine);
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = client::docs::MemClient::new(RpcClient::new(connector));
Expand Down
6 changes: 3 additions & 3 deletions src/rpc/docs_handle_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ use super::{
SetRequest, SetResponse, ShareRequest, ShareResponse, StartSyncRequest, StartSyncResponse,
StatusRequest, StatusResponse,
},
RpcError, RpcResult,
Handler, RpcError, RpcResult,
};
use crate::{engine::Engine, Author, DocTicket, NamespaceSecret};
use crate::{Author, DocTicket, NamespaceSecret};

/// Capacity for the flume channels to forward sync store iterators to async RPC streams.
const ITER_CHANNEL_CAP: usize = 64;

impl<D: iroh_blobs::store::Store> Engine<D> {
impl<D: iroh_blobs::store::Store> Handler<D> {
pub(super) async fn author_create(
self,
_req: AuthorCreateRequest,
Expand Down
54 changes: 11 additions & 43 deletions tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ use std::{
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
};

use iroh::{discovery::Discovery, dns::DnsResolver, key::SecretKey, NodeId, RelayMode};
use iroh_blobs::{
net_protocol::Blobs,
store::{GcConfig, Store as BlobStore},
util::local_pool::LocalPool,
};
use iroh_docs::protocol::Docs;
use iroh_gossip::net::Gossip;
use nested_enum_utils::enum_conversions;
use quic_rpc::transport::{Connector, Listener};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -134,58 +136,24 @@ impl<S: BlobStore> Builder<S> {
builder = builder.dns_resolver(dns_resolver);
}
let endpoint = builder.bind().await?;
let addr = endpoint.node_addr().await?;
let local_pool = LocalPool::single();
let mut router = iroh::protocol::Router::builder(endpoint.clone());

// Setup blobs
let downloader = iroh_blobs::downloader::Downloader::new(
store.clone(),
endpoint.clone(),
local_pool.handle().clone(),
);
let blobs = iroh_blobs::net_protocol::Blobs::new(
store.clone(),
local_pool.handle().clone(),
Default::default(),
downloader.clone(),
endpoint.clone(),
);
let gossip = iroh_gossip::net::Gossip::from_endpoint(
endpoint.clone(),
Default::default(),
&addr.info,
);
let replica_store = match self.path {
Some(ref path) => iroh_docs::store::Store::persistent(path.join("docs.redb"))?,
None => iroh_docs::store::Store::memory(),
};
let author_store = match self.path {
Some(ref path) => {
iroh_docs::engine::DefaultAuthorStorage::Persistent(path.join("default-author"))
}
None => iroh_docs::engine::DefaultAuthorStorage::Mem,
let blobs = Blobs::builder(store.clone()).build(&local_pool, &endpoint);
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
let builder = match self.path {
Some(ref path) => Docs::persistent(path.to_path_buf()),
None => Docs::memory(),
};
let docs = match iroh_docs::engine::Engine::spawn(
endpoint,
gossip.clone(),
replica_store,
store.clone(),
downloader,
author_store,
local_pool.handle().clone(),
)
.await
{
let docs = match builder.spawn(&blobs, &gossip).await {
Ok(docs) => docs,
Err(err) => {
store.shutdown().await;
return Err(err);
}
};
router = router.accept(iroh_blobs::ALPN, blobs.clone());
router = router.accept(iroh_docs::ALPN, Arc::new(docs.clone()));
router = router.accept(iroh_gossip::ALPN, Arc::new(gossip.clone()));
router = router.accept(iroh_docs::ALPN, docs.clone());
router = router.accept(iroh_gossip::ALPN, gossip.clone());

// Build the router
let router = router.spawn().await?;
Expand Down

0 comments on commit 857f0dc

Please sign in to comment.