From 191960999842f07f98bdc2f113f85cc75d11a311 Mon Sep 17 00:00:00 2001 From: benluelo Date: Mon, 16 Sep 2024 14:31:07 +0100 Subject: [PATCH] wip --- lib/serde-utils/src/lib.rs | 1 + lib/voyager-message/src/call.rs | 33 +- lib/voyager-message/src/module.rs | 2 +- lib/voyager-message/src/pass.rs | 38 +- .../modules/consensus/cometbls/src/main.rs | 2 +- .../consensus/ethereum/src/callback.rs | 441 +----------------- .../modules/consensus/ethereum/src/main.rs | 407 ++++++++++++++-- voyager/plugins/packet-filter/src/main.rs | 6 +- .../plugins/transaction-batch/src/callback.rs | 189 ++++---- voyager/plugins/transaction-batch/src/main.rs | 67 ++- voyager/src/queue.rs | 8 +- 11 files changed, 575 insertions(+), 619 deletions(-) diff --git a/lib/serde-utils/src/lib.rs b/lib/serde-utils/src/lib.rs index 60c9f3a0fc..67966428e1 100644 --- a/lib/serde-utils/src/lib.rs +++ b/lib/serde-utils/src/lib.rs @@ -425,6 +425,7 @@ pub mod hex_string_list { } } +// TODO: Check if human readable pub mod string { use alloc::string::String; use core::{fmt::Display, str::FromStr}; diff --git a/lib/voyager-message/src/call.rs b/lib/voyager-message/src/call.rs index 5369b97245..4c85486fc8 100644 --- a/lib/voyager-message/src/call.rs +++ b/lib/voyager-message/src/call.rs @@ -9,7 +9,7 @@ use macros::apply; use queue_msg::{ call, conc, data, defer, noop, now, promise, queue_msg, seq, HandleCall, Op, QueueError, }; -use serde_json::Value; +use serde_json::{json, Value}; use serde_utils::Hex; use tracing::{debug, info, info_span, instrument, trace, Instrument}; use unionlabs::{ @@ -744,12 +744,31 @@ impl HandleCall> for counterparty_chain_id, update_from, update_to, - }) => ctx - .consensus_module(&chain_id) - .map_err(error_object_to_queue_error)? - .fetch_update_headers(update_from, update_to, counterparty_chain_id) - .await - .map_err(json_rpc_error_to_queue_error), + }) => { + info!( + %chain_id, + %counterparty_chain_id, + %update_from, + %update_to, + "fetching update headers", + ); + if update_from >= update_to { + Err(QueueError::Fatal( + format!( + "update range is invalid ({update_to} >= {update_to}), \ + chain_id: {chain_id}, counterparty_chain_id: \ + {counterparty_chain_id}" + ) + .into(), + )) + } else { + ctx.consensus_module(&chain_id) + .map_err(error_object_to_queue_error)? + .fetch_update_headers(update_from, update_to, counterparty_chain_id) + .await + .map_err(json_rpc_error_to_queue_error) + } + } Call::MakeMsgCreateClient(MakeMsgCreateClient { chain_id, diff --git a/lib/voyager-message/src/module.rs b/lib/voyager-message/src/module.rs index 6563b48450..a568f212b8 100644 --- a/lib/voyager-message/src/module.rs +++ b/lib/voyager-message/src/module.rs @@ -406,7 +406,7 @@ pub trait ConsensusModule { /// [`AggregateMsgUpdateClientsFromOrderedHeaders`] message, which will /// be used to build the actual [`MsgUpdateClient`]s. #[method(name = "fetchUpdateHeaders")] - fn fetch_update_headers( + async fn fetch_update_headers( &self, update_from: Height, update_to: Height, diff --git a/lib/voyager-message/src/pass.rs b/lib/voyager-message/src/pass.rs index 862ae841e5..8763c659f7 100644 --- a/lib/voyager-message/src/pass.rs +++ b/lib/voyager-message/src/pass.rs @@ -5,7 +5,7 @@ use queue_msg::{ optimize::{OptimizationResult, PurePass}, BoxDynError, Op, }; -use tracing::{error, info, trace, trace_span}; +use tracing::{error, trace, trace_span}; use unionlabs::ErrorReporter; use crate::VoyagerMessage; @@ -87,7 +87,9 @@ impl PurePass for JaqInterestFilter { )); let Some(result) = out.next() else { - panic!("filter didn't return any values"); + error!("filter didn't return any values"); + + continue; }; let result = match result { @@ -99,22 +101,24 @@ impl PurePass for JaqInterestFilter { } }; - assert!(out.next().is_none(), "filter returned too many items"); - - match result { - Val::Bool(true) => { - info!(%msg_json, "interest"); - - opt_res - .optimize_further - .push((vec![idx], msg, plugin_name.clone())); - - continue 'outer; - } - Val::Bool(false) => { - trace!(%msg_json, "no interest"); + if out.next().is_some() { + error!("filter returned multiple values, only a single boolean value is valid"); + } else { + match result { + Val::Bool(true) => { + trace!(%msg_json, "interest"); + + opt_res + .optimize_further + .push((vec![idx], msg, plugin_name.clone())); + + continue 'outer; + } + Val::Bool(false) => { + trace!(%msg_json, "no interest"); + } + _ => error!("filter returned a non-boolean value: {result:?}"), } - _ => error!("filter returned a non-boolean value: {result:?}"), } } diff --git a/voyager/modules/consensus/cometbls/src/main.rs b/voyager/modules/consensus/cometbls/src/main.rs index 7fe6402f11..864abff761 100644 --- a/voyager/modules/consensus/cometbls/src/main.rs +++ b/voyager/modules/consensus/cometbls/src/main.rs @@ -354,7 +354,7 @@ impl ConsensusModuleServer for ModuleSer } #[instrument(skip_all, fields(chain_id = %self.ctx.chain_id))] - fn fetch_update_headers( + async fn fetch_update_headers( &self, update_from: Height, update_to: Height, diff --git a/voyager/modules/consensus/ethereum/src/callback.rs b/voyager/modules/consensus/ethereum/src/callback.rs index 078e4e56e4..e3dbf95927 100644 --- a/voyager/modules/consensus/ethereum/src/callback.rs +++ b/voyager/modules/consensus/ethereum/src/callback.rs @@ -1,443 +1,6 @@ -use std::{collections::VecDeque, ops::Div}; - -use beacon_api::{client::BeaconApiClient, types::Spec}; -use chain_utils::ethereum::ETHEREUM_REVISION_NUMBER; use enumorph::Enumorph; -use frunk::{hlist_pat, HList}; -use jsonrpsee::{core::RpcResult, types::ErrorObject}; -use queue_msg::{ - aggregation::{DoCallback, SubsetOf}, - call, data, promise, queue_msg, seq, Op, -}; -use tracing::debug; -use unionlabs::{ - self, - constants::metric::NANOS_PER_SECOND, - ethereum::beacon::light_client_finality_update::UnboundedLightClientFinalityUpdate, - ibc::{ - core::client::height::Height, - lightclients::ethereum::{ - header::UnboundedHeader, - light_client_update::UnboundedLightClientUpdate, - trusted_sync_committee::{UnboundedActiveSyncCommittee, UnboundedTrustedSyncCommittee}, - }, - }, - ErrorReporter, -}; -use voyager_message::{ - call::{Call, WaitForTimestamp}, - callback::Callback, - data::{Data, DecodedHeaderMeta, OrderedHeaders}, - ChainId, PluginMessage, VoyagerMessage, -}; - -use crate::{ - call::{ - FetchAccountUpdate, FetchBeaconGenesis, FetchLightClientUpdate, FetchLightClientUpdates, - ModuleCall, - }, - data::{ - AccountUpdateData, BeaconGenesis, BeaconSpec, FinalityUpdate, Header, LightClientUpdate, - LightClientUpdates, ModuleData, - }, -}; +use queue_msg::queue_msg; #[queue_msg] #[derive(Enumorph)] -pub enum ModuleCallback { - MakeCreateUpdates(MakeCreateUpdates), - MakeCreateUpdatesFromLightClientUpdates(MakeCreateUpdatesFromLightClientUpdates), - CreateUpdate(CreateUpdate), - AggregateHeaders(AggregateHeaders), -} - -/// This is the entrypoint into the update construction for ethereum clients. This will requeue [`MakeCreateUpdatesFromLightClientUpdates`] with [`LightClientUpdates`] in the range `update_from..`. Note that the `update_to` field is currently mostly ignored. -#[queue_msg] -pub struct MakeCreateUpdates { - pub update_from: Height, - pub update_to: Height, - pub counterparty_chain_id: ChainId<'static>, -} - -impl DoCallback> for MakeCreateUpdates { - type Params = HList![PluginMessage, PluginMessage]; - - fn call( - MakeCreateUpdates { - update_from, - update_to, - counterparty_chain_id, - }: Self, - hlist_pat![ - PluginMessage { - plugin, - message: FinalityUpdate { finality_update }, - }, - PluginMessage { - plugin: _, - message: BeaconSpec { spec }, - } - ]: Self::Params, - ) -> Op> { - let target_period = - sync_committee_period(finality_update.attested_header.beacon.slot, spec.period()); - - let trusted_period = sync_committee_period(update_from.revision_height, spec.period()); - - assert!( - trusted_period <= target_period, - "trusted period {trusted_period} is behind target \ - period {target_period}, something is wrong!", - ); - - // Eth chain is more than 1 signature period ahead of us. We need to do sync committee - // updates until we reach the `target_period - 1`. - promise( - [call(Call::plugin( - plugin.clone(), - FetchLightClientUpdates { - trusted_period, - target_period, - }, - ))], - [Data::plugin(plugin.clone(), BeaconSpec { spec })], - Callback::plugin( - plugin, - MakeCreateUpdatesFromLightClientUpdates { - update_from, - update_to, - finality_update, - counterparty_chain_id, - }, - ), - ) - } -} - -/// The second step in the update construction process. -#[queue_msg] -pub struct MakeCreateUpdatesFromLightClientUpdates { - // this was previously duplicated as `trusted_height` - pub update_from: Height, - pub update_to: Height, - pub finality_update: UnboundedLightClientFinalityUpdate, - pub counterparty_chain_id: ChainId<'static>, -} - -impl DoCallback> - for MakeCreateUpdatesFromLightClientUpdates -{ - type Params = HList![PluginMessage, PluginMessage]; - - fn call( - MakeCreateUpdatesFromLightClientUpdates { - update_from, - update_to, - finality_update, - counterparty_chain_id, - }: Self, - hlist_pat![ - PluginMessage { - plugin, - message: LightClientUpdates { - light_client_updates, - }, - }, - PluginMessage { - plugin: _, - message: BeaconSpec { spec }, - } - ]: Self::Params, - ) -> Op> { - let target_period = sync_committee_period(finality_update.signature_slot, spec.period()); - - let trusted_period = sync_committee_period(update_from.revision_height, spec.period()); - - let (updates, last_update_block_number) = light_client_updates.into_iter().fold( - (VecDeque::new(), update_from.revision_height), - |(mut vec, mut trusted_slot), update| { - let old_trusted_slot = trusted_slot; - - // REVIEW: Assert that this is greater (i.e. increasing)? - trusted_slot = update.attested_header.beacon.slot; - - vec.push_back(make_create_update( - plugin.clone(), - old_trusted_slot, - update, - true, - &spec, - )); - - (vec, trusted_slot) - }, - ); - - let lc_updates = if trusted_period < target_period { - updates - } else { - [].into() - }; - - let does_not_have_finality_update = last_update_block_number >= update_to.revision_height; - - debug!(last_update_block_number, update_to.revision_height); - - let finality_update_msg = if does_not_have_finality_update { - // do nothing - None - } else { - // do finality update - Some(make_create_update( - plugin.clone(), - last_update_block_number, - UnboundedLightClientUpdate { - attested_header: finality_update.attested_header, - next_sync_committee: None, - next_sync_committee_branch: None, - finalized_header: finality_update.finalized_header, - finality_branch: finality_update.finality_branch, - sync_aggregate: finality_update.sync_aggregate, - signature_slot: finality_update.signature_slot, - }, - false, - &spec, - )) - }; - - promise( - [seq(lc_updates.into_iter().chain(finality_update_msg))], - [], - Callback::plugin( - plugin, - AggregateHeaders { - counterparty_chain_id, - }, - ), - ) - } -} - -fn make_create_update( - plugin: String, - currently_trusted_slot: u64, - light_client_update: UnboundedLightClientUpdate, - is_next: bool, - spec: &Spec, -) -> Op> { - // When we fetch the update at this height, the `next_sync_committee` will - // be the current sync committee of the period that we want to update to. - let previous_period = u64::max( - 1, - light_client_update.attested_header.beacon.slot / spec.period(), - ) - 1; - - promise( - [ - call(Call::plugin( - &plugin, - FetchLightClientUpdate { - period: previous_period, - }, - )), - call(Call::plugin( - &plugin, - FetchAccountUpdate { - slot: light_client_update.attested_header.beacon.slot, - }, - )), - call(Call::plugin(&plugin, FetchBeaconGenesis {})), - ], - [Data::plugin(&plugin, BeaconSpec { spec: spec.clone() })], - Callback::plugin( - &plugin, - CreateUpdate { - // chain_id, - // counterparty_chain_id, - currently_trusted_slot, - light_client_update, - is_next, - }, - ), - ) -} - -#[queue_msg] -pub struct CreateUpdate { - // pub chain_id: String, - // pub counterparty_chain_id: String, - pub currently_trusted_slot: u64, - pub light_client_update: UnboundedLightClientUpdate, - pub is_next: bool, -} - -impl DoCallback> for CreateUpdate { - type Params = HList![ - PluginMessage, - PluginMessage, - PluginMessage, - PluginMessage, - ]; - - fn call( - CreateUpdate { - // chain_id, - // counterparty_chain_id, - currently_trusted_slot, - light_client_update, - is_next, - }: Self, - hlist_pat![ - PluginMessage { - plugin, - message: LightClientUpdate { - update: previous_period_light_client_update - }, - }, - PluginMessage { - plugin: _, - message: AccountUpdateData { - slot: _, - update: account_update, - }, - }, - PluginMessage { - plugin: _, - message: BeaconGenesis { genesis: _ }, - }, - PluginMessage { - plugin: _, - message: BeaconSpec { spec: _ }, - }, - ]: Self::Params, - ) -> Op> { - // seq([ - // REVIEW: Why did we add this? - // void(wait(WaitForTimestamp { - // chain_id: counterparty_chain_id.clone(), - // timestamp: (genesis.genesis_time - // + (light_client_update.signature_slot * spec.seconds_per_slot)) - // .try_into() - // .unwrap(), - // })), - data(Data::plugin( - plugin, - Header { - header: UnboundedHeader { - consensus_update: light_client_update, - trusted_sync_committee: UnboundedTrustedSyncCommittee { - trusted_height: Height { - revision_number: ETHEREUM_REVISION_NUMBER, - revision_height: currently_trusted_slot, - }, - sync_committee: if is_next { - UnboundedActiveSyncCommittee::Next( - previous_period_light_client_update - .next_sync_committee - .unwrap(), - ) - } else { - UnboundedActiveSyncCommittee::Current( - previous_period_light_client_update - .next_sync_committee - .unwrap(), - ) - }, - }, - account_update, - }, - }, - )) - // ]) - } -} - -// REVIEW: Does this function exist anywhere else? -fn sync_committee_period(height: u64, period: u64) -> u64 { - height.div(period) -} - -/// Aggregates all [`Header`] datas into an [`OrderedHeaders`] data. -#[queue_msg] -pub struct AggregateHeaders { - pub counterparty_chain_id: ChainId<'static>, -} - -impl AggregateHeaders { - pub async fn aggregate( - self, - beacon_api_client: &BeaconApiClient, - data: VecDeque>, - ) -> RpcResult>> { - let mut headers = data - .into_iter() - .map(PluginMessage::
::try_from_super) - .map(|d| d.expect("invalid type?").message.header) - .collect::>(); - - headers.sort_by_key(|header| header.consensus_update.attested_header.beacon.slot); - - let genesis = beacon_api_client - .genesis() - .await - .map_err(|e| { - ErrorObject::owned( - -1, - format!("error fetching beacon genesis: {}", ErrorReporter(e)), - None::<()>, - ) - })? - .data; - - let spec = beacon_api_client - .spec() - .await - .map_err(|e| { - ErrorObject::owned( - -1, - format!("error fetching beacon spec: {}", ErrorReporter(e)), - None::<()>, - ) - })? - .data; - - let last_update_signature_slot = headers - .iter() - .map(|h| h.consensus_update.signature_slot) - .max() - .expect("expected at least one update"); - - Ok(seq([ - call(WaitForTimestamp { - chain_id: self.counterparty_chain_id.clone(), - // we wait for one more block just to be sure the counterparty's block time has caught up - timestamp: i64::try_from( - (genesis.genesis_time + (last_update_signature_slot * spec.seconds_per_slot)) - + spec.seconds_per_slot, - ) - .unwrap() - * NANOS_PER_SECOND as i64, - }), - queue_msg::data(OrderedHeaders { - headers: headers - .into_iter() - .map(|header| { - ( - DecodedHeaderMeta { - height: Height { - revision_number: ETHEREUM_REVISION_NUMBER, - revision_height: header - .consensus_update - .attested_header - .beacon - .slot, - }, - }, - serde_json::to_value(header).unwrap(), - ) - }) - .collect(), - }), - ])) - } -} +pub enum ModuleCallback {} diff --git a/voyager/modules/consensus/ethereum/src/main.rs b/voyager/modules/consensus/ethereum/src/main.rs index 570c9f6dc7..44765c2628 100644 --- a/voyager/modules/consensus/ethereum/src/main.rs +++ b/voyager/modules/consensus/ethereum/src/main.rs @@ -1,27 +1,38 @@ use std::{collections::VecDeque, ops::Div}; -use beacon_api::client::BeaconApiClient; +use beacon_api::{client::BeaconApiClient, types::Spec}; use bitvec::{order::Msb0, vec::BitVec}; +use chain_utils::{ethereum::ETHEREUM_REVISION_NUMBER, BoxDynError}; use ethers::providers::{Middleware, Provider, ProviderError, Ws, WsClientError}; -use jsonrpsee::core::{async_trait, RpcResult}; -use queue_msg::{aggregation::do_callback, call, data, defer, now, promise, seq, Op}; +use futures::{stream, StreamExt}; +use jsonrpsee::{ + core::{async_trait, RpcResult}, + types::ErrorObject, +}; +use queue_msg::{call, data, defer, now, seq, Op}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tracing::{debug, info, instrument, warn}; +use tracing::{debug, info, instrument}; use unionlabs::{ + constants::metric::NANOS_PER_SECOND, ethereum::{config::PresetBaseKind, IBC_HANDLER_COMMITMENTS_SLOT}, hash::H160, ibc::{ core::client::height::Height, lightclients::ethereum::{ - self, account_proof::AccountProof, account_update::AccountUpdate, + self, + account_proof::AccountProof, + account_update::AccountUpdate, + header::UnboundedHeader, + light_client_update::UnboundedLightClientUpdate, + trusted_sync_committee::{UnboundedActiveSyncCommittee, UnboundedTrustedSyncCommittee}, }, }, + ErrorReporter, }; use voyager_message::{ - call::Call, - callback::Callback, - data::Data, + call::{Call, FetchUpdateHeaders, WaitForTimestamp}, + data::{Data, DecodedHeaderMeta, OrderedHeaders}, module::{ConsensusModuleInfo, ConsensusModuleServer, ModuleInfo, QueueInteractionsServer}, run_module_server, ChainId, ClientType, DefaultCmd, ModuleContext, ModuleServer, VoyagerMessage, @@ -32,7 +43,7 @@ use crate::{ FetchAccountUpdate, FetchBeaconGenesis, FetchBeaconSpec, FetchBootstrap, FetchFinalityUpdate, FetchLightClientUpdate, FetchLightClientUpdates, ModuleCall, }, - callback::{MakeCreateUpdates, ModuleCallback}, + callback::ModuleCallback, data::{ AccountUpdateData, BeaconGenesis, BeaconSpec, BootstrapData, FinalityUpdate, LightClientUpdates, ModuleData, @@ -86,6 +97,36 @@ impl Module { fn plugin_name(&self) -> String { plugin_name(&self.chain_id) } + + pub async fn fetch_account_update(&self, slot: u64) -> AccountUpdate { + let execution_height = self + .beacon_api_client + .execution_height(beacon_api::client::BlockId::Slot(slot)) + .await + .unwrap(); + + let account_update = self + .provider + .get_proof( + ethers::types::H160::from(self.ibc_handler_address), + vec![], + // NOTE: Proofs are from the execution layer, so we use execution height, not beacon slot. + Some(execution_height.into()), + ) + .await + .unwrap(); + + AccountUpdate { + account_proof: AccountProof { + storage_root: account_update.storage_hash.into(), + proof: account_update + .account_proof + .into_iter() + .map(|x| x.to_vec()) + .collect(), + }, + } + } } impl ModuleContext for Module { @@ -183,8 +224,6 @@ impl QueueInteractionsServer for ModuleS .data .sync_committee_size; - dbg!(format!("{scb:x}")); - assert_eq!(scb.len() as u64, sync_committee_size); scb.count_ones() * 3 < scb.len() * 2 @@ -305,18 +344,18 @@ impl QueueInteractionsServer for ModuleS async fn callback( &self, cb: ModuleCallback, - data: VecDeque>, + _data: VecDeque>, ) -> RpcResult>> { - Ok(match cb { - ModuleCallback::CreateUpdate(aggregate) => do_callback(aggregate, data), - ModuleCallback::MakeCreateUpdates(aggregate) => do_callback(aggregate, data), - ModuleCallback::MakeCreateUpdatesFromLightClientUpdates(aggregate) => { - do_callback(aggregate, data) - } - ModuleCallback::AggregateHeaders(cb) => { - cb.aggregate(&self.ctx.beacon_api_client, data).await? - } - }) + match cb { + // ModuleCallback::CreateUpdate(aggregate) => do_callback(aggregate, data), + // ModuleCallback::MakeCreateUpdates(aggregate) => do_callback(aggregate, data), + // ModuleCallback::MakeCreateUpdatesFromLightClientUpdates(aggregate) => { + // do_callback(aggregate, data) + // } + // ModuleCallback::AggregateHeaders(cb) => { + // cb.aggregate(&self.ctx.beacon_api_client, data).await? + // } + } } } @@ -433,26 +472,324 @@ impl ConsensusModuleServer for ModuleSer } #[instrument(skip_all, fields(chain_id = %self.ctx.chain_id))] - fn fetch_update_headers( + async fn fetch_update_headers( &self, update_from: Height, update_to: Height, counterparty_chain_id: ChainId<'static>, ) -> RpcResult>> { - Ok(promise( - [ - call(Call::plugin(self.ctx.plugin_name(), FetchFinalityUpdate {})), - call(Call::plugin(self.ctx.plugin_name(), FetchBeaconSpec {})), - ], - [], - Callback::plugin( - self.ctx.plugin_name(), - MakeCreateUpdates { + // Ok(promise( + // [ + // call(Call::plugin(self.ctx.plugin_name(), FetchFinalityUpdate {})), + // call(Call::plugin(self.ctx.plugin_name(), FetchBeaconSpec {})), + // ], + // [], + // Callback::plugin( + // self.ctx.plugin_name(), + // MakeCreateUpdates { + // update_from, + // update_to, + // counterparty_chain_id, + // }, + // ), + // )) + + self.ctx + .fetch_update(update_from, update_to, counterparty_chain_id) + .await + .map_err(|e| { + ErrorObject::owned( + -1, + format!("error fetching update: {}", ErrorReporter(&*e)), + None::<()>, + ) + }) + } +} + +impl Module { + /// Fetch a client update from the provided trusted height (`update_from`) to at least the desired new height (`update_to`). + /// + /// Note that this will generate updates as close to the tip of the chain as possible, as long as that height is > `update_to`. Due to the nature of ethereum finality, it is not possible to update to a *specific* height in the same way as is possible in chains with single slot finality (such as tendermint or cometbls). While it would be possible to update to a height *closer* to `update_to`, the extra complexity brought by that is unlikely to be worth the slightly smaller update generated, especially since in practice the light client will likely always be up to date with the tip of the (finalized) chain. + #[instrument( + skip_all, + fields( + chain_id = %self.chain_id, + %counterparty_chain_id, + %update_from, + %update_to + ) + )] + async fn fetch_update( + &self, + update_from: Height, + update_to: Height, + counterparty_chain_id: ChainId<'static>, + ) -> Result>, BoxDynError> { + let finality_update = self.beacon_api_client.finality_update().await.unwrap().data; + + // === FETCH VALID FINALITY UPDATE + + // TODO: This is named poorly? + let has_supermajority = { + let scb = BitVec::::try_from( + finality_update.sync_aggregate.sync_committee_bits.clone(), + ) + .unwrap(); + + let sync_committee_size = self + .beacon_api_client + .spec() + .await + .unwrap() + .data + .sync_committee_size; + + assert_eq!(scb.len() as u64, sync_committee_size); + + scb.count_ones() * 3 < scb.len() * 2 + }; + + if has_supermajority { + info!( + signature_slot = finality_update.signature_slot, + "signature supermajority not hit" + ); + + return Ok(seq([ + defer(now() + 1), + call(FetchUpdateHeaders { + chain_id: self.chain_id.clone(), + counterparty_chain_id, update_from, update_to, - counterparty_chain_id, + }), + ])); + }; + + // === FETCH LIGHT CLIENT UPDATES + + let spec = self.beacon_api_client.spec().await.unwrap().data; + + let target_period = + sync_committee_period(finality_update.attested_header.beacon.slot, spec.period()); + + let trusted_period = sync_committee_period(update_from.revision_height, spec.period()); + + info!("target period: {target_period}, trusted period: {trusted_period}"); + + assert!( + trusted_period <= target_period, + "trusted period {trusted_period} is behind target \ + period {target_period}, something is wrong!", + ); + + // Eth chain is more than 1 signature period ahead of us. We need to do sync committee + // updates until we reach the `target_period - 1`. + + // let target_period = sync_committee_period(finality_update.signature_slot, spec.period()); + + let light_client_updates = self + .beacon_api_client + .light_client_updates(trusted_period + 1, target_period - trusted_period) + .await + .unwrap() + .0 + .into_iter() + .map(|x| x.data) + .collect::>(); + + info!( + "fetched {} light client updates", + light_client_updates.len() + ); + + let (updates, last_update_block_number) = stream::iter(light_client_updates) + .fold((VecDeque::new(), update_from.revision_height), { + |(mut vec, mut trusted_slot), update| { + let self_ = self.clone(); + let spec = spec.clone(); + + async move { + let old_trusted_slot = trusted_slot; + + // REVIEW: Assert that this is greater (i.e. increasing)? + trusted_slot = update.attested_header.beacon.slot; + + vec.push_back( + self_ + .make_header(old_trusted_slot, update, true, &spec) + .await, + ); + + (vec, trusted_slot) + } + } + }) + .await; + + let lc_updates = if trusted_period < target_period { + updates + } else { + [].into() + }; + + let does_not_have_finality_update = last_update_block_number >= update_to.revision_height; + + debug!(last_update_block_number, update_to.revision_height); + + let finality_update_msg = if does_not_have_finality_update { + info!("does not have finality update"); + // do nothing + None + } else { + info!("has finality update"); + // do finality update + Some( + self.make_header( + last_update_block_number, + UnboundedLightClientUpdate { + attested_header: finality_update.attested_header, + next_sync_committee: None, + next_sync_committee_branch: None, + finalized_header: finality_update.finalized_header, + finality_branch: finality_update.finality_branch, + sync_aggregate: finality_update.sync_aggregate, + signature_slot: finality_update.signature_slot, + }, + false, + &spec, + ) + .await, + ) + }; + + let headers = lc_updates + .into_iter() + .chain(finality_update_msg) + .collect::>(); + + // header.sort_by_key(|header| header.consensus_update.attested_header.beacon.slot); + + let genesis = self + .beacon_api_client + .genesis() + .await + .map_err(|e| { + ErrorObject::owned( + -1, + format!("error fetching beacon genesis: {}", ErrorReporter(e)), + None::<()>, + ) + })? + .data; + + let last_update_signature_slot = headers + .iter() + .map(|h| h.consensus_update.signature_slot) + .max() + .expect("expected at least one update"); + + Ok(seq([ + call(WaitForTimestamp { + chain_id: counterparty_chain_id.clone(), + // we wait for one more block just to be sure the counterparty's block time has caught up + timestamp: i64::try_from( + (genesis.genesis_time + (last_update_signature_slot * spec.seconds_per_slot)) + + spec.seconds_per_slot, + ) + .unwrap() + * NANOS_PER_SECOND as i64, + }), + queue_msg::data(OrderedHeaders { + headers: headers + .into_iter() + .map(|header| { + ( + DecodedHeaderMeta { + height: Height { + revision_number: ETHEREUM_REVISION_NUMBER, + revision_height: header + .consensus_update + .attested_header + .beacon + .slot, + }, + }, + serde_json::to_value(header).unwrap(), + ) + }) + .collect(), + }), + ])) + } + + #[instrument( + skip_all, + fields( + chain_id = %self.chain_id, + %currently_trusted_slot, + signature_slot = %light_client_update.signature_slot, + %is_next, + ) + )] + async fn make_header( + &self, + currently_trusted_slot: u64, + light_client_update: UnboundedLightClientUpdate, + is_next: bool, + spec: &Spec, + ) -> UnboundedHeader { + // When we fetch the update at this height, the `next_sync_committee` will + // be the current sync committee of the period that we want to update to. + let previous_period = u64::max( + 1, + light_client_update.attested_header.beacon.slot / spec.period(), + ) - 1; + + let account_update = self + .fetch_account_update(light_client_update.attested_header.beacon.slot) + .await; + + let previous_period_light_client_update = self + .beacon_api_client + .light_client_updates(previous_period, 1) + .await + .unwrap() + .0 + .into_iter() + .map(|x| x.data) + .collect::>() + .pop() + .unwrap(); + + UnboundedHeader { + consensus_update: light_client_update, + trusted_sync_committee: UnboundedTrustedSyncCommittee { + trusted_height: Height { + revision_number: ETHEREUM_REVISION_NUMBER, + revision_height: currently_trusted_slot, }, - ), - )) + sync_committee: if is_next { + UnboundedActiveSyncCommittee::Next( + previous_period_light_client_update + .next_sync_committee + .unwrap(), + ) + } else { + UnboundedActiveSyncCommittee::Current( + previous_period_light_client_update + .next_sync_committee + .unwrap(), + ) + }, + }, + account_update, + } } } + +// REVIEW: Does this function exist anywhere else? +fn sync_committee_period(height: u64, period: u64) -> u64 { + height.div(period) +} diff --git a/voyager/plugins/packet-filter/src/main.rs b/voyager/plugins/packet-filter/src/main.rs index d9592ba54d..76912c024f 100644 --- a/voyager/plugins/packet-filter/src/main.rs +++ b/voyager/plugins/packet-filter/src/main.rs @@ -5,7 +5,7 @@ use queue_msg::{optimize::OptimizationResult, BoxDynError, Op}; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; -use tracing::{instrument, warn}; +use tracing::{instrument, trace, warn}; use voyager_message::{ data::Data, module::{ModuleInfo, PluginModuleInfo, PluginModuleServer, QueueInteractionsServer}, @@ -320,9 +320,9 @@ impl PluginModuleServer for ModuleServer #[instrument] async fn run_pass( &self, - _msgs: Vec>>, + msgs: Vec>>, ) -> RpcResult>> { - warn!("dropping messages"); + trace!("dropping {} messages", msgs.len()); Ok(OptimizationResult::default()) } diff --git a/voyager/plugins/transaction-batch/src/callback.rs b/voyager/plugins/transaction-batch/src/callback.rs index 8a231849bc..96a6272d30 100644 --- a/voyager/plugins/transaction-batch/src/callback.rs +++ b/voyager/plugins/transaction-batch/src/callback.rs @@ -8,7 +8,7 @@ use queue_msg::{ call, conc, data, noop, promise, queue_msg, seq, Op, }; use tracing::warn; -use unionlabs::{id::ClientId, QueryHeight}; +use unionlabs::{ibc::core::client::height::Height, id::ClientId, QueryHeight}; use voyager_message::{ call::{ MakeMsgAcknowledgement, MakeMsgChannelOpenAck, MakeMsgChannelOpenConfirm, @@ -17,6 +17,7 @@ use voyager_message::{ }, callback::Callback, data::{Data, IbcMessage, OrderedMsgUpdateClients, WithChainId}, + module::ClientStateMeta, rpc::{json_rpc_error_to_rpc_error, VoyagerRpcClient}, ChainId, ModuleServer, VoyagerMessage, }; @@ -73,93 +74,111 @@ impl MakeIbcMessagesFromUpdate { .0 .height; - Ok(conc(self.batches.into_iter().enumerate().map( - |(i, batch)| { - promise( - batch.into_iter().map(|batchable_event| { - assert!(batchable_event.provable_height <= new_trusted_height); + make_msgs( + module_server, + self.client_id, + self.batches, + Some(updates), + client_meta, + new_trusted_height, + ) + } +} + +pub fn make_msgs( + module_server: &ModuleServer, + + client_id: ClientId, + batches: Vec>, + + updates: Option, - let origin_chain_id = client_meta.chain_id.clone(); - let target_chain_id = module_server.ctx.chain_id.clone(); + client_meta: ClientStateMeta, + new_trusted_height: Height, +) -> RpcResult>> { + Ok(conc(batches.into_iter().enumerate().map(|(i, batch)| { + promise( + batch.into_iter().map(|batchable_event| { + assert!(batchable_event.provable_height <= new_trusted_height); - // in this context, we are the destination - the counterparty of the source is the destination - match batchable_event.event { - Event::ConnectionOpenInit(connection_open_init_event) => { - call(MakeMsgConnectionOpenTry { - origin_chain_id, - origin_chain_proof_height: new_trusted_height, - target_chain_id, - connection_open_init_event, - }) - } - Event::ConnectionOpenTry(connection_open_try_event) => { - call(MakeMsgConnectionOpenAck { - origin_chain_id, - origin_chain_proof_height: new_trusted_height, - target_chain_id, - connection_open_try_event, - }) - } - Event::ConnectionOpenAck(connection_open_ack_event) => { - call(MakeMsgConnectionOpenConfirm { - origin_chain_id, - origin_chain_proof_height: new_trusted_height, - target_chain_id, - connection_open_ack_event, - }) - } - Event::ChannelOpenInit(channel_open_init_event) => { - call(MakeMsgChannelOpenTry { - origin_chain_id, - origin_chain_proof_height: new_trusted_height, - target_chain_id, - channel_open_init_event, - }) - } - Event::ChannelOpenTry(channel_open_try_event) => { - call(MakeMsgChannelOpenAck { - origin_chain_id, - origin_chain_proof_height: new_trusted_height, - target_chain_id, - channel_open_try_event, - }) - } - Event::ChannelOpenAck(channel_open_ack_event) => { - call(MakeMsgChannelOpenConfirm { - origin_chain_id, - origin_chain_proof_height: new_trusted_height, - target_chain_id, - channel_open_ack_event, - }) - } - Event::SendPacket(send_packet_event) => call(MakeMsgRecvPacket { - origin_chain_id, - origin_chain_proof_height: new_trusted_height, - target_chain_id, - send_packet_event, - }), - Event::WriteAcknowledgement(write_acknowledgement_event) => { - call(MakeMsgAcknowledgement { - origin_chain_id, - origin_chain_proof_height: new_trusted_height, - target_chain_id, - write_acknowledgement_event, - }) - } - } + let origin_chain_id = client_meta.chain_id.clone(); + let target_chain_id = module_server.ctx.chain_id.clone(); + + // in this context, we are the destination - the counterparty of the source is the destination + match batchable_event.event { + Event::ConnectionOpenInit(connection_open_init_event) => { + call(MakeMsgConnectionOpenTry { + origin_chain_id, + origin_chain_proof_height: new_trusted_height, + target_chain_id, + connection_open_init_event, + }) + } + Event::ConnectionOpenTry(connection_open_try_event) => { + call(MakeMsgConnectionOpenAck { + origin_chain_id, + origin_chain_proof_height: new_trusted_height, + target_chain_id, + connection_open_try_event, + }) + } + Event::ConnectionOpenAck(connection_open_ack_event) => { + call(MakeMsgConnectionOpenConfirm { + origin_chain_id, + origin_chain_proof_height: new_trusted_height, + target_chain_id, + connection_open_ack_event, + }) + } + Event::ChannelOpenInit(channel_open_init_event) => { + call(MakeMsgChannelOpenTry { + origin_chain_id, + origin_chain_proof_height: new_trusted_height, + target_chain_id, + channel_open_init_event, + }) + } + Event::ChannelOpenTry(channel_open_try_event) => call(MakeMsgChannelOpenAck { + origin_chain_id, + origin_chain_proof_height: new_trusted_height, + target_chain_id, + channel_open_try_event, }), - [], - Callback::plugin( - module_server.ctx.plugin_name(), - MakeBatchTransaction { - client_id: self.client_id.clone(), - updates: (i == 0).then(|| updates.clone()), - }, - ), - ) - }, - ))) - } + Event::ChannelOpenAck(channel_open_ack_event) => { + call(MakeMsgChannelOpenConfirm { + origin_chain_id, + origin_chain_proof_height: new_trusted_height, + target_chain_id, + channel_open_ack_event, + }) + } + Event::SendPacket(send_packet_event) => call(MakeMsgRecvPacket { + origin_chain_id, + origin_chain_proof_height: new_trusted_height, + target_chain_id, + send_packet_event, + }), + Event::WriteAcknowledgement(write_acknowledgement_event) => { + call(MakeMsgAcknowledgement { + origin_chain_id, + origin_chain_proof_height: new_trusted_height, + target_chain_id, + write_acknowledgement_event, + }) + } + } + }), + [], + Callback::plugin( + module_server.ctx.plugin_name(), + MakeBatchTransaction { + client_id: client_id.clone(), + // if updates are provided and this is the first batch using this update height, provide the updates along with the messages + updates: (i == 0).then(|| updates.clone()).flatten(), + }, + ), + ) + }))) } #[queue_msg] diff --git a/voyager/plugins/transaction-batch/src/main.rs b/voyager/plugins/transaction-batch/src/main.rs index 00a368a8cb..d37463eee6 100644 --- a/voyager/plugins/transaction-batch/src/main.rs +++ b/voyager/plugins/transaction-batch/src/main.rs @@ -17,7 +17,7 @@ use queue_msg::{ }; use serde::{Deserialize, Serialize}; use serde_json::json; -use tracing::{debug, error, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use unionlabs::{id::ClientId, QueryHeight}; use voyager_message::{ call::{Call, FetchUpdateHeaders, WaitForHeight}, @@ -31,7 +31,7 @@ use voyager_message::{ use crate::{ call::{MakeTransactionBatchesWithUpdate, ModuleCall}, - callback::{MakeIbcMessagesFromUpdate, ModuleCallback}, + callback::{make_msgs, MakeIbcMessagesFromUpdate, ModuleCallback}, data::{BatchableEvent, EventBatch, ModuleData}, }; @@ -260,34 +260,51 @@ impl QueueInteractionsServer for ModuleS )); } - Ok(promise( - [promise( - [call(FetchUpdateHeaders { - counterparty_chain_id: self.ctx.chain_id.clone(), - chain_id: client_meta.chain_id, - update_from: client_meta.height, - update_to: latest_height, - })], + if client_meta.height >= target_height { + info!( + "client {client_id} has already been updated to a height \ + >= the desired target height ({} >= {target_height})", + client_meta.height, + ); + + make_msgs( + self, + client_id, + batches, + None, + client_meta.clone(), + client_meta.height, + ) + } else { + Ok(promise( + [promise( + [call(FetchUpdateHeaders { + counterparty_chain_id: self.ctx.chain_id.clone(), + chain_id: client_meta.chain_id, + update_from: client_meta.height, + update_to: latest_height, + })], + [], + AggregateMsgUpdateClientsFromOrderedHeaders { + chain_id: self.ctx.chain_id.clone(), + counterparty_client_id: client_id.clone(), + }, + )], [], - AggregateMsgUpdateClientsFromOrderedHeaders { - chain_id: self.ctx.chain_id.clone(), - counterparty_client_id: client_id.clone(), - }, - )], - [], - Callback::plugin( - self.ctx.plugin_name(), - MakeIbcMessagesFromUpdate { - client_id: client_id.clone(), - batches, - }, - ), - )) + Callback::plugin( + self.ctx.plugin_name(), + MakeIbcMessagesFromUpdate { + client_id: client_id.clone(), + batches, + }, + ), + )) + } } } } - #[instrument] + #[instrument(skip_all, fields(chain_id = %self.ctx.chain_id))] async fn callback( &self, cb: ModuleCallback, diff --git a/voyager/src/queue.rs b/voyager/src/queue.rs index 8037b583d8..8277a08d9a 100644 --- a/voyager/src/queue.rs +++ b/voyager/src/queue.rs @@ -424,11 +424,6 @@ impl Voyager { for (plugin_name, filter) in self.context.interest_filters() { info!(%plugin_name, "spawning optimizer"); - // let client = self - // .context - // .plugin_client_raw::(&plugin_name) - // .unwrap(); - tasks.push(Box::pin( AssertUnwindSafe( async { @@ -470,7 +465,8 @@ impl Voyager { .await; } } - .instrument(info_span!("optimize", %plugin_name)), + .instrument(info_span!("optimize", %plugin_name)) + .instrument(trace_span!("optimize_verbose", %filter)), ) .catch_unwind(), ));