Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial direct I/O support in FilesystemStore #58

Merged
merged 8 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ half = { version = "2.0.0", features = ["bytemuck"] }
inventory = "0.3.0"
itertools = "0.13.0"
lru = "0.12.4"
libc = "0.2.158"
moka = { version = "0.12.8", features = ["sync"] }
monostate = "0.1.0"
ndarray = { version = ">=0.15.0,<17", optional = true }
num = { version = "0.4.1" }
object_store = { version = ">=0.9.0,<0.12", default-features = false, optional = true }
opendal = { version = ">=0.46,<0.50", default-features = false, optional = true }
page_size = "0.6.0"
parking_lot = "0.12.0"
pathdiff = "0.2.0"
pco = { version = "0.3.1", optional = true }
Expand Down
4 changes: 3 additions & 1 deletion src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ mod store_async;
mod store_sync;
// mod store_plugin;

pub use store_sync::filesystem_store::{FilesystemStore, FilesystemStoreCreateError};
pub use store_sync::filesystem_store::{
FilesystemStore, FilesystemStoreCreateError, FilesystemStoreOptions,
};
pub use store_sync::memory_store::MemoryStore;

#[cfg(feature = "http")]
Expand Down
160 changes: 152 additions & 8 deletions src/storage/store/store_sync/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@
},
};

use libc::O_DIRECT;
use parking_lot::RwLock;
use thiserror::Error;
use walkdir::WalkDir;

use std::{
alloc::{alloc_zeroed, dealloc, handle_alloc_error, Layout},
collections::HashMap,
fs::{File, OpenOptions},
io::{Read, Seek, SeekFrom, Write},
ops::{Deref, DerefMut},
os::unix::fs::OpenOptionsExt,
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
Expand All @@ -44,6 +48,95 @@
// FilesystemStore::new(path).map_err(|e| StorePluginCreateError::Other(e.to_string()))
// }

/// For `O_DIRECT`, we need a buffer that is aligned to the page size and is a
/// multiple of the page size.
struct PageAlignedBuffer {
buf: *mut u8,
layout: Layout,
}

impl PageAlignedBuffer {
/// Allocate a new page-size aligned buffer of `size` bytes. The actual size
/// will be rounded up to the next largest multiple of the page size.
pub fn new(size: usize) -> Self {
let align = page_size::get();
let layout = Layout::from_size_align(size, align)
.expect("size and align are reasonable")
.pad_to_align();

assert!(layout.size() > 0);

Check warning on line 67 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L61-L67

Added lines #L61 - L67 were not covered by tests
// SAFETY: `layout` is non-zero, as asserted above.
let buf = unsafe { alloc_zeroed(layout) };

// buf can be zero when out of memory, or if the allocator doesn't like
// our `Layout`
if buf.is_null() {
handle_alloc_error(layout);
}

Self { buf, layout }
}

Check warning on line 78 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L69-L78

Added lines #L69 - L78 were not covered by tests
}

impl Deref for PageAlignedBuffer {
type Target = [u8];

fn deref(&self) -> &Self::Target {
// SAFETY:
// * "data must be valid for reads for len * mem::size_of::<T>() many bytes, and it must be properly aligned"
// => T is u8 => alignment is trivial
// => `self.buf` is non-null, as per `buf.is_null` check above
// => `self.buf` is a single allocation
// * "`data` must point to len consecutive properly initialized values of type T."
// => `self.buf` is zero-initialized
// * "The memory referenced by the returned slice must not be mutated for the duration of lifetime 'a, except inside an UnsafeCell"
// => guaranteed by the borrow checker for us
// * "The total size len * mem::size_of::<T>() of the slice must be no
// larger than isize::MAX, and adding that size to data must not “wrap
// around” the address space."
// => given from the invariants of `Layout`
unsafe { std::slice::from_raw_parts(self.buf, self.layout.size()) }
}

Check warning on line 99 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L84-L99

Added lines #L84 - L99 were not covered by tests
}

impl DerefMut for PageAlignedBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
// SAFETY: see `deref` with the following modification:
// "The memory referenced by the returned slice must not be accessed
// through any other pointer (not derived from the return value) for the
// duration of lifetime 'a. Both read and write accesses are forbidden."
// => guaranteed by the mutable borrow
unsafe { std::slice::from_raw_parts_mut(self.buf, self.layout.size()) }
}

Check warning on line 110 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L103-L110

Added lines #L103 - L110 were not covered by tests
}

