diff --git a/Cargo.toml b/Cargo.toml index 35007aab..f62f96fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,12 +57,14 @@ half = { version = "2.0.0", features = ["bytemuck"] } inventory = "0.3.0" itertools = "0.13.0" lru = "0.12.4" +libc = "0.2.158" moka = { version = "0.12.8", features = ["sync"] } monostate = "0.1.0" ndarray = { version = ">=0.15.0,<17", optional = true } num = { version = "0.4.1" } object_store = { version = ">=0.9.0,<0.12", default-features = false, optional = true } opendal = { version = ">=0.46,<0.50", default-features = false, optional = true } +page_size = "0.6.0" parking_lot = "0.12.0" pathdiff = "0.2.0" pco = { version = "0.3.1", optional = true } diff --git a/src/storage/store.rs b/src/storage/store.rs index e7319c49..0460602d 100644 --- a/src/storage/store.rs +++ b/src/storage/store.rs @@ -8,7 +8,9 @@ mod store_async; mod store_sync; // mod store_plugin; -pub use store_sync::filesystem_store::{FilesystemStore, FilesystemStoreCreateError}; +pub use store_sync::filesystem_store::{ + FilesystemStore, FilesystemStoreCreateError, FilesystemStoreOptions, +}; pub use store_sync::memory_store::MemoryStore; #[cfg(feature = "http")] diff --git a/src/storage/store/store_sync/filesystem_store.rs b/src/storage/store/store_sync/filesystem_store.rs index d5319365..c5f2b24e 100644 --- a/src/storage/store/store_sync/filesystem_store.rs +++ b/src/storage/store/store_sync/filesystem_store.rs @@ -11,6 +11,7 @@ use crate::{ }, }; +use bytes::BytesMut; use parking_lot::RwLock; use thiserror::Error; use walkdir::WalkDir; @@ -23,6 +24,11 @@ use std::{ sync::{Arc, Mutex}, }; +#[cfg(target_os = "linux")] +use libc::O_DIRECT; +#[cfg(target_os = "linux")] +use std::os::unix::fs::OpenOptionsExt; + // // Register the store. // inventory::submit! { // ReadableStorePlugin::new("file", |uri| Ok(Arc::new(create_store_filesystem(uri)?))) @@ -44,6 +50,31 @@ use std::{ // FilesystemStore::new(path).map_err(|e| StorePluginCreateError::Other(e.to_string())) // } +/// For `O_DIRECT`, we need a buffer that is aligned to the page size and is a +/// multiple of the page size. +fn bytes_aligned(size: usize) -> BytesMut { + let align = page_size::get(); + let mut bytes = BytesMut::with_capacity(size + 2 * align); + let offset = bytes.as_ptr().align_offset(align); + bytes.split_off(offset) +} + +/// Options for use with [`FilesystemStore`] +#[non_exhaustive] +#[derive(Debug, Clone, Default)] +pub struct FilesystemStoreOptions { + direct_io: bool, +} + +impl FilesystemStoreOptions { + /// Set whether or not to enable direct I/O. Needs support from the + /// operating system (currently only Linux) and file system. + pub fn direct_io(&mut self, direct_io: bool) -> &mut Self { + self.direct_io = direct_io; + self + } +} + /// A synchronous file system store. /// /// See . @@ -52,6 +83,7 @@ pub struct FilesystemStore { base_path: PathBuf, sort: bool, readonly: bool, + options: FilesystemStoreOptions, files: Mutex>>>, // locks: StoreLocks, } @@ -64,6 +96,19 @@ impl FilesystemStore { /// - is not valid, or /// - it points to an existing file rather than a directory. pub fn new>(base_path: P) -> Result { + Self::new_with_options(base_path, FilesystemStoreOptions::default()) + } + + /// Create a new file system store at a given `base_path` and `options`. + /// + /// # Errors + /// Returns a [`FilesystemStoreCreateError`] if `base_directory`: + /// - is not valid, or + /// - it points to an existing file rather than a directory. + pub fn new_with_options>( + base_path: P, + options: FilesystemStoreOptions, + ) -> Result { let base_path = base_path.as_ref().to_path_buf(); if base_path.to_str().is_none() { return Err(FilesystemStoreCreateError::InvalidBasePath(base_path)); @@ -83,10 +128,10 @@ impl FilesystemStore { Ok(Self { base_path, sort: false, + options, readonly, files: Mutex::default(), }) - // Self::new_with_locks(base_path, Arc::new(DefaultStoreLocks::default())) } // /// Create a new file system store at a given `base_path` with non-default store locks. @@ -192,17 +237,42 @@ impl FilesystemStore { } } - let mut file = OpenOptions::new() - .write(true) - .create(true) - .truncate(truncate) - .open(key_path)?; + let mut flags = OpenOptions::new(); + flags.write(true).create(true).truncate(truncate); + + // FIXME: for now, only Linux support; also no support for `offset != 0` + let enable_direct = cfg!(target_os = "linux") + && self.options.direct_io + && offset.is_none() + && !value.is_empty(); + + #[cfg(target_os = "linux")] + if enable_direct { + flags.custom_flags(O_DIRECT); + } + + let mut file = flags.open(key_path)?; // Write - if let Some(offset) = offset { - file.seek(SeekFrom::Start(offset))?; + if enable_direct { + let mut buf = bytes_aligned(value.len()); + buf.extend_from_slice(value); + + // Pad to page size + let pad_size = buf.len().next_multiple_of(page_size::get()) - buf.len(); + buf.extend(std::iter::repeat(0).take(pad_size)); + + file.write_all(&buf)?; + + // Truncate again to requested size + file.set_len(value.len() as u64)?; + } else { + if let Some(offset) = offset { + file.seek(SeekFrom::Start(offset))?; + } + + file.write_all(value)?; } - file.write_all(value)?; Ok(()) } @@ -427,4 +497,18 @@ mod tests { super::super::test_util::store_list(&store)?; Ok(()) } + + #[test] + // #[cfg_attr(miri, ignore)] + fn direct_io() -> Result<(), Box> { + let path = tempfile::TempDir::new()?; + let mut opts = FilesystemStoreOptions::default(); + opts.direct_io(true); + + let store = FilesystemStore::new_with_options(path.path(), opts)?.sorted(); + super::super::test_util::store_write(&store)?; + super::super::test_util::store_read(&store)?; + super::super::test_util::store_list(&store)?; + Ok(()) + } }