Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Activate only by channel ID #526

Merged
merged 3 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions communication/src/allocator/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@

use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;

use crate::{Push, Pull};
use crate::allocator::Event;

/// The push half of an intra-thread channel.
pub struct Pusher<T, P: Push<T>> {
index: usize,
// count: usize,
events: Rc<RefCell<VecDeque<(usize, Event)>>>,
events: Rc<RefCell<Vec<usize>>>,
pusher: P,
phantom: ::std::marker::PhantomData<T>,
}

impl<T, P: Push<T>> Pusher<T, P> {
/// Wraps a pusher with a message counter.
pub fn new(pusher: P, index: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>) -> Self {
pub fn new(pusher: P, index: usize, events: Rc<RefCell<Vec<usize>>>) -> Self {
Pusher {
index,
// count: 0,
Expand All @@ -36,7 +34,7 @@ impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
// if self.count != 0 {
// self.events
// .borrow_mut()
// .push_back((self.index, Event::Pushed(self.count)));
// .push_back(self.index);
// self.count = 0;
// }
// }
Expand All @@ -47,7 +45,7 @@ impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
// moving information along. Better, but needs cooperation.
self.events
.borrow_mut()
.push_back((self.index, Event::Pushed(1)));
.push(self.index);

self.pusher.push(element)
}
Expand All @@ -59,15 +57,15 @@ use crossbeam_channel::Sender;
pub struct ArcPusher<T, P: Push<T>> {
index: usize,
// count: usize,
events: Sender<(usize, Event)>,
events: Sender<usize>,
pusher: P,
phantom: ::std::marker::PhantomData<T>,
buzzer: crate::buzzer::Buzzer,
}

impl<T, P: Push<T>> ArcPusher<T, P> {
/// Wraps a pusher with a message counter.
pub fn new(pusher: P, index: usize, events: Sender<(usize, Event)>, buzzer: crate::buzzer::Buzzer) -> Self {
pub fn new(pusher: P, index: usize, events: Sender<usize>, buzzer: crate::buzzer::Buzzer) -> Self {
ArcPusher {
index,
// count: 0,
Expand Down Expand Up @@ -99,7 +97,7 @@ impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
// and finally awaken the thread. Other orders are defective when
// multiple threads are involved.
self.pusher.push(element);
let _ = self.events.send((self.index, Event::Pushed(1)));
let _ = self.events.send(self.index);
// TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown).
// .expect("Failed to send message count");
self.buzzer.buzz();
Expand All @@ -110,14 +108,14 @@ impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
pub struct Puller<T, P: Pull<T>> {
index: usize,
count: usize,
events: Rc<RefCell<VecDeque<(usize, Event)>>>,
events: Rc<RefCell<Vec<usize>>>,
puller: P,
phantom: ::std::marker::PhantomData<T>,
}

impl<T, P: Pull<T>> Puller<T, P> {
/// Wraps a puller with a message counter.
pub fn new(puller: P, index: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>) -> Self {
pub fn new(puller: P, index: usize, events: Rc<RefCell<Vec<usize>>>) -> Self {
Puller {
index,
count: 0,
Expand All @@ -135,7 +133,7 @@ impl<T, P: Pull<T>> Pull<T> for Puller<T, P> {
if self.count != 0 {
self.events
.borrow_mut()
.push_back((self.index, Event::Pulled(self.count)));
.push(self.index);
self.count = 0;
}
}
Expand Down
7 changes: 3 additions & 4 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;

use crate::allocator::thread::ThreadBuilder;
use crate::allocator::process::ProcessBuilder as TypedProcessBuilder;
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread, Process};
use crate::allocator::{Allocate, AllocateBuilder, Thread, Process};
use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator};
use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};

Expand Down Expand Up @@ -74,7 +73,7 @@ impl Generic {
Generic::ZeroCopy(z) => z.release(),
}
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
match self {
Generic::Thread(ref t) => t.events(),
Generic::Process(ref p) => p.events(),
Expand All @@ -93,7 +92,7 @@ impl Allocate for Generic {

fn receive(&mut self) { self.receive(); }
fn release(&mut self) { self.release(); }
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
fn await_events(&self, _duration: Option<std::time::Duration>) {
match self {
Generic::Thread(t) => t.await_events(_duration),
Expand Down
11 changes: 1 addition & 10 deletions communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;
use std::collections::VecDeque;

pub use self::thread::Thread;
pub use self::process::Process;
Expand Down Expand Up @@ -50,7 +49,7 @@ pub trait Allocate {
/// drain these events in order to drive their computation. If they
/// fail to do so the event queue may become quite large, and turn
/// into a performance problem.
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>>;
fn events(&self) -> &Rc<RefCell<Vec<usize>>>;

/// Awaits communication events.
///
Expand Down Expand Up @@ -92,11 +91,3 @@ pub trait Allocate {
thread::Thread::new_from(identifier, self.events().clone())
}
}

/// A communication channel event.
pub enum Event {
/// A number of messages pushed into the channel.
Pushed(usize),
/// A number of messages pulled from the channel.
Pulled(usize),
}
18 changes: 9 additions & 9 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::cell::RefCell;
use std::sync::{Arc, Mutex};
use std::any::Any;
use std::time::Duration;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap};
use crossbeam_channel::{Sender, Receiver};

use crate::allocator::thread::{ThreadBuilder};
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread};
use crate::allocator::{Allocate, AllocateBuilder, Thread};
use crate::{Push, Pull, Message};
use crate::buzzer::Buzzer;

Expand All @@ -25,8 +25,8 @@ pub struct ProcessBuilder {
buzzers_send: Vec<Sender<Buzzer>>,
buzzers_recv: Vec<Receiver<Buzzer>>,

counters_send: Vec<Sender<(usize, Event)>>,
counters_recv: Receiver<(usize, Event)>,
counters_send: Vec<Sender<usize>>,
counters_recv: Receiver<usize>,
}

impl AllocateBuilder for ProcessBuilder {
Expand Down Expand Up @@ -63,8 +63,8 @@ pub struct Process {
// below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
channels: Arc<Mutex<HashMap</* channel id */ usize, Box<dyn Any+Send>>>>,
buzzers: Vec<Buzzer>,
counters_send: Vec<Sender<(usize, Event)>>,
counters_recv: Receiver<(usize, Event)>,
counters_send: Vec<Sender<usize>>,
counters_recv: Receiver<usize>,
}

impl Process {
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Allocate for Process {
(sends, recv)
}

fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
self.inner.events()
}

Expand All @@ -184,8 +184,8 @@ impl Allocate for Process {

fn receive(&mut self) {
let mut events = self.inner.events().borrow_mut();
while let Ok((index, event)) = self.counters_recv.try_recv() {
events.push_back((index, event));
while let Ok(index) = self.counters_recv.try_recv() {
events.push(index);
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::cell::RefCell;
use std::time::Duration;
use std::collections::VecDeque;

use crate::allocator::{Allocate, AllocateBuilder, Event};
use crate::allocator::{Allocate, AllocateBuilder};
use crate::allocator::counters::Pusher as CountPusher;
use crate::allocator::counters::Puller as CountPuller;
use crate::{Push, Pull, Message};
Expand All @@ -22,7 +22,7 @@ impl AllocateBuilder for ThreadBuilder {
/// An allocator for intra-thread communication.
pub struct Thread {
/// Shared counts of messages in channels.
events: Rc<RefCell<VecDeque<(usize, Event)>>>,
events: Rc<RefCell<Vec<usize>>>,
}

impl Allocate for Thread {
Expand All @@ -32,7 +32,7 @@ impl Allocate for Thread {
let (pusher, puller) = Thread::new_from(identifier, self.events.clone());
(vec![Box::new(pusher)], Box::new(puller))
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
&self.events
}
fn await_events(&self, duration: Option<Duration>) {
Expand All @@ -56,12 +56,12 @@ impl Thread {
/// Allocates a new thread-local channel allocator.
pub fn new() -> Self {
Thread {
events: Rc::new(RefCell::new(VecDeque::new())),
events: Rc::new(RefCell::new(Default::default())),
}
}

/// Creates a new thread-local channel from an identifier and shared counts.
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>)
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<Vec<usize>>>)
-> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>)
{
let shared = Rc::new(RefCell::new((VecDeque::<Message<T>>::new(), VecDeque::<Message<T>>::new())));
Expand Down
5 changes: 2 additions & 3 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::networking::MessageHeader;

use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::AllocateBuilder;
use crate::allocator::Event;
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
Expand Down Expand Up @@ -229,7 +228,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {

// Increment message count for channel.
// Safe to do this even if the channel has been dropped.
events.push_back((header.channel, Event::Pushed(1)));
events.push(header.channel);

// Ensure that a queue exists.
match self.to_local.entry(header.channel) {
Expand Down Expand Up @@ -269,7 +268,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
// }
// }
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
self.inner.events()
}
fn await_events(&self, duration: Option<std::time::Duration>) {
Expand Down
10 changes: 5 additions & 5 deletions communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use bytes::arc::Bytes;
use crate::networking::MessageHeader;

use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::{AllocateBuilder, Event};
use crate::allocator::{AllocateBuilder};
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
Expand Down Expand Up @@ -77,7 +77,7 @@ impl ProcessBuilder {
ProcessAllocator {
index: self.index,
peers: self.peers,
events: Rc::new(RefCell::new(VecDeque::new())),
events: Rc::new(RefCell::new(Default::default())),
canaries: Rc::new(RefCell::new(Vec::new())),
channel_id_bound: None,
staged: Vec::new(),
Expand All @@ -103,7 +103,7 @@ pub struct ProcessAllocator {
index: usize, // number out of peers
peers: usize, // number of peer allocators (for typed channel allocation).

events: Rc<RefCell<VecDeque<(usize, Event)>>>,
events: Rc<RefCell<Vec<usize>>>,

canaries: Rc<RefCell<Vec<usize>>>,

Expand Down Expand Up @@ -196,7 +196,7 @@ impl Allocate for ProcessAllocator {

// Increment message count for channel.
// Safe to do this even if the channel has been dropped.
events.push_back((header.channel, Event::Pushed(1)));
events.push(header.channel);

// Ensure that a queue exists.
match self.to_local.entry(header.channel) {
Expand Down Expand Up @@ -237,7 +237,7 @@ impl Allocate for ProcessAllocator {
// }
}

fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
&self.events
}
fn await_events(&self, duration: Option<std::time::Duration>) {
Expand Down
5 changes: 3 additions & 2 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,9 @@ impl<A: Allocate> Worker<A> {
let events = allocator.events().clone();
let mut borrow = events.borrow_mut();
let paths = self.paths.borrow();
for (channel, _event) in borrow.drain(..) {
// TODO: Pay more attent to `_event`.
borrow.sort_unstable();
borrow.dedup();
for channel in borrow.drain(..) {
// Consider tracking whether a channel
// in non-empty, and only activating
// on the basis of non-empty channels.
Expand Down
Loading