impl Drop for PageAlignedBuffer {
fn drop(&mut self) {
// SAFETY:
// * "ptr must denote a block of memory currently allocated via this allocator,"
// => we get the pointer from `System.alloc_zeroed`, and it is only free'd here in `drop`
// * "layout must be the same layout that was used to allocate that block of memory."
// => we use the `Layout` value previously used for allocation
unsafe { dealloc(self.buf, self.layout) }
}

Check warning on line 121 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L114-L121

Added lines #L114 - L121 were not covered by tests
}

/// Options for use with [`FilesystemStore`]
#[non_exhaustive]
#[derive(Debug, Clone, Default)]
pub struct FilesystemStoreOptions {
direct_io: bool,
}

impl FilesystemStoreOptions {
/// Set whether or not to enable direct I/O. Needs support from the
/// operating system (currently only Linux) and file system.
pub fn direct_io(&mut self, direct_io: bool) -> &mut Self {
self.direct_io = direct_io;
self
}

Check warning on line 137 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L134-L137

Added lines #L134 - L137 were not covered by tests
}

/// A synchronous file system store.
///
/// See <https://zarr-specs.readthedocs.io/en/latest/v3/stores/filesystem/v1.0.html>.
Expand All @@ -52,6 +145,7 @@
base_path: PathBuf,
sort: bool,
readonly: bool,
options: FilesystemStoreOptions,
files: Mutex<HashMap<StoreKey, Arc<RwLock<()>>>>,
// locks: StoreLocks,
}
Expand Down Expand Up @@ -83,12 +177,48 @@
Ok(Self {
base_path,
sort: false,
options: FilesystemStoreOptions::default(),
readonly,
files: Mutex::default(),
})
// Self::new_with_locks(base_path, Arc::new(DefaultStoreLocks::default()))
}

/// Create a new file system store at a given `base_path` and `options`.
///
/// # Errors
/// Returns a [`FilesystemStoreCreateError`] if `base_directory`:
/// - is not valid, or
/// - it points to an existing file rather than a directory.
pub fn new_with_options<P: AsRef<Path>>(
sk1p marked this conversation as resolved.
Show resolved Hide resolved
base_path: P,
options: FilesystemStoreOptions,
) -> Result<Self, FilesystemStoreCreateError> {
let base_path = base_path.as_ref().to_path_buf();
if base_path.to_str().is_none() {
return Err(FilesystemStoreCreateError::InvalidBasePath(base_path));
}

Check warning on line 200 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L193-L200

Added lines #L193 - L200 were not covered by tests

let readonly = if base_path.exists() {

Check warning on line 202 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L202

Added line #L202 was not covered by tests
// the path already exists, check if it is read only
let md = std::fs::metadata(&base_path).map_err(FilesystemStoreCreateError::IOError)?;
md.permissions().readonly()

Check warning on line 205 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L204-L205

Added lines #L204 - L205 were not covered by tests
} else {
// the path does not exist, so try and create it. If this succeeds, the filesystem is not read only
std::fs::create_dir_all(&base_path).map_err(FilesystemStoreCreateError::IOError)?;
std::fs::remove_dir(&base_path)?;
false

Check warning on line 210 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L208-L210

Added lines #L208 - L210 were not covered by tests
};

Ok(Self {
base_path,
sort: false,
options,
readonly,
files: Mutex::default(),
})
}

Check warning on line 220 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L213-L220

Added lines #L213 - L220 were not covered by tests

// /// Create a new file system store at a given `base_path` with non-default store locks.
// ///
// /// # Errors
Expand Down Expand Up @@ -192,17 +322,31 @@
}
}

let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(truncate)
.open(key_path)?;
let mut flags = OpenOptions::new();
flags.write(true).create(true).truncate(truncate);

// FIXME: for now, only Unix support; also no support for `offset != 0`
let enable_direct = cfg!(unix) && self.options.direct_io && offset.is_none();

if enable_direct {
flags.custom_flags(O_DIRECT);

Check warning on line 332 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L332

Added line #L332 was not covered by tests
}

let mut file = flags.open(key_path)?;

// Write
if let Some(offset) = offset {
file.seek(SeekFrom::Start(offset))?;
if enable_direct {
let mut buf = PageAlignedBuffer::new(value.len());
buf[0..value.len()].copy_from_slice(value);
file.write_all(&buf)?;
file.set_len(value.len() as u64)?;

Check warning on line 342 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L339-L342

Added lines #L339 - L342 were not covered by tests
} else {
if let Some(offset) = offset {
file.seek(SeekFrom::Start(offset))?;

Check warning on line 345 in src/storage/store/store_sync/filesystem_store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/store/store_sync/filesystem_store.rs#L345

Added line #L345 was not covered by tests
}

file.write_all(value)?;
}
file.write_all(value)?;

Ok(())
}
Expand Down
Loading