diff --git a/Cargo.lock b/Cargo.lock index f216320..0956510 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,20 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "acto" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c372578ce4215ccf94ec3f3585fbb6a902e47d07b064ff8a55d850ffb5025e" -dependencies = [ - "parking_lot", - "pin-project-lite", - "rustc_version", - "smol_str", - "tokio", - "tracing", -] - [[package]] name = "addr2line" version = "0.24.2" @@ -419,9 +405,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.9.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" dependencies = [ "serde", ] @@ -437,9 +423,9 @@ dependencies = [ [[package]] name = "cargo-platform" -version = "0.1.9" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e35af189006b9c0f00a064685c727031e3ed2d8020f7ba284d78cc2671bd36ea" +checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc" dependencies = [ "serde", ] @@ -459,9 +445,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc" +checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" dependencies = [ "shlex", ] @@ -941,6 +927,18 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" +[[package]] +name = "duct" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4ab5718d1224b63252cd0c6f74f6480f9ffeb117438a2e0f5cf6d9a4798929c" +dependencies = [ + "libc", + "once_cell", + "os_pipe", + "shared_child", +] + [[package]] name = "dyn-clone" version = "1.0.17" @@ -1104,12 +1102,12 @@ checksum = "a02a5d186d7bf1cb21f1f95e1a9cfa5c1f2dcd803a47aad454423ceec13525c5" [[package]] name = "errno" -version = "0.3.10" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -1537,30 +1535,6 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" -[[package]] -name = "hickory-proto" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07698b8420e2f0d6447a436ba999ec85d8fbf2a398bbd737b82cac4a2e96e512" -dependencies = [ - "async-trait", - "cfg-if", - "data-encoding", - "enum-as-inner", - "futures-channel", - "futures-io", - "futures-util", - "idna 0.4.0", - "ipnet", - "once_cell", - "rand", - "thiserror 1.0.69", - "tinyvec", - "tokio", - "tracing", - "url", -] - [[package]] name = "hickory-proto" version = "0.25.0-alpha.2" @@ -1595,7 +1569,7 @@ checksum = "46c110355b5703070d9e29c344d79818a7cde3de9c27fc35750defea6074b0ad" dependencies = [ "cfg-if", "futures-util", - "hickory-proto 0.25.0-alpha.2", + "hickory-proto", "ipconfig", "lru-cache", "once_cell", @@ -1917,16 +1891,6 @@ dependencies = [ "syn 2.0.89", ] -[[package]] -name = "idna" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" -dependencies = [ - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "0.5.0" @@ -2051,7 +2015,7 @@ checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" [[package]] name = "iroh" version = "0.28.1" -source = "git+https://github.com/n0-computer/iroh?branch=main#43d0ea45b950a69aa3a3340b60275f53aa18b254" +source = "git+https://github.com/n0-computer/iroh?branch=main#f7764ef130a0f2fd4938339178372a7176bd5def" dependencies = [ "anyhow", "axum", @@ -2068,7 +2032,7 @@ dependencies = [ "genawaiter", "governor", "hex", - "hickory-proto 0.25.0-alpha.2", + "hickory-proto", "hickory-resolver", "hostname 0.4.0", "http 1.1.0", @@ -2111,7 +2075,6 @@ dependencies = [ "strum", "stun-rs", "surge-ping", - "swarm-discovery", "thiserror 2.0.3", "time", "tokio", @@ -2133,7 +2096,7 @@ dependencies = [ [[package]] name = "iroh-base" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#43d0ea45b950a69aa3a3340b60275f53aa18b254" +source = "git+https://github.com/n0-computer/iroh?branch=main#f7764ef130a0f2fd4938339178372a7176bd5def" dependencies = [ "aead", "anyhow", @@ -2173,15 +2136,13 @@ dependencies = [ [[package]] name = "iroh-blobs" version = "0.28.1" -source = "git+https://github.com/n0-computer/iroh-blobs?branch=main#d87f67115b3b818a96ace2732aac835dec1ada65" +source = "git+https://github.com/n0-computer/iroh-blobs?branch=main#e8783f9065c2ca0ead0223c7397e47a804f6d137" dependencies = [ "anyhow", "async-channel", "bao-tree", "bytes", "chrono", - "clap", - "console", "derive_more", "futures-buffered", "futures-lite 2.5.0", @@ -2189,7 +2150,6 @@ dependencies = [ "genawaiter", "hashlink", "hex", - "indicatif", "iroh", "iroh-base", "iroh-io", @@ -2283,7 +2243,7 @@ dependencies = [ [[package]] name = "iroh-gossip" version = "0.28.1" -source = "git+https://github.com/n0-computer/iroh-gossip?branch=main#89e91a34bd046fb7fbd504b2b8d0849e2865d410" +source = "git+https://github.com/n0-computer/iroh-gossip?branch=main#50caf1b6976859efedbc6fd24b65afc7df60c13d" dependencies = [ "anyhow", "async-channel", @@ -2328,7 +2288,7 @@ dependencies = [ [[package]] name = "iroh-metrics" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#43d0ea45b950a69aa3a3340b60275f53aa18b254" +source = "git+https://github.com/n0-computer/iroh?branch=main#f7764ef130a0f2fd4938339178372a7176bd5def" dependencies = [ "anyhow", "erased_set", @@ -2348,7 +2308,7 @@ dependencies = [ [[package]] name = "iroh-net-report" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#43d0ea45b950a69aa3a3340b60275f53aa18b254" +source = "git+https://github.com/n0-computer/iroh?branch=main#f7764ef130a0f2fd4938339178372a7176bd5def" dependencies = [ "anyhow", "bytes", @@ -2424,20 +2384,21 @@ dependencies = [ [[package]] name = "iroh-relay" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#43d0ea45b950a69aa3a3340b60275f53aa18b254" +source = "git+https://github.com/n0-computer/iroh?branch=main#f7764ef130a0f2fd4938339178372a7176bd5def" dependencies = [ "anyhow", "base64", "bytes", "clap", "derive_more", + "duct", "futures-buffered", "futures-lite 2.5.0", "futures-sink", "futures-util", "governor", "hex", - "hickory-proto 0.25.0-alpha.2", + "hickory-proto", "hickory-resolver", "hostname 0.4.0", "http 1.1.0", @@ -2852,7 +2813,7 @@ dependencies = [ [[package]] name = "netwatch" version = "0.1.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#43d0ea45b950a69aa3a3340b60275f53aa18b254" +source = "git+https://github.com/n0-computer/iroh?branch=main#f7764ef130a0f2fd4938339178372a7176bd5def" dependencies = [ "anyhow", "atomic-waker", @@ -3098,6 +3059,16 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "os_pipe" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ffd2b0a5634335b135d5728d84c5e0fd726954b87111f7506a61c502280d982" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "overload" version = "0.1.1" @@ -3388,7 +3359,7 @@ checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6" [[package]] name = "portmapper" version = "0.1.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#43d0ea45b950a69aa3a3340b60275f53aa18b254" +source = "git+https://github.com/n0-computer/iroh?branch=main#f7764ef130a0f2fd4938339178372a7176bd5def" dependencies = [ "anyhow", "base64", @@ -4431,6 +4402,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shared_child" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09fa9338aed9a1df411814a5b2252f7cd206c55ae9bf2fa763f8de84603aa60c" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "shell-words" version = "1.1.0" @@ -4498,12 +4479,6 @@ dependencies = [ "serde", ] -[[package]] -name = "smol_str" -version = "0.1.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fad6c857cbab2627dcf01ec85a623ca4e7dcb5691cbaa3d7fb7653671f0d09c9" - [[package]] name = "socket2" version = "0.5.8" @@ -4716,21 +4691,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "swarm-discovery" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39769914108ae68e261d85ceac7bce7095947130f79c29d4535e9b31fc702a40" -dependencies = [ - "acto", - "anyhow", - "hickory-proto 0.24.1", - "rand", - "socket2", - "tokio", - "tracing", -] - [[package]] name = "syn" version = "1.0.109" @@ -4852,9 +4812,9 @@ dependencies = [ [[package]] name = "testdir" -version = "0.9.1" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee79e927b64d193f5abb60d20a0eb56be0ee5a242fdeb8ce3bf054177006de52" +checksum = "a45fc921e7c4ad1aedb3484811514f3e5cd187886e0bbf1302c175f7578ef552" dependencies = [ "anyhow", "backtrace", diff --git a/Cargo.toml b/Cargo.toml index 58cdc1c..f6514bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ parking_lot = "0.12.3" testresult = "0.4.1" nested_enum_utils = "0.1.0" iroh-io = "0.6.1" -testdir = "0.9.1" +testdir = "0.7" data-encoding = "2.6.0" [features] diff --git a/tests/gc.rs b/tests/gc.rs index 09de6c3..aca7716 100644 --- a/tests/gc.rs +++ b/tests/gc.rs @@ -8,13 +8,14 @@ use std::{ use anyhow::Result; use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; use bytes::Bytes; +use futures_lite::StreamExt; use iroh_blobs::{ - hashseq::HashSeq, - store::{bao_tree, EntryStatus, MapMut, Store}, - util::Tag, - BlobFormat, HashAndFormat, IROH_BLOCK_SIZE, + store::{bao_tree, Map}, + IROH_BLOCK_SIZE, }; +use iroh_io::AsyncSliceReaderExt; use rand::RngCore; +use testdir::testdir; use util::Node; mod util; @@ -39,25 +40,6 @@ pub fn simulate_remote(data: &[u8]) -> (blake3::Hash, Cursor) { (hash, Cursor::new(encoded.into())) } -/// Wrap a bao store in a node that has gc enabled. -async fn mem_node( - gc_period: Duration, -) -> ( - Node, - async_channel::Receiver<()>, -) { - let (gc_send, gc_recv) = async_channel::unbounded(); - let node = Node::memory() - .gc_interval(Some(gc_period)) - .register_gc_done_cb(Box::new(move || { - gc_send.send_blocking(()).ok(); - })) - .spawn() - .await - .unwrap(); - (node, gc_recv) -} - /// Wrap a bao store in a node that has gc enabled. async fn persistent_node( path: PathBuf, @@ -78,444 +60,41 @@ async fn persistent_node( (node, gc_recv) } -async fn gc_test_node() -> ( - Node, - iroh_blobs::store::mem::Store, - async_channel::Receiver<()>, -) { - let (node, gc_recv) = mem_node(Duration::from_millis(500)).await; - let store = node.blob_store().clone(); - (node, store, gc_recv) -} - -async fn step(evs: &async_channel::Receiver<()>) { - // drain the event queue, we want a new GC - while evs.try_recv().is_ok() {} - // wait for several GC cycles - for _ in 0..3 { - evs.recv().await.unwrap(); - } -} - -/// Test the absolute basics of gc, temp tags and tags for blobs. -#[tokio::test] -async fn gc_basics() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let (node, bao_store, evs) = gc_test_node().await; - let data1 = create_test_data(1234); - let tt1 = bao_store.import_bytes(data1, BlobFormat::Raw).await?; - let data2 = create_test_data(5678); - let tt2 = bao_store.import_bytes(data2, BlobFormat::Raw).await?; - let h1 = *tt1.hash(); - let h2 = *tt2.hash(); - // temp tags are still there, so the entries should be there - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::Complete); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); - - // drop the first tag, the entry should be gone after some time - drop(tt1); - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); - - // create an explicit tag for h1 (as raw) and then delete the temp tag. Entry should still be there. - let tag = Tag::from("test"); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(h2))) - .await?; - drop(tt2); - tracing::info!("dropped tt2"); - step(&evs).await; - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); - - // delete the explicit tag, entry should be gone - bao_store.set_tag(tag, None).await?; - step(&evs).await; - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); - - node.shutdown().await?; - Ok(()) -} - -/// Test gc for sequences of hashes that protect their children from deletion. #[tokio::test] -async fn gc_hashseq_impl() -> Result<()> { +async fn redb_doc_import_stress() -> Result<()> { let _ = tracing_subscriber::fmt::try_init(); - let (node, bao_store, evs) = gc_test_node().await; - let data1 = create_test_data(1234); - let tt1 = bao_store.import_bytes(data1, BlobFormat::Raw).await?; - let data2 = create_test_data(5678); - let tt2 = bao_store.import_bytes(data2, BlobFormat::Raw).await?; - let seq = vec![*tt1.hash(), *tt2.hash()] - .into_iter() - .collect::(); - let ttr = bao_store - .import_bytes(seq.into_inner(), BlobFormat::HashSeq) - .await?; - let h1 = *tt1.hash(); - let h2 = *tt2.hash(); - let hr = *ttr.hash(); - drop(tt1); - drop(tt2); - - // there is a temp tag for the link seq, so it and its entries should be there - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::Complete); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); - assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete); - - // make a permanent tag for the link seq, then delete the temp tag. Entries should still be there. - let tag = Tag::from("test"); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(hr))) - .await?; - drop(ttr); - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::Complete); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); - assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete); - - // change the permanent tag to be just for the linkseq itself as a blob. Only the linkseq should be there, not the entries. - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) - .await?; - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); - assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete); - - // delete the permanent tag, everything should be gone - bao_store.set_tag(tag, None).await?; - step(&evs).await; - assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); - assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); - assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::NotFound); - - node.shutdown().await?; - Ok(()) -} - -mod file { - use std::{io, path::PathBuf}; - - use bao_tree::{ - io::fsm::{BaoContentItem, ResponseDecoderNext}, - BaoTree, - }; - use futures_lite::StreamExt; - use iroh_blobs::{ - store::{BaoBatchWriter, ConsistencyCheckProgress, Map, MapEntryMut, ReportLevel}, - util::progress::{AsyncChannelProgressSender, ProgressSender as _}, - TempTag, - }; - use iroh_io::AsyncSliceReaderExt; - use testdir::testdir; - use tokio::io::AsyncReadExt; - - use super::*; - - fn path(root: PathBuf, suffix: &'static str) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { - move |hash| root.join(format!("{}.{}", hash.to_hex(), suffix)) - } - - fn data_path(root: PathBuf) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { - // this assumes knowledge of the internal directory structure of the flat store - path(root.join("data"), "data") - } - - fn outboard_path(root: PathBuf) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { - // this assumes knowledge of the internal directory structure of the flat store - path(root.join("data"), "obao4") - } - - async fn check_consistency(store: &impl Store) -> anyhow::Result { - let mut max_level = ReportLevel::Trace; - let (tx, rx) = async_channel::bounded(1); - let task = tokio::task::spawn(async move { - while let Ok(ev) = rx.recv().await { - if let ConsistencyCheckProgress::Update { level, .. } = &ev { - max_level = max_level.max(*level); - } - } - }); - store - .consistency_check(false, AsyncChannelProgressSender::new(tx).boxed()) - .await?; - task.await?; - Ok(max_level) - } - - #[tokio::test] - async fn redb_doc_import_stress() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let dir = testdir!(); - let (node, _) = persistent_node(dir.join("store"), Duration::from_secs(10)).await; - let bao_store = node.blob_store().clone(); - let client = node.client(); - let doc = client.docs().create().await?; - let author = client.authors().create().await?; - let temp_path = dir.join("temp"); - tokio::fs::create_dir_all(&temp_path).await?; - let mut to_import = Vec::new(); - for i in 0..100 { - let data = create_test_data(16 * 1024 * 3 + 1); - let path = temp_path.join(format!("file{}", i)); - tokio::fs::write(&path, &data).await?; - let key = Bytes::from(format!("{}", path.display())); - to_import.push((key, path, data)); - } - for (key, path, _) in to_import.iter() { - let mut progress = doc.import_file(author, key.clone(), path, true).await?; - while let Some(msg) = progress.next().await { - tracing::info!("import progress {:?}", msg); - } - } - for (i, (key, _, expected)) in to_import.iter().enumerate() { - let Some(entry) = doc.get_exact(author, key.clone(), true).await? else { - anyhow::bail!("doc entry not found {}", i); - }; - let hash = entry.content_hash(); - let Some(content) = bao_store.get(&hash).await? else { - anyhow::bail!("content not found {} {}", i, &hash.to_hex()[..8]); - }; - let data = content.data_reader().read_to_end().await?; - assert_eq!(data, expected); - } - Ok(()) - } - - /// Test gc for sequences of hashes that protect their children from deletion. - #[tokio::test] - async fn gc_file_basics() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let dir = testdir!(); - let path = data_path(dir.clone()); - let outboard_path = outboard_path(dir.clone()); - let (node, evs) = persistent_node(dir.clone(), Duration::from_millis(100)).await; - let bao_store = node.blob_store().clone(); - let data1 = create_test_data(10000000); - let tt1 = bao_store - .import_bytes(data1.clone(), BlobFormat::Raw) - .await?; - let data2 = create_test_data(1000000); - let tt2 = bao_store - .import_bytes(data2.clone(), BlobFormat::Raw) - .await?; - let seq = vec![*tt1.hash(), *tt2.hash()] - .into_iter() - .collect::(); - let ttr = bao_store - .import_bytes(seq.into_inner(), BlobFormat::HashSeq) - .await?; - - let h1 = *tt1.hash(); - let h2 = *tt2.hash(); - let hr = *ttr.hash(); - - // data is protected by the temp tag - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 is for a giant file, so we will have both data and outboard files - assert!(path(&h1).exists()); - assert!(outboard_path(&h1).exists()); - // h2 is for a mid sized file, so we will have just the data file - assert!(path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr so small that data will be inlined and outboard will not exist at all - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - drop(tt1); - drop(tt2); - let tag = Tag::from("test"); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(*ttr.hash()))) - .await?; - drop(ttr); - - // data is now protected by a normal tag, nothing should be gone - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 is for a giant file, so we will have both data and outboard files - assert!(path(&h1).exists()); - assert!(outboard_path(&h1).exists()); - // h2 is for a mid sized file, so we will have just the data file - assert!(path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr so small that data will be inlined and outboard will not exist at all - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - tracing::info!("changing tag from hashseq to raw, this should orphan the children"); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) - .await?; - - // now only hr itself should be protected, but not its children - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 should be gone - assert!(!path(&h1).exists()); - assert!(!outboard_path(&h1).exists()); - // h2 should still not be there - assert!(!path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr should still not be there - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - bao_store.set_tag(tag, None).await?; - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 should be gone - assert!(!path(&h1).exists()); - assert!(!outboard_path(&h1).exists()); - // h2 should still not be there - assert!(!path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr should still not be there - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - node.shutdown().await?; - - Ok(()) + let dir = testdir!(); + let (node, _) = persistent_node(dir.join("store"), Duration::from_secs(10)).await; + let bao_store = node.blob_store().clone(); + let client = node.client(); + let doc = client.docs().create().await?; + let author = client.authors().create().await?; + let temp_path = dir.join("temp"); + tokio::fs::create_dir_all(&temp_path).await?; + let mut to_import = Vec::new(); + for i in 0..100 { + let data = create_test_data(16 * 1024 * 3 + 1); + let path = temp_path.join(format!("file{}", i)); + tokio::fs::write(&path, &data).await?; + let key = Bytes::from(format!("{}", path.display())); + to_import.push((key, path, data)); } - - /// Add a file to the store in the same way a download works. - /// - /// we know the hash in advance, create a partial entry, write the data to it and - /// the outboard file, then commit it to a complete entry. - /// - /// During this time, the partial entry is protected by a temp tag. - async fn simulate_download_partial( - bao_store: &S, - data: Bytes, - ) -> io::Result<(S::EntryMut, TempTag)> { - // simulate the remote side. - let (hash, mut response) = simulate_remote(data.as_ref()); - // simulate the local side. - // we got a hash and a response from the remote side. - let tt = bao_store.temp_tag(HashAndFormat::raw(hash.into())); - // get the size - let size = response.read_u64_le().await?; - // start reading the response - let mut reading = bao_tree::io::fsm::ResponseDecoder::new( - hash, - ChunkRanges::all(), - BaoTree::new(size, IROH_BLOCK_SIZE), - response, - ); - // create the partial entry - let entry = bao_store.get_or_create(hash.into(), size).await?; - // create the - let mut bw = entry.batch_writer().await?; - let mut buf = Vec::new(); - while let ResponseDecoderNext::More((next, res)) = reading.next().await { - let item = res?; - match &item { - BaoContentItem::Parent(_) => { - buf.push(item); - } - BaoContentItem::Leaf(_) => { - buf.push(item); - let batch = std::mem::take(&mut buf); - bw.write_batch(size, batch).await?; - } - } - reading = next; + for (key, path, _) in to_import.iter() { + let mut progress = doc.import_file(author, key.clone(), path, true).await?; + while let Some(msg) = progress.next().await { + tracing::info!("import progress {:?}", msg); } - bw.sync().await?; - drop(bw); - Ok((entry, tt)) } - - async fn simulate_download_complete( - bao_store: &S, - data: Bytes, - ) -> io::Result { - let (entry, tt) = simulate_download_partial(bao_store, data).await?; - // commit the entry - bao_store.insert_complete(entry).await?; - Ok(tt) - } - - /// Test that partial files are deleted. - #[tokio::test] - async fn gc_file_partial() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let dir = testdir!(); - let path = data_path(dir.clone()); - let outboard_path = outboard_path(dir.clone()); - - let (node, evs) = persistent_node(dir.clone(), Duration::from_millis(10)).await; - let bao_store = node.blob_store().clone(); - - let data1: Bytes = create_test_data(10000000); - let (_entry, tt1) = simulate_download_partial(&bao_store, data1.clone()).await?; - drop(_entry); - let h1 = *tt1.hash(); - // partial data and outboard files should be there - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - assert!(path(&h1).exists()); - assert!(outboard_path(&h1).exists()); - - drop(tt1); - // partial data and outboard files should be gone - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - assert!(!path(&h1).exists()); - assert!(!outboard_path(&h1).exists()); - - node.shutdown().await?; - Ok(()) - } - - #[tokio::test] - async fn gc_file_stress() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let dir = testdir!(); - - let (node, evs) = persistent_node(dir.clone(), Duration::from_secs(1)).await; - let bao_store = node.blob_store().clone(); - - let mut deleted = Vec::new(); - let mut live = Vec::new(); - // download - for i in 0..100 { - let data: Bytes = create_test_data(16 * 1024 * 3 + 1); - let tt = simulate_download_complete(&bao_store, data).await.unwrap(); - if i % 100 == 0 { - let tag = Tag::from(format!("test{}", i)); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(*tt.hash()))) - .await?; - live.push(*tt.hash()); - } else { - deleted.push(*tt.hash()); - } - } - step(&evs).await; - - for h in deleted.iter() { - assert_eq!(bao_store.entry_status(h).await?, EntryStatus::NotFound); - assert!(!dir.join(format!("data/{}.data", h.to_hex())).exists()); - } - - for h in live.iter() { - assert_eq!(bao_store.entry_status(h).await?, EntryStatus::Complete); - assert!(dir.join(format!("data/{}.data", h.to_hex())).exists()); - } - - node.shutdown().await?; - Ok(()) + for (i, (key, _, expected)) in to_import.iter().enumerate() { + let Some(entry) = doc.get_exact(author, key.clone(), true).await? else { + anyhow::bail!("doc entry not found {}", i); + }; + let hash = entry.content_hash(); + let Some(content) = bao_store.get(&hash).await? else { + anyhow::bail!("content not found {} {}", i, &hash.to_hex()[..8]); + }; + let data = content.data_reader().read_to_end().await?; + assert_eq!(data, expected); } + Ok(()) } diff --git a/tests/util.rs b/tests/util.rs index 9dc47ba..228146d 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -185,7 +185,7 @@ impl Builder { }; router = router.accept(iroh_blobs::ALPN, blobs.clone()); router = router.accept(iroh_docs::ALPN, Arc::new(docs.clone())); - router = router.accept(iroh_gossip::net::GOSSIP_ALPN, Arc::new(gossip.clone())); + router = router.accept(iroh_gossip::ALPN, Arc::new(gossip.clone())); // Build the router let router = router.spawn().await?;