From 978654de0299aab1b125c0c686c7fb95ef800514 Mon Sep 17 00:00:00 2001 From: Mattie Conover Date: Mon, 13 Jun 2022 18:13:18 -0400 Subject: [PATCH 1/6] Allow Raw messages to be parsed without cloning --- rust/abacus-core/src/traits/message.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/rust/abacus-core/src/traits/message.rs b/rust/abacus-core/src/traits/message.rs index fddd0bc0ce..7b994732d8 100644 --- a/rust/abacus-core/src/traits/message.rs +++ b/rust/abacus-core/src/traits/message.rs @@ -89,9 +89,17 @@ impl TryFrom for CommittedMessage { type Error = AbacusError; fn try_from(raw: RawCommittedMessage) -> Result { + (&raw).try_into() + } +} + +impl TryFrom<&RawCommittedMessage> for CommittedMessage { + type Error = AbacusError; + + fn try_from(raw: &RawCommittedMessage) -> Result { Ok(Self { leaf_index: raw.leaf_index, - message: AbacusMessage::read_from(&mut &raw.message[..])?, + message: AbacusMessage::read_from(&mut raw.message.as_slice())?, }) } } From 566f92d50f61f327a88973e82438a7846a23725f Mon Sep 17 00:00:00 2001 From: Mattie Conover Date: Tue, 14 Jun 2022 15:17:14 -0400 Subject: [PATCH 2/6] Add hardcoded mapping between domain and chain --- rust/abacus-core/src/chain.rs | 42 +++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/rust/abacus-core/src/chain.rs b/rust/abacus-core/src/chain.rs index 0fc0dc256c..63c9837e0e 100644 --- a/rust/abacus-core/src/chain.rs +++ b/rust/abacus-core/src/chain.rs @@ -48,3 +48,45 @@ impl From<&'_ Address> for ethers::types::H160 { ethers::types::H160::from_slice(addr.0.as_ref()) } } + +/// Quick single-use macro to prevent typing domain and chain twice and risking inconsistencies. +macro_rules! domain_and_chain { + {$($domain:literal <=> $chain:literal,)*} => { + /// Get the chain name from a domain id. Returns `None` if the `domain` is unknown. + pub fn chain_from_domain(domain: u32) -> Option<&'static str> { + match domain { + $( $domain => Some($chain), )* + _ => None + } + } + + /// Get the domain id from a chain name. Expects `chain` to be a lowercase str. + /// Returns `None` if the `chain` is unknown. + pub fn domain_from_chain(chain: &str) -> Option { + match chain { + $( $chain => Some($domain), )* + _ => None + } + } + } +} + +// Copied from https://github.com/abacus-network/abacus-monorepo/blob/54a41d5a4bbb86a3b08d02d7ff6662478c41e221/typescript/sdk/src/chain-metadata.ts +domain_and_chain! { + 0x63656c6f <=> "celo", + 0x657468 <=> "ethereum", + 0x61766178 <=> "avalanche", + 0x706f6c79 <=> "polygon", + 1000 <=> "alfajores", + 43113 <=> "fuji", + 5 <=> "goerli", + 3000 <=> "kovan", + 80001 <=> "mumbai", + 13371 <=> "test1", + 13372 <=> "test2", + 13373 <=> "test3", + 0x62732d74 <=> "bsctestnet", + 0x61722d72 <=> "arbitrumrinkeby", + 0x6f702d6b <=> "optimismkovan", + 0x61752d74 <=> "auroratestnet", +} From 5ffbc9986bb7446b5bd07f44a66c9af9a65c1611 Mon Sep 17 00:00:00 2001 From: Mattie Conover Date: Tue, 14 Jun 2022 15:33:11 -0400 Subject: [PATCH 3/6] Remove extra clone --- rust/abacus-base/src/contract_sync/interchain_gas.rs | 2 -- rust/abacus-core/src/db/abacus_db.rs | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/abacus-base/src/contract_sync/interchain_gas.rs b/rust/abacus-base/src/contract_sync/interchain_gas.rs index cedee046e9..c08927dfed 100644 --- a/rust/abacus-base/src/contract_sync/interchain_gas.rs +++ b/rust/abacus-base/src/contract_sync/interchain_gas.rs @@ -24,13 +24,11 @@ where let indexed_height = self .metrics .indexed_height - .clone() .with_label_values(&[GAS_PAYMENTS_LABEL, &self.chain_name]); let stored_messages = self .metrics .stored_events - .clone() .with_label_values(&[GAS_PAYMENTS_LABEL, &self.chain_name]); let config_from = self.index_settings.from(); diff --git a/rust/abacus-core/src/db/abacus_db.rs b/rust/abacus-core/src/db/abacus_db.rs index a09fb1de29..e8594f39cb 100644 --- a/rust/abacus-core/src/db/abacus_db.rs +++ b/rust/abacus-core/src/db/abacus_db.rs @@ -61,7 +61,7 @@ impl AbacusDB { for message in messages { self.store_latest_message(message)?; - let committed_message: CommittedMessage = message.clone().try_into()?; + let committed_message: CommittedMessage = message.try_into()?; info!( leaf_index = &committed_message.leaf_index, origin = &committed_message.message.origin, From 02f99f812cbd11dfb6e42431f24f8574916c4c8f Mon Sep 17 00:00:00 2001 From: Mattie Conover Date: Tue, 14 Jun 2022 15:34:10 -0400 Subject: [PATCH 4/6] Report dst chain for `dispatch` phase of `last_known_message_leaf_index` --- rust/abacus-base/src/contract_sync/outbox.rs | 41 +++++++++----------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/rust/abacus-base/src/contract_sync/outbox.rs b/rust/abacus-base/src/contract_sync/outbox.rs index 74e4a53b2e..c07256c263 100644 --- a/rust/abacus-base/src/contract_sync/outbox.rs +++ b/rust/abacus-base/src/contract_sync/outbox.rs @@ -1,11 +1,11 @@ -use abacus_core::{ListValidity, OutboxIndexer}; +use std::cmp::min; +use std::time::Duration; use tokio::time::sleep; use tracing::{info, info_span, warn}; use tracing::{instrument::Instrumented, Instrument}; -use std::cmp::min; -use std::time::Duration; +use abacus_core::{chain_from_domain, CommittedMessage, ListValidity, OutboxIndexer}; use crate::{ contract_sync::{last_message::OptLatestLeafIndex, schema::OutboxContractSyncDB}, @@ -27,26 +27,20 @@ where let indexed_height = self .metrics .indexed_height - .clone() .with_label_values(&[MESSAGES_LABEL, &self.chain_name]); let stored_messages = self .metrics .stored_events - .clone() .with_label_values(&[MESSAGES_LABEL, &self.chain_name]); let missed_messages = self .metrics .missed_events - .clone() .with_label_values(&[MESSAGES_LABEL, &self.chain_name]); - let message_leaf_index = self.metrics.message_leaf_index.clone().with_label_values(&[ - "dispatch", - &self.chain_name, - "unknown", - ]); + let message_leaf_index = self.metrics.message_leaf_index.clone(); + let chain_name = self.chain_name.clone(); let config_from = self.index_settings.from(); let chunk_size = self.index_settings.chunk_size(); @@ -63,11 +57,6 @@ where info!(from = from, "[Messages]: resuming indexer from {from}"); - // Set the metrics with the latest known leaf index - if let Ok(Some(idx)) = db.retrieve_latest_leaf_index() { - message_leaf_index.set(idx as i64); - } - loop { indexed_height.set(from as i64); @@ -128,8 +117,16 @@ where // Report amount of messages stored into db stored_messages.add(sorted_messages.len().try_into()?); - // Report latest leaf index to gauge - message_leaf_index.set(max_leaf_index_of_batch as i64); + // Report latest leaf index to gauge by dst + for raw_msg in sorted_messages.iter() { + let dst = CommittedMessage::try_from(raw_msg) + .ok() + .and_then(|msg| chain_from_domain(msg.message.destination)) + .unwrap_or("unknown"); + message_leaf_index + .with_label_values(&["dispatch", &chain_name, dst]) + .set(max_leaf_index_of_batch as i64); + } // Move forward next height db.store_message_latest_block_end(to)?; @@ -166,20 +163,20 @@ where #[cfg(test)] mod test { - use abacus_test::mocks::indexer::MockAbacusIndexer; - use mockall::*; - use std::sync::Arc; use ethers::core::types::H256; + use mockall::*; use abacus_core::{db::AbacusDB, AbacusMessage, Encode, RawCommittedMessage}; + use abacus_test::mocks::indexer::MockAbacusIndexer; use abacus_test::test_utils; - use super::*; use crate::ContractSync; use crate::{settings::IndexSettings, ContractSyncMetrics, CoreMetrics}; + use super::*; + #[tokio::test] async fn handles_missing_rpc_messages() { test_utils::run_test_db(|db| async move { From a78790b279351de543c7d9d2ee0315b3cd545f8f Mon Sep 17 00:00:00 2001 From: Mattie Conover Date: Wed, 15 Jun 2022 14:06:25 -0400 Subject: [PATCH 5/6] Update state call for outbox --- rust/abacus-base/src/outbox.rs | 6 +-- rust/abacus-core/src/traits/common.rs | 47 ++++++++++++++++++++--- rust/abacus-core/src/traits/outbox.rs | 4 +- rust/abacus-test/src/mocks/outbox.rs | 2 +- rust/chains/abacus-ethereum/src/inbox.rs | 6 +-- rust/chains/abacus-ethereum/src/outbox.rs | 10 ++--- 6 files changed, 51 insertions(+), 24 deletions(-) diff --git a/rust/abacus-base/src/outbox.rs b/rust/abacus-base/src/outbox.rs index 6dbaf84dd8..befffc7506 100644 --- a/rust/abacus-base/src/outbox.rs +++ b/rust/abacus-base/src/outbox.rs @@ -1,7 +1,7 @@ use abacus_core::db::AbacusDB; use abacus_core::{ AbacusCommon, AbacusContract, ChainCommunicationError, Checkpoint, Message, Outbox, - OutboxEvents, RawCommittedMessage, State, TxOutcome, + OutboxEvents, RawCommittedMessage, OutboxState, TxOutcome, }; use abacus_ethereum::EthereumOutbox; @@ -89,7 +89,7 @@ impl Outbox for CachingOutbox { self.outbox.dispatch(message).await } - async fn state(&self) -> Result { + async fn state(&self) -> Result { self.outbox.state().await } @@ -246,7 +246,7 @@ impl Outbox for OutboxVariants { } } - async fn state(&self) -> Result { + async fn state(&self) -> Result { match self { OutboxVariants::Ethereum(outbox) => outbox.state().await, OutboxVariants::Mock(mock_outbox) => mock_outbox.state().await, diff --git a/rust/abacus-core/src/traits/common.rs b/rust/abacus-core/src/traits/common.rs index a0755e69ab..7db33c0447 100644 --- a/rust/abacus-core/src/traits/common.rs +++ b/rust/abacus-core/src/traits/common.rs @@ -1,17 +1,52 @@ +use eyre::bail; + /// Contract states -#[derive(Debug)] -pub enum State { - /// Contract is active - Waiting, - /// Contract has failed - Failed, +#[repr(u8)] +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum OutboxState { + /// Before initialize function is called. + /// Note: the contract is initialized at deploy time, so it should never be in this state + UnInitialized = 0, + /// As long as the contract has not become fraudulent. + Active = 1, + /// After a valid fraud proof has been submitted; contract will no longer accept updates or new + /// messages + Failed = 2, +} + +impl TryFrom for OutboxState { + type Error = eyre::Report; + + fn try_from(value: u8) -> Result { + use OutboxState::*; + Ok(match value { + 0 => UnInitialized, + 1 => Active, + 2 => Failed, + _ => bail!("Invalid state value"), + }) + } } /// The status of a message in the inbox #[repr(u8)] +#[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum MessageStatus { /// Message is unknown None = 0, /// Message has been processed Processed = 1, } + +impl TryFrom for MessageStatus { + type Error = eyre::Report; + + fn try_from(value: u8) -> Result { + use MessageStatus::*; + Ok(match value { + 0 => None, + 1 => Processed, + _ => bail!("Invalid message status value"), + }) + } +} diff --git a/rust/abacus-core/src/traits/outbox.rs b/rust/abacus-core/src/traits/outbox.rs index c41078b18b..253d9e6e13 100644 --- a/rust/abacus-core/src/traits/outbox.rs +++ b/rust/abacus-core/src/traits/outbox.rs @@ -3,7 +3,7 @@ use std::fmt::Debug; use crate::{ traits::{ChainCommunicationError, TxOutcome}, - AbacusCommon, Checkpoint, CommittedMessage, Message, RawCommittedMessage, State, + AbacusCommon, Checkpoint, CommittedMessage, Message, RawCommittedMessage, OutboxState, }; use async_trait::async_trait; use ethers::core::types::H256; @@ -14,7 +14,7 @@ use eyre::Result; #[async_trait] pub trait Outbox: AbacusCommon + Send + Sync + Debug { /// Fetch the current state. - async fn state(&self) -> Result; + async fn state(&self) -> Result; /// Gets the current leaf count of the merkle tree async fn count(&self) -> Result; diff --git a/rust/abacus-test/src/mocks/outbox.rs b/rust/abacus-test/src/mocks/outbox.rs index f1422fe470..8adf686a05 100644 --- a/rust/abacus-test/src/mocks/outbox.rs +++ b/rust/abacus-test/src/mocks/outbox.rs @@ -60,7 +60,7 @@ impl Outbox for MockOutboxContract { self._dispatch(message) } - async fn state(&self) -> Result { + async fn state(&self) -> Result { self._state() } diff --git a/rust/chains/abacus-ethereum/src/inbox.rs b/rust/chains/abacus-ethereum/src/inbox.rs index e464096b86..010d685f5a 100644 --- a/rust/chains/abacus-ethereum/src/inbox.rs +++ b/rust/chains/abacus-ethereum/src/inbox.rs @@ -128,10 +128,6 @@ where #[tracing::instrument(err)] async fn message_status(&self, leaf: H256) -> Result { let status = self.contract.messages(leaf.into()).call().await?; - match status { - 0 => Ok(MessageStatus::None), - 1 => Ok(MessageStatus::Processed), - _ => panic!("Bad status from solidity"), - } + Ok(MessageStatus::try_from(status).expect("Bad status from solidity")) } } diff --git a/rust/chains/abacus-ethereum/src/outbox.rs b/rust/chains/abacus-ethereum/src/outbox.rs index d92d10d8e8..e923d20f39 100644 --- a/rust/chains/abacus-ethereum/src/outbox.rs +++ b/rust/chains/abacus-ethereum/src/outbox.rs @@ -11,7 +11,7 @@ use tracing::instrument; use abacus_core::{ AbacusCommon, AbacusContract, ChainCommunicationError, Checkpoint, CheckpointMeta, CheckpointWithMeta, ContractLocator, Indexer, Message, Outbox, OutboxIndexer, - RawCommittedMessage, State, TxOutcome, + RawCommittedMessage, OutboxState, TxOutcome, }; use crate::trait_builder::MakeableWithProvider; @@ -280,13 +280,9 @@ where } #[tracing::instrument(err, skip(self))] - async fn state(&self) -> Result { + async fn state(&self) -> Result { let state = self.contract.state().call().await?; - match state { - 0 => Ok(State::Waiting), - 1 => Ok(State::Failed), - _ => unreachable!(), - } + Ok(OutboxState::try_from(state).expect("Invalid state received from contract")) } #[tracing::instrument(err, skip(self))] From cf6df0d535ecc9d3198406aefb87323e8f57c2a8 Mon Sep 17 00:00:00 2001 From: Mattie Conover Date: Wed, 15 Jun 2022 14:58:35 -0400 Subject: [PATCH 6/6] Add outbox state metric --- rust/abacus-base/src/metrics/core.rs | 23 ++++- rust/abacus-base/src/outbox.rs | 2 +- rust/abacus-core/src/traits/outbox.rs | 2 +- rust/abacus-test/src/mocks/outbox.rs | 2 +- rust/agents/relayer/src/message_processor.rs | 90 ++++++++++++++------ rust/agents/relayer/src/relayer.rs | 14 ++- rust/chains/abacus-ethereum/src/outbox.rs | 4 +- 7 files changed, 99 insertions(+), 38 deletions(-) diff --git a/rust/abacus-base/src/metrics/core.rs b/rust/abacus-base/src/metrics/core.rs index bd79b2b214..9971c4ee6b 100644 --- a/rust/abacus-base/src/metrics/core.rs +++ b/rust/abacus-base/src/metrics/core.rs @@ -3,8 +3,6 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::Duration; -use crate::metrics::provider::create_provider_metrics; -use ethers_prometheus::ProviderMetrics; use eyre::Result; use once_cell::sync::OnceCell; use prometheus::{ @@ -15,6 +13,10 @@ use prometheus::{ }; use tokio::task::JoinHandle; +use ethers_prometheus::ProviderMetrics; + +use crate::metrics::provider::create_provider_metrics; + use super::NAMESPACE; /// Recommended default histogram buckets for network communication. @@ -43,6 +45,7 @@ pub struct CoreMetrics { span_events: IntCounterVec, last_known_message_leaf_index: IntGaugeVec, retry_queue_length: IntGaugeVec, + outbox_state: IntGaugeVec, /// Set of provider-specific metrics. These only need to get created once. provider_metrics: OnceCell, @@ -128,6 +131,16 @@ impl CoreMetrics { registry )?; + let outbox_state = register_int_gauge_vec_with_registry!( + opts!( + namespaced!("outbox_state"), + "Outbox contract state value", + const_labels_ref + ), + &["chain"], + registry + )?; + Ok(Self { agent_name: for_agent.into(), registry, @@ -139,6 +152,7 @@ impl CoreMetrics { span_events, last_known_message_leaf_index, retry_queue_length, + outbox_state, provider_metrics: OnceCell::new(), }) @@ -222,6 +236,11 @@ impl CoreMetrics { self.last_known_message_leaf_index.clone() } + /// Gauge for reporting the current outbox state. + pub fn outbox_state(&self) -> IntGaugeVec { + self.outbox_state.clone() + } + /// Gauge for measuring the retry queue length in MessageProcessor pub fn retry_queue_length(&self) -> IntGaugeVec { self.retry_queue_length.clone() diff --git a/rust/abacus-base/src/outbox.rs b/rust/abacus-base/src/outbox.rs index befffc7506..68706a8666 100644 --- a/rust/abacus-base/src/outbox.rs +++ b/rust/abacus-base/src/outbox.rs @@ -1,7 +1,7 @@ use abacus_core::db::AbacusDB; use abacus_core::{ AbacusCommon, AbacusContract, ChainCommunicationError, Checkpoint, Message, Outbox, - OutboxEvents, RawCommittedMessage, OutboxState, TxOutcome, + OutboxEvents, OutboxState, RawCommittedMessage, TxOutcome, }; use abacus_ethereum::EthereumOutbox; diff --git a/rust/abacus-core/src/traits/outbox.rs b/rust/abacus-core/src/traits/outbox.rs index 253d9e6e13..968e02d8e3 100644 --- a/rust/abacus-core/src/traits/outbox.rs +++ b/rust/abacus-core/src/traits/outbox.rs @@ -3,7 +3,7 @@ use std::fmt::Debug; use crate::{ traits::{ChainCommunicationError, TxOutcome}, - AbacusCommon, Checkpoint, CommittedMessage, Message, RawCommittedMessage, OutboxState, + AbacusCommon, Checkpoint, CommittedMessage, Message, OutboxState, RawCommittedMessage, }; use async_trait::async_trait; use ethers::core::types::H256; diff --git a/rust/abacus-test/src/mocks/outbox.rs b/rust/abacus-test/src/mocks/outbox.rs index 8adf686a05..8372cdbd1e 100644 --- a/rust/abacus-test/src/mocks/outbox.rs +++ b/rust/abacus-test/src/mocks/outbox.rs @@ -41,7 +41,7 @@ mock! { pub fn _validator_manager(&self) -> Result {} - pub fn _state(&self) -> Result {} + pub fn _state(&self) -> Result {} // AbacusContract pub fn _chain_name(&self) -> &str {} diff --git a/rust/agents/relayer/src/message_processor.rs b/rust/agents/relayer/src/message_processor.rs index eee3c113db..7cef8ae93b 100644 --- a/rust/agents/relayer/src/message_processor.rs +++ b/rust/agents/relayer/src/message_processor.rs @@ -6,16 +6,16 @@ use std::{ }; use eyre::{bail, Result}; -use prometheus::{IntGauge, IntGaugeVec}; +use prometheus::IntGauge; use tokio::{sync::watch::Receiver, task::JoinHandle, time::sleep}; use tracing::{ debug, error, info, info_span, instrument, instrument::Instrumented, warn, Instrument, }; -use abacus_base::{InboxContracts, Outboxes}; +use abacus_base::{CoreMetrics, InboxContracts, Outboxes}; use abacus_core::{ - db::AbacusDB, AbacusCommon, AbacusContract, CommittedMessage, Inbox, InboxValidatorManager, - MessageStatus, MultisigSignedCheckpoint, + db::AbacusDB, AbacusCommon, AbacusContract, ChainCommunicationError, CommittedMessage, Inbox, + InboxValidatorManager, MessageStatus, MultisigSignedCheckpoint, Outbox, OutboxState, }; use loop_control::LoopControl::{Continue, Flow}; use loop_control::{loop_ctrl, LoopControl}; @@ -24,6 +24,7 @@ use crate::merkle_tree_builder::MerkleTreeBuilder; use crate::settings::whitelist::Whitelist; pub(crate) struct MessageProcessor { + outbox: Outboxes, max_retries: u32, db: AbacusDB, inbox_contracts: InboxContracts, @@ -31,9 +32,7 @@ pub(crate) struct MessageProcessor { retry_queue: BinaryHeap, signed_checkpoint_receiver: Receiver>, whitelist: Arc, - processor_loop_gauge: IntGauge, - processed_gauge: IntGauge, - retry_queue_length_gauge: IntGauge, + metrics: MessageProcessorMetrics, } #[derive(PartialEq, Eq, PartialOrd, Ord)] @@ -61,22 +60,10 @@ impl MessageProcessor { inbox_contracts: InboxContracts, signed_checkpoint_receiver: Receiver>, whitelist: Arc, - leaf_index_gauge: IntGaugeVec, - retry_queue_length: IntGaugeVec, + metrics: MessageProcessorMetrics, ) -> Self { - let processor_loop_gauge = leaf_index_gauge.with_label_values(&[ - "processor_loop", - outbox.chain_name(), - inbox_contracts.inbox.chain_name(), - ]); - let processed_gauge = leaf_index_gauge.with_label_values(&[ - "message_processed", - outbox.chain_name(), - inbox_contracts.inbox.chain_name(), - ]); - let retry_queue_length_gauge = retry_queue_length - .with_label_values(&[outbox.chain_name(), inbox_contracts.inbox.chain_name()]); Self { + outbox, max_retries, prover_sync: MerkleTreeBuilder::new(db.clone()), db, @@ -84,9 +71,7 @@ impl MessageProcessor { retry_queue: BinaryHeap::new(), whitelist, signed_checkpoint_receiver, - processor_loop_gauge, - processed_gauge, - retry_queue_length_gauge, + metrics, } } @@ -219,12 +204,17 @@ impl MessageProcessor { let mut latest_signed_checkpoint = self.get_signed_checkpoint_blocking().await?; loop { + self.update_outbox_state_gauge(); + // Get latest signed checkpoint, non-blocking latest_signed_checkpoint = self.get_updated_latest_signed_checkpoint(latest_signed_checkpoint)?; - self.processor_loop_gauge.set(message_leaf_index as i64); - self.retry_queue_length_gauge + self.metrics + .processor_loop_gauge + .set(message_leaf_index as i64); + self.metrics + .retry_queue_length_gauge .set(self.retry_queue.len() as i64); if self @@ -254,6 +244,24 @@ impl MessageProcessor { Ok(()) } + /// Part of main loop. + /// + /// Spawn a task to update the outbox state gauge. + fn update_outbox_state_gauge( + &self, + ) -> JoinHandle> { + let outbox_state_gauge = self.metrics.outbox_state_gauge.clone(); + let outbox = self.outbox.clone(); + tokio::spawn(async move { + let state = outbox.state().await; + match &state { + Ok(state) => outbox_state_gauge.set(*state as u8 as i64), + Err(e) => warn!(error = %e, "Failed to get outbox state"), + }; + state + }) + } + /// Part of main loop /// /// - `returns` the new message leaf index. @@ -273,7 +281,7 @@ impl MessageProcessor { .await? { MessageProcessingStatus::Processed => { - self.processed_gauge.set(message_leaf_index as i64); + self.metrics.processed_gauge.set(message_leaf_index as i64); message_leaf_index + 1 } MessageProcessingStatus::NotYetCheckpointed => { @@ -382,3 +390,31 @@ impl MessageProcessor { tokio::spawn(self.main_loop()).instrument(span) } } + +pub(crate) struct MessageProcessorMetrics { + processor_loop_gauge: IntGauge, + processed_gauge: IntGauge, + retry_queue_length_gauge: IntGauge, + outbox_state_gauge: IntGauge, +} + +impl MessageProcessorMetrics { + pub fn new(metrics: &CoreMetrics, outbox_chain: &str, inbox_chain: &str) -> Self { + Self { + processor_loop_gauge: metrics.last_known_message_leaf_index().with_label_values(&[ + "processor_loop", + outbox_chain, + inbox_chain, + ]), + processed_gauge: metrics.last_known_message_leaf_index().with_label_values(&[ + "message_processed", + outbox_chain, + inbox_chain, + ]), + outbox_state_gauge: metrics.outbox_state().with_label_values(&[outbox_chain]), + retry_queue_length_gauge: metrics + .retry_queue_length() + .with_label_values(&[outbox_chain, inbox_chain]), + } + } +} diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 2cbd2f0f9f..4430c3ca50 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -12,9 +12,10 @@ use abacus_base::{ AbacusAgentCore, Agent, CachingInterchainGasPaymaster, ContractSyncMetrics, InboxContracts, MultisigCheckpointSyncer, }; -use abacus_core::MultisigSignedCheckpoint; +use abacus_core::{AbacusContract, MultisigSignedCheckpoint}; use crate::checkpoint_fetcher::CheckpointFetcher; +use crate::message_processor::MessageProcessorMetrics; use crate::settings::whitelist::Whitelist; use crate::{message_processor::MessageProcessor, settings::RelayerSettings}; @@ -113,15 +114,20 @@ impl Relayer { signed_checkpoint_receiver: Receiver>, ) -> Instrumented>> { let db = self.outbox().db(); + let outbox = self.outbox().outbox(); + let metrics = MessageProcessorMetrics::new( + &self.core.metrics, + outbox.chain_name(), + inbox_contracts.inbox.chain_name(), + ); let message_processor = MessageProcessor::new( - self.outbox().outbox(), + outbox, self.max_processing_retries, db, inbox_contracts, signed_checkpoint_receiver, self.whitelist.clone(), - self.core.metrics.last_known_message_leaf_index(), - self.core.metrics.retry_queue_length(), + metrics, ); message_processor.spawn() diff --git a/rust/chains/abacus-ethereum/src/outbox.rs b/rust/chains/abacus-ethereum/src/outbox.rs index e923d20f39..56e875ed4d 100644 --- a/rust/chains/abacus-ethereum/src/outbox.rs +++ b/rust/chains/abacus-ethereum/src/outbox.rs @@ -10,8 +10,8 @@ use tracing::instrument; use abacus_core::{ AbacusCommon, AbacusContract, ChainCommunicationError, Checkpoint, CheckpointMeta, - CheckpointWithMeta, ContractLocator, Indexer, Message, Outbox, OutboxIndexer, - RawCommittedMessage, OutboxState, TxOutcome, + CheckpointWithMeta, ContractLocator, Indexer, Message, Outbox, OutboxIndexer, OutboxState, + RawCommittedMessage, TxOutcome, }; use crate::trait_builder::MakeableWithProvider;