diff --git a/config/src/config.rs b/config/src/config.rs index 0a1e3d5..83b5ee1 100644 --- a/config/src/config.rs +++ b/config/src/config.rs @@ -30,6 +30,9 @@ pub struct Config { // A path to a directory which where files that appear should be auto-imported. // Absence implies no such directory exists pub watched_directory: Option, + + //How much storage space should Local Storage use? Measured in kiB. Default is 1 GiB + pub disk_usage: u64, } impl Default for Config { @@ -53,11 +56,12 @@ impl Default for Config { // Default to no set radio address radio_address: None, watched_directory: None, + disk_usage: 1024 * 1024, } } } fn default_storage_path() -> String { - dirs::cache_dir() + dirs::data_local_dir() .and_then(|d: PathBuf| { d.join("myceli") .into_os_string() diff --git a/hyphae/src/myceli_api.rs b/hyphae/src/myceli_api.rs index f213131..2dc3bc0 100644 --- a/hyphae/src/myceli_api.rs +++ b/hyphae/src/myceli_api.rs @@ -35,7 +35,8 @@ impl MyceliApi { } fn send_msg(&self, msg: Message) -> Result<()> { - self.transport.send(msg, &self.address) + self.transport.send(msg, &self.address)?; + Ok(()) } fn recv_msg(&self) -> Result { diff --git a/local-storage/src/file_provider.rs b/local-storage/src/file_provider.rs index 32084eb..2d25f8f 100644 --- a/local-storage/src/file_provider.rs +++ b/local-storage/src/file_provider.rs @@ -1,72 +1,38 @@ use crate::{block::StoredBlock, provider::StorageProvider}; -use anyhow::{anyhow, bail, Result}; -use cid::multihash::MultihashDigest; -use cid::{ - multibase, - multihash::{Code, Multihash}, - Cid, -}; -use ipfs_unixfs::codecs::Codec; -use ipfs_unixfs::unixfs::UnixfsNode; -use log::{debug, error, info, trace, warn}; -use std::cmp::Ordering; +use anyhow::Result; +use cid::{multibase, Cid}; +use log::{debug, error, info}; use std::{ - collections::{HashMap, HashSet}, - fmt::{Debug, Formatter}, + cmp::Ordering, + fmt::Debug, fs, - fs::{canonicalize, create_dir_all, read_dir, DirEntry, File, ReadDir}, + fs::{canonicalize, create_dir_all, read_dir, DirEntry, File}, io::{Read, Write}, - path::{Path, PathBuf}, + path::PathBuf, time::SystemTime, }; -//TODO - should be configurable -const MAX_DISK_USAGE: u64 = 1024 * 1024 * 64; - pub(crate) struct FileStorageProvider { dir: PathBuf, - janitor: JanitorStage, -} - -#[derive(Debug)] -enum JanitorStage { - Startup, - CheckBlocks(BlockCleanup), - CheckCids(CidCleanup), + usage: u64, + old_blocks: Vec, + high: u64, } -struct BlockCleanup { - listing: ReadDir, - total_size: u64, - existing: HashMap, -} -struct CidCleanup { - listing: ReadDir, - unref_block: HashMap, - refed_block: HashMap, - linked: HashSet, - existing: HashSet, - disk_usage: u64, -} -#[derive(PartialEq, Eq, Clone, Debug)] -struct OnDiskBlock { - mh_s: String, - path: PathBuf, - size: u64, - modt: SystemTime, -} - impl FileStorageProvider { #[allow(dead_code)] - pub fn new(storage_folder: &str) -> Result { + pub fn new(storage_folder: &str, high_usage: u64) -> Result { let mut me = Self { dir: storage_folder.into(), - janitor: JanitorStage::Startup, + usage: 0, + old_blocks: vec![], + high: high_usage, }; create_dir_all(&me.blocks())?; me.dir = canonicalize(storage_folder)?; debug!("FileStorageProvider({:?})", &me.dir); create_dir_all(&me.cids())?; create_dir_all(&me.names())?; + me.count_blocks(); Ok(me) } fn blocks(&self) -> PathBuf { @@ -147,43 +113,28 @@ impl FileStorageProvider { None } } - fn check_block_dirent(de: &DirEntry) -> Result { - let path = de.path(); - let md = de.metadata()?; - if !md.is_file() { - bail!("Ignore directories, symlinks, whatever: {path:?}"); - } - let mh_s = path - .file_name() - .ok_or(anyhow!("Can't get file's name"))? - .to_str() - .ok_or(anyhow!("File's name not stringable"))? - .to_string(); - let (_, mh_bytes) = multibase::decode(&mh_s)?; - let mh = Multihash::from_bytes(&mh_bytes)?; - let bytes = fs::read(&path)?; - let algo: Code = mh.code().try_into()?; - let as_stored = algo.digest(&bytes); - if mh == as_stored { - let size = md.len(); - let modt = md.modified()?; - Ok(OnDiskBlock { - path, - size, - modt, - mh_s, - }) - } else { - fs::remove_file(&path)?; - Err(anyhow!("Block file {:?} should have a multihash of {:?} but it actually hashes out to {:?}", &path, &mh, &as_stored)) + fn count_blocks(&mut self) { + if let Ok(rd) = fs::read_dir(&self.blocks()) { + self.old_blocks = rd + .flat_map(|r| r.ok()) + .flat_map(OnDiskBlock::from) + .collect(); + self.old_blocks.sort_by(|a, b| b.cmp(a)); + self.usage = self.old_blocks.iter().map(|b| b.size).sum(); } } } impl StorageProvider for FileStorageProvider { - fn import_block(&self, block: &StoredBlock) -> anyhow::Result<()> { + fn import_block(&mut self, block: &StoredBlock) -> anyhow::Result<()> { let cid = Cid::try_from(block.cid.as_str())?; let block_path = self.block_path(&cid); + + if !block_path.is_file() { + self.usage += u64::try_from(block.data.len()).ok().unwrap_or(0); + } else if let Some(idx) = self.old_blocks.iter().position(|o| o.path == block_path) { + self.old_blocks.remove(idx); + } File::create(&block_path)?.write_all(block.data.as_slice())?; let mut f = File::create(self.cids().join(&block.cid))?; for l in &block.links { @@ -281,207 +232,63 @@ impl StorageProvider for FileStorageProvider { } fn incremental_gc(&mut self) { - let block_dir = self.blocks(); - let cids_dir = self.cids(); - info!( - "incremental_gc({:?},{block_dir:?},{cids_dir:?})", - &self.janitor - ); - match &mut self.janitor { - JanitorStage::Startup => { - if let Ok(rd) = read_dir(self.blocks()) { - self.janitor = JanitorStage::CheckBlocks(BlockCleanup { - listing: rd, - total_size: 0, - existing: HashMap::new(), - }); + if self.usage < self.high { + debug!("No need to GC: usage={} < high={}", &self.usage, self.high); + } else if let Some(odb) = self.old_blocks.pop() { + match fs::remove_file(&odb.path) { + Ok(_) => { + info!( + "Removed {:?} as usage ({}) > max ({})", + &odb.path, self.usage, self.high + ); + self.usage -= odb.size; } - } - JanitorStage::CheckBlocks(bc) => match bc.listing.next() { - Some(Ok(de)) => match Self::check_block_dirent(&de) { - Ok(odb) => { - bc.total_size += odb.size; - bc.existing.insert(odb.mh_s.clone(), odb); - } - Err(e) => info!("Issue validating {:?}: {:?}", de, &e), - }, - Some(Err(e)) => error!("Error reading blocks dir: {:?}", &e), - None => { - if let Ok(rd) = read_dir(cids_dir) { - self.janitor = JanitorStage::CheckCids(CidCleanup { - listing: rd, - unref_block: std::mem::take(&mut bc.existing), - disk_usage: bc.total_size, - refed_block: HashMap::new(), - existing: HashSet::new(), - linked: HashSet::new(), - }); - } + Err(e) => { + error!("Error removing old block {odb:?} to free up space! {e:?}"); } - }, - JanitorStage::CheckCids(cc) => match cc.listing.next() { - Some(Ok(de)) => { - if let Err(e) = cc.check(&de, &block_dir) { - error!("CID validation problem {:?}", &e); - } - } - Some(Err(e)) => error!("Error reading cids dir: {:?}", &e), - None => { - while cc.disk_usage > MAX_DISK_USAGE { - let key_opt = { cc.unref_block.keys().cloned().next() }; - if let Some(key) = key_opt { - let odb = cc.unref_block.remove(&key).unwrap(); - match fs::remove_file(&odb.path) { - Ok(_) => { - cc.disk_usage -= odb.size; - info!( - "Removed unreferenced block file to free up space {:?}", - &odb.path - ); - } - Err(e) => { - error!( - "Error removing unreferenced block file {:?} {:?}", - &e, &odb.path - ); - } - } - } else { - let mut blocks: Vec<_> = cc.refed_block.values().cloned().collect(); - blocks.sort(); - for b in &blocks { - if fs::remove_file(&b.path).is_ok() { - warn!("Removing {:?} to free up space.", &b.path); - cc.disk_usage -= b.size; - cc.refed_block.remove(&b.mh_s); - if cc.disk_usage < MAX_DISK_USAGE { - break; - } - } - } - } - } - for l in &cc.linked { - println!("Missing linked-to CID {}", &l); - } - self.janitor = JanitorStage::Startup; - info!("Storage cleanup pass completed."); - } - }, + } + } else { + self.count_blocks(); + debug!("There are {} files in blocks/", &self.old_blocks.len()); } } } -impl CidCleanup { - fn check(&mut self, de: &DirEntry, block_dir: &Path) -> Result<()> { - let path = de.path(); - let md = de.metadata()?; - if !md.is_file() { - bail!("Ignore directories, symlinks, whatever: {path:?}"); - } - let cid_str = path - .file_name() - .ok_or(anyhow!("Bad CID filename"))? - .to_str() - .ok_or(anyhow!("Non-string CID filename"))? - .to_string(); - let cid = Cid::try_from(cid_str.as_str())?; - let mh = cid.hash(); - let mh_s = multibase::encode(multibase::Base::Base36Lower, mh.to_bytes()); - let modt = md.modified()?; - let bp = block_dir.join(&mh_s); - if let Some(mut un) = self.unref_block.remove(&mh_s) { - if modt > un.modt { - un.modt = modt; - } - self.refed_block.insert(mh_s.clone(), un); - } else if let Some(rf) = self.refed_block.get_mut(&mh_s) { - if modt > rf.modt { - rf.modt = modt; - } - trace!("Block referenced multiply: {} by {}", &mh_s, &cid_str); - } else if bp.is_file() { - debug!("New block: {mh_s}"); - } else { - fs::remove_file(path)?; - bail!( - "Orphaned CID {} (has no block {}). Removed. A block I do have: {:?}", - &cid_str, - &mh_s, - self.unref_block.iter().next(), - ); - } - let links: Vec<_> = fs::read_to_string(&path)? - .lines() - .map(String::from) - .collect(); - if Codec::DagPb == cid.codec().try_into()? { - let bytes = fs::read(&bp)?; - let node = UnixfsNode::decode(&cid, bytes.into())?; - let parsed_links: Vec<_> = node - .links() - .filter_map(|r| r.map(|l| l.cid.to_string()).ok()) - .collect(); - if !links.eq(&parsed_links) { - warn!("The recorded links do not match those parsed out of the node itself. Re-writing recorded CIDs. CID={} recorded={:?} parsed={:?}", &cid_str, &links, &parsed_links); - //The eq consumed it - let mut f = File::open(&path)?; - for l in &parsed_links { - writeln!(&mut f, "{}", &l)?; - } - } - for l in parsed_links { - if !self.existing.contains(&l) { - self.linked.insert(l); - } - } - } else if !links.is_empty() { - warn!( - "There are links recorded for a CID {} of codec {} : {:?}", - &cid_str, - cid.codec(), - links - ); +#[derive(Eq, PartialEq, Debug)] +struct OnDiskBlock { + modt: SystemTime, + size: u64, + path: PathBuf, +} +impl OnDiskBlock { + fn from(e: DirEntry) -> Option { + let m = e.metadata().ok()?; + if !m.is_file() { + return None; } - self.linked.remove(&cid_str); - self.existing.insert(cid_str); - Ok(()) + Some(OnDiskBlock { + modt: m.modified().ok()?, + size: m.len(), + path: e.path(), + }) } } - impl Ord for OnDiskBlock { fn cmp(&self, other: &Self) -> Ordering { - for o in &[ - self.modt.cmp(&other.modt), - other.size.cmp(&self.size), - self.mh_s.cmp(&other.mh_s), - ] { - if *o != Ordering::Equal { - return *o; - } + if self.modt != other.modt { + let result = self.modt.cmp(&other.modt); + // println!("{:?}.cmp({:?})={:?}", &self.modt, &other.modt, &result); + result + } else if self.size != other.size { + other.size.cmp(&self.size) + } else { + self.path.cmp(&other.path) } - Ordering::Equal } } impl PartialOrd for OnDiskBlock { fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(&other)) - } -} -impl Debug for BlockCleanup { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "listing={:?},total_size={},existing(count={})", - &self.listing, - self.total_size, - self.existing.len(), - ) - } -} -impl Debug for CidCleanup { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str("CidCleanup{...}") + Some(self.cmp(other)) } } @@ -500,7 +307,7 @@ pub mod tests { pub fn new() -> TestHarness { let td = TempDir::new().unwrap().canonicalize().unwrap(); let dir = td.as_os_str().to_str().unwrap(); - let provider = FileStorageProvider::new(dir).unwrap(); + let provider = FileStorageProvider::new(dir, 9).unwrap(); Self { provider } } pub fn import_hi(&mut self, deep: bool) -> StoredBlock { @@ -555,7 +362,7 @@ pub mod tests { } #[test] pub fn test_import_one_block() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"00")); let cid_str = cid.to_string(); @@ -580,7 +387,7 @@ pub mod tests { use std::collections::HashSet; use std::iter::FromIterator; - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let seeds = vec![b"00", b"11", b"22"]; let cids: Vec = seeds @@ -609,7 +416,7 @@ pub mod tests { #[test] pub fn test_import_then_get_block() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"00")); @@ -674,7 +481,7 @@ pub mod tests { #[test] pub fn test_sha2_512_roundtrip() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let cid = "bafkrgqg5v422de3bpk5myqltjgxcaqjrcltputujvf7kecu653tewvottiqzfgjke5h4dkbwxi6chi765o6uktkeensdz2aofknmst5fjssj6".to_string(); let block = StoredBlock { cid: cid.clone(), @@ -688,4 +495,27 @@ pub mod tests { let expected = vec![block]; assert_eq!(actual, expected); } + + #[test] + pub fn test_oldestfilesortsfirst() { + let a = assert_fs::NamedTempFile::new("yo").unwrap(); + let m = File::create(a.path()).unwrap().metadata().unwrap(); + let x = OnDiskBlock { + modt: m.modified().unwrap(), + size: m.len(), + path: PathBuf::from(a.path()), + }; + std::thread::sleep(std::time::Duration::from_secs(1)); + let b = assert_fs::NamedTempFile::new("yo").unwrap(); + let m = File::create(b.path()).unwrap().metadata().unwrap(); + let y = OnDiskBlock { + modt: m.modified().unwrap(), + size: m.len(), + path: PathBuf::from(b.path()), + }; + let mut v = vec![y, x]; + v.sort(); + assert_eq!(v[0].path, a.path()); + assert_eq!(v[1].path, b.path()); + } } diff --git a/local-storage/src/null_provider.rs b/local-storage/src/null_provider.rs index 0ff9cb4..2bce7eb 100644 --- a/local-storage/src/null_provider.rs +++ b/local-storage/src/null_provider.rs @@ -6,7 +6,7 @@ use anyhow::bail; pub(crate) struct NullStorageProvider {} impl StorageProvider for NullStorageProvider { - fn import_block(&self, _block: &StoredBlock) -> anyhow::Result<()> { + fn import_block(&mut self, _block: &StoredBlock) -> anyhow::Result<()> { bail!("NullStorageProvider does not implement anything") } diff --git a/local-storage/src/provider.rs b/local-storage/src/provider.rs index a3762f5..84988ba 100644 --- a/local-storage/src/provider.rs +++ b/local-storage/src/provider.rs @@ -11,7 +11,7 @@ use crate::file_provider::FileStorageProvider; pub trait StorageProvider { // Import a stored block - fn import_block(&self, block: &StoredBlock) -> Result<()>; + fn import_block(&mut self, block: &StoredBlock) -> Result<()>; // Requests a list of CIDs currently available in storage fn get_available_cids(&self) -> Result>; // Requests the block associated with the given CID @@ -38,11 +38,14 @@ pub trait StorageProvider { fn incremental_gc(&mut self); } -pub fn default_storage_provider(_storage_path: &str) -> Result> { +pub fn default_storage_provider( + _storage_path: &str, + _high_disk_usage: u64, +) -> Result> { #[cfg(all(not(feature = "files"), not(feature = "sqlite")))] let provider = NullStorageProvider::default(); #[cfg(all(feature = "files", not(feature = "sqlite")))] - let provider = FileStorageProvider::new(_storage_path)?; + let provider = FileStorageProvider::new(_storage_path, _high_disk_usage)?; #[cfg(feature = "sqlite")] let provider = SqliteStorageProvider::new(_storage_path)?; Ok(Box::new(provider)) diff --git a/local-storage/src/sql_provider.rs b/local-storage/src/sql_provider.rs index f8e2291..011019d 100644 --- a/local-storage/src/sql_provider.rs +++ b/local-storage/src/sql_provider.rs @@ -106,7 +106,7 @@ impl SqliteStorageProvider { } impl StorageProvider for SqliteStorageProvider { - fn import_block(&self, block: &StoredBlock) -> Result<()> { + fn import_block(&mut self, block: &StoredBlock) -> Result<()> { self.conn.execute( "INSERT OR IGNORE INTO blocks (cid, data, filename) VALUES (?1, ?2, ?3)", (&block.cid, &block.data, &block.filename), @@ -354,7 +354,7 @@ pub mod tests { #[test] pub fn test_import_one_block() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"00")); let cid_str = cid.to_string(); @@ -376,7 +376,7 @@ pub mod tests { use std::collections::HashSet; use std::iter::FromIterator; - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let seeds = vec![b"00", b"11", b"22"]; let cids: Vec = seeds @@ -405,7 +405,7 @@ pub mod tests { #[test] pub fn test_import_then_get_block() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"00")); @@ -426,7 +426,7 @@ pub mod tests { #[test] pub fn test_import_then_get_root_block() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let cid = Cid::new_v1(0x70, cid::multihash::Code::Sha2_256.digest(b"00")); let block_cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"11")); @@ -448,7 +448,7 @@ pub mod tests { #[test] pub fn test_verify_detect_missing_blocks() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"00")); let block_cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"11")); @@ -479,7 +479,7 @@ pub mod tests { #[test] pub fn test_verify_detect_no_missing_blocks() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"00")); let cid_str = cid.to_string(); diff --git a/local-storage/src/storage.rs b/local-storage/src/storage.rs index 74850ee..7d24b86 100644 --- a/local-storage/src/storage.rs +++ b/local-storage/src/storage.rs @@ -27,7 +27,7 @@ impl Storage { } } - pub fn import_path(&self, path: &Path) -> Result { + pub fn import_path(&mut self, path: &Path) -> Result { debug!("import_path({:?})", &path); let rt = tokio::runtime::Runtime::new()?; let blocks: Result> = rt.block_on(async { @@ -138,7 +138,7 @@ impl Storage { self.provider.get_all_dag_blocks(cid) } - pub fn import_block(&self, block: &StoredBlock) -> Result<()> { + pub fn import_block(&mut self, block: &StoredBlock) -> Result<()> { info!("Importing block {:?}", block); self.provider.import_block(block) } @@ -194,7 +194,7 @@ pub mod tests { #[test] pub fn test_import_path_to_storage_single_block() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let temp_dir = assert_fs::TempDir::new().unwrap(); let test_file = temp_dir.child("data.txt"); @@ -217,7 +217,7 @@ pub mod tests { #[test] pub fn test_import_path_to_storage_multi_block() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let temp_dir = assert_fs::TempDir::new().unwrap(); let test_file = temp_dir.child("data.txt"); @@ -240,7 +240,7 @@ pub mod tests { #[test] pub fn export_path_from_storage() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let temp_dir = assert_fs::TempDir::new().unwrap(); let test_file = temp_dir.child("data.txt"); @@ -267,7 +267,7 @@ pub mod tests { #[test] pub fn export_from_storage_various_file_sizes_binary_data() { for size in [100, 200, 300, 500, 1_000] { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let temp_dir = assert_fs::TempDir::new().unwrap(); let test_file = temp_dir.child("data.txt"); @@ -294,7 +294,7 @@ pub mod tests { #[test] pub fn test_get_dag_blocks_by_window() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let temp_dir = assert_fs::TempDir::new().unwrap(); let test_file = temp_dir.child("data.txt"); @@ -320,7 +320,7 @@ pub mod tests { #[test] pub fn compare_get_blocks_to_get_cids() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let temp_dir = assert_fs::TempDir::new().unwrap(); let test_file = temp_dir.child("data.txt"); @@ -342,7 +342,7 @@ pub mod tests { // #[test] // pub fn export_from_storage_various_file_sizes_duplicated_data() { // for size in [100, 200, 300, 500, 1000] { - // let harness = TestHarness::new(); + // let mut harness = TestHarness::new(); // let temp_dir = assert_fs::TempDir::new().unwrap(); // let test_file = temp_dir.child("data.txt"); // test_file diff --git a/myceli/Cargo.toml b/myceli/Cargo.toml index fc7bab6..f2c1527 100644 --- a/myceli/Cargo.toml +++ b/myceli/Cargo.toml @@ -21,10 +21,12 @@ tokio.workspace = true transports.workspace = true [features] -big = ["local-storage/big", "good_log"] -small = ["local-storage/small", "small_log"] -good_log = ["dep:env_logger"] -small_log = ["dep:smalog"] +big = ["sqlite", "good_log"] +small = ["files", "small_log"] +good_log = ["dep:env_logger", "local-storage/good_log"] +small_log = ["dep:smalog", "local-storage/small_log"] +sqlite = ["local-storage/sqlite"] +files = ["local-storage/files"] [dev-dependencies] assert_fs.workspace = true diff --git a/myceli/src/handlers.rs b/myceli/src/handlers.rs index 9f5fde7..33b2724 100644 --- a/myceli/src/handlers.rs +++ b/myceli/src/handlers.rs @@ -1,15 +1,19 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use local_storage::storage::Storage; use messages::{ApplicationAPI, DagInfo, DataProtocol, Message}; use std::path::PathBuf; use std::rc::Rc; -pub fn import_file(path: &str, storage: Rc) -> Result { - let root_cid = storage.import_path(&PathBuf::from(path.to_owned()))?; - Ok(Message::ApplicationAPI(ApplicationAPI::FileImported { - path: path.to_string(), - cid: root_cid, - })) +pub fn import_file(path: &str, storage: &mut Rc) -> Result { + if let Some(storage) = Rc::get_mut(storage) { + let root_cid = storage.import_path(&PathBuf::from(path.to_owned()))?; + Ok(Message::ApplicationAPI(ApplicationAPI::FileImported { + path: path.to_string(), + cid: root_cid, + })) + } else { + Err(anyhow!("Unable to lock storage in order to import_file")) + } } pub fn validate_dag(cid: &str, storage: Rc) -> Result { @@ -164,11 +168,11 @@ pub mod tests { #[test] pub fn test_import_file_validate_dag() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let test_file_path = harness.generate_file().unwrap(); - let imported_file_cid = match import_file(&test_file_path, harness.storage.clone()) { + let imported_file_cid = match import_file(&test_file_path, &mut harness.storage) { Ok(Message::ApplicationAPI(ApplicationAPI::FileImported { cid, .. })) => cid, other => panic!("ImportFile returned wrong response {other:?}"), }; @@ -186,11 +190,11 @@ pub mod tests { #[test] pub fn test_import_file_validate_blocks() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let test_file_path = harness.generate_file().unwrap(); - let imported_file_cid = match import_file(&test_file_path, harness.storage.clone()) { + let imported_file_cid = match import_file(&test_file_path, &mut harness.storage) { Ok(Message::ApplicationAPI(ApplicationAPI::FileImported { cid, .. })) => cid, other => panic!("ImportFile returned wrong response {other:?}"), }; @@ -215,7 +219,7 @@ pub mod tests { #[test] pub fn test_available_blocks() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let available_blocks = match request_available_blocks(harness.storage.clone()) { Ok(Message::ApplicationAPI(ApplicationAPI::AvailableBlocks { cids })) => cids, @@ -224,7 +228,7 @@ pub mod tests { assert!(available_blocks.is_empty()); let test_file_path = harness.generate_file().unwrap(); - import_file(&test_file_path, harness.storage.clone()).unwrap(); + import_file(&test_file_path, &mut harness.storage).unwrap(); let available_blocks = match request_available_blocks(harness.storage.clone()) { Ok(Message::ApplicationAPI(ApplicationAPI::AvailableBlocks { cids })) => cids, @@ -236,11 +240,11 @@ pub mod tests { #[test] pub fn test_get_missing_blocks_none_missing() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let test_file_path = harness.generate_file().unwrap(); - let imported_file_cid = match import_file(&test_file_path, harness.storage.clone()) { + let imported_file_cid = match import_file(&test_file_path, &mut harness.storage) { Ok(Message::ApplicationAPI(ApplicationAPI::FileImported { cid, .. })) => cid, other => panic!("ImportFile returned wrong response {other:?}"), }; @@ -255,7 +259,7 @@ pub mod tests { #[test] pub fn test_get_missing_blocks_one_missing() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); let test_file_path = harness.generate_file().unwrap(); let mut file_blocks = file_to_blocks(&test_file_path).unwrap(); @@ -263,7 +267,10 @@ pub mod tests { let root_cid = file_blocks.last().unwrap().cid.to_owned(); for block in file_blocks { - harness.storage.import_block(&block).unwrap(); + Rc::get_mut(&mut harness.storage) + .unwrap() + .import_block(&block) + .unwrap(); } let missing_blocks = match get_missing_dag_blocks(&root_cid, harness.storage) { @@ -276,12 +283,12 @@ pub mod tests { #[test] pub fn test_import_zero_file() { - let harness = TestHarness::new(); + let mut harness = TestHarness::new(); // let test_file_path = harness.generate_file().unwrap(); let test_file_path = harness.zero_file().unwrap(); - let imported_file_cid = match import_file(&test_file_path, harness.storage.clone()) { + let imported_file_cid = match import_file(&test_file_path, &mut harness.storage) { Ok(Message::ApplicationAPI(ApplicationAPI::FileImported { cid, .. })) => cid, other => panic!("ImportFile returned wrong response {other:?}"), }; diff --git a/myceli/src/listener.rs b/myceli/src/listener.rs index 5194834..8214f1e 100644 --- a/myceli/src/listener.rs +++ b/myceli/src/listener.rs @@ -2,6 +2,7 @@ use crate::handlers; use crate::shipper::Shipper; use anyhow::Result; use local_storage::{provider::default_storage_provider, storage::Storage}; +use log::{error, info, trace}; use messages::{ApplicationAPI, DataProtocol, Message}; use std::{ net::SocketAddr, @@ -13,14 +14,13 @@ use std::{ }; use transports::{Transport, TransportError}; -use log::{error, info}; - pub struct Listener { storage_path: String, storage: Rc, transport: Arc, connected: Arc>, radio_address: Option, + high_disk_usage: u64, } impl Listener { @@ -30,9 +30,10 @@ impl Listener { transport: Arc, block_size: u32, radio_address: Option, + high_disk_usage: u64, ) -> Result> { let storage = Rc::new(Storage::new( - default_storage_provider(storage_path)?, + default_storage_provider(storage_path, high_disk_usage)?, block_size, )); @@ -43,6 +44,7 @@ impl Listener { transport, connected: Arc::new(Mutex::new(true)), radio_address, + high_disk_usage, }) } @@ -59,6 +61,7 @@ impl Listener { let shipper_transport = Arc::clone(&self.transport); let initial_connected = Arc::clone(&self.connected); let shipper_radio = self.radio_address.clone(); + let high_disk_usage = self.high_disk_usage; spawn(move || { let mut shipper = Shipper::new( &shipper_storage_path, @@ -70,6 +73,7 @@ impl Listener { initial_connected, block_size, shipper_radio, + high_disk_usage, ) .expect("Shipper creation failed"); shipper.receive_msg_loop(); @@ -118,7 +122,7 @@ impl Listener { sender_addr: &str, shipper_sender: Sender<(DataProtocol, String)>, ) -> Result> { - println!("Handling {message:?}"); + trace!("Handling {message:?}"); let resp = match message { Message::ApplicationAPI(ApplicationAPI::TransmitDag { cid, @@ -144,7 +148,7 @@ impl Listener { None } Message::ApplicationAPI(ApplicationAPI::ImportFile { path }) => { - Some(handlers::import_file(&path, self.storage.clone())?) + Some(handlers::import_file(&path, &mut self.storage)?) } Message::ApplicationAPI(ApplicationAPI::ExportDag { cid, path }) => { match self.storage.export_cid(&cid, &PathBuf::from(path.clone())) { diff --git a/myceli/src/main.rs b/myceli/src/main.rs index bb30a8d..2b67a74 100644 --- a/myceli/src/main.rs +++ b/myceli/src/main.rs @@ -13,8 +13,7 @@ fn main() -> Result<()> { #[cfg(feature = "good_log")] env_logger::init(); #[cfg(feature = "small_log")] - smalog::set_level(log::LevelFilter::Debug); - // smalog::init(); + smalog::init(); let config_path = std::env::args().nth(1); let cfg = Config::parse(config_path).expect("Failed to parse config"); @@ -30,6 +29,7 @@ fn main() -> Result<()> { std::fs::create_dir_all(&cfg.storage_path).expect("Failed to create storage dir"); let db_path = format!("{}/storage.db", cfg.storage_path); + let disk_bytes = cfg.disk_usage * 1024; let mut udp_transport = UdpTransport::new(&cfg.listen_address, cfg.mtu, cfg.chunk_transmit_throttle) @@ -45,6 +45,7 @@ fn main() -> Result<()> { Arc::new(udp_transport), cfg.block_size, cfg.radio_address, + disk_bytes, ) .expect("Listener creation failed"); println!("pid={}", std::process::id()); diff --git a/myceli/src/shipper.rs b/myceli/src/shipper.rs index e581e07..2d45596 100644 --- a/myceli/src/shipper.rs +++ b/myceli/src/shipper.rs @@ -68,9 +68,10 @@ impl Shipper { connected: Arc>, block_size: u32, radio_address: Option, + high_disk_usage: u64, ) -> Result> { let storage = Rc::new(Storage::new( - default_storage_provider(storage_path)?, + default_storage_provider(storage_path, high_disk_usage)?, block_size, )); Ok(Shipper { @@ -445,7 +446,11 @@ impl Shipper { filename: block.filename, }; stored_block.validate()?; - self.storage.import_block(&stored_block) + if let Some(store) = Rc::get_mut(&mut self.storage) { + store.import_block(&stored_block) + } else { + Err(anyhow!("Unable to lock storage")) + } } } @@ -521,6 +526,7 @@ mod tests { Arc::new(Mutex::new(true)), BLOCK_SIZE, None, + u64::MAX, ) .unwrap(); TestShipper { @@ -600,12 +606,9 @@ mod tests { // Generate file for test let test_file_path = transmitter.generate_file().unwrap(); - + let store = Rc::get_mut(&mut transmitter._storage).unwrap(); // Import test file into transmitter storage - let test_file_cid = transmitter - ._storage - .import_path(&PathBuf::from(test_file_path)) - .unwrap(); + let test_file_cid = store.import_path(&PathBuf::from(test_file_path)).unwrap(); transmitter .shipper diff --git a/myceli/tests/utils/mod.rs b/myceli/tests/utils/mod.rs index 2efdef8..b94fa97 100644 --- a/myceli/tests/utils/mod.rs +++ b/myceli/tests/utils/mod.rs @@ -92,11 +92,12 @@ fn start_listener_thread(listen_addr: SocketAddr, db_path: ChildPath) { let listen_addr_str = listen_addr.to_string(); let mut transport = UdpTransport::new(&listen_addr_str, 60, None).unwrap(); transport - .set_read_timeout(Some(Duration::from_millis(10))) + .set_read_timeout(Some(Duration::from_secs(10))) .unwrap(); transport.set_max_read_attempts(Some(1)); let transport = Arc::new(transport); - let mut listener = Listener::new(&listen_addr, db_path, transport, BLOCK_SIZE, None).unwrap(); + let mut listener = + Listener::new(&listen_addr, db_path, transport, BLOCK_SIZE, None, 9).unwrap(); listener .start(10, 2, BLOCK_SIZE) .expect("Error encountered in listener"); @@ -110,7 +111,7 @@ impl TestController { pub fn new() -> Self { let mut transport = UdpTransport::new("127.0.0.1:0", 60, None).unwrap(); transport - .set_read_timeout(Some(Duration::from_millis(50))) + .set_read_timeout(Some(Duration::from_secs(5))) .unwrap(); transport.set_max_read_attempts(Some(1)); TestController { transport } diff --git a/smalog/src/lib.rs b/smalog/src/lib.rs index 575533e..210d447 100644 --- a/smalog/src/lib.rs +++ b/smalog/src/lib.rs @@ -1,4 +1,5 @@ -use log::{Metadata, Record}; +use log::{Level, Metadata, Record}; +use std::env; struct Smalog { lev: log::LevelFilter, @@ -8,7 +9,11 @@ static mut LOGGER: Smalog = Smalog { lev: log::LevelFilter::Info, }; pub fn init() { - set_level(log::LevelFilter::Info); + let lev = match env::var("RUST_LOG") { + Ok(lev_s) => level_from_str(&lev_s), + Err(_) => Level::Info, + }; + set_level(lev.to_level_filter()); } pub fn set_level(lev: log::LevelFilter) { unsafe { @@ -31,3 +36,12 @@ impl log::Log for Smalog { fn flush(&self) {} } + +fn level_from_str(s: &str) -> Level { + use std::str::FromStr; + if let Ok(l) = Level::from_str(s) { + return l; + } + println!("ERROR! RUST_LOG set to {s} which is not recognized by smalog which only accepts a simple level name, i.e. one of: OFF; ERROR; WARN; INFO; DEBUG; TRACE. Will use INFO instead."); + Level::Info +} diff --git a/transports/src/udp_transport.rs b/transports/src/udp_transport.rs index 637762f..a5bec46 100644 --- a/transports/src/udp_transport.rs +++ b/transports/src/udp_transport.rs @@ -1,4 +1,4 @@ -use crate::error::{adhoc_err, TransportError}; +use crate::error::TransportError; use crate::{ error::{adhoc, Result}, udp_chunking::SimpleChunker, @@ -19,6 +19,7 @@ pub struct UdpTransport { chunker: Arc>, max_read_attempts: Option, chunk_transmit_throttle: Option, + timeout: Option, } impl UdpTransport { @@ -30,11 +31,13 @@ impl UdpTransport { chunker: Arc::new(Mutex::new(SimpleChunker::new(mtu))), max_read_attempts: None, chunk_transmit_throttle, + timeout: None, }) } pub fn set_read_timeout(&mut self, dur: Option) -> Result<()> { - Ok(self.socket.set_read_timeout(dur)?) + self.timeout = dur; + Ok(self.socket.set_read_timeout(dur.map(|d| d / 10))?) } pub fn set_max_read_attempts(&mut self, attempts: Option) { @@ -46,12 +49,11 @@ impl Transport for UdpTransport { fn receive(&self) -> Result<(Message, String)> { let mut buf = vec![0; usize::from(MAX_MTU)]; let mut sender_addr; - let mut read_attempts = 0; + let mut read_errors = 0; let mut read_len; let mut timeouts = 0; loop { loop { - read_attempts += 1; match self.socket.recv_from(&mut buf) { Ok((len, sender)) => { if len > 0 { @@ -63,21 +65,21 @@ impl Transport for UdpTransport { Err(e) => match e.kind() { io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => { trace!("Receive timed out. May be normal depending on usage."); + if timeouts >= 10 { + return Err(TransportError::TimedOut); + } timeouts += 1; } _ => { error!("Recv failed {e}"); + if self.max_read_attempts.unwrap_or(u16::MAX) <= read_errors { + return Err(e.into()); + } + read_errors += 1; } }, } - if let Some(max_attempts) = self.max_read_attempts { - if read_attempts > max_attempts { - adhoc_err("Exceeded number of read attempts")?; - } else if timeouts * 2 > read_attempts { - return Err(TransportError::TimedOut); - } - } - sleep(Duration::from_millis(10)); + sleep(Duration::from_millis(1)); } debug!("Received possible chunk of {} bytes", read_len); @@ -108,7 +110,7 @@ impl Transport for UdpTransport { } fn send(&self, msg: Message, addr: &str) -> Result<()> { - debug!("Transmitting msg: {msg:?}"); + debug!("UDP: Transmitting msg: {msg:?}"); let addr = addr .to_socket_addrs()? .next()