Skip to content

Commit

Permalink
bytes: Separate Bytes and BytesMut (#637)
Browse files Browse the repository at this point in the history
* Remove some unsafe

* Rename Bytes to BytesMut

* Introduce and use Bytes

* Update comments and doctests

* Comments, inlines, and pruning
  • Loading branch information
frankmcsherry authored Feb 11, 2025
1 parent 6826f06 commit 38ef688
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 46 deletions.
138 changes: 104 additions & 34 deletions bytes/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
//! A simplified implementation of the `bytes` crate, with different features, less safety.
//!
//! The crate is currently minimalist rather than maximalist, and for example does not support
//! methods on `BytesMut` that seem like they should be safe, because they are not yet needed.
//! For example, `BytesMut` should be able to implement `Send`, and `BytesMut::extract_to` could
//! return a `BytesMut` rather than a `Bytes`.
//!
//! # Examples
//!
//! ```
//! use timely_bytes::arc::Bytes;
//! use timely_bytes::arc::BytesMut;
//!
//! let bytes = vec![0u8; 1024];
//! let mut shared1 = Bytes::from(bytes);
//! let mut shared1 = BytesMut::from(bytes);
//! let mut shared2 = shared1.extract_to(100);
//! let mut shared3 = shared1.extract_to(100);
//! let mut shared4 = shared2.extract_to(60);
Expand All @@ -17,13 +22,10 @@
//! assert_eq!(shared4.len(), 60);
//!
//! for byte in shared1.iter_mut() { *byte = 1u8; }
//! for byte in shared2.iter_mut() { *byte = 2u8; }
//! for byte in shared3.iter_mut() { *byte = 3u8; }
//! for byte in shared4.iter_mut() { *byte = 4u8; }
//!
//! // memory in slabs [4, 2, 3, 1]: merge back in arbitrary order.
//! shared2.try_merge(shared3).ok().expect("Failed to merge 2 and 3");
//! shared2.try_merge(shared1).ok().expect("Failed to merge 23 and 1");
//! shared2.try_merge(shared1.freeze()).ok().expect("Failed to merge 23 and 1");
//! shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
//!
//! assert_eq!(shared4.len(), 1024);
Expand All @@ -38,7 +40,11 @@ pub mod arc {
use std::any::Any;

/// A thread-safe byte buffer backed by a shared allocation.
pub struct Bytes {
///
/// An instance of this type contends that `ptr` is valid for `len` bytes,
/// and that no other reference to these bytes exists, other than through
/// the type currently held in `sequestered`.
pub struct BytesMut {
/// Pointer to the start of this slice (not the allocation).
ptr: *mut u8,
/// Length of this slice.
Expand All @@ -51,15 +57,10 @@ pub mod arc {
sequestered: Arc<dyn Any>,
}

// Synchronization happens through `self.sequestered`, which mean to ensure that even
// across multiple threads each region of the slice is uniquely "owned", if not in the
// traditional Rust sense.
unsafe impl Send for Bytes { }

impl Bytes {
impl BytesMut {

/// Create a new instance from a byte allocation.
pub fn from<B>(bytes: B) -> Bytes where B : DerefMut<Target=[u8]>+'static {
pub fn from<B>(bytes: B) -> BytesMut where B : DerefMut<Target=[u8]>+'static {

// Sequester allocation behind an `Arc`, which *should* keep the address
// stable for the lifetime of `sequestered`. The `Arc` also serves as our
Expand All @@ -73,7 +74,7 @@ pub mod arc {
.map(|a| (a.as_mut_ptr(), a.len()))
.unwrap();

Bytes {
BytesMut {
ptr,
len,
sequestered,
Expand All @@ -90,40 +91,40 @@ pub mod arc {

assert!(index <= self.len);

let result = Bytes {
let result = BytesMut {
ptr: self.ptr,
len: index,
sequestered: self.sequestered.clone(),
};

unsafe { self.ptr = self.ptr.add(index); }
self.ptr = self.ptr.wrapping_add(index);
self.len -= index;

result
result.freeze()
}

/// Regenerates the Bytes if it is uniquely held.
/// Regenerates the BytesMut if it is uniquely held.
///
/// If uniquely held, this method recovers the initial pointer and length
/// of the sequestered allocation and re-initializes the Bytes. The return
/// of the sequestered allocation and re-initializes the BytesMut. The return
/// value indicates whether this occurred.
///
/// # Examples
///
/// ```
/// use timely_bytes::arc::Bytes;
/// use timely_bytes::arc::BytesMut;
///
/// let bytes = vec![0u8; 1024];
/// let mut shared1 = Bytes::from(bytes);
/// let mut shared1 = BytesMut::from(bytes);
/// let mut shared2 = shared1.extract_to(100);
/// let mut shared3 = shared1.extract_to(100);
/// let mut shared4 = shared2.extract_to(60);
///
/// drop(shared1);
/// drop(shared3);
/// drop(shared2);
/// drop(shared4);
/// assert!(shared3.try_regenerate::<Vec<u8>>());
/// assert!(shared3.len() == 1024);
/// assert!(shared1.try_regenerate::<Vec<u8>>());
/// assert!(shared1.len() == 1024);
/// ```
pub fn try_regenerate<B>(&mut self) -> bool where B: DerefMut<Target=[u8]>+'static {
// Only possible if this is the only reference to the sequestered allocation.
Expand All @@ -138,6 +139,80 @@ pub mod arc {
}
}

/// Converts a writeable byte slice to a shareable byte slice.
#[inline(always)]
pub fn freeze(self) -> Bytes {
Bytes {
ptr: self.ptr,
len: self.len,
sequestered: self.sequestered,
}
}
}

impl Deref for BytesMut {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &[u8] {
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
}
}

impl DerefMut for BytesMut {
#[inline(always)]
fn deref_mut(&mut self) -> &mut [u8] {
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
}
}


/// A thread-safe shared byte buffer backed by a shared allocation.
///
/// An instance of this type contends that `ptr` is valid for `len` bytes,
/// and that no other mutable reference to these bytes exists, other than
/// through the type currently held in `sequestered`.
#[derive(Clone)]
pub struct Bytes {
/// Pointer to the start of this slice (not the allocation).
ptr: *const u8,
/// Length of this slice.
len: usize,
/// Shared access to underlying resources.
///
/// Importantly, this is unavailable for as long as the struct exists, which may
/// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
/// enough to make a stronger statement about this.
sequestered: Arc<dyn Any>,
}

// Synchronization happens through `self.sequestered`, which means to ensure that even
// across multiple threads the referenced range of bytes remain valid.
unsafe impl Send for Bytes { }

impl Bytes {

/// Extracts [0, index) into a new `Bytes` which is returned, updating `self`.
///
/// # Safety
///
/// This method first tests `index` against `self.len`, which should ensure that both
/// the returned `Bytes` contains valid memory, and that `self` can no longer access it.
pub fn extract_to(&mut self, index: usize) -> Bytes {

assert!(index <= self.len);

let result = Bytes {
ptr: self.ptr,
len: index,
sequestered: self.sequestered.clone(),
};

self.ptr = self.ptr.wrapping_add(index);
self.len -= index;

result
}

/// Attempts to merge adjacent slices from the same allocation.
///
/// If the merge succeeds then `other.len` is added to `self` and the result is `Ok(())`.
Expand All @@ -147,10 +222,10 @@ pub mod arc {
/// # Examples
///
/// ```
/// use timely_bytes::arc::Bytes;
/// use timely_bytes::arc::BytesMut;
///
/// let bytes = vec![0u8; 1024];
/// let mut shared1 = Bytes::from(bytes);
/// let mut shared1 = BytesMut::from(bytes).freeze();
/// let mut shared2 = shared1.extract_to(100);
/// let mut shared3 = shared1.extract_to(100);
/// let mut shared4 = shared2.extract_to(60);
Expand All @@ -161,7 +236,7 @@ pub mod arc {
/// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
/// ```
pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> {
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.add(self.len) }, other.ptr) {
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(self.ptr.wrapping_add(self.len), other.ptr) {
self.len += other.len;
Ok(())
}
Expand All @@ -173,14 +248,9 @@ pub mod arc {

impl Deref for Bytes {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &[u8] {
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
}
}

impl DerefMut for Bytes {
fn deref_mut(&mut self) -> &mut [u8] {
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
}
}
}
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
// No splitting occurs across allocations.
while bytes.len() > 0 {

if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
if let Some(header) = MessageHeader::try_read(&bytes[..]) {

// Get the header and payload, ditch the header.
let mut peel = bytes.extract_to(header.required_bytes());
Expand Down
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Allocate for ProcessAllocator {
// No splitting occurs across allocations.
while bytes.len() > 0 {

if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
if let Some(header) = MessageHeader::try_read(&bytes[..]) {

// Get the header and payload, ditch the header.
let mut peel = bytes.extract_to(header.required_bytes());
Expand Down
14 changes: 7 additions & 7 deletions communication/src/allocator/zero_copy/bytes_slab.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
//! A large binary allocation for writing and sharing.
use timely_bytes::arc::Bytes;
use timely_bytes::arc::{Bytes, BytesMut};

/// A large binary allocation for writing and sharing.
///
/// A bytes slab wraps a `Bytes` and maintains a valid (written) length, and supports writing after
/// A bytes slab wraps a `BytesMut` and maintains a valid (written) length, and supports writing after
/// this valid length, and extracting `Bytes` up to this valid length. Extracted bytes are enqueued
/// and checked for uniqueness in order to recycle them (once all shared references are dropped).
pub struct BytesSlab {
buffer: Bytes, // current working buffer.
in_progress: Vec<Option<Bytes>>, // buffers shared with workers.
stash: Vec<Bytes>, // reclaimed and reusable buffers.
buffer: BytesMut, // current working buffer.
in_progress: Vec<Option<BytesMut>>, // buffers shared with workers.
stash: Vec<BytesMut>, // reclaimed and reusable buffers.
shift: usize, // current buffer allocation size.
valid: usize, // buffer[..valid] are valid bytes.
}
Expand All @@ -19,7 +19,7 @@ impl BytesSlab {
/// Allocates a new `BytesSlab` with an initial size determined by a shift.
pub fn new(shift: usize) -> Self {
BytesSlab {
buffer: Bytes::from(vec![0u8; 1 << shift].into_boxed_slice()),
buffer: BytesMut::from(vec![0u8; 1 << shift].into_boxed_slice()),
in_progress: Vec::new(),
stash: Vec::new(),
shift,
Expand Down Expand Up @@ -82,7 +82,7 @@ impl BytesSlab {
self.in_progress.retain(|x| x.is_some());
}

let new_buffer = self.stash.pop().unwrap_or_else(|| Bytes::from(vec![0; 1 << self.shift].into_boxed_slice()));
let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(vec![0; 1 << self.shift].into_boxed_slice()));
let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);

self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);
Expand Down
4 changes: 2 additions & 2 deletions communication/src/allocator/zero_copy/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ pub fn send_loop<S: Stream>(
}
else {
// TODO: Could do scatter/gather write here.
for mut bytes in stash.drain(..) {
for bytes in stash.drain(..) {

// Record message sends.
logger.as_mut().map(|logger| {
let mut offset = 0;
while let Some(header) = MessageHeader::try_read(&mut bytes[offset..]) {
while let Some(header) = MessageHeader::try_read(&bytes[offset..]) {
logger.log(MessageEvent { is_send: true, header, });
offset += header.required_bytes();
}
Expand Down
2 changes: 1 addition & 1 deletion communication/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl MessageHeader {

/// Returns a header when there is enough supporting data
#[inline]
pub fn try_read(bytes: &mut [u8]) -> Option<MessageHeader> {
pub fn try_read(bytes: &[u8]) -> Option<MessageHeader> {
let mut cursor = io::Cursor::new(&bytes[..]);
let mut buffer = [0; Self::FIELDS];
cursor.read_u64_into::<ByteOrder>(&mut buffer).ok()?;
Expand Down

0 comments on commit 38ef688

Please sign in to comment.