Skip to content

Commit

Permalink
Don't auto-dangle CIDs.
Browse files Browse the repository at this point in the history
  • Loading branch information
John-LittleBearLabs committed Aug 11, 2023
1 parent b5c4a3a commit ca3d461
Show file tree
Hide file tree
Showing 14 changed files with 81 additions and 34 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/docker-test.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: CI Unit Testing
on: [pull_request]
name: Docker Testing
on: [ pull_request ]
jobs:
test_docker_build:
name: Test Docker build
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ target/
build/
storage.db
sat/
gnd/
gnd/
log
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ members = [
]

[workspace.package]
version = "0.6.5"
version = "0.6.6"
edition = "2021"
license = "Apache-2.0/MIT"
rust-version = "1.68.1"
Expand Down
1 change: 1 addition & 0 deletions controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ rust-version.workspace = true
[dependencies]
anyhow.workspace = true
clap.workspace = true
config.workspace = true
env_logger.workspace = true
log.workspace = true
messages.workspace = true
Expand Down
11 changes: 9 additions & 2 deletions controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use transports::{Transport, UdpTransport, MAX_MTU};
#[clap(about = "Control a Myceli instance")]
pub struct Cli {
#[arg(help = "The network address that a myceli instance is listening on")]
instance_addr: String,
instance_addr: Option<String>,
#[arg(
short,
long,
Expand Down Expand Up @@ -45,7 +45,14 @@ impl Cli {
let cmd_str = serde_json::to_string(&command)?;
info!("Transmitting: {}", &cmd_str);

transport.send(command, &self.instance_addr)?;
let instance_addr = if let Some(addr) = &self.instance_addr {
addr.clone()
} else {
let cfg = config::Config::parse(None)
.expect("Please specify instance addr, as I can't read myceli.toml");
cfg.listen_address
};
transport.send(command, &instance_addr)?;
if self.listen_mode {
match transport.receive() {
Ok((msg, _)) => {
Expand Down
48 changes: 36 additions & 12 deletions local-storage/src/file_provider.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{block::StoredBlock, provider::StorageProvider};
use anyhow::Result;
use cid::{multibase, Cid};
use log::{debug, error, info};
use log::{debug, error, info, trace};
use std::{
cmp::Ordering,
fmt::Debug,
Expand All @@ -27,11 +27,11 @@ impl FileStorageProvider {
old_blocks: vec![],
high: high_usage,
};
create_dir_all(&me.blocks())?;
create_dir_all(me.blocks())?;
me.dir = canonicalize(storage_folder)?;
debug!("FileStorageProvider({:?})", &me.dir);
create_dir_all(&me.cids())?;
create_dir_all(&me.names())?;
create_dir_all(me.cids())?;
create_dir_all(me.names())?;
me.count_blocks();
Ok(me)
}
Expand Down Expand Up @@ -98,8 +98,10 @@ impl FileStorageProvider {
Ok((to_skip, to_fetch))
}

fn entry_to_cid_str(&self, r: std::io::Result<DirEntry>) -> Option<String> {
let e = r.ok()?;
fn rentry_to_cid_str(&self, r: std::io::Result<DirEntry>) -> Option<String> {
self.entry_to_cid_str(r.ok()?)
}
fn entry_to_cid_str(&self, e: DirEntry) -> Option<String> {
if e.metadata().ok()?.is_file() {
let cid_str = e.file_name().to_str()?.to_owned();
let cid = Cid::try_from(cid_str.as_str()).ok()?;
Expand All @@ -114,7 +116,7 @@ impl FileStorageProvider {
}
}
fn count_blocks(&mut self) {
if let Ok(rd) = fs::read_dir(&self.blocks()) {
if let Ok(rd) = fs::read_dir(self.blocks()) {
self.old_blocks = rd
.flat_map(|r| r.ok())
.flat_map(OnDiskBlock::from)
Expand All @@ -123,6 +125,27 @@ impl FileStorageProvider {
self.usage = self.old_blocks.iter().map(|b| b.size).sum();
}
}
fn drop_cids_with_block_path(&self, block_path: &std::path::Path) -> Result<()> {
trace!("drop_cids_with_block_path({block_path:?})");
for e in read_dir(self.cids())?.flat_map(|r| r.ok()) {
let cid_path = e.path();
trace!("Checking CID path {cid_path:?}");
if let Some(Some(cid_str)) = cid_path.file_name().map(|f| f.to_str()) {
if let Ok(cid) = Cid::try_from(cid_str) {
if self.block_path(&cid) == block_path {
match fs::remove_file(&cid_path) {
Ok(_) => {
info!("Removed {cid_path:?} because its block {block_path:?} is gone.");
fs::remove_file(self.names().join(cid_str)).ok();//It's totally normal to not exist
},
Err(e) => error!("Error removing dangling CID {cid_path:?} (corresponding to {block_path:?}): {e}"),
}
}
}
}
}
Ok(())
}
}

impl StorageProvider for FileStorageProvider {
Expand All @@ -148,7 +171,7 @@ impl StorageProvider for FileStorageProvider {

fn get_available_cids(&self) -> anyhow::Result<Vec<String>> {
let mut result: Vec<String> = read_dir(self.cids())?
.filter_map(|f| self.entry_to_cid_str(f))
.filter_map(|f| self.rentry_to_cid_str(f))
.collect();
result.sort();
Ok(result)
Expand Down Expand Up @@ -182,7 +205,7 @@ impl StorageProvider for FileStorageProvider {
.get_available_cids()?
.into_iter()
.map(|c| {
let n = self.get_name(c.as_str()).ok().unwrap_or_else(String::new);
let n = self.get_name(c.as_str()).ok().unwrap_or_default();
(c, n)
})
.collect())
Expand Down Expand Up @@ -242,6 +265,9 @@ impl StorageProvider for FileStorageProvider {
&odb.path, self.usage, self.high
);
self.usage -= odb.size;
if let Err(e) = self.drop_cids_with_block_path(&odb.path) {
error!("Trouble dropping CIDs for block: {e}");
}
}
Err(e) => {
error!("Error removing old block {odb:?} to free up space! {e:?}");
Expand Down Expand Up @@ -276,9 +302,7 @@ impl OnDiskBlock {
impl Ord for OnDiskBlock {
fn cmp(&self, other: &Self) -> Ordering {
if self.modt != other.modt {
let result = self.modt.cmp(&other.modt);
// println!("{:?}.cmp({:?})={:?}", &self.modt, &other.modt, &result);
result
self.modt.cmp(&other.modt)
} else if self.size != other.size {
other.size.cmp(&self.size)
} else {
Expand Down
13 changes: 13 additions & 0 deletions local-storage/src/sql_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,26 @@ use crate::{block::StoredBlock, error::StorageError, provider::StorageProvider};
use anyhow::{bail, Result};
use log::trace;
use rusqlite::{params_from_iter, Connection};
use std::{path::PathBuf, str::FromStr};

pub struct SqliteStorageProvider {
conn: Box<Connection>,
}

impl SqliteStorageProvider {
pub fn new(db_path: &str) -> Result<Self> {
let mut db_path = PathBuf::from_str(db_path)?;
loop {
if db_path.is_dir() {
db_path = db_path.join("storage.db");
} else if db_path.exists()
|| db_path.extension().unwrap_or_default().to_str() == Some("db")
{
break;
} else {
db_path = db_path.join("storage.db");
}
}
let result = SqliteStorageProvider {
conn: Box::new(Connection::open(db_path)?),
};
Expand Down
2 changes: 1 addition & 1 deletion myceli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn main() -> Result<()> {

std::fs::create_dir_all(&cfg.storage_path).expect("Failed to create storage dir");

let db_path = format!("{}/storage.db", cfg.storage_path);
let db_path = cfg.storage_path.clone();
let disk_bytes = cfg.disk_usage * 1024;

let mut udp_transport =
Expand Down
3 changes: 2 additions & 1 deletion myceli/tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,15 @@ impl TestController {
pub fn new() -> Self {
let mut transport = UdpTransport::new("127.0.0.1:0", 60, None).unwrap();
transport
.set_read_timeout(Some(Duration::from_secs(5)))
.set_read_timeout(Some(Duration::from_secs(9)))
.unwrap();
transport.set_max_read_attempts(Some(1));
TestController { transport }
}

pub fn send_and_recv(&mut self, target_addr: &str, message: Message) -> Message {
self.send_msg(message, target_addr);
std::thread::sleep(Duration::from_millis(500));
self.recv_msg().unwrap()
}

Expand Down
8 changes: 4 additions & 4 deletions testing/local_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ else
fi
rm -rv sat || true
rm -rv gnd || true
mkdir -p sat/storage.db
mkdir -p sat/
mkdir gnd
cat > sat/config.toml <<SATCFG
listen_address = "0.0.0.0:8764"
Expand Down Expand Up @@ -153,9 +153,9 @@ controller() {
set +x
}
cid_present() {
if [ -d ${1}/storage.db ]
if [ -f ${1}/cids/${2} ]
then
test -f ${1}/storage.db/cids/${2}
true
else
sqlite3 ${1}/storage.db "select * from blocks where cid = '${2}';" | grep '[a-z]'
fi
Expand Down Expand Up @@ -243,7 +243,7 @@ echo 'Shutdown the myceli ground instance'
kill_myceli gnd

echo ', delete the storage database'
rm -rv gnd/storage.db
rm -v gnd/storage.db

echo ', and start the myceli ground instance again.'
start_myceli gnd
Expand Down
2 changes: 1 addition & 1 deletion transports/src/udp_chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl SimpleChunker {
let mut databuf = &data[..data.len()];
let chunk = SimpleChunk::decode(&mut databuf)?;
self.recv_chunk(chunk)?;
Ok(self.attempt_msg_assembly()?)
self.attempt_msg_assembly()
}
}

Expand Down
2 changes: 1 addition & 1 deletion transports/src/udp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Transport for UdpTransport {
debug!("Received: no msg ready for assembly yet");
}
Err(err) => {
return Err(err.into());
return Err(err);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion watcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ fn watched_dir(cfg: &config::Config) -> PathBuf {
result.push(
cfg.clone()
.watched_directory
.clone()
.expect("Must configure watched_directory before running watcher."),
);
result
Expand Down

0 comments on commit ca3d461

Please sign in to comment.