Skip to content

Commit

Permalink
Demonstrate BytesSlab with abstracted bytes storage (#641)
Browse files Browse the repository at this point in the history
* Demonstrate BytesSlab with abstracted bytes storage

* lgalloc example

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>

* Fix lgalloc integration

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>

* Only build lgalloc eample on supported os

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>

* Only depend on lgalloc on supported os

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>

---------

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
Co-authored-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
frankmcsherry and antiguru authored Feb 13, 2025
1 parent 67d8e05 commit 839d02d
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 28 deletions.
4 changes: 4 additions & 0 deletions communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ timely_bytes = { path = "../bytes", version = "0.13" }
timely_container = { path = "../container", version = "0.14.0" }
timely_logging = { path = "../logging", version = "0.13" }
crossbeam-channel = "0.5"

# Lgalloc only supports linux and macos, don't depend on any other OS.
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
lgalloc = "0.4"
108 changes: 108 additions & 0 deletions communication/examples/lgalloc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#![cfg(any(target_os = "linux", target_os = "macos"))]

use std::ops::{Deref, DerefMut};
use std::ptr::NonNull;
use timely_communication::{Allocate, Bytesable};

/// A wrapper that indicates the serialization/deserialization strategy.
pub struct Message {
/// Text contents.
pub payload: String,
}

impl Bytesable for Message {
fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
}

fn length_in_bytes(&self) -> usize {
self.payload.len()
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
writer.write_all(self.payload.as_bytes()).unwrap();
}
}

fn lgalloc_refill(size: usize) -> Box<LgallocHandle> {
let (pointer, capacity, handle) = lgalloc::allocate::<u8>(size).unwrap();
let handle = Some(handle);
Box::new(LgallocHandle { handle, pointer, capacity })
}

struct LgallocHandle {
handle: Option<lgalloc::Handle>,
pointer: NonNull<u8>,
capacity: usize,
}

impl Deref for LgallocHandle {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &Self::Target {
unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
}
}

impl DerefMut for LgallocHandle {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
}
}

impl Drop for LgallocHandle {
fn drop(&mut self) {
lgalloc::deallocate(self.handle.take().unwrap());
}
}

fn main() {
let mut config = lgalloc::LgAlloc::new();
config.enable().with_path(std::env::temp_dir());
lgalloc::lgalloc_set_config(&config);

let refill = std::sync::Arc::new(|size| lgalloc_refill(size) as Box<dyn DerefMut<Target=[u8]>>);

// extract the configuration from user-supplied arguments, initialize the computation.
let config = timely_communication::Config::ProcessBinary(4);
let (allocators, others) = config.try_build_with(refill).unwrap();
let guards = timely_communication::initialize_from(allocators, others, |mut allocator| {

println!("worker {} of {} started", allocator.index(), allocator.peers());

// allocates a pair of senders list and one receiver.
let (mut senders, mut receiver) = allocator.allocate(0);

// send typed data along each channel
for i in 0 .. allocator.peers() {
senders[i].send(Message { payload: format!("hello, {}", i)});
senders[i].done();
}

// no support for termination notification,
// we have to count down ourselves.
let mut received = 0;
while received < allocator.peers() {

allocator.receive();

if let Some(message) = receiver.recv() {
println!("worker {}: received: <{}>", allocator.index(), message.payload);
received += 1;
}

allocator.release();
}

allocator.index()
});

// computation runs until guards are joined or dropped.
if let Ok(guards) = guards {
for guard in guards.join() {
println!("result: {:?}", guard);
}
}
else { println!("error in computation"); }
}
4 changes: 3 additions & 1 deletion communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ impl<T: Clone> Push<T> for Broadcaster<T> {
}
}

use crate::allocator::zero_copy::bytes_slab::BytesRefill;

