Skip to content

Commit

Permalink
feat: rewrite queue-msg normalization pass (#3026)
Browse files Browse the repository at this point in the history
closes #3003

results:

```
normalize               time:   [15.757 µs 15.946 µs 16.192 µs] // old
normalize2              time:   [5.5628 µs 5.5822 µs 5.6047 µs] // new
```

also removed some unused `Op` variants.
  • Loading branch information
benluelo authored Sep 26, 2024
2 parents 7ea184e + 53f555a commit 525a45d
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 886 deletions.
50 changes: 47 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions lib/queue-msg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ tracing = { workspace = true }
unionlabs = { workspace = true }

[dev-dependencies]
enumorph = "0.1.2"
tokio = { workspace = true, features = ["time", "rt", "macros"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
criterion = { version = "0.5.1", features = ["html_reports"] }
enumorph = "0.1.2"
tokio = { workspace = true, features = ["time", "rt", "macros"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
voyager-message.workspace = true

[features]
default = []

[[bench]]
harness = false
name = "normalize"
100 changes: 100 additions & 0 deletions lib/queue-msg/benches/normalize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use queue_msg::{call, conc, data, noop, normalize::normalize, promise, seq, Op};
use unionlabs::ibc::core::client::height::Height;
use voyager_message::{
call::FetchBlock, callback::AggregateFetchBlockRange, core::ChainId, data::LatestHeight,
VoyagerMessage,
};

fn bench_normalize(c: &mut Criterion) {
c.bench_function("normalize", |b| {
b.iter_with_setup(
|| vec![mk_msg(), mk_msg(), mk_msg()],
|msgs| black_box(normalize(msgs)),
)
});
}

fn mk_msg() -> Op<VoyagerMessage> {
seq([
promise(
[
data(LatestHeight {
chain_id: ChainId::new("chain"),
height: Height {
revision_number: 1,
revision_height: 1,
},
}),
call(FetchBlock {
chain_id: ChainId::new("chain"),
height: Height {
revision_number: 1,
revision_height: 1,
},
}),
conc([
noop(),
data(LatestHeight {
chain_id: ChainId::new("chain"),
height: Height {
revision_number: 1,
revision_height: 1,
},
}),
call(FetchBlock {
chain_id: ChainId::new("chain"),
height: Height {
revision_number: 1,
revision_height: 1,
},
}),
]),
],
[],
AggregateFetchBlockRange {
from_height: Height {
revision_number: 1,
revision_height: 1,
},
},
),
seq([
data(LatestHeight {
chain_id: ChainId::new("chain"),
height: Height {
revision_number: 1,
revision_height: 1,
},
}),
call(FetchBlock {
chain_id: ChainId::new("chain"),
height: Height {
revision_number: 1,
revision_height: 1,
},
}),
conc([
noop(),
data(LatestHeight {
chain_id: ChainId::new("chain"),
height: Height {
revision_number: 1,
revision_height: 1,
},
}),
call(FetchBlock {
chain_id: ChainId::new("chain"),
height: Height {
revision_number: 1,
revision_height: 1,
},
}),
]),
]),
])
}

criterion_group!(benches, bench_normalize);

criterion_main!(benches);
67 changes: 4 additions & 63 deletions lib/queue-msg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
error::Error,
fmt::Debug,
future::Future,
num::{NonZeroU64, NonZeroU8},
num::NonZeroU8,
pin::Pin,
sync::{
atomic::{AtomicU32, Ordering},
Expand Down Expand Up @@ -100,12 +100,6 @@ pub enum Op<T: QueueMessage> {
Defer {
until: u64,
},
/// Repeats the contained message `times` times. If `times` is `None`, will repeat infinitely.
Repeat {
#[serde(default, skip_serializing_if = "Option::is_none")]
times: Option<NonZeroU64>,
msg: Box<Self>,
},
/// Executes the contained message only if `timeout_timestamp` has not been hit.
Timeout {
timeout_timestamp: u64,
Expand Down Expand Up @@ -133,16 +127,6 @@ pub enum Op<T: QueueMessage> {
/// Note that this is similar to `Sequence`, except that the new messages are queued at the
/// *back* of the list, allowing for uniform progress across all nested messages.
Conc(VecDeque<Self>),
/// Race a list of messages. The head of the list is handled, and if it returns no new messages,
/// then the rest of the list is dropped; otherwise, the new message is pushed to the back of the
/// list (similar to [`Self::Conc`]).
///
/// ```txt
/// [A B C]
/// D = handle(A)
/// if D.is_none() noop else race([B C D])
/// ```
Race(VecDeque<Self>),
/// Handle `msg`, retrying on failure. If `msg` fails, this will requeue itself with `remaining - 1`.
Retry {
remaining: NonZeroU8,
Expand Down Expand Up @@ -171,16 +155,6 @@ pub fn retry<T: QueueMessage>(count: NonZeroU8, t: impl Into<Op<T>>) -> Op<T> {
}
}

/// Convenience constructor for [`Op::Repeat`]
#[inline]
#[must_use = "constructing an instruction has no effect"]
pub fn repeat<T: QueueMessage>(times: impl Into<Option<NonZeroU64>>, t: impl Into<Op<T>>) -> Op<T> {
Op::Repeat {
times: times.into(),
msg: Box::new(t.into()),
}
}

/// Convenience constructor for [`Op::Seq`]
#[inline]
#[must_use = "constructing an instruction has no effect"]
Expand Down Expand Up @@ -237,12 +211,6 @@ pub fn void<T: QueueMessage>(t: impl Into<Op<T>>) -> Op<T> {
Op::Void(Box::new(t.into()))
}

#[inline]
#[must_use = "constructing an instruction has no effect"]
pub fn race<T: QueueMessage>(ts: impl IntoIterator<Item = Op<T>>) -> Op<T> {
Op::Race(ts.into_iter().collect())
}

#[inline]
#[must_use = "constructing an instruction has no effect"]
pub fn noop<T: QueueMessage>() -> Op<T> {
Expand Down Expand Up @@ -408,16 +376,6 @@ impl<T: QueueMessage> Op<T> {
receiver.handle(store, data).await.map(Some)
}
}
Op::Repeat { times: None, msg } => {
Ok(Some(seq([*msg.clone(), repeat(None, *msg)])))
}
Op::Repeat {
times: Some(times),
msg,
} => Ok(Some(seq([*msg.clone()].into_iter().chain(
// if times - 1 > 0, queue repeat with times - 1
NonZeroU64::new(times.get() - 1_u64).map(|times| repeat(Some(times), *msg)),
)))),
Op::Void(msg) => {
// TODO: distribute across seq/conc
Ok(msg.handle(store, depth + 1).await?.map(|msg| match msg {
Expand All @@ -428,22 +386,6 @@ impl<T: QueueMessage> Op<T> {
msg => void(msg),
}))
}
Op::Race(mut msgs) => match msgs.pop_front() {
Some(msg) => {
match msg.handle(store, depth + 1).await? {
Some(msg) => {
msgs.push_back(msg);
Ok(Some(race(msgs)))
}
// head won, drop the rest of the messages
None => {
info!("race won, dropping other messages");
Ok(None)
}
}
}
None => Ok(None),
},
Op::Noop => Ok(None),
}
};
Expand All @@ -455,7 +397,7 @@ impl<T: QueueMessage> Op<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::normalize::{flatten_seq, normalize};
use crate::normalize::normalize;

enum UnitMessage {}

Expand Down Expand Up @@ -521,7 +463,7 @@ mod tests {
defer(5),
]);
assert_eq!(
flatten_seq(vec![msg]),
normalize(vec![msg]),
vec![(
vec![0],
seq([defer(1), defer(2), defer(3), defer(4), defer(5)])
Expand Down Expand Up @@ -549,8 +491,7 @@ mod tests {

#[test]
fn nested_seq_conc_single() {
// any nesting level of seq and conc should be handled in a single pass of (seq, conc) or
// (conc, seq)
// any nesting level of seq and conc should be handled in a single pass

let msg = conc::<UnitMessage>([seq([conc([noop()])])]);
assert_eq!(normalize(vec![msg]), vec![]);
Expand Down
Loading

0 comments on commit 525a45d

Please sign in to comment.