diff --git a/Cargo.lock b/Cargo.lock index 97c67d8fe0..d9e14fde06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2778,6 +2778,7 @@ dependencies = [ "human-time", "indicatif", "iroh", + "iroh-docs", "iroh-gossip", "iroh-metrics", "nix 0.27.1", @@ -2864,7 +2865,7 @@ dependencies = [ [[package]] name = "iroh-docs" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh-docs?branch=main#c3017de4573930fd03a05ec4aa6c7ab7a84f1890" +source = "git+https://github.com/n0-computer/iroh-docs?branch=main#9166b226fedc889218986eb6ef2a3d3cdcdee367" dependencies = [ "anyhow", "async-channel", @@ -2883,15 +2884,20 @@ dependencies = [ "iroh-net", "iroh-router", "lru", + "nested_enum_utils", "num_enum", + "portable-atomic", "postcard", + "quic-rpc", + "quic-rpc-derive", "rand", "rand_core", "redb 1.5.1", "redb 2.2.0", "self_cell", "serde", - "strum 0.25.0", + "serde-error", + "strum 0.26.3", "tempfile", "thiserror", "tokio", diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index 5a76c09c1b..d36cbe0111 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -42,6 +42,7 @@ human-time = "0.1.6" indicatif = { version = "0.17", features = ["tokio"] } iroh = { version = "0.28.1", path = "../iroh", features = ["metrics"] } iroh-gossip = "0.28.1" +iroh-docs = { version = "0.28.0", features = ["rpc"]} iroh-metrics = { version = "0.28.0" } parking_lot = "0.12.1" pkarr = { version = "2.2.0", default-features = false } diff --git a/iroh-cli/src/commands/docs.rs b/iroh-cli/src/commands/docs.rs index 4846db1208..fc827890d6 100644 --- a/iroh-cli/src/commands/docs.rs +++ b/iroh-cli/src/commands/docs.rs @@ -18,17 +18,17 @@ use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressS use iroh::{ base::{base32::fmt_short, node_addr::AddrInfoOptions}, blobs::{provider::AddProgress, util::SetTagOption, Hash, Tag}, - client::{ - blobs::WrapOption, - docs::{Doc, Entry, LiveEvent, Origin, ShareMode}, - Iroh, - }, + client::{blobs::WrapOption, Doc, Iroh}, docs::{ store::{DownloadPolicy, FilterKind, Query, SortDirection}, AuthorId, DocTicket, NamespaceId, }, util::fs::{path_content_info, path_to_key, PathContent}, }; +use iroh_docs::{ + engine::Origin, + rpc::client::docs::{Entry, LiveEvent, ShareMode}, +}; use tokio::io::AsyncReadExt; use crate::config::ConsoleEnv; @@ -414,7 +414,7 @@ impl DocCommands { let mut stream = doc.get_many(query).await?; while let Some(entry) = stream.try_next().await? { - println!("{}", fmt_entry(&doc, &entry, mode).await); + println!("{}", fmt_entry(&iroh.blobs(), &entry, mode).await); } } Self::Keys { @@ -440,7 +440,7 @@ impl DocCommands { query = query.sort_by(sort.into(), direction); let mut stream = doc.get_many(query).await?; while let Some(entry) = stream.try_next().await? { - println!("{}", fmt_entry(&doc, &entry, mode).await); + println!("{}", fmt_entry(&iroh.blobs(), &entry, mode).await); } } Self::Leave { doc } => { @@ -516,7 +516,7 @@ impl DocCommands { } Some(e) => e, }; - match entry.content_reader(&doc).await { + match iroh.blobs().read(entry.content_hash()).await { Ok(mut content) => { if let Some(dir) = path.parent() { if let Err(err) = std::fs::create_dir_all(dir) { @@ -547,13 +547,14 @@ impl DocCommands { Self::Watch { doc } => { let doc = get_doc(iroh, env, doc).await?; let mut stream = doc.subscribe().await?; + let blobs = iroh.blobs(); while let Some(event) = stream.next().await { let event = event?; match event { LiveEvent::InsertLocal { entry } => { println!( "local change: {}", - fmt_entry(&doc, &entry, DisplayContentMode::Auto).await + fmt_entry(&blobs, &entry, DisplayContentMode::Auto).await ) } LiveEvent::InsertRemote { @@ -563,17 +564,17 @@ impl DocCommands { } => { let content = match content_status { iroh::docs::ContentStatus::Complete => { - fmt_entry(&doc, &entry, DisplayContentMode::Auto).await + fmt_entry(&blobs, &entry, DisplayContentMode::Auto).await } iroh::docs::ContentStatus::Incomplete => { let (Ok(content) | Err(content)) = - fmt_content(&doc, &entry, DisplayContentMode::ShortHash) + fmt_content(&blobs, &entry, DisplayContentMode::ShortHash) .await; format!("", content, human_len(&entry)) } iroh::docs::ContentStatus::Missing => { let (Ok(content) | Err(content)) = - fmt_content(&doc, &entry, DisplayContentMode::ShortHash) + fmt_content(&blobs, &entry, DisplayContentMode::ShortHash) .await; format!("", content, human_len(&entry)) } @@ -679,14 +680,19 @@ impl DocCommands { /// Gets the document given the client, the environment (and maybe the [`NamespaceID`]). async fn get_doc(iroh: &Iroh, env: &ConsoleEnv, id: Option) -> anyhow::Result { + let doc_id = env.doc(id)?; iroh.docs() - .open(env.doc(id)?) + .open(doc_id) .await? .context("Document not found") } /// Formats the content. If an error occurs it's returned in a formatted, friendly way. -async fn fmt_content(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> Result { +async fn fmt_content( + blobs: &iroh::client::blobs::Client, + entry: &Entry, + mode: DisplayContentMode, +) -> Result { let read_failed = |err: anyhow::Error| format!(""); let encode_hex = |err: std::string::FromUtf8Error| format!("0x{}", hex::encode(err.as_bytes())); let as_utf8 = |buf: Vec| String::from_utf8(buf).map(|repr| format!("\"{repr}\"")); @@ -695,11 +701,17 @@ async fn fmt_content(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> Resu DisplayContentMode::Auto => { if entry.content_len() < MAX_DISPLAY_CONTENT_LEN { // small content: read fully as UTF-8 - let bytes = entry.content_bytes(doc).await.map_err(read_failed)?; + let bytes = blobs + .read_to_bytes(entry.content_hash()) + .await + .map_err(read_failed)?; Ok(as_utf8(bytes.into()).unwrap_or_else(encode_hex)) } else { // large content: read just the first part as UTF-8 - let mut blob_reader = entry.content_reader(doc).await.map_err(read_failed)?; + let mut blob_reader = blobs + .read(entry.content_hash()) + .await + .map_err(read_failed)?; let mut buf = Vec::with_capacity(MAX_DISPLAY_CONTENT_LEN as usize + 5); blob_reader @@ -714,7 +726,10 @@ async fn fmt_content(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> Resu } DisplayContentMode::Content => { // read fully as UTF-8 - let bytes = entry.content_bytes(doc).await.map_err(read_failed)?; + let bytes = blobs + .read_to_bytes(entry.content_hash()) + .await + .map_err(read_failed)?; Ok(as_utf8(bytes.into()).unwrap_or_else(encode_hex)) } DisplayContentMode::ShortHash => { @@ -735,12 +750,16 @@ fn human_len(entry: &Entry) -> HumanBytes { /// Formats an entry for display as a `String`. #[must_use = "this won't be printed, you need to print it yourself"] -async fn fmt_entry(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> String { +async fn fmt_entry( + blobs: &iroh::client::blobs::Client, + entry: &Entry, + mode: DisplayContentMode, +) -> String { let key = std::str::from_utf8(entry.key()) .unwrap_or("") .bold(); let author = fmt_short(entry.author()); - let (Ok(content) | Err(content)) = fmt_content(doc, entry, mode).await; + let (Ok(content) | Err(content)) = fmt_content(blobs, entry, mode).await; let len = human_len(entry); format!("@{author}: {key} = {content} ({len})") } diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 6ba0e6320e..89d745caf5 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -36,7 +36,7 @@ iroh-router = { version = "0.28.0" } nested_enum_utils = "0.1.0" num_cpus = { version = "1.15.0" } portable-atomic = "1" -iroh-docs = { version = "0.28.0" } +iroh-docs = { version = "0.28.0", features = ["rpc"] } iroh-gossip = "0.28.1" parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } diff --git a/iroh/examples/client.rs b/iroh/examples/client.rs index 3cc0840b2f..4d2338012a 100644 --- a/iroh/examples/client.rs +++ b/iroh/examples/client.rs @@ -16,6 +16,7 @@ async fn main() -> anyhow::Result<()> { // Could also use `node` directly, as it derefs to the client. let client = node.client(); + let blobs = client.blobs(); let doc = client.docs().create().await?; let author = client.authors().default().await?; @@ -24,8 +25,7 @@ async fn main() -> anyhow::Result<()> { let mut stream = doc.get_many(Query::all()).await?; while let Some(entry) = stream.try_next().await? { println!("entry {}", fmt_entry(&entry)); - // You can pass either `&doc` or the `client`. - let content = entry.content_bytes(&doc).await?; + let content = blobs.read_to_bytes(entry.content_hash()).await?; println!(" content {}", std::str::from_utf8(&content)?) } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index c74d208281..e8ffc051a1 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -14,13 +14,12 @@ pub use crate::rpc_protocol::RpcService; mod quic; -pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; -pub use self::{docs::Doc, net::NodeStatus}; - -pub mod authors; pub use iroh_blobs::rpc::client::{blobs, tags}; +pub use iroh_docs::rpc::client::{authors, docs, docs::Doc}; pub use iroh_gossip::rpc::client as gossip; -pub mod docs; + +pub use self::net::NodeStatus; +pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; pub mod net; // Keep this type exposed, otherwise every occurrence of `RpcClient` in the API @@ -61,13 +60,13 @@ impl Iroh { } /// Returns the docs client. - pub fn docs(&self) -> &docs::Client { - docs::Client::ref_cast(&self.rpc) + pub fn docs(&self) -> iroh_docs::rpc::client::docs::Client { + iroh_docs::rpc::client::docs::Client::new(self.rpc.clone().map().boxed()) } - /// Returns the authors client. - pub fn authors(&self) -> &authors::Client { - authors::Client::ref_cast(&self.rpc) + /// Returns the docs client. + pub fn authors(&self) -> iroh_docs::rpc::client::authors::Client { + iroh_docs::rpc::client::authors::Client::new(self.rpc.clone().map().boxed()) } /// Returns the tags client. diff --git a/iroh/src/client/authors.rs b/iroh/src/client/authors.rs deleted file mode 100644 index ac606a748c..0000000000 --- a/iroh/src/client/authors.rs +++ /dev/null @@ -1,134 +0,0 @@ -//! API for author management. -//! -//! The main entry point is the [`Client`]. -//! -//! You obtain a [`Client`] via [`Iroh::authors()`](crate::client::Iroh::authors). - -use anyhow::Result; -use futures_lite::{stream::StreamExt, Stream}; -use iroh_docs::{Author, AuthorId}; -use ref_cast::RefCast; - -use super::{flatten, RpcClient}; -use crate::rpc_protocol::authors::{ - CreateRequest, DeleteRequest, ExportRequest, GetDefaultRequest, ImportRequest, ListRequest, - SetDefaultRequest, -}; - -/// Iroh authors client. -#[derive(Debug, Clone, RefCast)] -#[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, -} - -impl Client { - /// Creates a new document author. - /// - /// You likely want to save the returned [`AuthorId`] somewhere so that you can use this author - /// again. - /// - /// If you need only a single author, use [`Self::default`]. - pub async fn create(&self) -> Result { - let res = self.rpc.rpc(CreateRequest).await??; - Ok(res.author_id) - } - - /// Returns the default document author of this node. - /// - /// On persistent nodes, the author is created on first start and its public key is saved - /// in the data directory. - /// - /// The default author can be set with [`Self::set_default`]. - pub async fn default(&self) -> Result { - let res = self.rpc.rpc(GetDefaultRequest).await??; - Ok(res.author_id) - } - - /// Sets the node-wide default author. - /// - /// If the author does not exist, an error is returned. - /// - /// On a persistent node, the author id will be saved to a file in the data directory and - /// reloaded after a restart. - pub async fn set_default(&self, author_id: AuthorId) -> Result<()> { - self.rpc.rpc(SetDefaultRequest { author_id }).await??; - Ok(()) - } - - /// Lists document authors for which we have a secret key. - /// - /// It's only possible to create writes from authors that we have the secret key of. - pub async fn list(&self) -> Result>> { - let stream = self.rpc.server_streaming(ListRequest {}).await?; - Ok(flatten(stream).map(|res| res.map(|res| res.author_id))) - } - - /// Exports the given author. - /// - /// Warning: The [`Author`] struct contains sensitive data. - pub async fn export(&self, author: AuthorId) -> Result> { - let res = self.rpc.rpc(ExportRequest { author }).await??; - Ok(res.author) - } - - /// Imports the given author. - /// - /// Warning: The [`Author`] struct contains sensitive data. - pub async fn import(&self, author: Author) -> Result<()> { - self.rpc.rpc(ImportRequest { author }).await??; - Ok(()) - } - - /// Deletes the given author by id. - /// - /// Warning: This permanently removes this author. - /// - /// Returns an error if attempting to delete the default author. - pub async fn delete(&self, author: AuthorId) -> Result<()> { - self.rpc.rpc(DeleteRequest { author }).await??; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::node::Node; - - #[tokio::test] - async fn test_authors() -> Result<()> { - let node = Node::memory().enable_docs().spawn().await?; - - // default author always exists - let authors: Vec<_> = node.authors().list().await?.try_collect().await?; - assert_eq!(authors.len(), 1); - let default_author = node.authors().default().await?; - assert_eq!(authors, vec![default_author]); - - let author_id = node.authors().create().await?; - - let authors: Vec<_> = node.authors().list().await?.try_collect().await?; - assert_eq!(authors.len(), 2); - - let author = node - .authors() - .export(author_id) - .await? - .expect("should have author"); - node.authors().delete(author_id).await?; - let authors: Vec<_> = node.authors().list().await?.try_collect().await?; - assert_eq!(authors.len(), 1); - - node.authors().import(author).await?; - - let authors: Vec<_> = node.authors().list().await?.try_collect().await?; - assert_eq!(authors.len(), 2); - - assert!(node.authors().default().await? != author_id); - node.authors().set_default(author_id).await?; - assert_eq!(node.authors().default().await?, author_id); - - Ok(()) - } -} diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs deleted file mode 100644 index a0c2237260..0000000000 --- a/iroh/src/client/docs.rs +++ /dev/null @@ -1,876 +0,0 @@ -//! API for document management. -//! -//! The main entry point is the [`Client`]. -//! -//! You obtain a [`Client`] via [`Iroh::docs()`](crate::client::Iroh::docs). - -use std::{ - path::{Path, PathBuf}, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use anyhow::{anyhow, Context as _, Result}; -use bytes::Bytes; -use derive_more::{Display, FromStr}; -use futures_lite::{Stream, StreamExt}; -use iroh_base::{key::PublicKey, node_addr::AddrInfoOptions}; -use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash}; -#[doc(inline)] -pub use iroh_docs::engine::{Origin, SyncEvent, SyncReason}; -use iroh_docs::{ - actor::OpenState, - store::{DownloadPolicy, Query}, - AuthorId, Capability, CapabilityKind, ContentStatus, DocTicket, NamespaceId, PeerIdBytes, - RecordIdentifier, -}; -use iroh_net::NodeAddr; -use portable_atomic::{AtomicBool, Ordering}; -use quic_rpc::message::RpcMsg; -use ref_cast::RefCast; -use serde::{Deserialize, Serialize}; - -use super::{blobs, flatten, RpcClient}; -use crate::rpc_protocol::{ - docs::{ - CloseRequest, CreateRequest, DelRequest, DelResponse, DocListRequest, DocSubscribeRequest, - DropRequest, ExportFileRequest, GetDownloadPolicyRequest, GetExactRequest, GetManyRequest, - GetSyncPeersRequest, ImportFileRequest, ImportRequest, LeaveRequest, OpenRequest, - SetDownloadPolicyRequest, SetHashRequest, SetRequest, ShareRequest, StartSyncRequest, - StatusRequest, - }, - RpcService, -}; - -/// Iroh docs client. -#[derive(Debug, Clone, RefCast)] -#[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, -} - -impl Client { - /// Creates a new document. - pub async fn create(&self) -> Result { - let res = self.rpc.rpc(CreateRequest {}).await??; - let doc = Doc::new(self.rpc.clone(), res.id); - Ok(doc) - } - - /// Deletes a document from the local node. - /// - /// This is a destructive operation. Both the document secret key and all entries in the - /// document will be permanently deleted from the node's storage. Content blobs will be deleted - /// through garbage collection unless they are referenced from another document or tag. - pub async fn drop_doc(&self, doc_id: NamespaceId) -> Result<()> { - self.rpc.rpc(DropRequest { doc_id }).await??; - Ok(()) - } - - /// Imports a document from a namespace capability. - /// - /// This does not start sync automatically. Use [`Doc::start_sync`] to start sync. - pub async fn import_namespace(&self, capability: Capability) -> Result { - let res = self.rpc.rpc(ImportRequest { capability }).await??; - let doc = Doc::new(self.rpc.clone(), res.doc_id); - Ok(doc) - } - - /// Imports a document from a ticket and joins all peers in the ticket. - pub async fn import(&self, ticket: DocTicket) -> Result { - let DocTicket { capability, nodes } = ticket; - let doc = self.import_namespace(capability).await?; - doc.start_sync(nodes).await?; - Ok(doc) - } - - /// Imports a document from a ticket, creates a subscription stream and joins all peers in the ticket. - /// - /// Returns the [`Doc`] and a [`Stream`] of [`LiveEvent`]s. - /// - /// The subscription stream is created before the sync is started, so the first call to this - /// method after starting the node is guaranteed to not miss any sync events. - pub async fn import_and_subscribe( - &self, - ticket: DocTicket, - ) -> Result<(Doc, impl Stream>)> { - let DocTicket { capability, nodes } = ticket; - let res = self.rpc.rpc(ImportRequest { capability }).await??; - let doc = Doc::new(self.rpc.clone(), res.doc_id); - let events = doc.subscribe().await?; - doc.start_sync(nodes).await?; - Ok((doc, events)) - } - - /// Lists all documents. - pub async fn list(&self) -> Result>> { - let stream = self.rpc.server_streaming(DocListRequest {}).await?; - Ok(flatten(stream).map(|res| res.map(|res| (res.id, res.capability)))) - } - - /// Returns a [`Doc`] client for a single document. - /// - /// Returns None if the document cannot be found. - pub async fn open(&self, id: NamespaceId) -> Result> { - self.rpc.rpc(OpenRequest { doc_id: id }).await??; - let doc = Doc::new(self.rpc.clone(), id); - Ok(Some(doc)) - } -} - -/// Document handle -#[derive(Debug, Clone)] -pub struct Doc(Arc); - -impl PartialEq for Doc { - fn eq(&self, other: &Self) -> bool { - self.0.id == other.0.id - } -} - -impl Eq for Doc {} - -#[derive(Debug)] -struct DocInner { - id: NamespaceId, - rpc: RpcClient, - closed: AtomicBool, - rt: tokio::runtime::Handle, -} - -impl Drop for DocInner { - fn drop(&mut self) { - let doc_id = self.id; - let rpc = self.rpc.clone(); - if !self.closed.swap(true, Ordering::Relaxed) { - self.rt.spawn(async move { - rpc.rpc(CloseRequest { doc_id }).await.ok(); - }); - } - } -} - -impl Doc { - fn new(rpc: RpcClient, id: NamespaceId) -> Self { - Self(Arc::new(DocInner { - rpc, - id, - closed: AtomicBool::new(false), - rt: tokio::runtime::Handle::current(), - })) - } - - async fn rpc(&self, msg: M) -> Result - where - M: RpcMsg, - { - let res = self.0.rpc.rpc(msg).await?; - Ok(res) - } - - /// Returns the document id of this doc. - pub fn id(&self) -> NamespaceId { - self.0.id - } - - /// Closes the document. - pub async fn close(&self) -> Result<()> { - if !self.0.closed.swap(true, Ordering::Relaxed) { - self.rpc(CloseRequest { doc_id: self.id() }).await??; - } - Ok(()) - } - - fn ensure_open(&self) -> Result<()> { - if self.0.closed.load(Ordering::Relaxed) { - Err(anyhow!("document is closed")) - } else { - Ok(()) - } - } - - /// Sets the content of a key to a byte array. - pub async fn set_bytes( - &self, - author_id: AuthorId, - key: impl Into, - value: impl Into, - ) -> Result { - self.ensure_open()?; - let res = self - .rpc(SetRequest { - doc_id: self.id(), - author_id, - key: key.into(), - value: value.into(), - }) - .await??; - Ok(res.entry.content_hash()) - } - - /// Sets an entries on the doc via its key, hash, and size. - pub async fn set_hash( - &self, - author_id: AuthorId, - key: impl Into, - hash: Hash, - size: u64, - ) -> Result<()> { - self.ensure_open()?; - self.rpc(SetHashRequest { - doc_id: self.id(), - author_id, - key: key.into(), - hash, - size, - }) - .await??; - Ok(()) - } - - /// Adds an entry from an absolute file path - pub async fn import_file( - &self, - author: AuthorId, - key: Bytes, - path: impl AsRef, - in_place: bool, - ) -> Result { - self.ensure_open()?; - let stream = self - .0 - .rpc - .server_streaming(ImportFileRequest { - doc_id: self.id(), - author_id: author, - path: path.as_ref().into(), - key, - in_place, - }) - .await?; - Ok(ImportFileProgress::new(stream)) - } - - /// Exports an entry as a file to a given absolute path. - pub async fn export_file( - &self, - entry: Entry, - path: impl AsRef, - mode: ExportMode, - ) -> Result { - self.ensure_open()?; - let stream = self - .0 - .rpc - .server_streaming(ExportFileRequest { - entry: entry.0, - path: path.as_ref().into(), - mode, - }) - .await?; - Ok(ExportFileProgress::new(stream)) - } - - /// Deletes entries that match the given `author` and key `prefix`. - /// - /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other - /// entries whose key starts with or is equal to the given `prefix`. - /// - /// Returns the number of entries deleted. - pub async fn del(&self, author_id: AuthorId, prefix: impl Into) -> Result { - self.ensure_open()?; - let res = self - .rpc(DelRequest { - doc_id: self.id(), - author_id, - prefix: prefix.into(), - }) - .await??; - let DelResponse { removed } = res; - Ok(removed) - } - - /// Returns an entry for a key and author. - /// - /// Optionally also returns the entry unless it is empty (i.e. a deletion marker). - pub async fn get_exact( - &self, - author: AuthorId, - key: impl AsRef<[u8]>, - include_empty: bool, - ) -> Result> { - self.ensure_open()?; - let res = self - .rpc(GetExactRequest { - author, - key: key.as_ref().to_vec().into(), - doc_id: self.id(), - include_empty, - }) - .await??; - Ok(res.entry.map(|entry| entry.into())) - } - - /// Returns all entries matching the query. - pub async fn get_many( - &self, - query: impl Into, - ) -> Result>> { - self.ensure_open()?; - let stream = self - .0 - .rpc - .server_streaming(GetManyRequest { - doc_id: self.id(), - query: query.into(), - }) - .await?; - Ok(flatten(stream).map(|res| res.map(|res| res.entry.into()))) - } - - /// Returns a single entry. - pub async fn get_one(&self, query: impl Into) -> Result> { - self.get_many(query).await?.next().await.transpose() - } - - /// Shares this document with peers over a ticket. - pub async fn share( - &self, - mode: ShareMode, - addr_options: AddrInfoOptions, - ) -> anyhow::Result { - self.ensure_open()?; - let res = self - .rpc(ShareRequest { - doc_id: self.id(), - mode, - addr_options, - }) - .await??; - Ok(res.0) - } - - /// Starts to sync this document with a list of peers. - pub async fn start_sync(&self, peers: Vec) -> Result<()> { - self.ensure_open()?; - let _res = self - .rpc(StartSyncRequest { - doc_id: self.id(), - peers, - }) - .await??; - Ok(()) - } - - /// Stops the live sync for this document. - pub async fn leave(&self) -> Result<()> { - self.ensure_open()?; - let _res = self.rpc(LeaveRequest { doc_id: self.id() }).await??; - Ok(()) - } - - /// Subscribes to events for this document. - pub async fn subscribe(&self) -> anyhow::Result>> { - self.ensure_open()?; - let stream = self - .0 - .rpc - .try_server_streaming(DocSubscribeRequest { doc_id: self.id() }) - .await?; - Ok(stream.map(|res| match res { - Ok(res) => Ok(res.event.into()), - Err(err) => Err(err.into()), - })) - } - - /// Returns status info for this document - pub async fn status(&self) -> anyhow::Result { - self.ensure_open()?; - let res = self.rpc(StatusRequest { doc_id: self.id() }).await??; - Ok(res.status) - } - - /// Sets the download policy for this document - pub async fn set_download_policy(&self, policy: DownloadPolicy) -> Result<()> { - self.rpc(SetDownloadPolicyRequest { - doc_id: self.id(), - policy, - }) - .await??; - Ok(()) - } - - /// Returns the download policy for this document - pub async fn get_download_policy(&self) -> Result { - let res = self - .rpc(GetDownloadPolicyRequest { doc_id: self.id() }) - .await??; - Ok(res.policy) - } - - /// Returns sync peers for this document - pub async fn get_sync_peers(&self) -> Result>> { - let res = self - .rpc(GetSyncPeersRequest { doc_id: self.id() }) - .await??; - Ok(res.peers) - } -} - -impl<'a> From<&'a Doc> for &'a RpcClient { - fn from(doc: &'a Doc) -> &'a RpcClient { - &doc.0.rpc - } -} - -/// A single entry in a [`Doc`]. -#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] -pub struct Entry(iroh_docs::Entry); - -impl From for Entry { - fn from(value: iroh_docs::Entry) -> Self { - Self(value) - } -} - -impl From for Entry { - fn from(value: iroh_docs::SignedEntry) -> Self { - Self(value.into()) - } -} - -impl Entry { - /// Returns the [`RecordIdentifier`] for this entry. - pub fn id(&self) -> &RecordIdentifier { - self.0.id() - } - - /// Returns the [`AuthorId`] of this entry. - pub fn author(&self) -> AuthorId { - self.0.author() - } - - /// Returns the [`struct@Hash`] of the content data of this record. - pub fn content_hash(&self) -> Hash { - self.0.content_hash() - } - - /// Returns the length of the data addressed by this record's content hash. - pub fn content_len(&self) -> u64 { - self.0.content_len() - } - - /// Returns the key of this entry. - pub fn key(&self) -> &[u8] { - self.0.key() - } - - /// Returns the timestamp of this entry. - pub fn timestamp(&self) -> u64 { - self.0.timestamp() - } - - /// Reads the content of an [`Entry`] as a streaming [`blobs::Reader`]. - /// - /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. - pub async fn content_reader(&self, client: impl Into<&RpcClient>) -> Result { - let client: RpcClient = client.into().clone(); - let client: quic_rpc::RpcClient = client.map(); - blobs::Reader::from_rpc_read(&client, self.content_hash()).await - } - - /// Reads all content of an [`Entry`] into a buffer. - /// - /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. - pub async fn content_bytes(&self, client: impl Into<&RpcClient>) -> Result { - let client: RpcClient = client.into().clone(); - let client: quic_rpc::RpcClient = client.map(); - - blobs::Reader::from_rpc_read(&client, self.content_hash()) - .await? - .read_to_bytes() - .await - } -} - -/// Progress messages for an doc import operation -/// -/// An import operation involves computing the outboard of a file, and then -/// either copying or moving the file into the database, then setting the author, hash, size, and tag of that -/// file as an entry in the doc. -#[derive(Debug, Serialize, Deserialize)] -pub enum ImportProgress { - /// An item was found with name `name`, from now on referred to via `id`. - Found { - /// A new unique id for this entry. - id: u64, - /// The name of the entry. - name: String, - /// The size of the entry in bytes. - size: u64, - }, - /// We got progress ingesting item `id`. - Progress { - /// The unique id of the entry. - id: u64, - /// The offset of the progress, in bytes. - offset: u64, - }, - /// We are done adding `id` to the data store and the hash is `hash`. - IngestDone { - /// The unique id of the entry. - id: u64, - /// The hash of the entry. - hash: Hash, - }, - /// We are done setting the entry to the doc. - AllDone { - /// The key of the entry - key: Bytes, - }, - /// We got an error and need to abort. - /// - /// This will be the last message in the stream. - Abort(serde_error::Error), -} - -/// Intended capability for document share tickets -#[derive(Serialize, Deserialize, Debug, Clone, Display, FromStr)] -pub enum ShareMode { - /// Read-only access - Read, - /// Write access - Write, -} - -/// Events informing about actions of the live sync progress. -#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)] -pub enum LiveEvent { - /// A local insertion. - InsertLocal { - /// The inserted entry. - entry: Entry, - }, - /// Received a remote insert. - InsertRemote { - /// The peer that sent us the entry. - from: PublicKey, - /// The inserted entry. - entry: Entry, - /// If the content is available at the local node - content_status: ContentStatus, - }, - /// The content of an entry was downloaded and is now available at the local node - ContentReady { - /// The content hash of the newly available entry content - hash: Hash, - }, - /// We have a new neighbor in the swarm. - NeighborUp(PublicKey), - /// We lost a neighbor in the swarm. - NeighborDown(PublicKey), - /// A set-reconciliation sync finished. - SyncFinished(SyncEvent), - /// All pending content is now ready. - /// - /// This event signals that all queued content downloads from the last sync run have either - /// completed or failed. - /// - /// It will only be emitted after a [`Self::SyncFinished`] event, never before. - /// - /// Receiving this event does not guarantee that all content in the document is available. If - /// blobs failed to download, this event will still be emitted after all operations completed. - PendingContentReady, -} - -impl From for LiveEvent { - fn from(event: crate::docs::engine::LiveEvent) -> LiveEvent { - match event { - crate::docs::engine::LiveEvent::InsertLocal { entry } => Self::InsertLocal { - entry: entry.into(), - }, - crate::docs::engine::LiveEvent::InsertRemote { - from, - entry, - content_status, - } => Self::InsertRemote { - from, - content_status, - entry: entry.into(), - }, - crate::docs::engine::LiveEvent::ContentReady { hash } => Self::ContentReady { hash }, - crate::docs::engine::LiveEvent::NeighborUp(node) => Self::NeighborUp(node), - crate::docs::engine::LiveEvent::NeighborDown(node) => Self::NeighborDown(node), - crate::docs::engine::LiveEvent::SyncFinished(details) => Self::SyncFinished(details), - crate::docs::engine::LiveEvent::PendingContentReady => Self::PendingContentReady, - } - } -} - -/// Progress stream for [`Doc::import_file`]. -#[derive(derive_more::Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct ImportFileProgress { - #[debug(skip)] - stream: Pin> + Send + Unpin + 'static>>, -} - -impl ImportFileProgress { - fn new( - stream: (impl Stream, impl Into>> - + Send - + Unpin - + 'static), - ) -> Self { - let stream = stream.map(|item| match item { - Ok(item) => Ok(item.into()), - Err(err) => Err(err.into()), - }); - Self { - stream: Box::pin(stream), - } - } - - /// Finishes writing the stream, ignoring all intermediate progress events. - /// - /// Returns a [`ImportFileOutcome`] which contains a tag, key, and hash and the size of the - /// content. - pub async fn finish(mut self) -> Result { - let mut entry_size = 0; - let mut entry_hash = None; - while let Some(msg) = self.next().await { - match msg? { - ImportProgress::Found { size, .. } => { - entry_size = size; - } - ImportProgress::AllDone { key } => { - let hash = entry_hash - .context("expected DocImportProgress::IngestDone event to occur")?; - let outcome = ImportFileOutcome { - hash, - key, - size: entry_size, - }; - return Ok(outcome); - } - ImportProgress::Abort(err) => return Err(err.into()), - ImportProgress::Progress { .. } => {} - ImportProgress::IngestDone { hash, .. } => { - entry_hash = Some(hash); - } - } - } - Err(anyhow!("Response stream ended prematurely")) - } -} - -/// Outcome of a [`Doc::import_file`] operation -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ImportFileOutcome { - /// The hash of the entry's content - pub hash: Hash, - /// The size of the entry - pub size: u64, - /// The key of the entry - pub key: Bytes, -} - -impl Stream for ImportFileProgress { - type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_next(cx) - } -} - -/// Progress stream for [`Doc::export_file`]. -#[derive(derive_more::Debug)] -pub struct ExportFileProgress { - #[debug(skip)] - stream: Pin> + Send + Unpin + 'static>>, -} -impl ExportFileProgress { - fn new( - stream: (impl Stream, impl Into>> - + Send - + Unpin - + 'static), - ) -> Self { - let stream = stream.map(|item| match item { - Ok(item) => Ok(item.into()), - Err(err) => Err(err.into()), - }); - Self { - stream: Box::pin(stream), - } - } - - /// Iterates through the export progress stream, returning when the stream has completed. - /// - /// Returns a [`ExportFileOutcome`] which contains a file path the data was written to and the size of the content. - pub async fn finish(mut self) -> Result { - let mut total_size = 0; - let mut path = None; - while let Some(msg) = self.next().await { - match msg? { - ExportProgress::Found { size, outpath, .. } => { - total_size = size.value(); - path = Some(outpath); - } - ExportProgress::AllDone => { - let path = path.context("expected ExportProgress::Found event to occur")?; - let outcome = ExportFileOutcome { - size: total_size, - path, - }; - return Ok(outcome); - } - ExportProgress::Done { .. } => {} - ExportProgress::Abort(err) => return Err(anyhow!(err)), - ExportProgress::Progress { .. } => {} - } - } - Err(anyhow!("Response stream ended prematurely")) - } -} - -/// Outcome of a [`Doc::export_file`] operation -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ExportFileOutcome { - /// The size of the entry - size: u64, - /// The path to which the entry was saved - path: PathBuf, -} - -impl Stream for ExportFileProgress { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_next(cx) - } -} - -#[cfg(test)] -mod tests { - use rand::RngCore; - use tokio::io::AsyncWriteExt; - - use super::*; - - #[tokio::test] - async fn test_drop_doc_client_sync() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let node = crate::node::Node::memory().enable_docs().spawn().await?; - - let client = node.client(); - let doc = client.docs().create().await?; - - let res = std::thread::spawn(move || { - drop(doc); - drop(node); - }); - - tokio::task::spawn_blocking(move || res.join().map_err(|e| anyhow::anyhow!("{:?}", e))) - .await??; - - Ok(()) - } - - /// Test that closing a doc does not close other instances. - #[tokio::test] - async fn test_doc_close() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let node = crate::node::Node::memory().enable_docs().spawn().await?; - let author = node.authors().default().await?; - // open doc two times - let doc1 = node.docs().create().await?; - let doc2 = node.docs().open(doc1.id()).await?.expect("doc to exist"); - // close doc1 instance - doc1.close().await?; - // operations on doc1 now fail. - assert!(doc1.set_bytes(author, "foo", "bar").await.is_err()); - // dropping doc1 will close the doc if not already closed - // wait a bit because the close-on-drop spawns a task for which we cannot track completion. - drop(doc1); - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - // operations on doc2 still succeed - doc2.set_bytes(author, "foo", "bar").await?; - Ok(()) - } - - #[tokio::test] - async fn test_doc_import_export() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let node = crate::node::Node::memory().enable_docs().spawn().await?; - - // create temp file - let temp_dir = tempfile::tempdir().context("tempdir")?; - - let in_root = temp_dir.path().join("in"); - tokio::fs::create_dir_all(in_root.clone()) - .await - .context("create dir all")?; - let out_root = temp_dir.path().join("out"); - - let path = in_root.join("test"); - - let size = 100; - let mut buf = vec![0u8; size]; - rand::thread_rng().fill_bytes(&mut buf); - let mut file = tokio::fs::File::create(path.clone()) - .await - .context("create file")?; - file.write_all(&buf.clone()).await.context("write_all")?; - file.flush().await.context("flush")?; - - // create doc & author - let client = node.client(); - let doc = client.docs().create().await.context("doc create")?; - let author = client.authors().create().await.context("author create")?; - - // import file - let import_outcome = doc - .import_file( - author, - crate::util::fs::path_to_key(path.clone(), None, Some(in_root))?, - path, - true, - ) - .await - .context("import file")? - .finish() - .await - .context("import finish")?; - - // export file - let entry = doc - .get_one(Query::author(author).key_exact(import_outcome.key)) - .await - .context("get one")? - .unwrap(); - let key = entry.key().to_vec(); - let export_outcome = doc - .export_file( - entry, - crate::util::fs::key_to_path(key, None, Some(out_root))?, - ExportMode::Copy, - ) - .await - .context("export file")? - .finish() - .await - .context("export finish")?; - - let got_bytes = tokio::fs::read(export_outcome.path) - .await - .context("tokio read")?; - assert_eq!(buf, got_bytes); - - Ok(()) - } -} diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index b852de2951..6906f7b4f5 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -49,13 +49,10 @@ //! manage and share content-addressed blobs of data //! - [tags](crate::client::tags): //! tags to tell iroh what data is important -//! - [gossip](iroh_gossip::RpcClient): +//! - [gossip](crate::client::gossip): //! exchange data with other nodes via a gossip protocol -//! -//! - [authors](crate::client::authors): -//! interact with document authors //! - [docs](crate::client::docs): -//! interact with documents +//! interact with documents and document authors //! //! The subsystem clients can be obtained cheaply from the main iroh client. //! They are also cheaply cloneable and can be shared across threads. diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 36cfb45c3f..7012865b61 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -72,7 +72,7 @@ mod nodes_storage; mod rpc; mod rpc_status; -pub(crate) use self::rpc::{RpcError, RpcResult}; +pub(crate) use self::rpc::RpcResult; pub use self::{ builder::{ Builder, DiscoveryConfig, DocsStorage, GcPolicy, ProtocolBuilder, StorageConfig, @@ -293,7 +293,7 @@ impl NodeInner { if let GcPolicy::Interval(gc_period) = gc_policy { let router = router.clone(); let handle = local_pool.spawn(move || async move { - let docs_engine = router.get_protocol::(DOCS_ALPN); + let docs_engine = router.get_protocol::>(DOCS_ALPN); let blobs = router .get_protocol::>(iroh_blobs::protocol::ALPN) .expect("missing blobs"); @@ -690,142 +690,4 @@ mod tests { ); Ok(()) } - - #[tokio::test] - async fn test_default_author_memory() -> Result<()> { - let iroh = Node::memory().enable_docs().spawn().await?; - let author = iroh.authors().default().await?; - assert!(iroh.authors().export(author).await?.is_some()); - assert!(iroh.authors().delete(author).await.is_err()); - Ok(()) - } - - #[cfg(feature = "fs-store")] - #[tokio::test] - async fn test_default_author_persist() -> Result<()> { - use crate::util::path::IrohPaths; - - let _guard = iroh_test::logging::setup(); - - let iroh_root_dir = tempfile::TempDir::new().unwrap(); - let iroh_root = iroh_root_dir.path(); - - // check that the default author exists and cannot be deleted. - let default_author = { - let iroh = Node::persistent(iroh_root) - .await - .unwrap() - .enable_docs() - .spawn() - .await - .unwrap(); - let author = iroh.authors().default().await.unwrap(); - assert!(iroh.authors().export(author).await.unwrap().is_some()); - assert!(iroh.authors().delete(author).await.is_err()); - iroh.shutdown().await.unwrap(); - author - }; - - // check that the default author is persisted across restarts. - { - let iroh = Node::persistent(iroh_root) - .await - .unwrap() - .enable_docs() - .spawn() - .await - .unwrap(); - let author = iroh.authors().default().await.unwrap(); - assert_eq!(author, default_author); - assert!(iroh.authors().export(author).await.unwrap().is_some()); - assert!(iroh.authors().delete(author).await.is_err()); - iroh.shutdown().await.unwrap(); - }; - - // check that a new default author is created if the default author file is deleted - // manually. - let default_author = { - tokio::fs::remove_file(IrohPaths::DefaultAuthor.with_root(iroh_root)) - .await - .unwrap(); - let iroh = Node::persistent(iroh_root) - .await - .unwrap() - .enable_docs() - .spawn() - .await - .unwrap(); - let author = iroh.authors().default().await.unwrap(); - assert!(author != default_author); - assert!(iroh.authors().export(author).await.unwrap().is_some()); - assert!(iroh.authors().delete(author).await.is_err()); - iroh.shutdown().await.unwrap(); - author - }; - - // check that the node fails to start if the default author is missing from the docs store. - { - let mut docs_store = iroh_docs::store::fs::Store::persistent( - IrohPaths::DocsDatabase.with_root(iroh_root), - ) - .unwrap(); - docs_store.delete_author(default_author).unwrap(); - docs_store.flush().unwrap(); - drop(docs_store); - let iroh = Node::persistent(iroh_root) - .await - .unwrap() - .enable_docs() - .spawn() - .await; - assert!(iroh.is_err()); - - // somehow the blob store is not shutdown correctly (yet?) on macos. - // so we give it some time until we find a proper fix. - #[cfg(target_os = "macos")] - tokio::time::sleep(Duration::from_secs(1)).await; - - tokio::fs::remove_file(IrohPaths::DefaultAuthor.with_root(iroh_root)) - .await - .unwrap(); - drop(iroh); - let iroh = Node::persistent(iroh_root) - .await - .unwrap() - .enable_docs() - .spawn() - .await; - assert!(iroh.is_ok()); - iroh.unwrap().shutdown().await.unwrap(); - } - - // check that the default author can be set manually and is persisted. - let default_author = { - let iroh = Node::persistent(iroh_root) - .await - .unwrap() - .enable_docs() - .spawn() - .await - .unwrap(); - let author = iroh.authors().create().await.unwrap(); - iroh.authors().set_default(author).await.unwrap(); - assert_eq!(iroh.authors().default().await.unwrap(), author); - iroh.shutdown().await.unwrap(); - author - }; - { - let iroh = Node::persistent(iroh_root) - .await - .unwrap() - .enable_docs() - .spawn() - .await - .unwrap(); - assert_eq!(iroh.authors().default().await.unwrap(), default_author); - iroh.shutdown().await.unwrap(); - } - - Ok(()) - } } diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index c66f1cb51d..e6524f41e6 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -80,7 +80,8 @@ async fn spawn_docs( endpoint: Endpoint, gossip: Gossip, downloader: Downloader, -) -> anyhow::Result> { + local_pool_handle: LocalPoolHandle, +) -> anyhow::Result>> { let docs_store = match storage { DocsStorage::Disabled => return Ok(None), DocsStorage::Memory => iroh_docs::store::fs::Store::memory(), @@ -93,6 +94,7 @@ async fn spawn_docs( blobs_store, downloader, default_author_storage, + local_pool_handle, ) .await?; Ok(Some(engine)) @@ -684,6 +686,7 @@ where endpoint.clone(), gossip.clone(), downloader.clone(), + lp.handle().clone(), ) .await?; @@ -833,7 +836,7 @@ impl ProtocolBuilder { store: D, gossip: Gossip, downloader: Downloader, - docs: Option, + docs: Option>, ) -> Self { // Register blobs. let blobs_proto = BlobsProtocol::new_with_events( diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 88dfb2355d..ae2a0ec0d7 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -1,28 +1,17 @@ -use std::{ - fmt::Debug, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{fmt::Debug, sync::Arc, time::Duration}; -use anyhow::{anyhow, Result}; -use futures_lite::{Stream, StreamExt}; -use iroh_base::{key::NodeId, node_addr::NodeAddr}; +use anyhow::Result; +use futures_lite::Stream; use iroh_blobs::{ - export::ExportProgress, - net_protocol::Blobs as BlobsProtocol, - store::{ExportFormat, ImportProgress, Store as BaoStore}, - util::{ - local_pool::LocalPoolHandle, - progress::{AsyncChannelProgressSender, ProgressSender}, - }, - BlobFormat, HashAndFormat, + net_protocol::Blobs as BlobsProtocol, store::Store as BaoStore, + util::local_pool::LocalPoolHandle, }; -use iroh_docs::{engine::Engine, net::DOCS_ALPN}; +use iroh_docs::net::DOCS_ALPN; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; +use iroh_net::{NodeAddr, NodeId}; use iroh_router::Router; use quic_rpc::server::{RpcChannel, RpcServerError}; use tokio::task::JoinSet; -use tokio_util::either::Either; use tracing::{debug, info, warn}; use super::IrohServerEndpoint; @@ -31,11 +20,6 @@ use crate::{ client::NodeStatus, node::NodeInner, rpc_protocol::{ - authors, - docs::{ - ExportFileRequest, ExportFileResponse, ImportFileRequest, ImportFileResponse, - Request as DocsRequest, SetHashRequest, - }, net::{ self, AddAddrRequest, AddrRequest, IdRequest, NodeWatchRequest, RelayRequest, RemoteInfoRequest, RemoteInfoResponse, RemoteInfosIterRequest, RemoteInfosIterResponse, @@ -46,8 +30,6 @@ use crate::{ }, }; -mod docs; - const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1); pub(crate) type RpcError = serde_error::Error; pub(crate) type RpcResult = Result; @@ -65,46 +47,12 @@ impl Handler { } impl Handler { - fn docs(&self) -> Option> { - self.router.get_protocol::(DOCS_ALPN) - } - fn blobs(&self) -> Arc> { self.router .get_protocol::>(iroh_blobs::protocol::ALPN) .expect("missing blobs") } - fn blobs_store(&self) -> D { - self.blobs().store().clone() - } - - async fn with_docs(self, f: F) -> RpcResult - where - T: Send + 'static, - F: FnOnce(Arc) -> Fut, - Fut: std::future::Future>, - { - if let Some(docs) = self.docs() { - f(docs).await - } else { - Err(docs_disabled()) - } - } - - fn with_docs_stream(self, f: F) -> impl Stream> - where - T: Send + 'static, - F: FnOnce(Arc) -> S, - S: Stream>, - { - if let Some(docs) = self.docs() { - Either::Left(f(docs)) - } else { - Either::Right(futures_lite::stream::once(Err(docs_disabled()))) - } - } - pub(crate) fn spawn_rpc_request( inner: Arc>, join_set: &mut JoinSet>, @@ -183,60 +131,23 @@ impl Handler { .map_err(|e| e.errors_into()) } - async fn handle_authors_request( - self, - msg: authors::Request, - chan: RpcChannel, - ) -> Result<(), RpcServerError> { - use authors::Request::*; - match msg { - List(msg) => chan.server_streaming(msg, self, Self::author_list).await, - Create(msg) => chan.rpc(msg, self, Self::author_create).await, - Import(msg) => chan.rpc(msg, self, Self::author_import).await, - Export(msg) => chan.rpc(msg, self, Self::author_export).await, - Delete(msg) => chan.rpc(msg, self, Self::author_delete).await, - GetDefault(msg) => chan.rpc(msg, self, Self::author_default).await, - SetDefault(msg) => chan.rpc(msg, self, Self::author_set_default).await, - } - } - async fn handle_docs_request( self, - msg: DocsRequest, + msg: iroh_docs::rpc::proto::Request, chan: RpcChannel, ) -> Result<(), RpcServerError> { - use DocsRequest::*; - match msg { - Open(msg) => chan.rpc(msg, self, Self::doc_open).await, - Close(msg) => chan.rpc(msg, self, Self::doc_close).await, - Status(msg) => chan.rpc(msg, self, Self::doc_status).await, - List(msg) => chan.server_streaming(msg, self, Self::doc_list).await, - Create(msg) => chan.rpc(msg, self, Self::doc_create).await, - Drop(msg) => chan.rpc(msg, self, Self::doc_drop).await, - Import(msg) => chan.rpc(msg, self, Self::doc_import).await, - Set(msg) => chan.rpc(msg, self, Self::doc_set).await, - ImportFile(msg) => { - chan.server_streaming(msg, self, Self::doc_import_file) - .await - } - ExportFile(msg) => { - chan.server_streaming(msg, self, Self::doc_export_file) - .await - } - Del(msg) => chan.rpc(msg, self, Self::doc_del).await, - SetHash(msg) => chan.rpc(msg, self, Self::doc_set_hash).await, - Get(msg) => chan.server_streaming(msg, self, Self::doc_get_many).await, - GetExact(msg) => chan.rpc(msg, self, Self::doc_get_exact).await, - StartSync(msg) => chan.rpc(msg, self, Self::doc_start_sync).await, - Leave(msg) => chan.rpc(msg, self, Self::doc_leave).await, - Share(msg) => chan.rpc(msg, self, Self::doc_share).await, - Subscribe(msg) => { - chan.try_server_streaming(msg, self, Self::doc_subscribe) - .await - } - SetDownloadPolicy(msg) => chan.rpc(msg, self, Self::doc_set_download_policy).await, - GetDownloadPolicy(msg) => chan.rpc(msg, self, Self::doc_get_download_policy).await, - GetSyncPeers(msg) => chan.rpc(msg, self, Self::doc_get_sync_peers).await, + if let Some(docs) = self + .router + .get_protocol::>(DOCS_ALPN) + { + let chan = chan.map::(); + docs.handle_rpc_request(msg, chan) + .await + .map_err(|e| e.errors_into()) + } else { + Err(RpcServerError::SendError(anyhow::anyhow!( + "Docs is not enabled" + ))) } } @@ -254,143 +165,11 @@ impl Handler { self.handle_blobs_and_tags_request(msg, chan.map().boxed()) .await } - Authors(msg) => self.handle_authors_request(msg, chan).await, Docs(msg) => self.handle_docs_request(msg, chan).await, Gossip(msg) => self.handle_gossip_request(msg, chan).await, } } - fn doc_import_file(self, msg: ImportFileRequest) -> impl Stream { - // provide a little buffer so that we don't slow down the sender - let (tx, rx) = async_channel::bounded(32); - let tx2 = tx.clone(); - self.local_pool_handle().spawn_detached(|| async move { - if let Err(e) = self.doc_import_file0(msg, tx).await { - tx2.send(crate::client::docs::ImportProgress::Abort(RpcError::new( - &*e, - ))) - .await - .ok(); - } - }); - rx.map(ImportFileResponse) - } - - async fn doc_import_file0( - self, - msg: ImportFileRequest, - progress: async_channel::Sender, - ) -> anyhow::Result<()> { - use std::collections::BTreeMap; - - use iroh_blobs::store::ImportMode; - - use crate::client::docs::ImportProgress as DocImportProgress; - - let progress = AsyncChannelProgressSender::new(progress); - let names = Arc::new(Mutex::new(BTreeMap::new())); - // convert import progress to provide progress - let import_progress = progress.clone().with_filter_map(move |x| match x { - ImportProgress::Found { id, name } => { - names.lock().unwrap().insert(id, name); - None - } - ImportProgress::Size { id, size } => { - let name = names.lock().unwrap().remove(&id)?; - Some(DocImportProgress::Found { id, name, size }) - } - ImportProgress::OutboardProgress { id, offset } => { - Some(DocImportProgress::Progress { id, offset }) - } - ImportProgress::OutboardDone { hash, id } => { - Some(DocImportProgress::IngestDone { hash, id }) - } - _ => None, - }); - let ImportFileRequest { - doc_id, - author_id, - key, - path: root, - in_place, - } = msg; - // Check that the path is absolute and exists. - anyhow::ensure!(root.is_absolute(), "path must be absolute"); - anyhow::ensure!( - root.exists(), - "trying to add missing path: {}", - root.display() - ); - - let import_mode = match in_place { - true => ImportMode::TryReference, - false => ImportMode::Copy, - }; - - let blobs = self.blobs(); - let (temp_tag, size) = blobs - .store() - .import_file(root, import_mode, BlobFormat::Raw, import_progress) - .await?; - - let hash_and_format = temp_tag.inner(); - let HashAndFormat { hash, .. } = *hash_and_format; - self.doc_set_hash(SetHashRequest { - doc_id, - author_id, - key: key.clone(), - hash, - size, - }) - .await?; - drop(temp_tag); - progress.send(DocImportProgress::AllDone { key }).await?; - Ok(()) - } - - fn doc_export_file(self, msg: ExportFileRequest) -> impl Stream { - let (tx, rx) = async_channel::bounded(1024); - let tx2 = tx.clone(); - self.local_pool_handle().spawn_detached(|| async move { - if let Err(e) = self.doc_export_file0(msg, tx).await { - tx2.send(ExportProgress::Abort(RpcError::new(&*e))) - .await - .ok(); - } - }); - rx.map(ExportFileResponse) - } - - async fn doc_export_file0( - self, - msg: ExportFileRequest, - progress: async_channel::Sender, - ) -> anyhow::Result<()> { - let _docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; - let progress = AsyncChannelProgressSender::new(progress); - let ExportFileRequest { entry, path, mode } = msg; - let key = bytes::Bytes::from(entry.key().to_vec()); - let export_progress = progress.clone().with_map(move |mut x| { - // assign the doc key to the `meta` field of the initial progress event - if let ExportProgress::Found { meta, .. } = &mut x { - *meta = Some(key.clone()) - } - x - }); - let blobs = self.blobs(); - iroh_blobs::export::export( - blobs.store(), - entry.content_hash(), - path, - ExportFormat::Blob, - mode, - export_progress, - ) - .await?; - progress.send(ExportProgress::AllDone).await?; - Ok(()) - } - #[allow(clippy::unused_async)] async fn node_stats(self, _req: StatsRequest) -> RpcResult { #[cfg(feature = "metrics")] @@ -505,7 +284,3 @@ impl Handler { Ok(()) } } - -fn docs_disabled() -> RpcError { - RpcError::new(&*anyhow!("docs are disabled")) -} diff --git a/iroh/src/node/rpc/docs.rs b/iroh/src/node/rpc/docs.rs deleted file mode 100644 index e8411aeb17..0000000000 --- a/iroh/src/node/rpc/docs.rs +++ /dev/null @@ -1,473 +0,0 @@ -//! This module contains an impl block on [`Handler`] to handle docs related requests. - -use anyhow::anyhow; -use futures_lite::{Stream, StreamExt}; -use iroh_blobs::{store::Store as BaoStore, BlobFormat}; -use iroh_docs::{Author, DocTicket, NamespaceSecret}; - -use super::{Handler, RpcError, RpcResult}; -use crate::{ - client::docs::ShareMode, - rpc_protocol::{ - authors::{ - CreateRequest, CreateResponse, DeleteRequest, DeleteResponse, ExportRequest, - ExportResponse, GetDefaultRequest, GetDefaultResponse, ImportRequest, ImportResponse, - ListRequest as AuthorListRequest, ListResponse as AuthorListResponse, - SetDefaultRequest, SetDefaultResponse, - }, - docs::{ - CloseRequest, CloseResponse, CreateRequest as DocCreateRequest, - CreateResponse as DocCreateResponse, DelRequest, DelResponse, DocListRequest, - DocSubscribeRequest, DocSubscribeResponse, DropRequest, DropResponse, - GetDownloadPolicyRequest, GetDownloadPolicyResponse, GetExactRequest, GetExactResponse, - GetManyRequest, GetManyResponse, GetSyncPeersRequest, GetSyncPeersResponse, - ImportRequest as DocImportRequest, ImportResponse as DocImportResponse, LeaveRequest, - LeaveResponse, ListResponse as DocListResponse, OpenRequest, OpenResponse, - SetDownloadPolicyRequest, SetDownloadPolicyResponse, SetHashRequest, SetHashResponse, - SetRequest, SetResponse, ShareRequest, ShareResponse, StartSyncRequest, - StartSyncResponse, StatusRequest, StatusResponse, - }, - }, -}; - -/// Capacity for the flume channels to forward sync store iterators to async RPC streams. -const ITER_CHANNEL_CAP: usize = 64; - -impl Handler { - pub(super) async fn author_create(self, _req: CreateRequest) -> RpcResult { - self.with_docs(|docs| async move { - // TODO: pass rng - let author = Author::new(&mut rand::rngs::OsRng {}); - docs.sync - .import_author(author.clone()) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(CreateResponse { - author_id: author.id(), - }) - }) - .await - } - - pub(super) async fn author_default( - self, - _req: GetDefaultRequest, - ) -> RpcResult { - self.with_docs(|docs| async move { - let author_id = docs.default_author.get(); - Ok(GetDefaultResponse { author_id }) - }) - .await - } - - pub(super) async fn author_set_default( - self, - req: SetDefaultRequest, - ) -> RpcResult { - self.with_docs(|docs| async move { - docs.default_author - .set(req.author_id, &docs.sync) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(SetDefaultResponse) - }) - .await - } - - pub(super) fn author_list( - self, - _req: AuthorListRequest, - ) -> impl Stream> + Unpin { - self.with_docs_stream(|docs| { - let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); - let sync = docs.sync.clone(); - // we need to spawn a task to send our request to the sync handle, because the method - // itself must be sync. - tokio::task::spawn(async move { - let tx2 = tx.clone(); - if let Err(err) = sync.list_authors(tx).await { - tx2.send(Err(err)).await.ok(); - } - }); - rx.boxed().map(|r| { - r.map(|author_id| AuthorListResponse { author_id }) - .map_err(|e| RpcError::new(&*e)) - }) - }) - } - - pub(super) async fn author_import(self, req: ImportRequest) -> RpcResult { - self.with_docs(|docs| async move { - let author_id = docs - .sync - .import_author(req.author) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(ImportResponse { author_id }) - }) - .await - } - - pub(super) async fn author_export(self, req: ExportRequest) -> RpcResult { - self.with_docs(|docs| async move { - let author = docs - .sync - .export_author(req.author) - .await - .map_err(|e| RpcError::new(&*e))?; - - Ok(ExportResponse { author }) - }) - .await - } - - pub(super) async fn author_delete(self, req: DeleteRequest) -> RpcResult { - self.with_docs(|docs| async move { - if req.author == docs.default_author.get() { - return Err(RpcError::new(&*anyhow!( - "Deleting the default author is not supported" - ))); - } - docs.sync - .delete_author(req.author) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DeleteResponse) - }) - .await - } - - pub(super) async fn doc_create(self, _req: DocCreateRequest) -> RpcResult { - self.with_docs(|docs| async move { - let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {}); - let id = namespace.id(); - docs.sync - .import_namespace(namespace.into()) - .await - .map_err(|e| RpcError::new(&*e))?; - docs.sync - .open(id, Default::default()) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DocCreateResponse { id }) - }) - .await - } - - pub(super) async fn doc_drop(self, req: DropRequest) -> RpcResult { - self.with_docs(|docs| async move { - let DropRequest { doc_id } = req; - docs.leave(doc_id, true) - .await - .map_err(|e| RpcError::new(&*e))?; - docs.sync - .drop_replica(doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DropResponse {}) - }) - .await - } - - pub(super) fn doc_list( - self, - _req: DocListRequest, - ) -> impl Stream> + Unpin { - self.with_docs_stream(|docs| { - let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); - let sync = docs.sync.clone(); - // we need to spawn a task to send our request to the sync handle, because the method - // itself must be sync. - tokio::task::spawn(async move { - let tx2 = tx.clone(); - if let Err(err) = sync.list_replicas(tx).await { - tx2.send(Err(err)).await.ok(); - } - }); - rx.boxed().map(|r| { - r.map(|(id, capability)| DocListResponse { id, capability }) - .map_err(|e| RpcError::new(&*e)) - }) - }) - } - - pub(super) async fn doc_open(self, req: OpenRequest) -> RpcResult { - self.with_docs(|docs| async move { - docs.sync - .open(req.doc_id, Default::default()) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(OpenResponse {}) - }) - .await - } - - pub(super) async fn doc_close(self, req: CloseRequest) -> RpcResult { - self.with_docs(|docs| async move { - docs.sync - .close(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(CloseResponse {}) - }) - .await - } - - pub(super) async fn doc_status(self, req: StatusRequest) -> RpcResult { - self.with_docs(|docs| async move { - let status = docs - .sync - .get_state(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(StatusResponse { status }) - }) - .await - } - - pub(super) async fn doc_share(self, req: ShareRequest) -> RpcResult { - self.with_docs(|docs| async move { - let ShareRequest { - doc_id, - mode, - addr_options, - } = req; - let mut me = docs - .endpoint - .node_addr() - .await - .map_err(|e| RpcError::new(&*e))?; - me.apply_options(addr_options); - - let capability = match mode { - ShareMode::Read => iroh_docs::Capability::Read(doc_id), - ShareMode::Write => { - let secret = docs - .sync - .export_secret_key(doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - iroh_docs::Capability::Write(secret) - } - }; - docs.start_sync(doc_id, vec![]) - .await - .map_err(|e| RpcError::new(&*e))?; - - Ok(ShareResponse(DocTicket { - capability, - nodes: vec![me], - })) - }) - .await - } - - pub(super) async fn doc_subscribe( - self, - req: DocSubscribeRequest, - ) -> RpcResult>> { - self.with_docs(|docs| async move { - let stream = docs - .subscribe(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - - Ok(stream.map(|el| { - el.map(|event| DocSubscribeResponse { event }) - .map_err(|e| RpcError::new(&*e)) - })) - }) - .await - } - - pub(super) async fn doc_import(self, req: DocImportRequest) -> RpcResult { - self.with_docs(|docs| async move { - let DocImportRequest { capability } = req; - let doc_id = docs - .sync - .import_namespace(capability) - .await - .map_err(|e| RpcError::new(&*e))?; - docs.sync - .open(doc_id, Default::default()) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DocImportResponse { doc_id }) - }) - .await - } - - pub(super) async fn doc_start_sync( - self, - req: StartSyncRequest, - ) -> RpcResult { - self.with_docs(|docs| async move { - let StartSyncRequest { doc_id, peers } = req; - docs.start_sync(doc_id, peers) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(StartSyncResponse {}) - }) - .await - } - - pub(super) async fn doc_leave(self, req: LeaveRequest) -> RpcResult { - self.with_docs(|docs| async move { - let LeaveRequest { doc_id } = req; - docs.leave(doc_id, false) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(LeaveResponse {}) - }) - .await - } - - pub(super) async fn doc_set(self, req: SetRequest) -> RpcResult { - let blobs_store = self.blobs_store(); - self.with_docs(|docs| async move { - let SetRequest { - doc_id, - author_id, - key, - value, - } = req; - let len = value.len(); - let tag = blobs_store - .import_bytes(value, BlobFormat::Raw) - .await - .map_err(|e| RpcError::new(&e))?; - docs.sync - .insert_local(doc_id, author_id, key.clone(), *tag.hash(), len as u64) - .await - .map_err(|e| RpcError::new(&*e))?; - let entry = docs - .sync - .get_exact(doc_id, author_id, key, false) - .await - .map_err(|e| RpcError::new(&*e))? - .ok_or_else(|| RpcError::new(&*anyhow!("failed to get entry after insertion")))?; - Ok(SetResponse { entry }) - }) - .await - } - - pub(super) async fn doc_del(self, req: DelRequest) -> RpcResult { - self.with_docs(|docs| async move { - let DelRequest { - doc_id, - author_id, - prefix, - } = req; - let removed = docs - .sync - .delete_prefix(doc_id, author_id, prefix) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DelResponse { removed }) - }) - .await - } - - pub(super) async fn doc_set_hash(self, req: SetHashRequest) -> RpcResult { - self.with_docs(|docs| async move { - let SetHashRequest { - doc_id, - author_id, - key, - hash, - size, - } = req; - docs.sync - .insert_local(doc_id, author_id, key.clone(), hash, size) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(SetHashResponse {}) - }) - .await - } - - pub(super) fn doc_get_many( - self, - req: GetManyRequest, - ) -> impl Stream> + Unpin { - let GetManyRequest { doc_id, query } = req; - self.with_docs_stream(move |docs| { - let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); - let sync = docs.sync.clone(); - // we need to spawn a task to send our request to the sync handle, because the method - // itself must be sync. - tokio::task::spawn(async move { - let tx2 = tx.clone(); - if let Err(err) = sync.get_many(doc_id, query, tx).await { - tx2.send(Err(err)).await.ok(); - } - }); - rx.boxed().map(|r| { - r.map(|entry| GetManyResponse { entry }) - .map_err(|e| RpcError::new(&*e)) - }) - }) - } - - pub(super) async fn doc_get_exact(self, req: GetExactRequest) -> RpcResult { - self.with_docs(|docs| async move { - let GetExactRequest { - doc_id, - author, - key, - include_empty, - } = req; - let entry = docs - .sync - .get_exact(doc_id, author, key, include_empty) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(GetExactResponse { entry }) - }) - .await - } - - pub(super) async fn doc_set_download_policy( - self, - req: SetDownloadPolicyRequest, - ) -> RpcResult { - self.with_docs(|docs| async move { - docs.sync - .set_download_policy(req.doc_id, req.policy) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(SetDownloadPolicyResponse {}) - }) - .await - } - - pub(super) async fn doc_get_download_policy( - self, - req: GetDownloadPolicyRequest, - ) -> RpcResult { - self.with_docs(|docs| async move { - let policy = docs - .sync - .get_download_policy(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(GetDownloadPolicyResponse { policy }) - }) - .await - } - - pub(super) async fn doc_get_sync_peers( - self, - req: GetSyncPeersRequest, - ) -> RpcResult { - self.with_docs(|docs| async move { - let peers = docs - .sync - .get_sync_peers(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(GetSyncPeersResponse { peers }) - }) - .await - } -} diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 88b2b520a7..1f11690457 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -17,8 +17,6 @@ //! macro. use serde::{Deserialize, Serialize}; -pub mod authors; -pub mod docs; pub mod net; pub mod node; @@ -33,9 +31,8 @@ pub struct RpcService; pub enum Request { Node(node::Request), Net(net::Request), - Docs(docs::Request), - Authors(authors::Request), Gossip(iroh_gossip::RpcRequest), + Docs(iroh_docs::rpc::proto::Request), BlobsAndTags(iroh_blobs::rpc::proto::Request), } @@ -46,9 +43,8 @@ pub enum Request { pub enum Response { Node(node::Response), Net(net::Response), - Docs(docs::Response), - Authors(authors::Response), Gossip(iroh_gossip::RpcResponse), + Docs(iroh_docs::rpc::proto::Response), BlobsAndTags(iroh_blobs::rpc::proto::Response), } diff --git a/iroh/src/rpc_protocol/authors.rs b/iroh/src/rpc_protocol/authors.rs deleted file mode 100644 index 2afb1d1f91..0000000000 --- a/iroh/src/rpc_protocol/authors.rs +++ /dev/null @@ -1,123 +0,0 @@ -use iroh_docs::{Author, AuthorId}; -use nested_enum_utils::enum_conversions; -use quic_rpc_derive::rpc_requests; -use serde::{Deserialize, Serialize}; - -use super::RpcService; -use crate::node::RpcResult; - -#[allow(missing_docs)] -#[derive(strum::Display, Debug, Serialize, Deserialize)] -#[enum_conversions(super::Request)] -#[rpc_requests(RpcService)] -pub enum Request { - #[server_streaming(response = RpcResult)] - List(ListRequest), - #[rpc(response = RpcResult)] - Create(CreateRequest), - #[rpc(response = RpcResult)] - GetDefault(GetDefaultRequest), - #[rpc(response = RpcResult)] - SetDefault(SetDefaultRequest), - #[rpc(response = RpcResult)] - Import(ImportRequest), - #[rpc(response = RpcResult)] - Export(ExportRequest), - #[rpc(response = RpcResult)] - Delete(DeleteRequest), -} - -#[allow(missing_docs)] -#[derive(strum::Display, Debug, Serialize, Deserialize)] -#[enum_conversions(super::Response)] -pub enum Response { - List(RpcResult), - Create(RpcResult), - GetDefault(RpcResult), - SetDefault(RpcResult), - Import(RpcResult), - Export(RpcResult), - Delete(RpcResult), -} - -/// List document authors for which we have a secret key. -#[derive(Serialize, Deserialize, Debug)] -pub struct ListRequest {} - -/// Response for [`ListRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct ListResponse { - /// The author id - pub author_id: AuthorId, -} - -/// Create a new document author. -#[derive(Serialize, Deserialize, Debug)] -pub struct CreateRequest; - -/// Response for [`CreateRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct CreateResponse { - /// The id of the created author - pub author_id: AuthorId, -} - -/// Get the default author. -#[derive(Serialize, Deserialize, Debug)] -pub struct GetDefaultRequest; - -/// Response for [`GetDefaultRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct GetDefaultResponse { - /// The id of the author - pub author_id: AuthorId, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct SetDefaultRequest { - /// The id of the author - pub author_id: AuthorId, -} - -/// Response for [`GetDefaultRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct SetDefaultResponse; - -/// Delete an author -#[derive(Serialize, Deserialize, Debug)] -pub struct DeleteRequest { - /// The id of the author to delete - pub author: AuthorId, -} - -/// Response for [`DeleteRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct DeleteResponse; - -/// Exports an author -#[derive(Serialize, Deserialize, Debug)] -pub struct ExportRequest { - /// The id of the author to delete - pub author: AuthorId, -} - -/// Response for [`ExportRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct ExportResponse { - /// The author - pub author: Option, -} - -/// Import author from secret key -#[derive(Serialize, Deserialize, Debug)] -pub struct ImportRequest { - /// The author to import - pub author: Author, -} - -/// Response to [`ImportRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct ImportResponse { - /// The author id of the imported author - pub author_id: AuthorId, -} diff --git a/iroh/src/rpc_protocol/docs.rs b/iroh/src/rpc_protocol/docs.rs deleted file mode 100644 index 45d2c8aacf..0000000000 --- a/iroh/src/rpc_protocol/docs.rs +++ /dev/null @@ -1,425 +0,0 @@ -use std::path::PathBuf; - -use bytes::Bytes; -use iroh_base::node_addr::AddrInfoOptions; -use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash}; -use iroh_docs::{ - actor::OpenState, - engine::LiveEvent, - store::{DownloadPolicy, Query}, - AuthorId, Capability, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes, SignedEntry, -}; -use iroh_net::NodeAddr; -use nested_enum_utils::enum_conversions; -use quic_rpc::pattern::try_server_streaming::StreamCreated; -use quic_rpc_derive::rpc_requests; -use serde::{Deserialize, Serialize}; - -use super::RpcService; -use crate::{ - client::docs::{ImportProgress, ShareMode}, - node::{RpcError, RpcResult}, -}; - -#[allow(missing_docs)] -#[derive(strum::Display, Debug, Serialize, Deserialize)] -#[enum_conversions(super::Request)] -#[rpc_requests(RpcService)] -pub enum Request { - #[rpc(response = RpcResult)] - Open(OpenRequest), - #[rpc(response = RpcResult)] - Close(CloseRequest), - #[rpc(response = RpcResult)] - Status(StatusRequest), - #[server_streaming(response = RpcResult)] - List(DocListRequest), - #[rpc(response = RpcResult)] - Create(CreateRequest), - #[rpc(response = RpcResult)] - Drop(DropRequest), - #[rpc(response = RpcResult)] - Import(ImportRequest), - #[rpc(response = RpcResult)] - Set(SetRequest), - #[rpc(response = RpcResult)] - SetHash(SetHashRequest), - #[server_streaming(response = RpcResult)] - Get(GetManyRequest), - #[rpc(response = RpcResult)] - GetExact(GetExactRequest), - #[server_streaming(response = ImportFileResponse)] - ImportFile(ImportFileRequest), - #[server_streaming(response = ExportFileResponse)] - ExportFile(ExportFileRequest), - #[rpc(response = RpcResult)] - Del(DelRequest), - #[rpc(response = RpcResult)] - StartSync(StartSyncRequest), - #[rpc(response = RpcResult)] - Leave(LeaveRequest), - #[rpc(response = RpcResult)] - Share(ShareRequest), - #[try_server_streaming(create_error = RpcError, item_error = RpcError, item = DocSubscribeResponse)] - Subscribe(DocSubscribeRequest), - #[rpc(response = RpcResult)] - GetDownloadPolicy(GetDownloadPolicyRequest), - #[rpc(response = RpcResult)] - SetDownloadPolicy(SetDownloadPolicyRequest), - #[rpc(response = RpcResult)] - GetSyncPeers(GetSyncPeersRequest), -} - -#[allow(missing_docs)] -#[derive(strum::Display, Debug, Serialize, Deserialize)] -#[enum_conversions(super::Response)] -pub enum Response { - Open(RpcResult), - Close(RpcResult), - Status(RpcResult), - List(RpcResult), - Create(RpcResult), - Drop(RpcResult), - Import(RpcResult), - Set(RpcResult), - SetHash(RpcResult), - Get(RpcResult), - GetExact(RpcResult), - ImportFile(ImportFileResponse), - ExportFile(ExportFileResponse), - Del(RpcResult), - Share(RpcResult), - StartSync(RpcResult), - Leave(RpcResult), - Subscribe(RpcResult), - GetDownloadPolicy(RpcResult), - SetDownloadPolicy(RpcResult), - GetSyncPeers(RpcResult), - StreamCreated(RpcResult), -} - -/// Subscribe to events for a document. -#[derive(Serialize, Deserialize, Debug)] -pub struct DocSubscribeRequest { - /// The document id - pub doc_id: NamespaceId, -} - -/// Response to [`DocSubscribeRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct DocSubscribeResponse { - /// The event that occurred on the document - pub event: LiveEvent, -} - -/// List all documents -#[derive(Serialize, Deserialize, Debug)] -pub struct DocListRequest {} - -/// Response to [`DocListRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct ListResponse { - /// The document id - pub id: NamespaceId, - /// The capability over the document. - pub capability: CapabilityKind, -} - -/// Create a new document -#[derive(Serialize, Deserialize, Debug)] -pub struct CreateRequest {} - -/// Response to [`CreateRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct CreateResponse { - /// The document id - pub id: NamespaceId, -} - -/// Import a document from a capability. -#[derive(Serialize, Deserialize, Debug)] -pub struct ImportRequest { - /// The namespace capability. - pub capability: Capability, -} - -/// Response to [`ImportRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct ImportResponse { - /// the document id - pub doc_id: NamespaceId, -} - -/// Share a document with peers over a ticket. -#[derive(Serialize, Deserialize, Debug)] -pub struct ShareRequest { - /// The document id - pub doc_id: NamespaceId, - /// Whether to share read or write access to the document - pub mode: ShareMode, - /// Configuration of the addresses in the ticket. - pub addr_options: AddrInfoOptions, -} - -/// The response to [`ShareRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct ShareResponse(pub DocTicket); - -/// Get info on a document -#[derive(Serialize, Deserialize, Debug)] -pub struct StatusRequest { - /// The document id - pub doc_id: NamespaceId, -} - -/// Response to [`StatusRequest`] -// TODO: actually provide info -#[derive(Serialize, Deserialize, Debug)] -pub struct StatusResponse { - /// Live sync status - pub status: OpenState, -} - -/// Open a document -#[derive(Serialize, Deserialize, Debug)] -pub struct OpenRequest { - /// The document id - pub doc_id: NamespaceId, -} - -/// Response to [`OpenRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct OpenResponse {} - -/// Open a document -#[derive(Serialize, Deserialize, Debug)] -pub struct CloseRequest { - /// The document id - pub doc_id: NamespaceId, -} - -/// Response to [`CloseRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct CloseResponse {} - -/// Start to sync a doc with peers. -#[derive(Serialize, Deserialize, Debug)] -pub struct StartSyncRequest { - /// The document id - pub doc_id: NamespaceId, - /// List of peers to join - pub peers: Vec, -} - -/// Response to [`StartSyncRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct StartSyncResponse {} - -/// Stop the live sync for a doc, and optionally delete the document. -#[derive(Serialize, Deserialize, Debug)] -pub struct LeaveRequest { - /// The document id - pub doc_id: NamespaceId, -} - -/// Response to [`LeaveRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct LeaveResponse {} - -/// Stop the live sync for a doc, and optionally delete the document. -#[derive(Serialize, Deserialize, Debug)] -pub struct DropRequest { - /// The document id - pub doc_id: NamespaceId, -} - -/// Response to [`DropRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct DropResponse {} - -/// Set an entry in a document -#[derive(Serialize, Deserialize, Debug)] -pub struct SetRequest { - /// The document id - pub doc_id: NamespaceId, - /// Author of this entry. - pub author_id: AuthorId, - /// Key of this entry. - pub key: Bytes, - /// Value of this entry. - // TODO: Allow to provide the hash directly - // TODO: Add a way to provide content as stream - pub value: Bytes, -} - -/// Response to [`SetRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct SetResponse { - /// The newly-created entry. - pub entry: SignedEntry, -} - -/// A request to the node to add the data at the given filepath as an entry to the document -/// -/// Will produce a stream of [`ImportProgress`] messages. -#[derive(Debug, Serialize, Deserialize)] -pub struct ImportFileRequest { - /// The document id - pub doc_id: NamespaceId, - /// Author of this entry. - pub author_id: AuthorId, - /// Key of this entry. - pub key: Bytes, - /// The filepath to the data - /// - /// This should be an absolute path valid for the file system on which - /// the node runs. Usually the cli will run on the same machine as the - /// node, so this should be an absolute path on the cli machine. - pub path: PathBuf, - /// True if the provider can assume that the data will not change, so it - /// can be shared in place. - pub in_place: bool, -} - -/// Wrapper around [`ImportProgress`]. -#[derive(Debug, Serialize, Deserialize, derive_more::Into)] -pub struct ImportFileResponse(pub ImportProgress); - -/// A request to the node to save the data of the entry to the given filepath -/// -/// Will produce a stream of [`ExportFileResponse`] messages. -#[derive(Debug, Serialize, Deserialize)] -pub struct ExportFileRequest { - /// The entry you want to export - pub entry: Entry, - /// The filepath to where the data should be saved - /// - /// This should be an absolute path valid for the file system on which - /// the node runs. Usually the cli will run on the same machine as the - /// node, so this should be an absolute path on the cli machine. - pub path: PathBuf, - /// The mode of exporting. Setting to `ExportMode::TryReference` means attempting - /// to use references for keeping file - pub mode: ExportMode, -} - -/// Progress messages for an doc export operation -/// -/// An export operation involves reading the entry from the database ans saving the entry to the -/// given `outpath` -#[derive(Debug, Serialize, Deserialize, derive_more::Into)] -pub struct ExportFileResponse(pub ExportProgress); - -/// Delete entries in a document -#[derive(Serialize, Deserialize, Debug)] -pub struct DelRequest { - /// The document id. - pub doc_id: NamespaceId, - /// Author of this entry. - pub author_id: AuthorId, - /// Prefix to delete. - pub prefix: Bytes, -} - -/// Response to [`DelRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct DelResponse { - /// The number of entries that were removed. - pub removed: usize, -} - -/// Set an entry in a document via its hash -#[derive(Serialize, Deserialize, Debug)] -pub struct SetHashRequest { - /// The document id - pub doc_id: NamespaceId, - /// Author of this entry. - pub author_id: AuthorId, - /// Key of this entry. - pub key: Bytes, - /// Hash of this entry. - pub hash: Hash, - /// Size of this entry. - pub size: u64, -} - -/// Response to [`SetHashRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct SetHashResponse {} - -/// Get entries from a document -#[derive(Serialize, Deserialize, Debug)] -pub struct GetManyRequest { - /// The document id - pub doc_id: NamespaceId, - /// Query to run - pub query: Query, -} - -/// Response to [`GetManyRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct GetManyResponse { - /// The document entry - pub entry: SignedEntry, -} - -/// Get entries from a document -#[derive(Serialize, Deserialize, Debug)] -pub struct GetExactRequest { - /// The document id - pub doc_id: NamespaceId, - /// Key matcher - pub key: Bytes, - /// Author matcher - pub author: AuthorId, - /// Whether to include empty entries (prefix deletion markers) - pub include_empty: bool, -} - -/// Response to [`GetExactRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct GetExactResponse { - /// The document entry - pub entry: Option, -} - -/// Set a download policy -#[derive(Serialize, Deserialize, Debug)] -pub struct SetDownloadPolicyRequest { - /// The document id - pub doc_id: NamespaceId, - /// Download policy - pub policy: DownloadPolicy, -} - -/// Response to [`SetDownloadPolicyRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct SetDownloadPolicyResponse {} - -/// Get a download policy -#[derive(Serialize, Deserialize, Debug)] -pub struct GetDownloadPolicyRequest { - /// The document id - pub doc_id: NamespaceId, -} - -/// Response to [`GetDownloadPolicyRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct GetDownloadPolicyResponse { - /// The download policy - pub policy: DownloadPolicy, -} - -/// Get peers for document -#[derive(Serialize, Deserialize, Debug)] -pub struct GetSyncPeersRequest { - /// The document id - pub doc_id: NamespaceId, -} - -/// Response to [`GetSyncPeersRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub struct GetSyncPeersResponse { - /// List of peers ids - pub peers: Option>, -} diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index d194f2b779..857de7e19f 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -185,13 +185,11 @@ mod file { io::fsm::{BaoContentItem, ResponseDecoderNext}, BaoTree, }; - use futures_lite::StreamExt; use iroh_blobs::{ - store::{BaoBatchWriter, ConsistencyCheckProgress, Map, MapEntryMut, ReportLevel}, + store::{BaoBatchWriter, ConsistencyCheckProgress, MapEntryMut, ReportLevel}, util::progress::{AsyncChannelProgressSender, ProgressSender as _}, TempTag, }; - use iroh_io::AsyncSliceReaderExt; use testdir::testdir; use tokio::io::AsyncReadExt; @@ -228,45 +226,6 @@ mod file { Ok(max_level) } - #[tokio::test] - async fn redb_doc_import_stress() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let dir = testdir!(); - let bao_store = iroh_blobs::store::fs::Store::load(dir.join("store")).await?; - let (node, _) = wrap_in_node(bao_store.clone(), Duration::from_secs(10)).await; - let client = node.client(); - let doc = client.docs().create().await?; - let author = client.authors().create().await?; - let temp_path = dir.join("temp"); - tokio::fs::create_dir_all(&temp_path).await?; - let mut to_import = Vec::new(); - for i in 0..100 { - let data = create_test_data(16 * 1024 * 3 + 1); - let path = temp_path.join(format!("file{}", i)); - tokio::fs::write(&path, &data).await?; - let key = Bytes::from(format!("{}", path.display())); - to_import.push((key, path, data)); - } - for (key, path, _) in to_import.iter() { - let mut progress = doc.import_file(author, key.clone(), path, true).await?; - while let Some(msg) = progress.next().await { - tracing::info!("import progress {:?}", msg); - } - } - for (i, (key, _, expected)) in to_import.iter().enumerate() { - let Some(entry) = doc.get_exact(author, key.clone(), true).await? else { - anyhow::bail!("doc entry not found {}", i); - }; - let hash = entry.content_hash(); - let Some(content) = bao_store.get(&hash).await? else { - anyhow::bail!("content not found {} {}", i, &hash.to_hex()[..8]); - }; - let data = content.data_reader().read_to_end().await?; - assert_eq!(data, expected); - } - Ok(()) - } - /// Test gc for sequences of hashes that protect their children from deletion. #[tokio::test] async fn gc_file_basics() -> Result<()> { diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs deleted file mode 100644 index 1d92b60314..0000000000 --- a/iroh/tests/sync.rs +++ /dev/null @@ -1,1375 +0,0 @@ -use std::{ - collections::HashMap, - future::Future, - sync::Arc, - time::{Duration, Instant}, -}; - -use anyhow::{anyhow, bail, Context, Result}; -use bytes::Bytes; -use futures_lite::Stream; -use futures_util::{FutureExt, StreamExt, TryStreamExt}; -use iroh::{ - base::node_addr::AddrInfoOptions, - client::{ - docs::{Entry, LiveEvent, ShareMode}, - Doc, - }, - net::key::{PublicKey, SecretKey}, - node::{Builder, Node}, -}; -use iroh_blobs::Hash; -use iroh_docs::{ - store::{DownloadPolicy, FilterKind, Query}, - AuthorId, ContentStatus, -}; -use iroh_net::RelayMode; -use rand::{CryptoRng, Rng, SeedableRng}; -use tracing::{debug, error_span, info, Instrument}; -use tracing_subscriber::{prelude::*, EnvFilter}; - -const TIMEOUT: Duration = Duration::from_secs(60); - -fn test_node(secret_key: SecretKey) -> Builder { - Node::memory() - .secret_key(secret_key) - .enable_docs() - .relay_mode(RelayMode::Disabled) -} - -// The function is not `async fn` so that we can take a `&mut` borrow on the `rng` without -// capturing that `&mut` lifetime in the returned future. This allows to call it in a loop while -// still collecting the futures before awaiting them altogether (see [`spawn_nodes`]) -fn spawn_node( - i: usize, - rng: &mut (impl CryptoRng + Rng), -) -> impl Future>> + 'static { - let secret_key = SecretKey::generate_with_rng(rng); - async move { - let node = test_node(secret_key); - let node = node.spawn().await?; - info!(?i, me = %node.node_id().fmt_short(), "node spawned"); - Ok(node) - } -} - -async fn spawn_nodes( - n: usize, - mut rng: &mut (impl CryptoRng + Rng), -) -> anyhow::Result>> { - let mut futs = vec![]; - for i in 0..n { - futs.push(spawn_node(i, &mut rng)); - } - futures_buffered::join_all(futs).await.into_iter().collect() -} - -pub fn test_rng(seed: &[u8]) -> rand_chacha::ChaCha12Rng { - rand_chacha::ChaCha12Rng::from_seed(*Hash::new(seed).as_bytes()) -} - -macro_rules! match_event { - ($pattern:pat $(if $guard:expr)? $(,)?) => { - Box::new(move |e| matches!(e, $pattern $(if $guard)?)) - }; -} - -/// This tests the simplest scenario: A node connects to another node, and performs sync. -#[tokio::test] -async fn sync_simple() -> Result<()> { - setup_logging(); - let mut rng = test_rng(b"sync_simple"); - let nodes = spawn_nodes(2, &mut rng).await?; - let clients = nodes.iter().map(|node| node.client()).collect::>(); - - // create doc on node0 - let peer0 = nodes[0].node_id(); - let author0 = clients[0].authors().create().await?; - let doc0 = clients[0].docs().create().await?; - let hash0 = doc0 - .set_bytes(author0, b"k1".to_vec(), b"v1".to_vec()) - .await?; - assert_latest(&doc0, b"k1", b"v1").await; - let ticket = doc0 - .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) - .await?; - - let mut events0 = doc0.subscribe().await?; - - info!("node1: join"); - let peer1 = nodes[1].node_id(); - let doc1 = clients[1].docs().import(ticket.clone()).await?; - let mut events1 = doc1.subscribe().await?; - info!("node1: assert 5 events"); - assert_next_unordered( - &mut events1, - TIMEOUT, - vec![ - Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer0)), - Box::new(move |e| matches!(e, LiveEvent::InsertRemote { from, .. } if *from == peer0 )), - Box::new(move |e| match_sync_finished(e, peer0)), - Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash0)), - match_event!(LiveEvent::PendingContentReady), - ], - ) - .await; - assert_latest(&doc1, b"k1", b"v1").await; - - info!("node0: assert 2 events"); - assert_next( - &mut events0, - TIMEOUT, - vec![ - Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer1)), - Box::new(move |e| match_sync_finished(e, peer1)), - ], - ) - .await; - - for node in nodes { - node.shutdown().await?; - } - Ok(()) -} - -/// Test subscribing to replica events (without sync) -#[tokio::test] -async fn sync_subscribe_no_sync() -> Result<()> { - let mut rng = test_rng(b"sync_subscribe"); - setup_logging(); - let node = spawn_node(0, &mut rng).await?; - let client = node.client(); - let doc = client.docs().create().await?; - let mut sub = doc.subscribe().await?; - let author = client.authors().create().await?; - doc.set_bytes(author, b"k".to_vec(), b"v".to_vec()).await?; - let event = tokio::time::timeout(Duration::from_millis(100), sub.next()).await?; - assert!( - matches!(event, Some(Ok(LiveEvent::InsertLocal { .. }))), - "expected InsertLocal but got {event:?}" - ); - node.shutdown().await?; - Ok(()) -} - -#[tokio::test] -async fn sync_gossip_bulk() -> Result<()> { - let n_entries: usize = std::env::var("N_ENTRIES") - .map(|x| x.parse().expect("N_ENTRIES must be a number")) - .unwrap_or(100); - let mut rng = test_rng(b"sync_gossip_bulk"); - setup_logging(); - - let nodes = spawn_nodes(2, &mut rng).await?; - let clients = nodes.iter().map(|node| node.client()).collect::>(); - - let _peer0 = nodes[0].node_id(); - let author0 = clients[0].authors().create().await?; - let doc0 = clients[0].docs().create().await?; - let mut ticket = doc0 - .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) - .await?; - // unset peers to not yet start sync - let peers = ticket.nodes.clone(); - ticket.nodes = vec![]; - let doc1 = clients[1].docs().import(ticket).await?; - let mut events = doc1.subscribe().await?; - - // create entries for initial sync. - let now = Instant::now(); - let value = b"foo"; - for i in 0..n_entries { - let key = format!("init/{i}"); - doc0.set_bytes(author0, key.as_bytes().to_vec(), value.to_vec()) - .await?; - } - let elapsed = now.elapsed(); - info!( - "insert took {elapsed:?} for {n_entries} ({:?} per entry)", - elapsed / n_entries as u32 - ); - - let now = Instant::now(); - let mut count = 0; - doc0.start_sync(vec![]).await?; - doc1.start_sync(peers).await?; - while let Some(event) = events.next().await { - let event = event?; - if matches!(event, LiveEvent::InsertRemote { .. }) { - count += 1; - } - if count == n_entries { - break; - } - } - let elapsed = now.elapsed(); - info!( - "initial sync took {elapsed:?} for {n_entries} ({:?} per entry)", - elapsed / n_entries as u32 - ); - - // publish another 1000 entries - let mut count = 0; - let value = b"foo"; - let now = Instant::now(); - for i in 0..n_entries { - let key = format!("gossip/{i}"); - doc0.set_bytes(author0, key.as_bytes().to_vec(), value.to_vec()) - .await?; - } - let elapsed = now.elapsed(); - info!( - "insert took {elapsed:?} for {n_entries} ({:?} per entry)", - elapsed / n_entries as u32 - ); - - while let Some(event) = events.next().await { - let event = event?; - if matches!(event, LiveEvent::InsertRemote { .. }) { - count += 1; - } - if count == n_entries { - break; - } - } - let elapsed = now.elapsed(); - info!( - "gossip recv took {elapsed:?} for {n_entries} ({:?} per entry)", - elapsed / n_entries as u32 - ); - - Ok(()) -} - -/// This tests basic sync and gossip with 3 peers. -#[tokio::test] -#[ignore = "flaky"] -async fn sync_full_basic() -> Result<()> { - let mut rng = test_rng(b"sync_full_basic"); - setup_logging(); - let mut nodes = spawn_nodes(2, &mut rng).await?; - let mut clients = nodes - .iter() - .map(|node| node.client().clone()) - .collect::>(); - - // peer0: create doc and ticket - let peer0 = nodes[0].node_id(); - let author0 = clients[0].authors().create().await?; - let doc0 = clients[0].docs().create().await?; - let mut events0 = doc0.subscribe().await?; - let key0 = b"k1"; - let value0 = b"v1"; - let hash0 = doc0 - .set_bytes(author0, key0.to_vec(), value0.to_vec()) - .await?; - - info!("peer0: wait for 1 event (local insert)"); - let e = next(&mut events0).await; - assert!( - matches!(&e, LiveEvent::InsertLocal { entry } if entry.content_hash() == hash0), - "expected LiveEvent::InsertLocal but got {e:?}", - ); - assert_latest(&doc0, key0, value0).await; - let ticket = doc0 - .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) - .await?; - - info!("peer1: spawn"); - let peer1 = nodes[1].node_id(); - let author1 = clients[1].authors().create().await?; - info!("peer1: join doc"); - let doc1 = clients[1].docs().import(ticket.clone()).await?; - - info!("peer1: wait for 4 events (for sync and join with peer0)"); - let mut events1 = doc1.subscribe().await?; - assert_next_unordered( - &mut events1, - TIMEOUT, - vec![ - match_event!(LiveEvent::NeighborUp(peer) if *peer == peer0), - match_event!(LiveEvent::InsertRemote { from, .. } if *from == peer0 ), - Box::new(move |e| match_sync_finished(e, peer0)), - match_event!(LiveEvent::ContentReady { hash } if *hash == hash0), - match_event!(LiveEvent::PendingContentReady), - ], - ) - .await; - - info!("peer0: wait for 2 events (join & accept sync finished from peer1)"); - assert_next( - &mut events0, - TIMEOUT, - vec![ - match_event!(LiveEvent::NeighborUp(peer) if *peer == peer1), - Box::new(move |e| match_sync_finished(e, peer1)), - match_event!(LiveEvent::PendingContentReady), - ], - ) - .await; - - info!("peer1: insert entry"); - let key1 = b"k2"; - let value1 = b"v2"; - let hash1 = doc1 - .set_bytes(author1, key1.to_vec(), value1.to_vec()) - .await?; - assert_latest(&doc1, key1, value1).await; - info!("peer1: wait for 1 event (local insert, and pendingcontentready)"); - assert_next( - &mut events1, - TIMEOUT, - vec![match_event!(LiveEvent::InsertLocal { entry} if entry.content_hash() == hash1)], - ) - .await; - - // peer0: assert events for entry received via gossip - info!("peer0: wait for 2 events (gossip'ed entry from peer1)"); - assert_next( - &mut events0, - TIMEOUT, - vec![ - Box::new( - move |e| matches!(e, LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == peer1), - ), - Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash1)), - ], - ).await; - assert_latest(&doc0, key1, value1).await; - - // Note: If we could check gossip messages directly here (we can't easily), we would notice - // that peer1 will receive a `Op::ContentReady` gossip message, broadcast - // by peer0 with neighbor scope. This message is superfluous, and peer0 could know that, however - // our gossip implementation does not allow us to filter message receivers this way. - - info!("peer2: spawn"); - nodes.push(spawn_node(nodes.len(), &mut rng).await?); - clients.push(nodes.last().unwrap().client().clone()); - let doc2 = clients[2].docs().import(ticket).await?; - let peer2 = nodes[2].node_id(); - let mut events2 = doc2.subscribe().await?; - - info!("peer2: wait for 9 events (from sync with peers)"); - assert_next_unordered_with_optionals( - &mut events2, - TIMEOUT, - // required events - vec![ - // 2 NeighborUp events - Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer0)), - Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer1)), - // 2 SyncFinished events - Box::new(move |e| match_sync_finished(e, peer0)), - Box::new(move |e| match_sync_finished(e, peer1)), - // 2 InsertRemote events - Box::new( - move |e| matches!(e, LiveEvent::InsertRemote { entry, content_status: ContentStatus::Missing, .. } if entry.content_hash() == hash0), - ), - Box::new( - move |e| matches!(e, LiveEvent::InsertRemote { entry, content_status: ContentStatus::Missing, .. } if entry.content_hash() == hash1), - ), - // 2 ContentReady events - Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash0)), - Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash1)), - // at least 1 PendingContentReady - match_event!(LiveEvent::PendingContentReady), - ], - // optional events - // it may happen that we run sync two times against our two peers: - // if the first sync (as a result of us joining the peer manually through the ticket) completes - // before the peer shows up as a neighbor, we run sync again for the NeighborUp event. - vec![ - // 2 SyncFinished events - Box::new(move |e| match_sync_finished(e, peer0)), - Box::new(move |e| match_sync_finished(e, peer1)), - match_event!(LiveEvent::PendingContentReady), - match_event!(LiveEvent::PendingContentReady), - ] - ).await; - assert_latest(&doc2, b"k1", b"v1").await; - assert_latest(&doc2, b"k2", b"v2").await; - - info!("peer0: wait for 2 events (join & accept sync finished from peer2)"); - assert_next( - &mut events0, - TIMEOUT, - vec![ - Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer2)), - Box::new(move |e| match_sync_finished(e, peer2)), - match_event!(LiveEvent::PendingContentReady), - ], - ) - .await; - - info!("peer1: wait for 2 events (join & accept sync finished from peer2)"); - assert_next( - &mut events1, - TIMEOUT, - vec![ - Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer2)), - Box::new(move |e| match_sync_finished(e, peer2)), - match_event!(LiveEvent::PendingContentReady), - ], - ) - .await; - - info!("shutdown"); - for node in nodes { - node.shutdown().await?; - } - - Ok(()) -} - -#[tokio::test] -async fn sync_open_close() -> Result<()> { - let mut rng = test_rng(b"sync_subscribe_stop_close"); - setup_logging(); - let node = spawn_node(0, &mut rng).await?; - let client = node.client(); - - let doc = client.docs().create().await?; - let status = doc.status().await?; - assert_eq!(status.handles, 1); - - let doc2 = client.docs().open(doc.id()).await?.unwrap(); - let status = doc2.status().await?; - assert_eq!(status.handles, 2); - - doc.close().await?; - assert!(doc.status().await.is_err()); - - let status = doc2.status().await?; - assert_eq!(status.handles, 1); - - Ok(()) -} - -#[tokio::test] -async fn sync_subscribe_stop_close() -> Result<()> { - let mut rng = test_rng(b"sync_subscribe_stop_close"); - setup_logging(); - let node = spawn_node(0, &mut rng).await?; - let client = node.client(); - - let doc = client.docs().create().await?; - let author = client.authors().create().await?; - - let status = doc.status().await?; - assert_eq!(status.subscribers, 0); - assert_eq!(status.handles, 1); - assert!(!status.sync); - - doc.start_sync(vec![]).await?; - let status = doc.status().await?; - assert!(status.sync); - assert_eq!(status.handles, 2); - assert_eq!(status.subscribers, 1); - - let sub = doc.subscribe().await?; - let status = doc.status().await?; - assert_eq!(status.subscribers, 2); - drop(sub); - // trigger an event that makes the actor check if the event channels are still connected - doc.set_bytes(author, b"x".to_vec(), b"x".to_vec()).await?; - let status = doc.status().await?; - assert_eq!(status.subscribers, 1); - - doc.leave().await?; - let status = doc.status().await?; - assert_eq!(status.subscribers, 0); - assert_eq!(status.handles, 1); - assert!(!status.sync); - - Ok(()) -} - -#[tokio::test] -#[cfg(feature = "test-utils")] -async fn test_sync_via_relay() -> Result<()> { - let _guard = iroh_test::logging::setup(); - let (relay_map, _relay_url, _guard) = iroh_net::test_utils::run_relay_server().await?; - - let node1 = Node::memory() - .relay_mode(RelayMode::Custom(relay_map.clone())) - .insecure_skip_relay_cert_verify(true) - .enable_docs() - .spawn() - .await?; - let node1_id = node1.node_id(); - let node2 = Node::memory() - .bind_random_port() - .relay_mode(RelayMode::Custom(relay_map.clone())) - .insecure_skip_relay_cert_verify(true) - .enable_docs() - .spawn() - .await?; - - let doc1 = node1.docs().create().await?; - let author1 = node1.authors().create().await?; - let inserted_hash = doc1 - .set_bytes(author1, b"foo".to_vec(), b"bar".to_vec()) - .await?; - let mut ticket = doc1 - .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) - .await?; - - // remove direct addrs to force connect via relay - ticket.nodes[0].info.direct_addresses = Default::default(); - - // join - let doc2 = node2.docs().import(ticket).await?; - let mut events = doc2.subscribe().await?; - - assert_next_unordered_with_optionals( - &mut events, - Duration::from_secs(2), - vec![ - Box::new(move |e| matches!(e, LiveEvent::NeighborUp(n) if *n== node1_id)), - Box::new(move |e| match_sync_finished(e, node1_id)), - Box::new( - move |e| matches!(e, LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == node1_id), - ), - Box::new( - move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == inserted_hash), - ), - match_event!(LiveEvent::PendingContentReady), - ], - vec![Box::new(move |e| match_sync_finished(e, node1_id))], - ).await; - let actual = doc2 - .get_exact(author1, b"foo", false) - .await? - .expect("entry to exist") - .content_bytes(&doc2) - .await?; - assert_eq!(actual.as_ref(), b"bar"); - - // update - let updated_hash = doc1 - .set_bytes(author1, b"foo".to_vec(), b"update".to_vec()) - .await?; - assert_next_unordered_with_optionals( - &mut events, - Duration::from_secs(2), - vec![ - Box::new( - move |e| matches!(e, LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == node1_id), - ), - Box::new( - move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == updated_hash), - ), - ], - vec![ - Box::new(move |e| match_sync_finished(e, node1_id)), - Box::new(move |e| matches!(e, LiveEvent::PendingContentReady)), - ], - ).await; - let actual = doc2 - .get_exact(author1, b"foo", false) - .await? - .expect("entry to exist") - .content_bytes(&doc2) - .await?; - assert_eq!(actual.as_ref(), b"update"); - Ok(()) -} - -#[tokio::test] -#[cfg(feature = "test-utils")] -#[ignore = "flaky"] -async fn sync_restart_node() -> Result<()> { - let mut rng = test_rng(b"sync_restart_node"); - setup_logging(); - let (relay_map, _relay_url, _guard) = iroh_net::test_utils::run_relay_server().await?; - - let discovery_server = iroh_net::test_utils::DnsPkarrServer::run().await?; - - let node1_dir = tempfile::TempDir::with_prefix("test-sync_restart_node-node1")?; - let secret_key_1 = SecretKey::generate_with_rng(&mut rng); - - let node1 = Node::persistent(&node1_dir) - .await? - .secret_key(secret_key_1.clone()) - .insecure_skip_relay_cert_verify(true) - .relay_mode(RelayMode::Custom(relay_map.clone())) - .dns_resolver(discovery_server.dns_resolver()) - .node_discovery(discovery_server.discovery(secret_key_1.clone()).into()) - .enable_docs() - .spawn() - .await?; - let id1 = node1.node_id(); - - // create doc & ticket on node1 - let doc1 = node1.docs().create().await?; - let mut events1 = doc1.subscribe().await?; - let ticket = doc1 - .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) - .await?; - - // create node2 - let secret_key_2 = SecretKey::generate_with_rng(&mut rng); - let node2 = Node::memory() - .secret_key(secret_key_2.clone()) - .relay_mode(RelayMode::Custom(relay_map.clone())) - .insecure_skip_relay_cert_verify(true) - .dns_resolver(discovery_server.dns_resolver()) - .node_discovery(discovery_server.discovery(secret_key_2.clone()).into()) - .enable_docs() - .spawn() - .await?; - let id2 = node2.node_id(); - let author2 = node2.authors().create().await?; - let doc2 = node2.docs().import(ticket.clone()).await?; - - info!("node2 set a"); - let hash_a = doc2.set_bytes(author2, "n2/a", "a").await?; - assert_latest(&doc2, b"n2/a", b"a").await; - - assert_next_unordered_with_optionals( - &mut events1, - Duration::from_secs(10), - vec![ - match_event!(LiveEvent::NeighborUp(n) if *n == id2), - match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()), - match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2), - match_event!(LiveEvent::ContentReady { hash } if *hash == hash_a), - match_event!(LiveEvent::PendingContentReady), - ], - vec![ - match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()), - match_event!(LiveEvent::PendingContentReady), - ], - ) - .await; - assert_latest(&doc1, b"n2/a", b"a").await; - - info!(me = id1.fmt_short(), "node1 start shutdown"); - node1.shutdown().await?; - info!(me = id1.fmt_short(), "node1 down"); - - info!(me = id1.fmt_short(), "sleep 1s"); - tokio::time::sleep(Duration::from_secs(1)).await; - - info!(me = id2.fmt_short(), "node2 set b"); - let hash_b = doc2.set_bytes(author2, "n2/b", "b").await?; - - info!(me = id1.fmt_short(), "node1 respawn"); - let node1 = Node::persistent(&node1_dir) - .await? - .secret_key(secret_key_1.clone()) - .insecure_skip_relay_cert_verify(true) - .relay_mode(RelayMode::Custom(relay_map.clone())) - .dns_resolver(discovery_server.dns_resolver()) - .node_discovery(discovery_server.discovery(secret_key_1.clone()).into()) - .enable_docs() - .spawn() - .await?; - assert_eq!(id1, node1.node_id()); - - let doc1 = node1.docs().open(doc1.id()).await?.expect("doc to exist"); - let mut events1 = doc1.subscribe().await?; - assert_latest(&doc1, b"n2/a", b"a").await; - - // check that initial resync is working - doc1.start_sync(vec![]).await?; - assert_next_unordered_with_optionals( - &mut events1, - Duration::from_secs(10), - vec![ - match_event!(LiveEvent::NeighborUp(n) if *n== id2), - match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()), - match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2), - match_event!(LiveEvent::ContentReady { hash } if *hash == hash_b), - ], - vec![ - match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()), - match_event!(LiveEvent::PendingContentReady), - ] - ).await; - assert_latest(&doc1, b"n2/b", b"b").await; - - // check that live conn is working - info!(me = id2.fmt_short(), "node2 set c"); - let hash_c = doc2.set_bytes(author2, "n2/c", "c").await?; - assert_next_unordered_with_optionals( - &mut events1, - Duration::from_secs(10), - vec![ - match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2), - match_event!(LiveEvent::ContentReady { hash } if *hash == hash_c), - ], - vec![ - match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()), - match_event!(LiveEvent::PendingContentReady), - match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()), - match_event!(LiveEvent::PendingContentReady), - ] - ).await; - - assert_latest(&doc1, b"n2/c", b"c").await; - - Ok(()) -} - -/// Joins two nodes that write to the same document but have differing download policies and tests -/// that they both synced the key info but not the content. -#[tokio::test] -async fn test_download_policies() -> Result<()> { - // keys node a has - let star_wars_movies = &[ - "star_wars/prequel/the_phantom_menace", - "star_wars/prequel/attack_of_the_clones", - "star_wars/prequel/revenge_of_the_sith", - "star_wars/og/a_new_hope", - "star_wars/og/the_empire_strikes_back", - "star_wars/og/return_of_the_jedi", - ]; - // keys node b has - let lotr_movies = &[ - "lotr/fellowship_of_the_ring", - "lotr/the_two_towers", - "lotr/return_of_the_king", - ]; - - // content policy for what b wants - let policy_b = - DownloadPolicy::EverythingExcept(vec![FilterKind::Prefix("star_wars/og".into())]); - // content policy for what a wants - let policy_a = DownloadPolicy::NothingExcept(vec![FilterKind::Exact( - "lotr/fellowship_of_the_ring".into(), - )]); - - // a will sync all lotr keys but download a single key - const EXPECTED_A_SYNCED: usize = 3; - const EXPECTED_A_DOWNLOADED: usize = 1; - - // b will sync all star wars content but download only the prequel keys - const EXPECTED_B_SYNCED: usize = 6; - const EXPECTED_B_DOWNLOADED: usize = 3; - - let mut rng = test_rng(b"sync_download_policies"); - setup_logging(); - let nodes = spawn_nodes(2, &mut rng).await?; - let clients = nodes.iter().map(|node| node.client()).collect::>(); - - let doc_a = clients[0].docs().create().await?; - let author_a = clients[0].authors().create().await?; - let ticket = doc_a - .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) - .await?; - - let doc_b = clients[1].docs().import(ticket).await?; - let author_b = clients[1].authors().create().await?; - - doc_a.set_download_policy(policy_a).await?; - doc_b.set_download_policy(policy_b).await?; - - let mut events_a = doc_a.subscribe().await?; - let mut events_b = doc_b.subscribe().await?; - - let mut key_hashes: HashMap = HashMap::default(); - - // set content in a - for k in star_wars_movies.iter() { - let hash = doc_a - .set_bytes(author_a, k.to_owned(), k.to_owned()) - .await?; - key_hashes.insert(hash, k); - } - - // set content in b - for k in lotr_movies.iter() { - let hash = doc_b - .set_bytes(author_b, k.to_owned(), k.to_owned()) - .await?; - key_hashes.insert(hash, k); - } - - assert_eq!(key_hashes.len(), star_wars_movies.len() + lotr_movies.len()); - - let fut = async { - use LiveEvent::*; - let mut downloaded_a: Vec<&'static str> = Vec::new(); - let mut downloaded_b: Vec<&'static str> = Vec::new(); - let mut synced_a = 0usize; - let mut synced_b = 0usize; - loop { - tokio::select! { - Some(Ok(ev)) = events_a.next() => { - match ev { - InsertRemote { content_status, entry, .. } => { - synced_a += 1; - if let ContentStatus::Complete = content_status { - downloaded_a.push(key_hashes.get(&entry.content_hash()).unwrap()) - } - }, - ContentReady { hash } => { - downloaded_a.push(key_hashes.get(&hash).unwrap()); - }, - _ => {} - } - } - Some(Ok(ev)) = events_b.next() => { - match ev { - InsertRemote { content_status, entry, .. } => { - synced_b += 1; - if let ContentStatus::Complete = content_status { - downloaded_b.push(key_hashes.get(&entry.content_hash()).unwrap()) - } - }, - ContentReady { hash } => { - downloaded_b.push(key_hashes.get(&hash).unwrap()); - }, - _ => {} - } - } - } - - if synced_a == EXPECTED_A_SYNCED - && downloaded_a.len() == EXPECTED_A_DOWNLOADED - && synced_b == EXPECTED_B_SYNCED - && downloaded_b.len() == EXPECTED_B_DOWNLOADED - { - break; - } - } - (downloaded_a, downloaded_b) - }; - - let (downloaded_a, mut downloaded_b) = tokio::time::timeout(TIMEOUT, fut) - .await - .context("timeout elapsed")?; - - downloaded_b.sort(); - assert_eq!(downloaded_a, vec!["lotr/fellowship_of_the_ring"]); - assert_eq!( - downloaded_b, - vec![ - "star_wars/prequel/attack_of_the_clones", - "star_wars/prequel/revenge_of_the_sith", - "star_wars/prequel/the_phantom_menace", - ] - ); - - Ok(()) -} - -/// Test sync between many nodes with propagation through sync reports. -#[tokio::test(flavor = "multi_thread")] -#[ignore = "flaky"] -async fn sync_big() -> Result<()> { - setup_logging(); - let mut rng = test_rng(b"sync_big"); - let n_nodes = std::env::var("NODES") - .map(|v| v.parse().expect("NODES must be a number")) - .unwrap_or(10); - let n_entries_init = 1; - - tokio::task::spawn(async move { - for i in 0.. { - tokio::time::sleep(Duration::from_secs(1)).await; - info!("tick {i}"); - } - }); - - let nodes = spawn_nodes(n_nodes, &mut rng).await?; - let node_ids = nodes.iter().map(|node| node.node_id()).collect::>(); - let clients = nodes.iter().map(|node| node.client()).collect::>(); - let authors = collect_futures(clients.iter().map(|c| c.authors().create())).await?; - - let doc0 = clients[0].docs().create().await?; - let mut ticket = doc0 - .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) - .await?; - // do not join for now, just import without any peer info - let peer0 = ticket.nodes[0].clone(); - ticket.nodes = vec![]; - - let mut docs = vec![]; - docs.push(doc0); - docs.extend_from_slice( - &collect_futures( - clients - .iter() - .skip(1) - .map(|c| c.docs().import(ticket.clone())), - ) - .await?, - ); - - let mut expected = vec![]; - - // create initial data on each node - publish(&docs, &mut expected, n_entries_init, |i, j| { - ( - authors[i], - format!("init/{}/{j}", node_ids[i].fmt_short()), - format!("init:{i}:{j}"), - ) - }) - .await?; - - // assert initial data - for (i, doc) in docs.iter().enumerate() { - let entries = get_all_with_content(doc).await?; - let mut expected = expected - .iter() - .filter(|e| e.author == authors[i]) - .cloned() - .collect::>(); - expected.sort(); - assert_eq!(entries, expected, "phase1 pre-sync correct"); - } - - // setup event streams - let events = collect_futures(docs.iter().map(|d| d.subscribe())).await?; - - // join nodes together - for (i, doc) in docs.iter().enumerate().skip(1) { - info!(me = %node_ids[i].fmt_short(), peer = %peer0.node_id.fmt_short(), "join"); - doc.start_sync(vec![peer0.clone()]).await?; - } - - // wait for InsertRemote events stuff to happen - info!("wait for all peers to receive insert events"); - let expected_inserts = (n_nodes - 1) * n_entries_init; - let mut tasks = tokio::task::JoinSet::default(); - for (i, events) in events.into_iter().enumerate() { - let doc = docs[i].clone(); - let me = doc.id().fmt_short(); - let expected = expected.clone(); - let fut = async move { - wait_for_events(events, expected_inserts, TIMEOUT, |e| { - matches!(e, LiveEvent::InsertRemote { .. }) - }) - .await?; - let entries = get_all(&doc).await?; - if entries != expected { - Err(anyhow!( - "node {i} failed (has {} entries but expected to have {})", - entries.len(), - expected.len() - )) - } else { - info!( - "received and checked all {} expected entries", - expected.len() - ); - Ok(()) - } - } - .instrument(error_span!("sync-test", %me)); - let fut = fut.map(move |r| r.with_context(move || format!("node {i} ({me})"))); - tasks.spawn(fut); - } - - while let Some(res) = tasks.join_next().await { - res??; - } - - assert_all_docs(&docs, &node_ids, &expected, "after initial sync").await; - - info!("shutdown"); - for node in nodes { - node.shutdown().await?; - } - - Ok(()) -} - -#[tokio::test] -#[cfg(feature = "test-utils")] -async fn test_list_docs_stream() -> Result<()> { - let node = Node::memory() - .node_discovery(iroh::node::DiscoveryConfig::None) - .relay_mode(iroh::net::RelayMode::Disabled) - .enable_docs() - .spawn() - .await?; - let count = 200; - - // create docs - for _i in 0..count { - let doc = node.docs().create().await?; - doc.close().await?; - } - - // create doc stream - let mut stream = node.docs().list().await?; - - // process each doc and call into the docs actor. - // this makes sure that we don't deadlock the docs actor. - let mut i = 0; - let fut = async { - while let Some((id, _)) = stream.try_next().await.unwrap() { - let _doc = node.docs().open(id).await.unwrap().unwrap(); - i += 1; - } - }; - - tokio::time::timeout(Duration::from_secs(2), fut) - .await - .expect("not to timeout"); - - assert_eq!(i, count); - - Ok(()) -} - -/// Get all entries of a document. -async fn get_all(doc: &Doc) -> anyhow::Result> { - let entries = doc.get_many(Query::all()).await?; - let entries = entries.collect::>().await; - entries.into_iter().collect() -} - -/// Get all entries of a document with the blob content. -async fn get_all_with_content(doc: &Doc) -> anyhow::Result> { - let entries = doc.get_many(Query::all()).await?; - let entries = entries.and_then(|entry| async { - let content = entry.content_bytes(doc).await; - content.map(|c| (entry, c)) - }); - let entries = entries.collect::>().await; - let entries = entries.into_iter().collect::>>()?; - Ok(entries) -} - -async fn publish( - docs: &[Doc], - expected: &mut Vec, - n: usize, - cb: impl Fn(usize, usize) -> (AuthorId, String, String), -) -> anyhow::Result<()> { - for (i, doc) in docs.iter().enumerate() { - for j in 0..n { - let (author, key, value) = cb(i, j); - doc.set_bytes(author, key.as_bytes().to_vec(), value.as_bytes().to_vec()) - .await?; - expected.push(ExpectedEntry { author, key, value }); - } - } - expected.sort(); - Ok(()) -} - -/// Collect an iterator into futures by joining them all and failing if any future failed. -async fn collect_futures( - futs: impl IntoIterator>>, -) -> anyhow::Result> { - futures_buffered::join_all(futs) - .await - .into_iter() - .collect::>>() -} - -/// Collect `count` events from the `events` stream, only collecting events for which `matcher` -/// returns true. -async fn wait_for_events( - mut events: impl Stream> + Send + Unpin + 'static, - count: usize, - timeout: Duration, - matcher: impl Fn(&LiveEvent) -> bool, -) -> anyhow::Result> { - let mut res = Vec::with_capacity(count); - let sleep = tokio::time::sleep(timeout); - tokio::pin!(sleep); - while res.len() < count { - tokio::select! { - () = &mut sleep => { - bail!("Failed to collect {count} elements in {timeout:?} (collected only {})", res.len()); - }, - event = events.try_next() => { - let event = event?; - match event { - None => bail!("stream ended after {} items, but expected {count}", res.len()), - Some(event) => if matcher(&event) { - res.push(event); - debug!("recv event {} of {count}", res.len()); - } - } - } - } - } - Ok(res) -} - -async fn assert_all_docs( - docs: &[Doc], - node_ids: &[PublicKey], - expected: &Vec, - label: &str, -) { - info!("validate all peers: {label}"); - for (i, doc) in docs.iter().enumerate() { - let entries = get_all(doc).await.unwrap_or_else(|err| { - panic!("failed to get entries for peer {:?}: {err:?}", node_ids[i]) - }); - assert_eq!( - &entries, - expected, - "{label}: peer {i} {:?} failed (have {} but expected {})", - node_ids[i], - entries.len(), - expected.len() - ); - } -} - -#[derive(Debug, Ord, Eq, PartialEq, PartialOrd, Clone)] -struct ExpectedEntry { - author: AuthorId, - key: String, - value: String, -} - -impl PartialEq for ExpectedEntry { - fn eq(&self, other: &Entry) -> bool { - self.key.as_bytes() == other.key() - && Hash::new(&self.value) == other.content_hash() - && self.author == other.author() - } -} -impl PartialEq<(Entry, Bytes)> for ExpectedEntry { - fn eq(&self, (entry, content): &(Entry, Bytes)) -> bool { - self.key.as_bytes() == entry.key() - && Hash::new(&self.value) == entry.content_hash() - && self.author == entry.author() - && self.value.as_bytes() == content.as_ref() - } -} -impl PartialEq for Entry { - fn eq(&self, other: &ExpectedEntry) -> bool { - other.eq(self) - } -} -impl PartialEq for (Entry, Bytes) { - fn eq(&self, other: &ExpectedEntry) -> bool { - other.eq(self) - } -} - -#[tokio::test] -async fn doc_delete() -> Result<()> { - let node = Node::memory() - .gc_policy(iroh::node::GcPolicy::Interval(Duration::from_millis(100))) - .enable_docs() - .spawn() - .await?; - let client = node.client(); - let doc = client.docs().create().await?; - let author = client.authors().create().await?; - let hash = doc - .set_bytes(author, b"foo".to_vec(), b"hi".to_vec()) - .await?; - assert_latest(&doc, b"foo", b"hi").await; - let deleted = doc.del(author, b"foo".to_vec()).await?; - assert_eq!(deleted, 1); - - let entry = doc.get_exact(author, b"foo".to_vec(), false).await?; - assert!(entry.is_none()); - - // wait for gc - // TODO: allow to manually trigger gc - tokio::time::sleep(Duration::from_millis(200)).await; - let bytes = client.blobs().read_to_bytes(hash).await; - assert!(bytes.is_err()); - node.shutdown().await?; - Ok(()) -} - -#[tokio::test] -async fn sync_drop_doc() -> Result<()> { - let mut rng = test_rng(b"sync_drop_doc"); - setup_logging(); - let node = spawn_node(0, &mut rng).await?; - let client = node.client(); - - let doc = client.docs().create().await?; - let author = client.authors().create().await?; - - let mut sub = doc.subscribe().await?; - doc.set_bytes(author, b"foo".to_vec(), b"bar".to_vec()) - .await?; - let ev = sub.next().await; - assert!(matches!(ev, Some(Ok(LiveEvent::InsertLocal { .. })))); - - client.docs().drop_doc(doc.id()).await?; - let res = doc.get_exact(author, b"foo".to_vec(), true).await; - assert!(res.is_err()); - let res = doc - .set_bytes(author, b"foo".to_vec(), b"bar".to_vec()) - .await; - assert!(res.is_err()); - let res = client.docs().open(doc.id()).await; - assert!(res.is_err()); - let ev = sub.next().await; - assert!(ev.is_none()); - - Ok(()) -} - -async fn assert_latest(doc: &Doc, key: &[u8], value: &[u8]) { - let content = get_latest(doc, key).await.unwrap(); - assert_eq!(content, value.to_vec()); -} - -async fn get_latest(doc: &Doc, key: &[u8]) -> anyhow::Result> { - let query = Query::single_latest_per_key().key_exact(key); - let entry = doc - .get_many(query) - .await? - .next() - .await - .ok_or_else(|| anyhow!("entry not found"))??; - let content = entry.content_bytes(doc).await?; - Ok(content.to_vec()) -} - -fn setup_logging() { - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) - .with(EnvFilter::from_default_env()) - .try_init() - .ok(); -} - -async fn next(mut stream: impl Stream> + Unpin) -> T { - let event = stream - .next() - .await - .expect("stream ended") - .expect("stream produced error"); - debug!("Event: {event:?}"); - event -} - -#[allow(clippy::type_complexity)] -fn apply_matchers(item: &T, matchers: &mut Vec bool + Send>>) -> bool { - for i in 0..matchers.len() { - if matchers[i](item) { - let _ = matchers.remove(i); - return true; - } - } - false -} - -/// Receive the next `matchers.len()` elements from a stream and matches them against the functions -/// in `matchers`, in order. -/// -/// Returns all received events. -#[allow(clippy::type_complexity)] -async fn assert_next( - mut stream: impl Stream> + Unpin + Send, - timeout: Duration, - matchers: Vec bool + Send>>, -) -> Vec { - let fut = async { - let mut items = vec![]; - for (i, f) in matchers.iter().enumerate() { - let item = stream - .next() - .await - .expect("event stream ended prematurely") - .expect("event stream errored"); - if !(f)(&item) { - panic!("assertion failed for event {i} {item:?}"); - } - items.push(item); - } - items - }; - let res = tokio::time::timeout(timeout, fut).await; - res.expect("timeout reached") -} - -/// Receive `matchers.len()` elements from a stream and assert that each element matches one of the -/// functions in `matchers`. -/// -/// Order of the matchers is not relevant. -/// -/// Returns all received events. -#[allow(clippy::type_complexity)] -async fn assert_next_unordered( - stream: impl Stream> + Unpin + Send, - timeout: Duration, - matchers: Vec bool + Send>>, -) -> Vec { - assert_next_unordered_with_optionals(stream, timeout, matchers, vec![]).await -} - -/// Receive between `min` and `max` elements from the stream and assert that each element matches -/// either one of the matchers in `required_matchers` or in `optional_matchers`. -/// -/// Order of the matchers is not relevant. -/// -/// Will return an error if: -/// * Any element fails to match one of the required or optional matchers -/// * More than `max` elements were received, but not all required matchers were used yet -/// * The timeout completes before all required matchers were used -/// -/// Returns all received events. -#[allow(clippy::type_complexity)] -async fn assert_next_unordered_with_optionals( - mut stream: impl Stream> + Unpin + Send, - timeout: Duration, - mut required_matchers: Vec bool + Send>>, - mut optional_matchers: Vec bool + Send>>, -) -> Vec { - let max = required_matchers.len() + optional_matchers.len(); - let required = required_matchers.len(); - // we have to use a mutex because rustc is not intelligent enough to realize - // that the mutable borrow terminates when the future completes - let events = Arc::new(parking_lot::Mutex::new(vec![])); - let fut = async { - while let Some(event) = stream.next().await { - let event = event.context("failed to read from stream")?; - let len = { - let mut events = events.lock(); - events.push(event.clone()); - events.len() - }; - if !apply_matchers(&event, &mut required_matchers) - && !apply_matchers(&event, &mut optional_matchers) - { - bail!("Event didn't match any matcher: {event:?}"); - } - if required_matchers.is_empty() || len == max { - break; - } - } - if !required_matchers.is_empty() { - bail!( - "Matched only {} of {required} required matchers", - required - required_matchers.len() - ); - } - Ok(()) - }; - tokio::pin!(fut); - let res = tokio::time::timeout(timeout, fut) - .await - .map_err(|_| anyhow!("Timeout reached ({timeout:?})")) - .and_then(|res| res); - let events = events.lock().clone(); - if let Err(err) = &res { - println!("Received events: {events:#?}"); - println!( - "Received {} events, expected between {required} and {max}", - events.len() - ); - panic!("Failed to receive or match all events: {err:?}"); - } - events -} - -/// Asserts that the event is a [`LiveEvent::SyncFinished`] and that the contained [`SyncEvent`] -/// has no error and matches `peer` and `namespace`. -fn match_sync_finished(event: &LiveEvent, peer: PublicKey) -> bool { - let LiveEvent::SyncFinished(e) = event else { - return false; - }; - e.peer == peer && e.result.is_ok() -}