/// A builder for vectors of peers.
pub trait PeerBuilder {
/// The peer type.
type Peer: AllocateBuilder + Sized;
/// Allocate a list of `Self::Peer` of length `peers`.
fn new_vector(peers: usize) -> Vec<Self::Peer>;
fn new_vector(peers: usize, refill: BytesRefill) -> Vec<Self::Peer>;
}
2 changes: 1 addition & 1 deletion communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Process {
impl PeerBuilder for Process {
type Peer = ProcessBuilder;
/// Allocate a list of connected intra-process allocators.
fn new_vector(peers: usize) -> Vec<ProcessBuilder> {
fn new_vector(peers: usize, _refill: crate::allocator::BytesRefill) -> Vec<ProcessBuilder> {

let mut counters_send = Vec::with_capacity(peers);
let mut counters_recv = Vec::with_capacity(peers);
Expand Down
12 changes: 8 additions & 4 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::networking::MessageHeader;
use crate::{Allocate, Push, Pull};
use crate::allocator::{AllocateBuilder, Exchangeable};
use crate::allocator::canary::Canary;

use crate::allocator::zero_copy::bytes_slab::BytesRefill;
use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
use super::push_pull::{Pusher, PullerInner};

Expand All @@ -27,6 +27,8 @@ pub struct TcpBuilder<A: AllocateBuilder> {
peers: usize, // number of peer allocators.
futures: Vec<Receiver<MergeQueue>>, // to receive queues to each network thread.
promises: Vec<Sender<MergeQueue>>, // to send queues from each network thread.
/// Byte slab refill function.
refill: BytesRefill,
}

/// Creates a vector of builders, sharing appropriate state.
Expand All @@ -44,8 +46,9 @@ pub struct TcpBuilder<A: AllocateBuilder> {
pub fn new_vector<A: AllocateBuilder>(
allocators: Vec<A>,
my_process: usize,
processes: usize)
-> (Vec<TcpBuilder<A>>,
processes: usize,
refill: BytesRefill,
) -> (Vec<TcpBuilder<A>>,
Vec<Vec<Sender<MergeQueue>>>,
Vec<Vec<Receiver<MergeQueue>>>)
{
Expand All @@ -68,6 +71,7 @@ pub fn new_vector<A: AllocateBuilder>(
peers: threads * processes,
promises,
futures,
refill: refill.clone(),
}})
.collect();

Expand All @@ -92,7 +96,7 @@ impl<A: AllocateBuilder> TcpBuilder<A> {
let mut sends = Vec::with_capacity(self.peers);
for pusher in self.futures.into_iter() {
let queue = pusher.recv().expect("Failed to receive push queue");
let sendpoint = SendEndpoint::new(queue);
let sendpoint = SendEndpoint::new(queue, self.refill.clone());
sends.push(Rc::new(RefCell::new(sendpoint)));
}

Expand Down
8 changes: 5 additions & 3 deletions communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::networking::MessageHeader;
use crate::{Allocate, Push, Pull};
use crate::allocator::{AllocateBuilder, Exchangeable, PeerBuilder};
use crate::allocator::canary::Canary;

use crate::allocator::zero_copy::bytes_slab::BytesRefill;
use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};

use super::push_pull::{Pusher, Puller};
Expand All @@ -28,14 +28,15 @@ pub struct ProcessBuilder {
peers: usize, // number of peer allocators.
pushers: Vec<Receiver<MergeQueue>>, // for pushing bytes at other workers.
pullers: Vec<Sender<MergeQueue>>, // for pulling bytes from other workers.
refill: BytesRefill,
}

impl PeerBuilder for ProcessBuilder {
type Peer = ProcessBuilder;
/// Creates a vector of builders, sharing appropriate state.
///
/// This method requires access to a byte exchanger, from which it mints channels.
fn new_vector(count: usize) -> Vec<ProcessBuilder> {
fn new_vector(count: usize, refill: BytesRefill) -> Vec<ProcessBuilder> {

// Channels for the exchange of `MergeQueue` endpoints.
let (pullers_vec, pushers_vec) = crate::promise_futures(count, count);
Expand All @@ -50,6 +51,7 @@ impl PeerBuilder for ProcessBuilder {
peers: count,
pushers,
pullers,
refill: refill.clone(),
}
)
.collect()
Expand All @@ -73,7 +75,7 @@ impl ProcessBuilder {
let mut sends = Vec::with_capacity(self.peers);
for pusher in self.pushers.into_iter() {
let queue = pusher.recv().expect("Failed to receive MergeQueue");
let sendpoint = SendEndpoint::new(queue);
let sendpoint = SendEndpoint::new(queue, self.refill.clone());
sends.push(Rc::new(RefCell::new(sendpoint)));
}

Expand Down
6 changes: 3 additions & 3 deletions communication/src/allocator/zero_copy/bytes_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
use std::collections::VecDeque;

use timely_bytes::arc::Bytes;
use super::bytes_slab::BytesSlab;
use super::bytes_slab::{BytesRefill, BytesSlab};

/// A target for `Bytes`.
pub trait BytesPush {
Expand Down Expand Up @@ -142,10 +142,10 @@ impl<P: BytesPush> SendEndpoint<P> {
}

/// Allocates a new `BytesSendEndpoint` from a shared queue.
pub fn new(queue: P) -> Self {
pub fn new(queue: P, refill: BytesRefill) -> Self {
SendEndpoint {
send: queue,
buffer: BytesSlab::new(20),
buffer: BytesSlab::new(20, refill),
}
}
/// Makes the next `bytes` bytes valid.
Expand Down
32 changes: 28 additions & 4 deletions communication/src/allocator/zero_copy/bytes_slab.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! A large binary allocation for writing and sharing.
use std::ops::{Deref, DerefMut};
use timely_bytes::arc::{Bytes, BytesMut};

/// A large binary allocation for writing and sharing.
Expand All @@ -13,17 +14,22 @@ pub struct BytesSlab {
stash: Vec<BytesMut>, // reclaimed and reusable buffers.
shift: usize, // current buffer allocation size.
valid: usize, // buffer[..valid] are valid bytes.
new_bytes: BytesRefill, // function to allocate new buffers.
}

/// A function to allocate a new buffer of at least `usize` bytes.
pub type BytesRefill = std::sync::Arc<dyn Fn(usize) -> Box<dyn DerefMut<Target=[u8]>>+Send+Sync>;

impl BytesSlab {
/// Allocates a new `BytesSlab` with an initial size determined by a shift.
pub fn new(shift: usize) -> Self {
pub fn new(shift: usize, new_bytes: BytesRefill) -> Self {
BytesSlab {
buffer: BytesMut::from(vec![0u8; 1 << shift].into_boxed_slice()),
buffer: BytesMut::from(BoxDerefMut { boxed: new_bytes(1 << shift) }),
in_progress: Vec::new(),
stash: Vec::new(),
shift,
valid: 0,
new_bytes,
}
}
/// The empty region of the slab.
Expand Down Expand Up @@ -68,7 +74,7 @@ impl BytesSlab {
if self.stash.is_empty() {
for shared in self.in_progress.iter_mut() {
if let Some(mut bytes) = shared.take() {
if bytes.try_regenerate::<Box<[u8]>>() {
if bytes.try_regenerate::<BoxDerefMut>() {
// NOTE: Test should be redundant, but better safe...
if bytes.len() == (1 << self.shift) {
self.stash.push(bytes);
Expand All @@ -82,7 +88,7 @@ impl BytesSlab {
self.in_progress.retain(|x| x.is_some());
}

let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(vec![0; 1 << self.shift].into_boxed_slice()));
let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(BoxDerefMut { boxed: (self.new_bytes)(1 << self.shift) }));
let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);

self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);
Expand All @@ -92,3 +98,21 @@ impl BytesSlab {
}
}
}

/// A wrapper for `Box<dyn DerefMut<Target=T>>` that dereferences to `T` rather than `dyn DerefMut<Target=T>`.
struct BoxDerefMut {
boxed: Box<dyn DerefMut<Target=[u8]>+'static>,
}

impl Deref for BoxDerefMut {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.boxed[..]
}
}

impl DerefMut for BoxDerefMut {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.boxed[..]
}
}
13 changes: 9 additions & 4 deletions communication/src/allocator/zero_copy/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::sync::Arc;
use timely_logging::Logger;
use crate::allocator::PeerBuilder;
use crate::allocator::zero_copy::bytes_slab::BytesRefill;
use crate::logging::CommunicationEventBuilder;
use crate::networking::create_sockets;
use super::tcp::{send_loop, recv_loop};
Expand Down Expand Up @@ -39,12 +40,13 @@ pub fn initialize_networking<P: PeerBuilder>(
my_index: usize,
threads: usize,
noisy: bool,
refill: BytesRefill,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<P::Peer>>, CommsGuard)>
{
let sockets = create_sockets(addresses, my_index, noisy)?;
initialize_networking_from_sockets::<_, P>(sockets, my_index, threads, log_sender)
initialize_networking_from_sockets::<_, P>(sockets, my_index, threads, refill, log_sender)
}

/// Initialize send and recv threads from sockets.
Expand All @@ -58,6 +60,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static, P: PeerBuilder>(
mut sockets: Vec<Option<S>>,
my_index: usize,
threads: usize,
refill: BytesRefill,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<P::Peer>>, CommsGuard)>
Expand All @@ -69,14 +72,15 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static, P: PeerBuilder>(

let processes = sockets.len();

let process_allocators = P::new_vector(threads);
let (builders, promises, futures) = new_vector(process_allocators, my_index, processes);
let process_allocators = P::new_vector(threads, refill.clone());
let (builders, promises, futures) = new_vector(process_allocators, my_index, processes, refill.clone());

let mut promises_iter = promises.into_iter();
let mut futures_iter = futures.into_iter();

let mut send_guards = Vec::with_capacity(sockets.len());
let mut recv_guards = Vec::with_capacity(sockets.len());
let refill = refill.clone();

// for each process, if a stream exists (i.e. not local) ...
for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
Expand Down Expand Up @@ -108,6 +112,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static, P: PeerBuilder>(
// let remote_sends = remote_sends.clone();
let log_sender = log_sender.clone();
let stream = stream.try_clone()?;
let refill = refill.clone();
let join_guard =
::std::thread::Builder::new()
.name(format!("timely:recv-{}", index))
Expand All @@ -117,7 +122,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static, P: PeerBuilder>(
sender: false,
remote: Some(index),
});
recv_loop(stream, remote_send, threads * my_index, my_index, index, logger);
recv_loop(stream, remote_send, threads * my_index, my_index, index, refill, logger);
})?;

recv_guards.push(join_guard);
Expand Down
Loading

0 comments on commit 839d02d

Please sign in to comment.