diff --git a/signer/src/kvv.rs b/signer/src/kvv.rs index fa05a0a..df960c8 100644 --- a/signer/src/kvv.rs +++ b/signer/src/kvv.rs @@ -1,16 +1,7 @@ -use fsdb::{AnyBucket, Bucket, Fsdb}; -use lightning_signer::persist::{Error, SignerId}; -use lightning_signer::SendSync; -use log::error; -use std::collections::BTreeMap; -use std::convert::TryInto; pub use vls_persist::kvv::{ cloud::CloudKVVStore, memory::MemoryKVVStore, KVVPersister, KVVStore, ValueFormat, KVV, }; -use vls_protocol_signer::lightning_signer; -extern crate alloc; -use std::cmp::Ordering; -use std::sync::Mutex; +pub use vls_protocol_signer::lightning_signer::{persist::Error, SendSync}; pub struct RmpFormat; @@ -25,219 +16,232 @@ impl ValueFormat for RmpFormat { } } -/// A key-version-value store backed by fsdb -pub struct FsKVVStore { - db: AnyBucket>, - meta: Bucket>, - signer_id: [u8; 16], - versions: Mutex>, -} - -/// An iterator over a KVVStore range -pub struct Iter(alloc::vec::IntoIter); +#[cfg(feature = "fspersist")] +pub mod fs { + extern crate alloc; + use super::{Error, KVVStore, SendSync, KVV}; + use fsdb::{AnyBucket, Bucket, Fsdb}; + use log::error; + use std::cmp::Ordering; + use std::collections::BTreeMap; + use std::convert::TryInto; + use std::sync::Mutex; + use vls_protocol_signer::lightning_signer::persist::SignerId; + + /// A key-version-value store backed by fsdb + pub struct FsKVVStore { + db: AnyBucket>, + meta: Bucket>, + signer_id: [u8; 16], + versions: Mutex>, + } + + /// An iterator over a KVVStore range + pub struct Iter(alloc::vec::IntoIter); + + impl Iterator for Iter { + type Item = KVV; + + fn next(&mut self) -> Option { + self.0.next() + } + } -impl Iterator for Iter { - type Item = KVV; + impl SendSync for FsKVVStore {} - fn next(&mut self) -> Option { - self.0.next() - } -} + impl FsKVVStore { + pub fn new(path: &str, signer_id: [u8; 16], maxsize: Option) -> Self { + let db = Fsdb::new(path).expect("could not create db"); + let bucket = db + .any_bucket::>(maxsize) + .expect("could not create bucket"); -impl SendSync for FsKVVStore {} - -impl FsKVVStore { - pub fn new(path: &str, signer_id: [u8; 16], maxsize: Option) -> Self { - let db = Fsdb::new(path).expect("could not create db"); - let bucket = db - .any_bucket::>(maxsize) - .expect("could not create bucket"); - - // seed the initial versions store - let mut versions = BTreeMap::new(); - let fulllist = bucket.list_all().expect("could not list bucket"); - for path in fulllist { - match bucket.get_raw(&path) { - Ok(item) => { - let (version, _) = Self::decode_vv(&item); - versions.insert(path, version); + // seed the initial versions store + let mut versions = BTreeMap::new(); + let fulllist = bucket.list_all().expect("could not list bucket"); + for path in fulllist { + match bucket.get_raw(&path) { + Ok(item) => { + let (version, _) = Self::decode_vv(&item); + versions.insert(path, version); + } + Err(e) => log::error!("failed to seed version {:?}", e), } - Err(e) => log::error!("failed to seed version {:?}", e), } - } - let meta = db - .bucket::>("meta", None) - .expect("could not create bucket"); - Self { - db: bucket, - meta, - signer_id, - versions: Mutex::new(versions), - } - } - fn decode_vv(vv: &[u8]) -> (u64, Vec) { - let version = u64::from_be_bytes(vv[..8].try_into().unwrap()); - let value = vv[8..].to_vec(); - (version, value) - } - fn encode_vv(version: u64, value: &[u8]) -> Vec { - let mut vv = Vec::with_capacity(value.len() + 8); - vv.extend_from_slice(&version.to_be_bytes()); - vv.extend_from_slice(value); - vv - } - fn check_version( - &self, - key: &str, - version: u64, - prev: u64, - value: &[u8], - ) -> Result, Error> { - let vv = Self::encode_vv(version, value); - match version.cmp(&prev) { - Ordering::Less => { - error!("version mismatch for {}: {} < {}", key, version, prev); - // version cannot go backwards - Err(Error::VersionMismatch) + let meta = db + .bucket::>("meta", None) + .expect("could not create bucket"); + Self { + db: bucket, + meta, + signer_id, + versions: Mutex::new(versions), } - Ordering::Equal => { - // if same version, value must not have changed - if let Ok(existing) = self.db.get_raw(key) { - if existing != vv { - error!("value mismatch for {}: {}", key, version); - return Err(Error::VersionMismatch); + } + fn decode_vv(vv: &[u8]) -> (u64, Vec) { + let version = u64::from_be_bytes(vv[..8].try_into().unwrap()); + let value = vv[8..].to_vec(); + (version, value) + } + fn encode_vv(version: u64, value: &[u8]) -> Vec { + let mut vv = Vec::with_capacity(value.len() + 8); + vv.extend_from_slice(&version.to_be_bytes()); + vv.extend_from_slice(value); + vv + } + fn check_version( + &self, + key: &str, + version: u64, + prev: u64, + value: &[u8], + ) -> Result, Error> { + let vv = Self::encode_vv(version, value); + match version.cmp(&prev) { + Ordering::Less => { + error!("version mismatch for {}: {} < {}", key, version, prev); + // version cannot go backwards + Err(Error::VersionMismatch) + } + Ordering::Equal => { + // if same version, value must not have changed + if let Ok(existing) = self.db.get_raw(key) { + if existing != vv { + error!("value mismatch for {}: {}", key, version); + return Err(Error::VersionMismatch); + } } + Ok(vv) } - Ok(vv) + Ordering::Greater => Ok(vv), } - Ordering::Greater => Ok(vv), } } -} -impl KVVStore for FsKVVStore { - type Iter = Iter; + impl KVVStore for FsKVVStore { + type Iter = Iter; - fn signer_id(&self) -> SignerId { - self.signer_id - } + fn signer_id(&self) -> SignerId { + self.signer_id + } - fn put(&self, key: &str, value: Vec) -> Result<(), Error> { - let v = self - .versions - .lock() - .unwrap() - .get(key) - .map(|v| v + 1) - .unwrap_or(0); - self.put_with_version(key, v, value) - } + fn put(&self, key: &str, value: Vec) -> Result<(), Error> { + let v = self + .versions + .lock() + .unwrap() + .get(key) + .map(|v| v + 1) + .unwrap_or(0); + self.put_with_version(key, v, value) + } - fn put_with_version(&self, key: &str, version: u64, value: Vec) -> Result<(), Error> { - let mut vers = self.versions.lock().unwrap(); - let vv = match vers.get(key) { - Some(prev) => self.check_version(key, version, *prev, &value)?, - None => Self::encode_vv(version, &value), - }; - vers.insert(key.to_string(), version); - self.db - .put_raw(key, &vv) - .map_err(|_| Error::Internal("could not put".to_string()))?; - Ok(()) - } - fn put_batch(&self, kvvs: Vec) -> Result<(), Error> { - let mut found_version_mismatch = false; - let mut staged_vvs: Vec<(String, u64, Vec)> = Vec::new(); - - let mut vers = self.versions.lock().unwrap(); - for kvv in kvvs.into_iter() { - let key = kvv.0.clone(); - let ver = kvv.1 .0; - let val = &kvv.1 .1; - match vers.get(&key) { - Some(prev) => match self.check_version(&key, ver, *prev, val) { - Ok(vv) => staged_vvs.push((key.clone(), ver, vv)), - Err(_) => found_version_mismatch = true, - }, - None => { - let vv = Self::encode_vv(ver, val); - staged_vvs.push((key.clone(), ver, vv)); + fn put_with_version(&self, key: &str, version: u64, value: Vec) -> Result<(), Error> { + let mut vers = self.versions.lock().unwrap(); + let vv = match vers.get(key) { + Some(prev) => self.check_version(key, version, *prev, &value)?, + None => Self::encode_vv(version, &value), + }; + vers.insert(key.to_string(), version); + self.db + .put_raw(key, &vv) + .map_err(|_| Error::Internal("could not put".to_string()))?; + Ok(()) + } + fn put_batch(&self, kvvs: Vec) -> Result<(), Error> { + let mut found_version_mismatch = false; + let mut staged_vvs: Vec<(String, u64, Vec)> = Vec::new(); + + let mut vers = self.versions.lock().unwrap(); + for kvv in kvvs.into_iter() { + let key = kvv.0.clone(); + let ver = kvv.1 .0; + let val = &kvv.1 .1; + match vers.get(&key) { + Some(prev) => match self.check_version(&key, ver, *prev, val) { + Ok(vv) => staged_vvs.push((key.clone(), ver, vv)), + Err(_) => found_version_mismatch = true, + }, + None => { + let vv = Self::encode_vv(ver, val); + staged_vvs.push((key.clone(), ver, vv)); + } + } + } + if found_version_mismatch { + // abort the transaction + return Err(Error::VersionMismatch); + } else { + for vv in staged_vvs { + self.db + .put_raw(&vv.0, &vv.2) + .map_err(|_| Error::Internal("could not put".to_string()))?; + vers.insert(vv.0, vv.1); } } + Ok(()) } - if found_version_mismatch { - // abort the transaction - return Err(Error::VersionMismatch); - } else { - for vv in staged_vvs { - self.db - .put_raw(&vv.0, &vv.2) - .map_err(|_| Error::Internal("could not put".to_string()))?; - vers.insert(vv.0, vv.1); + fn get(&self, key: &str) -> Result)>, Error> { + if let Ok(vv) = self.db.get_raw(key) { + let (version, value) = Self::decode_vv(&vv); + Ok(Some((version, value))) + } else { + Ok(None) } } - Ok(()) - } - fn get(&self, key: &str) -> Result)>, Error> { - if let Ok(vv) = self.db.get_raw(key) { - let (version, value) = Self::decode_vv(&vv); - Ok(Some((version, value))) - } else { - Ok(None) + fn get_version(&self, key: &str) -> Result, Error> { + Ok(self.versions.lock().unwrap().get(key).copied()) } - } - fn get_version(&self, key: &str) -> Result, Error> { - Ok(self.versions.lock().unwrap().get(key).copied()) - } - fn get_prefix(&self, prefix: &str) -> Result { - let items = self - .db - .list(prefix) - .map_err(|_| Error::Internal("could not list".to_string()))?; - let mut result = Vec::new(); - for item in items { - let sep = if prefix.ends_with('/') { - "".to_string() - } else { - "/".to_string() - }; - let key = format!("{}{}{}", prefix, sep, item); - let vv = self + fn get_prefix(&self, prefix: &str) -> Result { + let items = self .db - .get_raw(&key) - .map_err(|_| Error::Internal("could not get".to_string()))?; - let (version, value) = Self::decode_vv(&vv); - result.push(KVV(key, (version, value))); + .list(prefix) + .map_err(|_| Error::Internal("could not list".to_string()))?; + let mut result = Vec::new(); + for item in items { + let sep = if prefix.ends_with('/') { + "".to_string() + } else { + "/".to_string() + }; + let key = format!("{}{}{}", prefix, sep, item); + let vv = self + .db + .get_raw(&key) + .map_err(|_| Error::Internal("could not get".to_string()))?; + let (version, value) = Self::decode_vv(&vv); + result.push(KVV(key, (version, value))); + } + Ok(Iter(result.into_iter())) + } + fn delete(&self, key: &str) -> Result<(), Error> { + self.db + .remove(key) + .map_err(|_| Error::Internal("could not remove".to_string())) + } + fn clear_database(&self) -> Result<(), Error> { + self.db + .clear() + .map_err(|_| Error::Internal("could not clear".to_string())) } - Ok(Iter(result.into_iter())) - } - fn delete(&self, key: &str) -> Result<(), Error> { - self.db - .remove(key) - .map_err(|_| Error::Internal("could not remove".to_string())) - } - fn clear_database(&self) -> Result<(), Error> { - self.db - .clear() - .map_err(|_| Error::Internal("could not clear".to_string())) } -} -impl FsKVVStore { - pub fn get_raw(&self, key: &str) -> Result, Error> { - self.meta - .get_raw(key) - .map_err(|_| Error::Internal("failed get_raw".to_string())) - } - pub fn set_raw(&self, key: &str, data: &[u8]) -> Result<(), Error> { - self.meta - .put_raw(key, data) - .map_err(|_| Error::Internal("failed put_raw".to_string())) - } - pub fn delete_raw(&self, key: &str) -> Result<(), Error> { - self.meta - .remove(key) - .map_err(|_| Error::Internal("failed remove".to_string())) + impl FsKVVStore { + pub fn get_raw(&self, key: &str) -> Result, Error> { + self.meta + .get_raw(key) + .map_err(|_| Error::Internal("failed get_raw".to_string())) + } + pub fn set_raw(&self, key: &str, data: &[u8]) -> Result<(), Error> { + self.meta + .put_raw(key, data) + .map_err(|_| Error::Internal("failed put_raw".to_string())) + } + pub fn delete_raw(&self, key: &str) -> Result<(), Error> { + self.meta + .remove(key) + .map_err(|_| Error::Internal("failed remove".to_string())) + } } } diff --git a/vls-mqtt/src/main.rs b/vls-mqtt/src/main.rs index ccab205..dd7bfbd 100644 --- a/vls-mqtt/src/main.rs +++ b/vls-mqtt/src/main.rs @@ -12,7 +12,7 @@ use glyph::ser::{serialize_controlresponse, ByteBuf}; use lss::init_lss; use rand::RngCore; use rocket::tokio::sync::{broadcast, mpsc, oneshot}; -use sphinx_signer::kvv::{CloudKVVStore, FsKVVStore, KVVPersister, RmpFormat}; +use sphinx_signer::kvv::{fs::FsKVVStore, CloudKVVStore, KVVPersister, RmpFormat}; use sphinx_signer::lightning_signer::bitcoin::Network; use sphinx_signer::lightning_signer::persist::Persist; use sphinx_signer::lightning_signer::wallet::Wallet;