Skip to content

Commit

Permalink
experiment: move authors API purely to in memory calls, no rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Dec 6, 2024
1 parent 533baf9 commit 112d87b
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 267 deletions.
4 changes: 2 additions & 2 deletions src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);

/// Import an author action.
#[derive(Debug, Serialize, Deserialize)]
pub struct ImportAuthorAction {
pub(crate) struct ImportAuthorAction {
/// The author to import.
pub author: Author,
}
Expand Down Expand Up @@ -228,7 +228,7 @@ struct OpenReplica {
/// waiting for the actor to finish happens in an async context, and therefore that the final
/// [`SyncHandle::drop`] will not block.
#[derive(Debug, Clone)]
pub struct SyncHandle {
pub(crate) struct SyncHandle {
tx: async_channel::Sender<Action>,
join_handle: Arc<Option<JoinHandle<()>>>,
}
Expand Down
119 changes: 102 additions & 17 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@
//!
//! [`crate::Replica`] is also called documents here.
use std::{
io,
path::PathBuf,
str::FromStr,
sync::{Arc, RwLock},
};
use std::{io, path::PathBuf, str::FromStr, sync::Arc};

use anyhow::{bail, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use futures_lite::{Stream, StreamExt};
use iroh::{key::PublicKey, Endpoint, NodeAddr};
use iroh_blobs::{
Expand All @@ -18,7 +13,7 @@ use iroh_blobs::{
};
use iroh_gossip::net::Gossip;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio_util::task::AbortOnDropHandle;
use tracing::{error, error_span, Instrument};

Expand All @@ -45,11 +40,11 @@ const SUBSCRIBE_CHANNEL_CAP: usize = 256;
#[derive(derive_more::Debug, Clone)]
pub struct Engine<D> {
/// [`Endpoint`] used by the engine.
pub endpoint: Endpoint,
pub(crate) endpoint: Endpoint,
/// Handle to the actor thread.
pub sync: SyncHandle,
pub(crate) sync: SyncHandle,
/// The persistent default author for this engine.
pub default_author: Arc<DefaultAuthor>,
default_author: Arc<DefaultAuthor>,
to_live_actor: mpsc::Sender<ToLiveActor>,
#[allow(dead_code)]
actor_handle: Arc<AbortOnDropHandle<()>>,
Expand Down Expand Up @@ -252,6 +247,93 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
pub fn local_pool_handle(&self) -> &LocalPoolHandle {
&self.local_pool_handle
}

/// Authors API.
pub fn authors(&self) -> Authors {
Authors {
sync: self.sync.clone(),
default_author: self.default_author.clone(),
}
}
}

/// Authors client
#[derive(Debug, Clone)]
pub struct Authors {
sync: SyncHandle,
default_author: Arc<DefaultAuthor>,
}

impl Authors {
/// Creates a new document author.
///
/// You likely want to save the returned [`AuthorId`] somewhere so that you can use this author
/// again.
///
/// If you need only a single author, use [`Self::default`].
pub async fn create(&self) -> Result<AuthorId> {
let mut rng = rand::rngs::OsRng::default();
let author = Author::new(&mut rng);
self.sync.import_author(author).await
}

/// Returns the default document author of this node.
///
/// On persistent nodes, the author is created on first start and its public key is saved
/// in the data directory.
///
/// The default author can be set with [`Self::set_default`].
pub async fn default(&self) -> AuthorId {
self.default_author.get().await
}

/// Sets the node-wide default author.
///
/// If the author does not exist, an error is returned.
///
/// On a persistent node, the author id will be saved to a file in the data directory and
/// reloaded after a restart.
pub async fn set_default(&self, author_id: AuthorId) -> Result<()> {
self.default_author.set(author_id, &self.sync).await
}

/// Lists document authors for which we have a secret key.
///
/// It's only possible to create writes from authors that we have the secret key of.
pub async fn list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
let (tx, rx) = async_channel::bounded(64);
self.sync.list_authors(tx).await?;

Ok(rx)
}

/// Exports the given author.
///
/// Warning: The [`Author`] struct contains sensitive data.
pub async fn export(&self, author: AuthorId) -> Result<Option<Author>> {
self.sync.export_author(author).await
}

/// Imports the given author.
///
/// Warning: The [`Author`] struct contains sensitive data.
pub async fn import(&self, author: Author) -> Result<AuthorId> {
self.sync.import_author(author).await
}

/// Deletes the given author by id.
///
/// Warning: This permanently removes this author.
///
/// Returns an error if attempting to delete the default author.
pub async fn delete(&self, author: AuthorId) -> Result<()> {
let default_author = self.default().await;
ensure!(
author != default_author,
"Deleting the default author is not supported"
);
self.sync.delete_author(author).await
}
}

/// Converts an [`EntryStatus`] into a ['ContentStatus'].
Expand Down Expand Up @@ -359,7 +441,7 @@ impl DefaultAuthorStorage {
///
/// Returns an error if the author can't be parsed or if the uathor does not exist in the docs
/// store.
pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
pub(crate) async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
match self {
Self::Mem => {
let author = Author::new(&mut rand::thread_rng());
Expand Down Expand Up @@ -432,7 +514,10 @@ impl DefaultAuthor {
/// Load the default author from storage.
///
/// If the storage is empty creates a new author and persists it.
pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
pub(crate) async fn load(
storage: DefaultAuthorStorage,
docs_store: &SyncHandle,
) -> Result<Self> {
let value = storage.load(docs_store).await?;
Ok(Self {
value: RwLock::new(value),
Expand All @@ -441,17 +526,17 @@ impl DefaultAuthor {
}

/// Get the current default author.
pub fn get(&self) -> AuthorId {
*self.value.read().unwrap()
pub async fn get(&self) -> AuthorId {
*self.value.read().await
}

/// Set the default author.
pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
pub(crate) async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
if docs_store.export_author(author_id).await?.is_none() {
bail!("The author does not exist");
}
self.storage.persist(author_id).await?;
*self.value.write().unwrap() = author_id;
*self.value.write().await = author_id;
Ok(())
}
}
4 changes: 2 additions & 2 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub const ALPN: &[u8] = b"/iroh-sync/1";
mod codec;

/// Connect to a peer and sync a replica
pub async fn connect_and_sync(
pub(crate) async fn connect_and_sync(
endpoint: &Endpoint,
sync: &SyncHandle,
namespace: NamespaceId,
Expand Down Expand Up @@ -104,7 +104,7 @@ pub enum AcceptOutcome {
}

/// Handle an iroh-docs connection and sync all shared documents in the replica store.
pub async fn handle_connection<F, Fut>(
pub(crate) async fn handle_connection<F, Fut>(
sync: SyncHandle,
connecting: iroh::endpoint::Connecting,
accept_cb: F,
Expand Down
8 changes: 0 additions & 8 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
SetDownloadPolicy(msg) => chan.rpc(msg, this, Self::doc_set_download_policy).await,
GetDownloadPolicy(msg) => chan.rpc(msg, this, Self::doc_get_download_policy).await,
GetSyncPeers(msg) => chan.rpc(msg, this, Self::doc_get_sync_peers).await,

AuthorList(msg) => chan.server_streaming(msg, this, Self::author_list).await,
AuthorCreate(msg) => chan.rpc(msg, this, Self::author_create).await,
AuthorImport(msg) => chan.rpc(msg, this, Self::author_import).await,
AuthorExport(msg) => chan.rpc(msg, this, Self::author_export).await,
AuthorDelete(msg) => chan.rpc(msg, this, Self::author_delete).await,
AuthorGetDefault(msg) => chan.rpc(msg, this, Self::author_default).await,
AuthorSetDefault(msg) => chan.rpc(msg, this, Self::author_set_default).await,
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use anyhow::Result;
use futures_util::{Stream, StreamExt};

pub mod authors;
pub mod docs;

fn flatten<T, E1, E2>(
Expand Down
9 changes: 1 addition & 8 deletions src/rpc/client/authors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,7 @@ use quic_rpc::{client::BoxedConnector, Connector};
use super::flatten;
#[doc(inline)]
pub use crate::engine::{Origin, SyncEvent, SyncReason};
use crate::{
actor::ImportAuthorAction,
rpc::proto::{
AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest, AuthorGetDefaultRequest,
AuthorListRequest, AuthorSetDefaultRequest, RpcService,
},
Author, AuthorId,
};
use crate::{actor::ImportAuthorAction, rpc::proto::RpcService, Author, AuthorId};

/// Iroh docs client.
#[derive(Debug, Clone)]
Expand Down
7 changes: 1 addition & 6 deletions src/rpc/client/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use quic_rpc::{
};
use serde::{Deserialize, Serialize};

use super::{authors, flatten};
use super::flatten;
use crate::{
actor::OpenState,
rpc::proto::{
Expand Down Expand Up @@ -56,11 +56,6 @@ impl<C: Connector<RpcService>> Client<C> {
Self { rpc }
}

/// Returns an authors client.
pub fn authors(&self) -> authors::Client<C> {
authors::Client::new(self.rpc.clone())
}

/// Creates a client.
pub async fn create(&self) -> Result<Doc<C>> {
let res = self.rpc.rpc(CreateRequest {}).await??;
Expand Down
Loading

0 comments on commit 112d87b

Please sign in to comment.