diff --git a/Cargo.lock b/Cargo.lock index 1a4f38352f..1aefcfbf79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -673,6 +673,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "ansi_term" version = "0.12.1" @@ -889,7 +895,7 @@ dependencies = [ "bcs 0.1.4", "blst", "blstrs", - "criterion", + "criterion 0.3.6", "ff", "group", "hex", @@ -3474,7 +3480,7 @@ dependencies = [ "atty", "cast", "clap 2.34.0", - "criterion-plot", + "criterion-plot 0.4.5", "csv", "itertools 0.10.5", "lazy_static", @@ -3491,6 +3497,32 @@ dependencies = [ "walkdir", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap 4.5.4", + "criterion-plot 0.5.0", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + [[package]] name = "criterion-plot" version = "0.4.5" @@ -3501,6 +3533,16 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -9382,7 +9424,7 @@ dependencies = [ name = "poseidon-rs" version = "0.0.10" dependencies = [ - "criterion", + "criterion 0.3.6", "ethereum-types", "ff_ce", "primitive-types 0.12.2", @@ -9780,6 +9822,7 @@ dependencies = [ name = "queue-msg" version = "0.1.0" dependencies = [ + "criterion 0.5.1", "either", "enumorph", "frame-support-procedural", @@ -9795,6 +9838,7 @@ dependencies = [ "tracing", "tracing-subscriber 0.3.18", "unionlabs", + "voyager-message", ] [[package]] diff --git a/lib/queue-msg/Cargo.toml b/lib/queue-msg/Cargo.toml index f2c2f233df..57049212b8 100644 --- a/lib/queue-msg/Cargo.toml +++ b/lib/queue-msg/Cargo.toml @@ -23,9 +23,15 @@ tracing = { workspace = true } unionlabs = { workspace = true } [dev-dependencies] -enumorph = "0.1.2" -tokio = { workspace = true, features = ["time", "rt", "macros"] } -tracing-subscriber = { workspace = true, features = ["env-filter"] } +criterion = { version = "0.5.1", features = ["html_reports"] } +enumorph = "0.1.2" +tokio = { workspace = true, features = ["time", "rt", "macros"] } +tracing-subscriber = { workspace = true, features = ["env-filter"] } +voyager-message.workspace = true [features] default = [] + +[[bench]] +harness = false +name = "normalize" diff --git a/lib/queue-msg/benches/normalize.rs b/lib/queue-msg/benches/normalize.rs new file mode 100644 index 0000000000..8476a1c13d --- /dev/null +++ b/lib/queue-msg/benches/normalize.rs @@ -0,0 +1,100 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use queue_msg::{call, conc, data, noop, normalize::normalize, promise, seq, Op}; +use unionlabs::ibc::core::client::height::Height; +use voyager_message::{ + call::FetchBlock, callback::AggregateFetchBlockRange, core::ChainId, data::LatestHeight, + VoyagerMessage, +}; + +fn bench_normalize(c: &mut Criterion) { + c.bench_function("normalize", |b| { + b.iter_with_setup( + || vec![mk_msg(), mk_msg(), mk_msg()], + |msgs| black_box(normalize(msgs)), + ) + }); +} + +fn mk_msg() -> Op { + seq([ + promise( + [ + data(LatestHeight { + chain_id: ChainId::new("chain"), + height: Height { + revision_number: 1, + revision_height: 1, + }, + }), + call(FetchBlock { + chain_id: ChainId::new("chain"), + height: Height { + revision_number: 1, + revision_height: 1, + }, + }), + conc([ + noop(), + data(LatestHeight { + chain_id: ChainId::new("chain"), + height: Height { + revision_number: 1, + revision_height: 1, + }, + }), + call(FetchBlock { + chain_id: ChainId::new("chain"), + height: Height { + revision_number: 1, + revision_height: 1, + }, + }), + ]), + ], + [], + AggregateFetchBlockRange { + from_height: Height { + revision_number: 1, + revision_height: 1, + }, + }, + ), + seq([ + data(LatestHeight { + chain_id: ChainId::new("chain"), + height: Height { + revision_number: 1, + revision_height: 1, + }, + }), + call(FetchBlock { + chain_id: ChainId::new("chain"), + height: Height { + revision_number: 1, + revision_height: 1, + }, + }), + conc([ + noop(), + data(LatestHeight { + chain_id: ChainId::new("chain"), + height: Height { + revision_number: 1, + revision_height: 1, + }, + }), + call(FetchBlock { + chain_id: ChainId::new("chain"), + height: Height { + revision_number: 1, + revision_height: 1, + }, + }), + ]), + ]), + ]) +} + +criterion_group!(benches, bench_normalize); + +criterion_main!(benches); diff --git a/lib/queue-msg/src/lib.rs b/lib/queue-msg/src/lib.rs index 1f0e579f8e..e7d1020841 100644 --- a/lib/queue-msg/src/lib.rs +++ b/lib/queue-msg/src/lib.rs @@ -6,7 +6,7 @@ use std::{ error::Error, fmt::Debug, future::Future, - num::{NonZeroU64, NonZeroU8}, + num::NonZeroU8, pin::Pin, sync::{ atomic::{AtomicU32, Ordering}, @@ -100,12 +100,6 @@ pub enum Op { Defer { until: u64, }, - /// Repeats the contained message `times` times. If `times` is `None`, will repeat infinitely. - Repeat { - #[serde(default, skip_serializing_if = "Option::is_none")] - times: Option, - msg: Box, - }, /// Executes the contained message only if `timeout_timestamp` has not been hit. Timeout { timeout_timestamp: u64, @@ -133,16 +127,6 @@ pub enum Op { /// Note that this is similar to `Sequence`, except that the new messages are queued at the /// *back* of the list, allowing for uniform progress across all nested messages. Conc(VecDeque), - /// Race a list of messages. The head of the list is handled, and if it returns no new messages, - /// then the rest of the list is dropped; otherwise, the new message is pushed to the back of the - /// list (similar to [`Self::Conc`]). - /// - /// ```txt - /// [A B C] - /// D = handle(A) - /// if D.is_none() noop else race([B C D]) - /// ``` - Race(VecDeque), /// Handle `msg`, retrying on failure. If `msg` fails, this will requeue itself with `remaining - 1`. Retry { remaining: NonZeroU8, @@ -171,16 +155,6 @@ pub fn retry(count: NonZeroU8, t: impl Into>) -> Op { } } -/// Convenience constructor for [`Op::Repeat`] -#[inline] -#[must_use = "constructing an instruction has no effect"] -pub fn repeat(times: impl Into>, t: impl Into>) -> Op { - Op::Repeat { - times: times.into(), - msg: Box::new(t.into()), - } -} - /// Convenience constructor for [`Op::Seq`] #[inline] #[must_use = "constructing an instruction has no effect"] @@ -237,12 +211,6 @@ pub fn void(t: impl Into>) -> Op { Op::Void(Box::new(t.into())) } -#[inline] -#[must_use = "constructing an instruction has no effect"] -pub fn race(ts: impl IntoIterator>) -> Op { - Op::Race(ts.into_iter().collect()) -} - #[inline] #[must_use = "constructing an instruction has no effect"] pub fn noop() -> Op { @@ -408,16 +376,6 @@ impl Op { receiver.handle(store, data).await.map(Some) } } - Op::Repeat { times: None, msg } => { - Ok(Some(seq([*msg.clone(), repeat(None, *msg)]))) - } - Op::Repeat { - times: Some(times), - msg, - } => Ok(Some(seq([*msg.clone()].into_iter().chain( - // if times - 1 > 0, queue repeat with times - 1 - NonZeroU64::new(times.get() - 1_u64).map(|times| repeat(Some(times), *msg)), - )))), Op::Void(msg) => { // TODO: distribute across seq/conc Ok(msg.handle(store, depth + 1).await?.map(|msg| match msg { @@ -428,22 +386,6 @@ impl Op { msg => void(msg), })) } - Op::Race(mut msgs) => match msgs.pop_front() { - Some(msg) => { - match msg.handle(store, depth + 1).await? { - Some(msg) => { - msgs.push_back(msg); - Ok(Some(race(msgs))) - } - // head won, drop the rest of the messages - None => { - info!("race won, dropping other messages"); - Ok(None) - } - } - } - None => Ok(None), - }, Op::Noop => Ok(None), } }; @@ -455,7 +397,7 @@ impl Op { #[cfg(test)] mod tests { use super::*; - use crate::normalize::{flatten_seq, normalize}; + use crate::normalize::normalize; enum UnitMessage {} @@ -521,7 +463,7 @@ mod tests { defer(5), ]); assert_eq!( - flatten_seq(vec![msg]), + normalize(vec![msg]), vec![( vec![0], seq([defer(1), defer(2), defer(3), defer(4), defer(5)]) @@ -549,8 +491,7 @@ mod tests { #[test] fn nested_seq_conc_single() { - // any nesting level of seq and conc should be handled in a single pass of (seq, conc) or - // (conc, seq) + // any nesting level of seq and conc should be handled in a single pass let msg = conc::([seq([conc([noop()])])]); assert_eq!(normalize(vec![msg]), vec![]); diff --git a/lib/queue-msg/src/normalize.rs b/lib/queue-msg/src/normalize.rs index 25eb25a1fd..40463b5c76 100644 --- a/lib/queue-msg/src/normalize.rs +++ b/lib/queue-msg/src/normalize.rs @@ -1,144 +1,80 @@ -use crate::{conc, noop, race, repeat, retry, seq, void, Op, QueueMessage}; - -/// Combination of passes over the queue to normalize the internal structure. See the documentation -/// on each pass for more information on how they work individually. -/// -/// This order is important, and it is recommended to reuse this type instead of the individual -/// passes when using them in combination with your own passes. -/// -/// # Passes -/// -/// First, any `Data` messages are extracted out of their contained structures, pulling them out -/// into individual top-level messages: -/// -/// ```patch -/// - seq(.., seq(data, noop, .., conc(effect, data, repeat(noop)), ..)) -/// + data, data, seq(.., seq(noop, .., conc(effect, repeat(noop)), ..)) -/// ``` -/// -/// Then, any `Noop` messages are removed: -/// -/// ```patch -/// - data, data, seq(.., seq(noop, .., conc(effect, repeat(noop)), ..)) -/// + data, data, seq(.., seq(.., conc(effect), ..)) -/// ``` -/// -/// Next, `Sequence`s are flattened: -/// -/// ```patch -/// - data, data, seq(.., seq(noop, .., conc(effect, repeat(noop)), ..)) -/// + data, data, seq(.., .., conc(effect), ..) -/// ``` -/// -/// Finally, `Concurrent`s are flattened: -/// -/// ```patch -/// - data, data, seq(.., .., conc(effect), ..) -/// + data, data, seq(.., .., effect, ..) -/// ``` -/// -/// `FlattenSeq` and `FlattenConc` are associative, as are `ExtractData` and `RemoveNoop`. -/// -/// Note how if the flattening occurred first, then the `conc(effect) -> effect` transformation could -/// never have occurred since the data and noop messages would still be there, resulting in an -/// incomplete normalization. -#[allow(clippy::let_and_return)] -pub fn normalize(msgs: Vec>) -> Vec<(Vec, Op)> { - // dbg!(&msgs); - - // let ids = msgs.iter().enumerate().map(|(id, _)| id).copied().collect::>(); - let (parent_idxs, msgs): (Vec<_>, Vec<_>) = extract_data(msgs).into_iter().unzip(); - - // dbg!(&msgs); - - let (parent_idxs, msgs): (Vec<_>, Vec<_>) = - combine_normalization_pass_output(parent_idxs, remove_noop(msgs)) - .into_iter() - .unzip(); - - // dbg!(&msgs); - - let (parent_idxs, msgs): (Vec<_>, Vec<_>) = - combine_normalization_pass_output(parent_idxs, flatten_seq(msgs)) - .into_iter() - .unzip(); - - // dbg!(&msgs); - - let output = combine_normalization_pass_output(parent_idxs, flatten_conc(msgs)); - - // dbg!(&output); - - output -} - -fn combine_normalization_pass_output( - previous_parents: Vec>, - mut output: Vec<(Vec, Op)>, -) -> Vec<(Vec, Op)> { - for (parents, _) in &mut output { - *parents = parents - .iter() - .flat_map(|p| &previous_parents[*p]) - .copied() - .collect(); - } - - output -} - -/// Extract all data out of the contained messages, pulling out into top-level messages. -/// -/// Both `Sequence` and `Concurrent` are descended into, as well as `Aggregate` - for `Aggregate`, -/// `Data` messages are pulled out to the top level of the internal aggregation queue. For `seq`, -// `data` messages are only pulled out if they are in the front of the queue. -// REVIEW: Should data messages be queued as ready? -pub fn extract_data(msgs: Vec>) -> Vec<(Vec, Op)> { - // fn extract_data_internal(msg: Op) -> Vec> { - fn go(msg: Op) -> Vec> { - match msg { - Op::Seq(msgs) => { - let mut msgs = msgs.into_iter().flat_map(go).collect::>(); +use either::Either::{Left, Right}; +use itertools::Itertools; + +use crate::{Op, QueueMessage}; + +pub fn normalize(ops: Vec>) -> Vec<(Vec, Op)> { + pub fn go(op: Op) -> Vec> { + match op { + Op::Data(data) => vec![Op::Data(data)], + Op::Call(call) => vec![Op::Call(call)], + Op::Defer { until } => vec![Op::Defer { until }], + Op::Timeout { + timeout_timestamp, + msg, + } => vec![Op::Timeout { + timeout_timestamp, + msg, + }], + Op::Seq(seq) => { + let mut ops = seq.into_iter().flat_map(go).collect::>(); - let first_non_data_msg_idx = msgs + let first_non_data_msg_idx = ops .iter() .enumerate() .find_map(|(idx, msg)| (!matches!(msg, Op::Data(_))).then_some(idx)) - .unwrap_or(msgs.len()); - - // dbg!(&msgs); - // dbg!(first_non_data_msg_idx); - - let non_data_msgs = msgs.split_off(first_non_data_msg_idx); - let data_msgs = msgs; - - if non_data_msgs.is_empty() { - data_msgs - } else { - data_msgs - .into_iter() - .chain([seq(non_data_msgs.into_iter().flat_map(go))]) - .collect() + .unwrap_or(ops.len()); + + match ops.len() { + 0 => vec![], + 1 => vec![ops.pop().expect("length is 1; qed;")], + 2.. => { + let non_data_msgs = ops.split_off(first_non_data_msg_idx); + let data_msgs = ops; + + if non_data_msgs.is_empty() { + data_msgs + } else { + data_msgs + .into_iter() + .chain([Op::Seq( + non_data_msgs + .into_iter() + .flat_map(|op| match op { + Op::Seq(seq) => seq.into(), + op => vec![op], + }) + .collect(), + )]) + .collect() + } + } } - - // if data.is_empty() { - // vec![seq(msgs)] - // } else { - // data.into_iter().chain([seq(msgs)]).collect() - // } } - Op::Conc(msgs) => { - let (data, msgs): (Vec<_>, Vec<_>) = msgs + Op::Conc(conc) => { + let (datas, mut ops): (Vec<_>, Vec<_>) = conc .into_iter() .flat_map(go) - .partition(|msg| matches!(msg, Op::Data(_))); + .flat_map(|op| match op { + Op::Conc(seq) => seq.into(), + op => vec![op], + }) + .partition_map(|op| match op { + Op::Data(data) => Left(data), + op => Right(op), + }); + + let ops = match ops.len() { + 0 => vec![], + 1 => vec![ops.pop().expect("length is 1; qed;")], + 2.. => { + vec![Op::Conc(ops.into())] + } + }; - if data.is_empty() { - vec![conc(msgs)] - } else { - data.into_iter().chain([conc(msgs)]).collect() - } + datas.into_iter().map(Op::Data).chain(ops).collect() } + Op::Retry { remaining, msg } => vec![Op::Retry { remaining, msg }], Op::Promise { queue, data, @@ -148,336 +84,13 @@ pub fn extract_data(msgs: Vec>) -> Vec<(Vec, Op data, receiver, }], - _ => vec![msg], - } - } - - // go(msg) - // } - - msgs.into_iter() - .enumerate() - .flat_map(|(i, msg)| go(msg).into_iter().map(move |msg| (vec![i], msg))) - .collect() -} - -/// Remove any `Noop` messages that don't hold any semantic weight (noop in a race cannot be optimized away, for example) -pub fn remove_noop(msgs: Vec>) -> Vec<(Vec, Op)> { - fn remove_noop_internal(msg: Op) -> Option> { - fn go(msg: Op) -> Option> { - match msg { - Op::Repeat { times, msg } => go(*msg).map(|msg| repeat(times, msg)), - Op::Timeout { - timeout_timestamp, - msg, - } => go(*msg).map(|msg| Op::Timeout { - timeout_timestamp, - msg: Box::new(msg), - }), - Op::Seq(msgs) => Some(seq(msgs.into_iter().flat_map(go))), - Op::Conc(msgs) => Some(conc(msgs.into_iter().flat_map(go))), - Op::Retry { remaining, msg } => go(*msg).map(|msg| retry(remaining, msg)), - Op::Promise { - queue, - data, - receiver, - } => Some(Op::Promise { - queue: queue.into_iter().flat_map(remove_noop_internal).collect(), - data, - receiver, - }), - Op::Void(msg) => go(*msg).map(void), - Op::Noop => None, - _ => Some(msg), - } + Op::Void(op) => vec![Op::Void(op)], + Op::Noop => vec![], } - - go(msg) } - msgs.into_iter() + ops.into_iter() .enumerate() - .flat_map(|(i, msg)| remove_noop_internal(msg).map(|msg| (vec![i], msg))) + .flat_map(|(i, op)| go(op).into_iter().map(move |op| (vec![i], op))) .collect() } - -pub fn flatten_seq(msgs: Vec>) -> Vec<(Vec, Op)> { - fn flatten_seq_internal(msg: Op) -> Option> { - fn go(msg: Op) -> Vec> { - match msg { - Op::Seq(new_seq) => new_seq.into_iter().flat_map(go).collect(), - Op::Conc(c) => vec![conc(c.into_iter().flat_map(|msg| { - let mut msgs = go(msg); - - match msgs.len() { - 0 => None, - 1 => Some(msgs.pop().unwrap()), - _ => Some(seq(msgs)), - } - }))], - Op::Promise { - queue, - data, - receiver, - } => vec![Op::Promise { - queue: queue.into_iter().filter_map(flatten_seq_internal).collect(), - data, - receiver, - }], - Op::Race(msgs) => { - vec![Op::Race( - msgs.into_iter() - .filter_map(|msg| { - // `noop`s hold semantic weight in the context of a race - if msg == noop() { - Some(noop()) - // empty seq holds semantic weight in the context of a race - } else if msg == seq([]) { - Some(seq([])) - } else { - flatten_seq_internal(msg) - } - }) - .collect(), - )] - } - _ => [msg].into(), - } - } - - let mut msgs = go(msg); - - match msgs.len() { - 0 => None, - 1 => Some(msgs.pop().unwrap()), - _ => Some(seq(msgs)), - } - } - - msgs.into_iter() - .enumerate() - .filter_map(|(i, msg)| flatten_seq_internal(msg).map(|msg| (vec![i], msg))) - .collect() -} - -pub fn flatten_conc(msgs: Vec>) -> Vec<(Vec, Op)> { - fn flatten_conc_internal(msg: Op) -> Vec> { - fn go(msg: Op) -> Vec> { - match msg { - Op::Conc(new_conc) => new_conc.into_iter().flat_map(go).collect(), - Op::Seq(s) => vec![seq(s.into_iter().flat_map(|msg| { - let mut msgs = go(msg); - - match msgs.len() { - 0 => None, - 1 => Some(msgs.pop().unwrap()), - // wrap in conc again - // seq(conc(a.., conc(b..)), c..) == seq(conc(a.., b..), c..) - // seq(conc(a.., conc(b..)), c..) != seq(a.., b.., c..) - _ => Some(conc(msgs)), - } - }))], - Op::Promise { - queue, - data, - receiver, - } => vec![Op::Promise { - queue: queue.into_iter().flat_map(flatten_conc_internal).collect(), - data, - receiver, - }], - Op::Race(msgs) => { - vec![race(msgs.into_iter().map(|msg| { - let mut msgs = go(msg); - - match msgs.len() { - 0 => noop(), - 1 => msgs.pop().unwrap(), - _ => conc(msgs), - } - }))] - } - _ => [msg].into(), - } - } - - go(msg) - } - - msgs.into_iter() - .enumerate() - .flat_map(|(i, msg)| { - flatten_conc_internal(msg) - .into_iter() - .map(move |msg| (vec![i], msg)) - }) - .collect() -} - -// #[cfg(test)] -// mod tests { -// use super::*; -// use crate::{ -// aggregate, data, defer_relative, effect, fetch, noop, race, -// test_utils::{ -// AggregatePrintAbc, DataA, DataB, DataC, FetchA, PrintAbc, SimpleEvent, SimpleMessage, -// }, -// }; - -// #[test] -// fn normalize() { -// let msgs = vec![seq::([ -// fetch(FetchA {}), -// seq([ -// data(DataA {}), -// noop(), -// fetch(FetchA {}), -// conc([ -// effect(PrintAbc { -// a: DataA {}, -// b: DataB {}, -// c: DataC {}, -// }), -// data(DataC {}), -// repeat(None, noop()), -// ]), -// fetch(FetchA {}), -// ]), -// ])]; - -// let expected_output = vec![ -// (vec![0], data(DataA {})), -// (vec![0], data(DataC {})), -// ( -// vec![0], -// seq([ -// fetch(FetchA {}), -// fetch(FetchA {}), -// effect(PrintAbc { -// a: DataA {}, -// b: DataB {}, -// c: DataC {}, -// }), -// fetch(FetchA {}), -// ]), -// ), -// ]; - -// let optimized = Normalize::default().run_pass_pure(msgs.clone()); -// assert_eq!(optimized.optimize_further, expected_output); -// assert_eq!(optimized.ready, []); - -// let optimized = NormalizeFinal::default().run_pass_pure(msgs); -// assert_eq!(optimized.ready, expected_output); -// assert_eq!(optimized.optimize_further, []); -// } - -// #[test] -// fn seq_conc_conc() { -// let msgs = vec![seq::([ -// conc([ -// aggregate([], [], AggregatePrintAbc {}), -// aggregate([], [], AggregatePrintAbc {}), -// ]), -// conc([ -// aggregate([], [], AggregatePrintAbc {}), -// aggregate([], [], AggregatePrintAbc {}), -// ]), -// conc([ -// // repeat(None, seq([event(SimpleEvent {}), defer_relative(10)])), -// // repeat(None, seq([event(SimpleEvent {}), defer_relative(10)])), -// // this seq is the only message that should be flattened -// seq([ -// effect(PrintAbc { -// a: DataA {}, -// b: DataB {}, -// c: DataC {}, -// }), -// seq([ -// aggregate([], [], AggregatePrintAbc {}), -// aggregate([], [], AggregatePrintAbc {}), -// aggregate([], [], AggregatePrintAbc {}), -// ]), -// ]), -// ]), -// ])]; - -// let expected_output = vec![( -// vec![0], -// seq::([ -// conc([ -// aggregate([], [], AggregatePrintAbc {}), -// aggregate([], [], AggregatePrintAbc {}), -// ]), -// conc([ -// aggregate([], [], AggregatePrintAbc {}), -// aggregate([], [], AggregatePrintAbc {}), -// ]), -// conc([ -// repeat(None, seq([event(SimpleEvent {}), defer_relative(10)])), -// repeat(None, seq([event(SimpleEvent {}), defer_relative(10)])), -// seq([ -// effect(PrintAbc { -// a: DataA {}, -// b: DataB {}, -// c: DataC {}, -// }), -// aggregate([], [], AggregatePrintAbc {}), -// aggregate([], [], AggregatePrintAbc {}), -// aggregate([], [], AggregatePrintAbc {}), -// ]), -// ]), -// ]), -// )]; - -// let optimized = Normalize::default().run_pass_pure(msgs.clone()); - -// assert_eq!(optimized.optimize_further, expected_output); -// assert_eq!(optimized.ready, []); - -// let optimized = NormalizeFinal::default().run_pass_pure(msgs); -// assert_eq!(optimized.ready, expected_output); -// assert_eq!(optimized.optimize_further, []); -// } - -// #[test] -// fn race_opt() { -// let msgs = vec![race::([ -// seq([event(SimpleEvent {}), event(SimpleEvent {})]), -// conc([ -// event(SimpleEvent {}), -// conc([event(SimpleEvent {}), event(SimpleEvent {})]), -// ]), -// ])]; - -// let expected_output = vec![( -// vec![0], -// race::([ -// seq([event(SimpleEvent {}), event(SimpleEvent {})]), -// conc([ -// event(SimpleEvent {}), -// event(SimpleEvent {}), -// event(SimpleEvent {}), -// ]), -// ]), -// )]; - -// let optimized = Normalize::default().run_pass_pure(msgs.clone()); - -// assert_eq!(optimized.optimize_further, expected_output); -// assert_eq!(optimized.ready, []); -// } - -// #[test] -// fn race_opt_noop() { -// let msgs = vec![race::([seq([]), conc([])])]; - -// // an empty seq optimizes to an empty seq, but an empty conc optimizes to noop -// let expected_output = vec![(vec![0], race::([seq([]), noop()]))]; - -// let optimized = Normalize::default().run_pass_pure(msgs.clone()); - -// assert_eq!(optimized.optimize_further, expected_output); -// assert_eq!(optimized.ready, []); -// } -// } diff --git a/lib/queue-msg/src/optimize.rs b/lib/queue-msg/src/optimize.rs index f15449c9ed..b12fc5774d 100644 --- a/lib/queue-msg/src/optimize.rs +++ b/lib/queue-msg/src/optimize.rs @@ -149,7 +149,7 @@ mod tests { use crate::{ call, conc, data, defer, noop, normalize::normalize, - now, promise, race, repeat, seq, + now, promise, seq, test_utils::{BuildPrintAbc, DataA, DataB, DataC, FetchA, FetchB, PrintAbc, SimpleMessage}, }; @@ -168,7 +168,6 @@ mod tests { c: DataC {}, }), data(DataC {}), - repeat(None, noop()), ]), call(FetchA {}), ]), @@ -220,8 +219,8 @@ mod tests { promise([], [], BuildPrintAbc {}), ]), conc([ - repeat(None, seq([call(FetchA {}), defer(now() + 10)])), - repeat(None, seq([call(FetchB {}), defer(now() + 10)])), + seq([call(FetchA {}), defer(now() + 10)]), + seq([call(FetchB {}), defer(now() + 10)]), // this seq is the only message that should be flattened seq([ call(PrintAbc { @@ -250,8 +249,8 @@ mod tests { promise([], [], BuildPrintAbc {}), ]), conc([ - repeat(None, seq([call(FetchA {}), defer(now() + 10)])), - repeat(None, seq([call(FetchB {}), defer(now() + 10)])), + seq([call(FetchA {}), defer(now() + 10)]), + seq([call(FetchB {}), defer(now() + 10)]), seq([ call(PrintAbc { a: DataA {}, @@ -272,35 +271,4 @@ mod tests { let optimized = normalize(msgs); assert_eq!(optimized, expected_output); } - - #[test] - fn race_opt() { - let msgs = vec![race::([ - seq([call(FetchA {}), call(FetchB {})]), - conc([call(FetchA {}), conc([call(FetchA {}), call(FetchA {})])]), - ])]; - - let expected_output = vec![( - vec![0], - race::([ - seq([call(FetchA {}), call(FetchB {})]), - conc([call(FetchA {}), call(FetchA {}), call(FetchA {})]), - ]), - )]; - - let optimized = normalize(msgs); - assert_eq!(optimized, expected_output); - } - - #[test] - fn race_opt_noop() { - let msgs = vec![race::([seq([]), conc([])])]; - - // an empty seq optimizes to an empty seq, but an empty conc optimizes to noop - // REVIEW: Why? - let expected_output = vec![(vec![0], race::([seq([]), noop()]))]; - - let optimized = normalize(msgs); - assert_eq!(optimized, expected_output); - } } diff --git a/lib/queue-msg/src/optimize/passes.rs b/lib/queue-msg/src/optimize/passes.rs deleted file mode 100644 index 94ee7c6a2d..0000000000 --- a/lib/queue-msg/src/optimize/passes.rs +++ /dev/null @@ -1,321 +0,0 @@ -use tracing::debug; - -use crate::{ - conc, noop, - optimize::{OptimizationResult, Pass, Pure, PurePass}, - race, repeat, retry, seq, void, Op, QueueMessage, -}; - -/// Combination of passes over the queue to normalize the internal structure. See the documentation -/// on each pass for more information on how they work individually. -/// -/// This order is important, and it is recommended to reuse this type instead of the individual -/// passes when using them in combination with your own passes. -/// -/// # Passes -/// -/// First, any `Data` messages are extracted out of their contained structures, pulling them out -/// into individual top-level messages: -/// -/// ```patch -/// - seq(.., seq(data, noop, .., conc(effect, data, repeat(noop)), ..)) -/// + data, data, seq(.., seq(noop, .., conc(effect, repeat(noop)), ..)) -/// ``` -/// -/// Then, any `Noop` messages are removed: -/// -/// ```patch -/// - data, data, seq(.., seq(noop, .., conc(effect, repeat(noop)), ..)) -/// + data, data, seq(.., seq(.., conc(effect), ..)) -/// ``` -/// -/// Next, `Sequence`s are flattened: -/// -/// ```patch -/// - data, data, seq(.., seq(noop, .., conc(effect, repeat(noop)), ..)) -/// + data, data, seq(.., .., conc(effect), ..) -/// ``` -/// -/// Finally, `Concurrent`s are flattened: -/// -/// ```patch -/// - data, data, seq(.., .., conc(effect), ..) -/// + data, data, seq(.., .., effect, ..) -/// ``` -/// -/// `FlattenSeq` and `FlattenConc` are associative, as are `ExtractData` and `RemoveNoop`. -/// -/// Note how if the flattening occurred first, then the `conc(effect) -> effect` transformation could -/// never have occurred since the data and noop messages would still be there, resulting in an -/// incomplete normalization. -pub type Normalize = (ExtractData, (RemoveNoop, (FlattenSeq, FlattenConc))); -pub const NORMALIZE: Normalize = (ExtractData, (RemoveNoop, (FlattenSeq, FlattenConc))); - -/// Runs the normalization passes, and then finalizes the optimizations with [`FinalPass`]. Use this -/// if you don't have any custom optimizations to run. -pub type NormalizeFinal = (Normalize, FinalPass); -pub const NORMALIZE_FINAL: NormalizeFinal = (NORMALIZE, FinalPass); - -const _: fn() = || { - fn impls_pure_pass>() {} - - fn f() { - impls_pure_pass::(); - impls_pure_pass::(); - } - - fn impls_pass>() {} - - fn g() { - impls_pass::>(); - impls_pass::>(); - } -}; - -#[derive(Debug, Clone, Default)] -pub struct FinalPass; - -impl PurePass for FinalPass { - fn run_pass_pure(&self, msgs: Vec>) -> OptimizationResult { - debug!("running final pass"); - - OptimizationResult { - optimize_further: vec![], - ready: msgs - .into_iter() - .enumerate() - .map(|(i, msg)| (vec![i], msg)) - .collect(), - } - } -} - -/// Extract all data out of the contained messages, pulling out into top-level messages. -/// -/// Both `Sequence` and `Concurrent` are descended into, as well as `Aggregate` - for `Aggregate`, -/// `Data` messages are pulled out to the top level of the internal aggregation queue. -// REVIEW: Should data messages be queued as ready? -#[derive(Debug, Clone, Default)] -pub struct ExtractData; - -impl PurePass for ExtractData { - fn run_pass_pure(&self, msgs: Vec>) -> OptimizationResult { - fn extract_data(msg: Op) -> Vec> { - fn go(msg: Op) -> Vec> { - match msg { - Op::Seq(msgs) => { - let (data, msgs): (Vec<_>, Vec<_>) = msgs - .into_iter() - .flat_map(go) - .partition(|msg| matches!(msg, Op::Data(_))); - - if data.is_empty() { - vec![seq(msgs)] - } else { - data.into_iter().chain([seq(msgs)]).collect() - } - } - Op::Conc(msgs) => { - let (data, msgs): (Vec<_>, Vec<_>) = msgs - .into_iter() - .flat_map(go) - .partition(|msg| matches!(msg, Op::Data(_))); - - if data.is_empty() { - vec![conc(msgs)] - } else { - data.into_iter().chain([conc(msgs)]).collect() - } - } - Op::Aggregate { - queue, - data, - receiver, - } => vec![Op::Aggregate { - queue: queue.into_iter().flat_map(extract_data).collect(), - data, - receiver, - }], - _ => vec![msg], - } - } - - go(msg) - } - - OptimizationResult { - optimize_further: msgs - .into_iter() - .enumerate() - .flat_map(|(i, msg)| extract_data(msg).into_iter().map(move |msg| (vec![i], msg))) - .map(|(a, b)| (a, b, "NO TAG".to_owned())) - .collect(), - ready: vec![], - } - } -} - -/// Remove any `Noop` messages. -#[derive(Debug, Clone, Default)] -pub struct RemoveNoop; - -impl PurePass for RemoveNoop { - fn run_pass_pure(&self, msgs: Vec>) -> OptimizationResult { - fn remove_noop(msg: Op) -> Option> { - fn go(msg: Op) -> Option> { - match msg { - Op::Repeat { times, msg } => go(*msg).map(|msg| repeat(times, msg)), - Op::Timeout { - timeout_timestamp, - msg, - } => go(*msg).map(|msg| Op::Timeout { - timeout_timestamp, - msg: Box::new(msg), - }), - Op::Seq(msgs) => Some(seq(msgs.into_iter().flat_map(go))), - Op::Conc(msgs) => Some(conc(msgs.into_iter().flat_map(go))), - Op::Retry { remaining, msg } => go(*msg).map(|msg| retry(remaining, msg)), - Op::Aggregate { - queue, - data, - receiver, - } => Some(Op::Aggregate { - queue: queue.into_iter().flat_map(remove_noop).collect(), - data, - receiver, - }), - Op::Void(msg) => go(*msg).map(void), - Op::Noop => None, - _ => Some(msg), - } - } - - go(msg) - } - - OptimizationResult { - optimize_further: msgs - .into_iter() - .enumerate() - .flat_map(|(i, msg)| remove_noop(msg).map(|msg| (vec![i], msg))) - .map(|(a, b)| (a, b, "NO TAG".to_owned())) - .collect(), - ready: vec![], - } - } -} - -#[derive(Debug, Clone, Default)] -pub struct FlattenSeq; - -impl PurePass for FlattenSeq { - fn run_pass_pure(&self, msgs: Vec>) -> OptimizationResult { - fn flatten_seq(msg: Op) -> Op { - fn go(msg: Op) -> Vec> { - match msg { - Op::Seq(new_seq) => new_seq.into_iter().flat_map(go).collect(), - Op::Conc(c) => vec![conc(c.into_iter().flat_map(|msg| { - let mut msgs = go(msg); - - match msgs.len() { - 0 => None, - 1 => Some(msgs.pop().unwrap()), - _ => Some(seq(msgs)), - } - }))], - Op::Aggregate { - queue, - data, - receiver, - } => vec![Op::Aggregate { - queue: queue.into_iter().map(flatten_seq).collect(), - data, - receiver, - }], - Op::Race(msgs) => { - vec![Op::Race(msgs.into_iter().map(flatten_seq).collect())] - } - _ => [msg].into(), - } - } - - let mut msgs = go(msg); - - if msgs.len() == 1 { - msgs.pop().unwrap() - } else { - seq(msgs) - } - } - - OptimizationResult { - optimize_further: msgs - .into_iter() - .enumerate() - .map(|(i, msg)| (vec![i], flatten_seq(msg))) - .map(|(a, b)| (a, b, "NO TAG".to_owned())) - .collect(), - ready: vec![], - } - } -} - -#[derive(Debug, Clone, Default)] -pub struct FlattenConc; - -impl PurePass for FlattenConc { - fn run_pass_pure(&self, msgs: Vec>) -> OptimizationResult { - fn flatten_conc(msg: Op) -> Vec> { - fn go(msg: Op) -> Vec> { - match msg { - Op::Conc(new_conc) => new_conc.into_iter().flat_map(go).collect(), - Op::Seq(s) => vec![seq(s.into_iter().flat_map(|msg| { - let mut msgs = go(msg); - - match msgs.len() { - 0 => None, - 1 => Some(msgs.pop().unwrap()), - // wrap in conc again - // seq(conc(a.., conc(b..)), c..) == seq(conc(a.., b..), c..) - // seq(conc(a.., conc(b..)), c..) != seq(a.., b.., c..) - _ => Some(conc(msgs)), - } - }))], - Op::Aggregate { - queue, - data, - receiver, - } => vec![Op::Aggregate { - queue: queue.into_iter().flat_map(flatten_conc).collect(), - data, - receiver, - }], - Op::Race(msgs) => { - vec![race(msgs.into_iter().map(|msg| { - let mut msgs = go(msg); - - match msgs.len() { - 0 => noop(), - 1 => msgs.pop().unwrap(), - _ => conc(msgs), - } - }))] - } - _ => [msg].into(), - } - } - - go(msg) - } - - OptimizationResult { - optimize_further: msgs - .into_iter() - .enumerate() - .flat_map(|(i, msg)| flatten_conc(msg).into_iter().map(move |msg| (vec![i], msg))) - .map(|(a, b)| (a, b, "NO TAG".to_owned())) - .collect(), - ready: vec![], - } - } -} diff --git a/voyager/modules/client/movement/src/main.rs b/voyager/modules/client/movement/src/main.rs index 86c576f316..25315cfb2e 100644 --- a/voyager/modules/client/movement/src/main.rs +++ b/voyager/modules/client/movement/src/main.rs @@ -12,7 +12,7 @@ use serde_utils::Hex; use tracing::instrument; use unionlabs::{ self, - aptos::{sparse_merkle_proof::SparseMerkleProof, storage_proof::StorageProof}, + aptos::storage_proof::StorageProof, encoding::{Bcs, DecodeAs, EncodeAs, Proto}, google::protobuf::any::Any, ibc::{