From ca3d46182fe05f30760d963c1581982b26adaee8 Mon Sep 17 00:00:00 2001 From: John Turpish Date: Fri, 11 Aug 2023 10:24:27 -0400 Subject: [PATCH] Don't auto-dangle CIDs. --- .github/workflows/docker-test.yml | 4 +-- .gitignore | 3 +- Cargo.lock | 15 +++++----- Cargo.toml | 2 +- controller/Cargo.toml | 1 + controller/src/main.rs | 11 +++++-- local-storage/src/file_provider.rs | 48 ++++++++++++++++++++++-------- local-storage/src/sql_provider.rs | 13 ++++++++ myceli/src/main.rs | 2 +- myceli/tests/utils/mod.rs | 3 +- testing/local_test.sh | 8 ++--- transports/src/udp_chunking.rs | 2 +- transports/src/udp_transport.rs | 2 +- watcher/src/main.rs | 1 - 14 files changed, 81 insertions(+), 34 deletions(-) diff --git a/.github/workflows/docker-test.yml b/.github/workflows/docker-test.yml index 4d79fea..78207b5 100644 --- a/.github/workflows/docker-test.yml +++ b/.github/workflows/docker-test.yml @@ -1,5 +1,5 @@ -name: CI Unit Testing -on: [pull_request] +name: Docker Testing +on: [ pull_request ] jobs: test_docker_build: name: Test Docker build diff --git a/.gitignore b/.gitignore index 7abd971..9872f74 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ target/ build/ storage.db sat/ -gnd/ \ No newline at end of file +gnd/ +log diff --git a/Cargo.lock b/Cargo.lock index fdbceed..ee1fc6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -486,10 +486,11 @@ checksum = "21a53c0a4d288377e7415b53dcfc3c04da5cdc2cc95c8d5ac178b58f0b861ad6" [[package]] name = "controller" -version = "0.6.5" +version = "0.6.6" dependencies = [ "anyhow", "clap", + "config", "env_logger", "log", "messages", @@ -1072,7 +1073,7 @@ dependencies = [ [[package]] name = "hyphae" -version = "0.6.5" +version = "0.6.6" dependencies = [ "anyhow", "bytes", @@ -1204,7 +1205,7 @@ dependencies = [ [[package]] name = "ipfs-unixfs" -version = "0.6.5" +version = "0.6.6" dependencies = [ "anyhow", "async-recursion", @@ -1406,7 +1407,7 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "local-storage" -version = "0.6.5" +version = "0.6.6" dependencies = [ "anyhow", "assert_fs", @@ -1474,7 +1475,7 @@ dependencies = [ [[package]] name = "messages" -version = "0.6.5" +version = "0.6.6" dependencies = [ "cid", "clap", @@ -1601,7 +1602,7 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" [[package]] name = "myceli" -version = "0.6.5" +version = "0.6.6" dependencies = [ "anyhow", "assert_fs", @@ -2602,7 +2603,7 @@ dependencies = [ [[package]] name = "transports" -version = "0.6.5" +version = "0.6.6" dependencies = [ "cid", "derive-error", diff --git a/Cargo.toml b/Cargo.toml index 884abb1..5269320 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ members = [ ] [workspace.package] -version = "0.6.5" +version = "0.6.6" edition = "2021" license = "Apache-2.0/MIT" rust-version = "1.68.1" diff --git a/controller/Cargo.toml b/controller/Cargo.toml index fb8bad2..71d614e 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -9,6 +9,7 @@ rust-version.workspace = true [dependencies] anyhow.workspace = true clap.workspace = true +config.workspace = true env_logger.workspace = true log.workspace = true messages.workspace = true diff --git a/controller/src/main.rs b/controller/src/main.rs index 5fecc67..eb5b0e8 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -9,7 +9,7 @@ use transports::{Transport, UdpTransport, MAX_MTU}; #[clap(about = "Control a Myceli instance")] pub struct Cli { #[arg(help = "The network address that a myceli instance is listening on")] - instance_addr: String, + instance_addr: Option, #[arg( short, long, @@ -45,7 +45,14 @@ impl Cli { let cmd_str = serde_json::to_string(&command)?; info!("Transmitting: {}", &cmd_str); - transport.send(command, &self.instance_addr)?; + let instance_addr = if let Some(addr) = &self.instance_addr { + addr.clone() + } else { + let cfg = config::Config::parse(None) + .expect("Please specify instance addr, as I can't read myceli.toml"); + cfg.listen_address + }; + transport.send(command, &instance_addr)?; if self.listen_mode { match transport.receive() { Ok((msg, _)) => { diff --git a/local-storage/src/file_provider.rs b/local-storage/src/file_provider.rs index 2d25f8f..edd43a3 100644 --- a/local-storage/src/file_provider.rs +++ b/local-storage/src/file_provider.rs @@ -1,7 +1,7 @@ use crate::{block::StoredBlock, provider::StorageProvider}; use anyhow::Result; use cid::{multibase, Cid}; -use log::{debug, error, info}; +use log::{debug, error, info, trace}; use std::{ cmp::Ordering, fmt::Debug, @@ -27,11 +27,11 @@ impl FileStorageProvider { old_blocks: vec![], high: high_usage, }; - create_dir_all(&me.blocks())?; + create_dir_all(me.blocks())?; me.dir = canonicalize(storage_folder)?; debug!("FileStorageProvider({:?})", &me.dir); - create_dir_all(&me.cids())?; - create_dir_all(&me.names())?; + create_dir_all(me.cids())?; + create_dir_all(me.names())?; me.count_blocks(); Ok(me) } @@ -98,8 +98,10 @@ impl FileStorageProvider { Ok((to_skip, to_fetch)) } - fn entry_to_cid_str(&self, r: std::io::Result) -> Option { - let e = r.ok()?; + fn rentry_to_cid_str(&self, r: std::io::Result) -> Option { + self.entry_to_cid_str(r.ok()?) + } + fn entry_to_cid_str(&self, e: DirEntry) -> Option { if e.metadata().ok()?.is_file() { let cid_str = e.file_name().to_str()?.to_owned(); let cid = Cid::try_from(cid_str.as_str()).ok()?; @@ -114,7 +116,7 @@ impl FileStorageProvider { } } fn count_blocks(&mut self) { - if let Ok(rd) = fs::read_dir(&self.blocks()) { + if let Ok(rd) = fs::read_dir(self.blocks()) { self.old_blocks = rd .flat_map(|r| r.ok()) .flat_map(OnDiskBlock::from) @@ -123,6 +125,27 @@ impl FileStorageProvider { self.usage = self.old_blocks.iter().map(|b| b.size).sum(); } } + fn drop_cids_with_block_path(&self, block_path: &std::path::Path) -> Result<()> { + trace!("drop_cids_with_block_path({block_path:?})"); + for e in read_dir(self.cids())?.flat_map(|r| r.ok()) { + let cid_path = e.path(); + trace!("Checking CID path {cid_path:?}"); + if let Some(Some(cid_str)) = cid_path.file_name().map(|f| f.to_str()) { + if let Ok(cid) = Cid::try_from(cid_str) { + if self.block_path(&cid) == block_path { + match fs::remove_file(&cid_path) { + Ok(_) => { + info!("Removed {cid_path:?} because its block {block_path:?} is gone."); + fs::remove_file(self.names().join(cid_str)).ok();//It's totally normal to not exist + }, + Err(e) => error!("Error removing dangling CID {cid_path:?} (corresponding to {block_path:?}): {e}"), + } + } + } + } + } + Ok(()) + } } impl StorageProvider for FileStorageProvider { @@ -148,7 +171,7 @@ impl StorageProvider for FileStorageProvider { fn get_available_cids(&self) -> anyhow::Result> { let mut result: Vec = read_dir(self.cids())? - .filter_map(|f| self.entry_to_cid_str(f)) + .filter_map(|f| self.rentry_to_cid_str(f)) .collect(); result.sort(); Ok(result) @@ -182,7 +205,7 @@ impl StorageProvider for FileStorageProvider { .get_available_cids()? .into_iter() .map(|c| { - let n = self.get_name(c.as_str()).ok().unwrap_or_else(String::new); + let n = self.get_name(c.as_str()).ok().unwrap_or_default(); (c, n) }) .collect()) @@ -242,6 +265,9 @@ impl StorageProvider for FileStorageProvider { &odb.path, self.usage, self.high ); self.usage -= odb.size; + if let Err(e) = self.drop_cids_with_block_path(&odb.path) { + error!("Trouble dropping CIDs for block: {e}"); + } } Err(e) => { error!("Error removing old block {odb:?} to free up space! {e:?}"); @@ -276,9 +302,7 @@ impl OnDiskBlock { impl Ord for OnDiskBlock { fn cmp(&self, other: &Self) -> Ordering { if self.modt != other.modt { - let result = self.modt.cmp(&other.modt); - // println!("{:?}.cmp({:?})={:?}", &self.modt, &other.modt, &result); - result + self.modt.cmp(&other.modt) } else if self.size != other.size { other.size.cmp(&self.size) } else { diff --git a/local-storage/src/sql_provider.rs b/local-storage/src/sql_provider.rs index 011019d..f765bfe 100644 --- a/local-storage/src/sql_provider.rs +++ b/local-storage/src/sql_provider.rs @@ -2,6 +2,7 @@ use crate::{block::StoredBlock, error::StorageError, provider::StorageProvider}; use anyhow::{bail, Result}; use log::trace; use rusqlite::{params_from_iter, Connection}; +use std::{path::PathBuf, str::FromStr}; pub struct SqliteStorageProvider { conn: Box, @@ -9,6 +10,18 @@ pub struct SqliteStorageProvider { impl SqliteStorageProvider { pub fn new(db_path: &str) -> Result { + let mut db_path = PathBuf::from_str(db_path)?; + loop { + if db_path.is_dir() { + db_path = db_path.join("storage.db"); + } else if db_path.exists() + || db_path.extension().unwrap_or_default().to_str() == Some("db") + { + break; + } else { + db_path = db_path.join("storage.db"); + } + } let result = SqliteStorageProvider { conn: Box::new(Connection::open(db_path)?), }; diff --git a/myceli/src/main.rs b/myceli/src/main.rs index 2b67a74..66b78a9 100644 --- a/myceli/src/main.rs +++ b/myceli/src/main.rs @@ -28,7 +28,7 @@ fn main() -> Result<()> { std::fs::create_dir_all(&cfg.storage_path).expect("Failed to create storage dir"); - let db_path = format!("{}/storage.db", cfg.storage_path); + let db_path = cfg.storage_path.clone(); let disk_bytes = cfg.disk_usage * 1024; let mut udp_transport = diff --git a/myceli/tests/utils/mod.rs b/myceli/tests/utils/mod.rs index b94fa97..57ecbe9 100644 --- a/myceli/tests/utils/mod.rs +++ b/myceli/tests/utils/mod.rs @@ -111,7 +111,7 @@ impl TestController { pub fn new() -> Self { let mut transport = UdpTransport::new("127.0.0.1:0", 60, None).unwrap(); transport - .set_read_timeout(Some(Duration::from_secs(5))) + .set_read_timeout(Some(Duration::from_secs(9))) .unwrap(); transport.set_max_read_attempts(Some(1)); TestController { transport } @@ -119,6 +119,7 @@ impl TestController { pub fn send_and_recv(&mut self, target_addr: &str, message: Message) -> Message { self.send_msg(message, target_addr); + std::thread::sleep(Duration::from_millis(500)); self.recv_msg().unwrap() } diff --git a/testing/local_test.sh b/testing/local_test.sh index 60554a0..3c0e84f 100755 --- a/testing/local_test.sh +++ b/testing/local_test.sh @@ -99,7 +99,7 @@ else fi rm -rv sat || true rm -rv gnd || true -mkdir -p sat/storage.db +mkdir -p sat/ mkdir gnd cat > sat/config.toml < { - return Err(err.into()); + return Err(err); } } } diff --git a/watcher/src/main.rs b/watcher/src/main.rs index 70f5734..5f88f22 100644 --- a/watcher/src/main.rs +++ b/watcher/src/main.rs @@ -11,7 +11,6 @@ fn watched_dir(cfg: &config::Config) -> PathBuf { result.push( cfg.clone() .watched_directory - .clone() .expect("Must configure watched_directory before running watcher."), ); result