Skip to content

Commit

Permalink
give_container for arbitrary container builders (#621)
Browse files Browse the repository at this point in the history
Previously, we'd only support `Session::give_container` for capacity
container builders, but there is no reason not to offer it for all
builders. Here, we change the definition to apply for all session and
buffers. The implementation ensures FIFO order by flushing before pushing
a whole prefabricated container.

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru authored Jan 15, 2025
1 parent 7359175 commit 3a90720
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ impl<T, C: Container + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainer
pub fn autoflush_session(&mut self, cap: Capability<T>) -> AutoflushSession<T, CapacityContainerBuilder<C>, P> where T: Timestamp {
self.autoflush_session_with_builder(cap)
}

/// Gives an entire container at the current time.
fn give_container(&mut self, container: &mut C) {
if !container.is_empty() {
self.builder.push_container(container);
self.extract_and_send();
}
}
}

impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
Expand Down Expand Up @@ -109,6 +101,19 @@ impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB,
Message::push_at(container, time, &mut self.pusher);
}
}

/// Gives an entire container at the current time. Maintains FIFO order with previously pushed
/// data. Only intended to be used through [`Session::give_container`].
// TODO: This method could exist without a container builder, but we can't express this as a
// buffer always requires a container builder. We could expose the buffer's underlying pusher
// directly, but this would bypass the buffer's time tracking.
fn give_container(&mut self, container: &mut CB::Container) {
if !container.is_empty() {
self.flush();
let time = self.time.as_ref().unwrap().clone();
Message::push_at(container, time, &mut self.pusher);
}
}
}

impl<T, CB, P, D> PushInto<D> for Buffer<T, CB, P>
Expand All @@ -133,13 +138,14 @@ pub struct Session<'a, T, CB, P> {
buffer: &'a mut Buffer<T, CB, P>,
}

impl<'a, T, C: Container + Data, P> Session<'a, T, CapacityContainerBuilder<C>, P>
impl<'a, T, CB: ContainerBuilder, P> Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
P: Push<Message<T, C>> + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
/// Provide a container at the time specified by the [Session].
pub fn give_container(&mut self, container: &mut C) {
/// Provide a container at the time specified by the [Session]. Maintains FIFO order with
/// previously pushed data.
pub fn give_container(&mut self, container: &mut CB::Container) {
self.buffer.give_container(container)
}
}
Expand Down

0 comments on commit 3a90720

Please sign in to comment.