Skip to content

Commit

Permalink
minimal test working
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Oct 31, 2024
1 parent a761c37 commit 70c565f
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 144 deletions.
9 changes: 7 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,4 @@ iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "main" }
iroh-gossip = { git = "https://github.com/n0-computer/iroh-gossip", branch = "main" }
356 changes: 214 additions & 142 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,24 +501,6 @@ impl Entry {
pub fn timestamp(&self) -> u64 {
self.0.timestamp()
}

// TODO: depends on an rpc client in iroh-blobs
// /// Reads the content of an [`Entry`] as a streaming [`blobs::Reader`].
// ///
// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`.
// pub async fn content_reader(&self, client: impl Into<&Client>) -> Result<blobs::Reader> {
// blobs::Reader::from_rpc_read(client.into(), self.content_hash()).await
// }

// /// Reads all content of an [`Entry`] into a buffer.
// ///
// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`.
// pub async fn content_bytes(&self, client: impl Into<&Client>) -> Result<Bytes> {
// blobs::Reader::from_rpc_read(client.into(), self.content_hash())
// .await?
// .read_to_bytes()
// .await
// }
}

/// Progress messages for an doc import operation
Expand Down Expand Up @@ -792,127 +774,217 @@ where
})
}

// TODO
// #[cfg(test)]
// mod tests {
// use rand::RngCore;
// use tokio::io::AsyncWriteExt;

// use super::*;

// #[tokio::test]
// async fn test_drop_doc_client_sync() -> Result<()> {
// let _guard = iroh_test::logging::setup();

// let node = crate::node::Node::memory().enable_docs().spawn().await?;

// let client = node.client();
// let doc = client.docs().create().await?;

// let res = std::thread::spawn(move || {
// drop(doc);
// drop(node);
// });

// tokio::task::spawn_blocking(move || res.join().map_err(|e| anyhow::anyhow!("{:?}", e)))
// .await??;

// Ok(())
// }

// /// Test that closing a doc does not close other instances.
// #[tokio::test]
// async fn test_doc_close() -> Result<()> {
// let _guard = iroh_test::logging::setup();

// let node = crate::node::Node::memory().enable_docs().spawn().await?;
// let author = node.authors().default().await?;
// // open doc two times
// let doc1 = node.docs().create().await?;
// let doc2 = node.docs().open(doc1.id()).await?.expect("doc to exist");
// // close doc1 instance
// doc1.close().await?;
// // operations on doc1 now fail.
// assert!(doc1.set_bytes(author, "foo", "bar").await.is_err());
// // dropping doc1 will close the doc if not already closed
// // wait a bit because the close-on-drop spawns a task for which we cannot track completion.
// drop(doc1);
// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// // operations on doc2 still succeed
// doc2.set_bytes(author, "foo", "bar").await?;
// Ok(())
// }

// #[tokio::test]
// async fn test_doc_import_export() -> Result<()> {
// let _guard = iroh_test::logging::setup();

// let node = crate::node::Node::memory().enable_docs().spawn().await?;

// // create temp file
// let temp_dir = tempfile::tempdir().context("tempdir")?;

// let in_root = temp_dir.path().join("in");
// tokio::fs::create_dir_all(in_root.clone())
// .await
// .context("create dir all")?;
// let out_root = temp_dir.path().join("out");

// let path = in_root.join("test");

// let size = 100;
// let mut buf = vec![0u8; size];
// rand::thread_rng().fill_bytes(&mut buf);
// let mut file = tokio::fs::File::create(path.clone())
// .await
// .context("create file")?;
// file.write_all(&buf.clone()).await.context("write_all")?;
// file.flush().await.context("flush")?;

// // create doc & author
// let client = node.client();
// let doc = client.docs().create().await.context("doc create")?;
// let author = client.authors().create().await.context("author create")?;

// // import file
// let import_outcome = doc
// .import_file(
// author,
// crate::util::fs::path_to_key(path.clone(), None, Some(in_root))?,
// path,
// true,
// )
// .await
// .context("import file")?
// .finish()
// .await
// .context("import finish")?;

// // export file
// let entry = doc
// .get_one(Query::author(author).key_exact(import_outcome.key))
// .await
// .context("get one")?
// .unwrap();
// let key = entry.key().to_vec();
// let export_outcome = doc
// .export_file(
// entry,
// crate::util::fs::key_to_path(key, None, Some(out_root))?,
// ExportMode::Copy,
// )
// .await
// .context("export file")?
// .finish()
// .await
// .context("export finish")?;

// let got_bytes = tokio::fs::read(export_outcome.path)
// .await
// .context("tokio read")?;
// assert_eq!(buf, got_bytes);

// Ok(())
// }
// }
#[cfg(test)]
mod tests {
use iroh_blobs::util::local_pool::LocalPool;
use iroh_gossip::net::GOSSIP_ALPN;
use rand::RngCore;
use tokio::io::AsyncWriteExt;
use tracing::warn;

use crate::{
engine::{DefaultAuthorStorage, Engine},
net::DOCS_ALPN,
};

use super::*;

async fn setup_router() -> Result<(Client, iroh_router::Router, tokio::task::JoinHandle<anyhow::Result<()>>)>
{
let endpoint = iroh_net::Endpoint::builder().bind().await?;
let local_pool = LocalPool::single();
let mut router = iroh_router::Router::builder(endpoint.clone());

// Setup gossip
let my_addr = endpoint.node_addr().await?;
let gossip = iroh_gossip::net::Gossip::from_endpoint(
endpoint.clone(),
Default::default(),
&my_addr.info,
);
router = router.accept(GOSSIP_ALPN.to_vec(), Arc::new(gossip.clone()));

// Setup blobs

let bao_store = iroh_blobs::store::mem::Store::new();
let downloader = iroh_blobs::downloader::Downloader::new(
bao_store.clone(),
endpoint.clone(),
local_pool.handle().clone(),
);
let blobs = iroh_blobs::net_protocol::Blobs::new_with_events(
bao_store.clone(),
local_pool.handle().clone(),
Default::default(),
downloader.clone(),
);
router = router.accept(iroh_blobs::protocol::ALPN.to_vec(), Arc::new(blobs));

// Setup docs

let replica_store = crate::store::Store::memory();
let engine = Engine::spawn(
endpoint.clone(),
gossip,
replica_store,
bao_store,
downloader,
DefaultAuthorStorage::Mem,
local_pool.handle().clone(),
)
.await?;

router = router.accept(DOCS_ALPN.to_vec(), Arc::new(engine.clone()));

// Build the router

let router = router.spawn().await?;

// Setup RPC
let (internal_rpc, controller) =
quic_rpc::transport::flume::service_connection::<RpcService>(32);
let controller = quic_rpc::transport::boxed::Connection::new(controller);
let internal_rpc = quic_rpc::transport::boxed::ServerEndpoint::new(internal_rpc);
let internal_rpc = quic_rpc::RpcServer::new(internal_rpc);

let rpc_server_task = tokio::task::spawn(async move {
loop {
let request = internal_rpc.accept().await;
match request {
Ok(accepting) => {
let engine = engine.clone();
tokio::task::spawn(async move {
let (msg, chan) = accepting.read_first().await.unwrap();
engine.handle_rpc_request(msg, chan).await.unwrap();
});
}
Err(err) => {
warn!("rpc error: {:?}", err);
}
}
}
});

let docs_client = Client::new(quic_rpc::RpcClient::new(controller.clone()));

Ok((docs_client, router, rpc_server_task))
}

#[tokio::test]
async fn test_drop_doc_client_sync() -> Result<()> {
let _guard = iroh_test::logging::setup();

let (docs_client, router, rpc_server_task) = setup_router().await?;

let doc = docs_client.create().await?;

let res = std::thread::spawn(move || {
drop(doc);
});

tokio::task::spawn_blocking(move || res.join().map_err(|e| anyhow::anyhow!("{:?}", e)))
.await??;

rpc_server_task.abort();
router.shutdown().await?;

Ok(())
}

// /// Test that closing a doc does not close other instances.
// #[tokio::test]
// async fn test_doc_close() -> Result<()> {
// let _guard = iroh_test::logging::setup();

// let node = iroh::node::Node::memory().enable_docs().spawn().await?;
// let author = node.authors().default().await?;
// // open doc two times
// let doc1 = node.docs().create().await?;
// let doc2 = node.docs().open(doc1.id()).await?.expect("doc to exist");
// // close doc1 instance
// doc1.close().await?;
// // operations on doc1 now fail.
// assert!(doc1.set_bytes(author, "foo", "bar").await.is_err());
// // dropping doc1 will close the doc if not already closed
// // wait a bit because the close-on-drop spawns a task for which we cannot track completion.
// drop(doc1);
// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// // operations on doc2 still succeed
// doc2.set_bytes(author, "foo", "bar").await?;
// Ok(())
// }

// #[tokio::test]
// async fn test_doc_import_export() -> Result<()> {
// let _guard = iroh_test::logging::setup();

// let node = iroh::node::Node::memory().enable_docs().spawn().await?;

// // create temp file
// let temp_dir = tempfile::tempdir().context("tempdir")?;

// let in_root = temp_dir.path().join("in");
// tokio::fs::create_dir_all(in_root.clone())
// .await
// .context("create dir all")?;
// let out_root = temp_dir.path().join("out");

// let path = in_root.join("test");

// let size = 100;
// let mut buf = vec![0u8; size];
// rand::thread_rng().fill_bytes(&mut buf);
// let mut file = tokio::fs::File::create(path.clone())
// .await
// .context("create file")?;
// file.write_all(&buf.clone()).await.context("write_all")?;
// file.flush().await.context("flush")?;

// // create doc & author
// let client = node.rpc_client().clone();
// let docs_client = Client::from_service(client);
// let doc = docs_client.create().await.context("doc create")?;
// let author = client.authors().create().await.context("author create")?;

// // import file
// let import_outcome = doc
// .import_file(
// author,
// iroh::util::fs::path_to_key(path.clone(), None, Some(in_root))?,
// path,
// true,
// )
// .await
// .context("import file")?
// .finish()
// .await
// .context("import finish")?;

// // export file
// let entry = doc
// .get_one(Query::author(author).key_exact(import_outcome.key))
// .await
// .context("get one")?
// .unwrap();
// let key = entry.key().to_vec();
// let export_outcome = doc
// .export_file(
// entry,
// iroh::util::fs::key_to_path(key, None, Some(out_root))?,
// ExportMode::Copy,
// )
// .await
// .context("export file")?
// .finish()
// .await
// .context("export finish")?;

// let got_bytes = tokio::fs::read(export_outcome.path)
// .await
// .context("tokio read")?;
// assert_eq!(buf, got_bytes);

// Ok(())
// }
}

0 comments on commit 70c565f

Please sign in to comment.