diff --git a/crates/madara/client/mempool/src/inner/intent.rs b/crates/madara/client/mempool/src/inner/intent.rs index a18ebfd8b..668c4cc97 100644 --- a/crates/madara/client/mempool/src/inner/intent.rs +++ b/crates/madara/client/mempool/src/inner/intent.rs @@ -9,15 +9,35 @@ //! //! Intents are categorized by readiness. A transaction intent is marked as //! [TransactionIntentReady] if its nonce directly follows that of the contract -//! sending the transaction, else it is [TransactionIntentPending]. +//! sending the transaction, else it marked as pending. //! -//! Pending intents remain pending until the transaction preceding them has been -//! polled from the [Mempool], at which point they are converted to a ready -//! intent with [TransactionIntentPending::ready]. +//! # Pending intents +//! +//! There are two types of pending intents [TransactionIntentPendingByNonce] and +//! [TransactionIntentPendingByTimestamp]. Each pending intent contains the same +//! data but with slightly different ordering rules. This is because the +//! [MempoolInner] holds two ordered queues for pending intents: +//! +//! - One is ordered by timestamp to facilitate the removal of age-exceeded +//! pending intents. +//! +//! - The other is ordered by nonces to be able to easily retrieve the +//! transaction with the next nonce for a specific contract. +//! +//! > You can convert from one pending intent type to another with [by_timestamp] +//! > and [by_nonce]. +//! +//! Both pending intents remain pending until the transaction preceding them has +//! been polled from the [Mempool], at which point [TransactionIntentPendingByNonce] +//! is converted to a ready intent with [TransactionIntentPendingByNonce::ready], +//! while any [TransactionIntentPendingByTimestamp] are removed from the queue. //! //! [MempoolTransaction]: super::MempoolTransaction //! [NonceTxMapping]: super::NonceTxMapping +//! [MempoolInner]: super::MempoolInner //! [Mempool]: super::super::Mempool +//! [by_timestamp]: TransactionIntentPendingByNonce::by_timestamp +//! [by_nonce]: TransactionIntentPendingByTimestamp::by_nonce use starknet_api::core::Nonce; use starknet_types_core::felt::Felt; @@ -32,21 +52,41 @@ use super::ArrivedAtTimestamp; pub(crate) struct MarkerReady; #[derive(Debug)] -pub(crate) struct MarkerPending; +pub(crate) struct MarkerPendingByNonce; + +#[derive(Debug)] +pub(crate) struct MarkerPendingByTimestamp; /// A [transaction intent] which is ready to be consumed. /// -/// [transaction intent]: IntentInner +/// [transaction intent]: TransactionIntent pub(crate) type TransactionIntentReady = TransactionIntent; +impl Ord for TransactionIntentReady { + fn cmp(&self, other: &Self) -> cmp::Ordering { + // Important: Fallback on contract addr here. + // There can be timestamp collisions. + self.timestamp + .cmp(&other.timestamp) + .then_with(|| self.contract_address.cmp(&other.contract_address)) + .then_with(|| self.nonce.cmp(&other.nonce)) + } +} + +impl PartialOrd for TransactionIntentReady { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// A [transaction intent] which is waiting for the [Nonce] before it to be /// consumed by the [Mempool]. It cannot be polled until then. /// /// [transaction intent]: TransactionIntent /// [Mempool]: super::super::Mempool -pub(crate) type TransactionIntentPending = TransactionIntent; +pub(crate) type TransactionIntentPendingByNonce = TransactionIntent; -impl TransactionIntentPending { +impl TransactionIntentPendingByNonce { /// Converts this [intent] to a [TransactionIntentReady] to be added to the /// ready intent queue in the [MempoolInner] /// @@ -58,23 +98,91 @@ impl TransactionIntentPending { timestamp: self.timestamp, nonce: self.nonce, nonce_next: self.nonce_next, - phantom: Default::default(), + phantom: std::marker::PhantomData, + } + } + + /// Converts this [intent] to a [TransactionIntentPendingByTimestamp] to be + /// used to remove aged pending transactions from the [MempoolInner]. + /// + /// [intent]: self + /// [MempoolInner]: super::MempoolInner + pub(crate) fn by_timestamp(&self) -> TransactionIntentPendingByTimestamp { + TransactionIntentPendingByTimestamp { + contract_address: self.contract_address, + timestamp: self.timestamp, + nonce: self.nonce, + nonce_next: self.nonce_next, + phantom: std::marker::PhantomData, + } + } +} + +impl Ord for TransactionIntentPendingByNonce { + fn cmp(&self, other: &Self) -> cmp::Ordering { + // Pending transactions are simply ordered by nonce + self.nonce + .cmp(&other.nonce) + .then_with(|| self.timestamp.cmp(&other.timestamp)) + .then_with(|| self.contract_address.cmp(&other.contract_address)) + } +} + +impl PartialOrd for TransactionIntentPendingByNonce { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// A [pending transaction intent] which is ordered by timestamp. This is +/// necessary to be able to remove pending transactions which have grown too old +/// in the [Mempool]. +/// +/// [pending transaction intent]: TransactionIntentPendingByNonce +/// [Mempool]: super::super::Mempool +pub(crate) type TransactionIntentPendingByTimestamp = TransactionIntent; + +impl TransactionIntentPendingByTimestamp { + pub(crate) fn by_nonce(self) -> TransactionIntentPendingByNonce { + TransactionIntentPendingByNonce { + contract_address: self.contract_address, + timestamp: self.timestamp, + nonce: self.nonce, + nonce_next: self.nonce_next, + phantom: PhantomData, } } } +impl Ord for TransactionIntentPendingByTimestamp { + fn cmp(&self, other: &Self) -> cmp::Ordering { + // Important: Fallback on contract addr here. + // There can be timestamp collisions. + self.timestamp + .cmp(&other.timestamp) + .then_with(|| self.contract_address.cmp(&other.contract_address)) + .then_with(|| self.nonce.cmp(&other.nonce)) + } +} + +impl PartialOrd for TransactionIntentPendingByTimestamp { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// An [intent] to be consumed by the [Mempool]. /// -/// This struct has the same logic applied for its implementations of [Eq] and -/// [Ord] and will check [timestamp], [contract_address] and [nonce] (in that -/// oreder) for both. [nonce_next] is not considered as it should directly -/// follow from [nonce] and therefore its equality and order is implied. +/// This data struct will check [timestamp], [contract_address] and [nonce] +/// (in that oreder) for equality. [nonce_next] is not considered as it should +/// directly follow from [nonce] and therefore its equality and order is implied. /// /// # Type Safety /// -/// This struct is statically wrapped by [TransactionIntentReady] and -/// [TransactionIntentPending] to provide type safety between intent types while -/// avoiding too much code duplication. +/// This struct is statically wrapped by [TransactionIntentReady], +/// [TransactionIntentPendingByNonce] and [TransactionIntentPendingByTimestamp] +/// to provide type safety between intent types while avoiding too much code +/// duplication. /// /// # [Invariants] /// @@ -119,20 +227,3 @@ impl PartialEq for TransactionIntent { } impl Eq for TransactionIntent {} - -impl Ord for TransactionIntent { - fn cmp(&self, other: &Self) -> cmp::Ordering { - // Important: Fallback on contract addr here. - // There can be timestamp collisions. - self.timestamp - .cmp(&other.timestamp) - .then_with(|| self.contract_address.cmp(&other.contract_address)) - .then_with(|| self.nonce.cmp(&other.nonce)) - } -} - -impl PartialOrd for TransactionIntent { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} diff --git a/crates/madara/client/mempool/src/inner/mod.rs b/crates/madara/client/mempool/src/inner/mod.rs index e909d9c41..4bb9e7c01 100644 --- a/crates/madara/client/mempool/src/inner/mod.rs +++ b/crates/madara/client/mempool/src/inner/mod.rs @@ -10,7 +10,7 @@ use mc_db::mempool_db::{NonceInfo, NonceStatus}; use mp_convert::ToFelt; use starknet_api::core::{ContractAddress, Nonce}; use starknet_types_core::felt::Felt; -use std::collections::{btree_map, hash_map, BTreeSet, HashMap}; +use std::collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap}; mod deployed_contracts; mod intent; @@ -35,31 +35,32 @@ use crate::CheckInvariants; /// /// These do not actually store transactions but the *intent* and the *order* of /// these transaction being added to the [Mempool]. We *intend* to execute a -/// transaction from a given contract address, stored in each queue, based on -/// its readiness and order of arrival. A transaction is deemed ready if its -/// [Nonce] directly follows the previous [Nonce] used by that contract. This is -/// retrieved from the database before the transaction is added to the inner -/// mempool. This means that transactions which are not ready (these are -/// [pending] transactions) will remain waiting for the required transactions to -/// be processed before they are marked as [ready] themselves. +/// transaction from a given contract address, based on its readiness and order +/// of arrival. A transaction is deemed ready if its [Nonce] directly follows +/// the previous [Nonce] used by that contract. This is retrieved from the +/// database before the transaction is added to the inner mempool. This means +/// that transactions which are not ready (these are [pending] transactions) +/// will remain waiting for the required transactions to be processed before +/// they are marked as [ready] themselves. /// /// ## [Ready] /// -/// FCFS queue. We use a [BTreeSet] to maintain logarithmic complexity and high +/// FIFO queue. We use a [BTreeSet] to maintain logarithmic complexity and high /// performance with low reordering of the memory even in the case of very high /// transaction throughput. /// /// ## [Pending] /// /// FIFO queue. The queue is distributed across all current contract addresses -/// in the mempool, with each contract address having a [BTreeSet] queue mapped -/// to it. Pending intents are popped at the front. It should actually be -/// slightly more performant to replace this with a [BTreeMap] to have access to -/// [BTreeMap::entry] which would avoid a double lookup in [pop_next] when -/// moving pending intents to the ready queue. +/// in the mempool, with each contract address having a [BTreeMap] queue mapped +/// to it. We do this to have access to [BTreeMap::entry] which avoids a double +/// lookup in [pop_next] when moving pending intents to the ready queue. Intents +/// in this queue are ordered by their [Nonce]. /// -/// > This optimizes for inserting pending intents and moving them to ready, but -/// > can be a bit slow when removing aged transactions. +/// While this is handy to retrieve the tx with the next nonce for a particular +/// contract, it is a performance bottleneck when removing age exceeded pending +/// transaction. For this reason, we keep a [separate ordering] of all pending +/// transactions, sorted by their time of arrival. /// /// # Updating Transaction Intent /// @@ -73,10 +74,14 @@ use crate::CheckInvariants; /// [NonceTxMapping], inside [nonce_mapping]. /// /// - Once this is done, we retrieve the pending queue for that contract address -/// in [tx_intent_queue_pending] and check if the next [pending] intent has -/// the right nonce. If so, we pop it, convert it to [ready] and add it to -/// [tx_intent_queue_ready]. If this was the last element in that queue, we -/// remove the mapping for that contract address in [tx_intent_queue_pending]. +/// in [tx_intent_queue_pending_by_nonce] and check if the next [pending] +/// intent has the right nonce. If so, we pop it, convert it to [ready] and +/// add it to [tx_intent_queue_ready]. If this was the last element in that +/// queue, we remove the mapping for that contract address in +/// [tx_intent_queue_pending_by_nonce]. +/// +/// - Finally, we update [tx_intent_queue_pending_by_timestamp] to reflect the +/// changes in [tx_intent_queue_pending_by_nonce] /// /// # Emptying the [Mempool] /// @@ -100,17 +105,20 @@ use crate::CheckInvariants; /// /// - every [MempoolTransaction] mapping in [nonce_mapping] should have a /// one-to-one match with an entry in either [tx_intent_queue_ready] or -/// [tx_intent_queue_pending]. +/// [tx_intent_queue_pending_by_nonce]. /// /// - every [Felt] key in [nonce_mapping] should have a one-to-one match with /// the contract address of an entry in either [tx_intent_queue_ready] or -/// [tx_intent_queue_pending]. +/// [tx_intent_queue_pending_by_nonce]. /// /// - Every [`AccountTransaction::DeployAccount`] transaction should have a one /// to one match with [deployed_contracts]. /// -/// - The invariants of [TransactionIntentReady] and [TransactionIntentPending] -/// must be respected. +/// - Every intent in [tx_intent_queue_pending_by_nonce] should have a one-to-one +/// mapping with [tx_intent_queue_pending_by_timestamp]. +/// +/// - The invariants of [TransactionIntentReady], [TransactionIntentPendingByNonce] +/// and [TransactionIntentPendingByTimestamp] must be respected. /// /// These invariants can be checked by calling [check_invariants] in a test /// environment. @@ -120,7 +128,7 @@ use crate::CheckInvariants; /// [BTreeMap::entry]: std::collections::BTreeMap::entry /// [readiness]: intent /// [Ready]: Self::tx_intent_queue_ready -/// [Pending]: Self::tx_intent_queue_ready +/// [Pending]: Self::tx_intent_queue_pending_by_nonce /// [Mempool]: super::Mempool /// [pending]: TransactionIntentPending /// [ready]: TransactionIntentReady @@ -128,23 +136,35 @@ use crate::CheckInvariants; /// [nonce_mapping]: Self::nonce_mapping /// [insert_tx]: Self::insert_tx /// [tx_intent_queue_ready]: Self::tx_intent_queue_ready -/// [tx_intent_queue_pending]: Self::tx_intent_queue_pending +/// [tx_intent_queue_pending_by_nonce]: Self::tx_intent_queue_pending_by_nonce +/// [tx_intent_queue_pending_by_timestamp]: Self::tx_intent_queue_pending_by_timestamp /// [deployed_contracts]: Self::deployed_contracts /// [check_invariants]: Self::check_invariants +/// [separate ordering]: Self::tx_intent_queue_pending_by_timestamp pub(crate) struct MempoolInner { /// We have one [Nonce] to [MempoolTransaction] mapping per contract /// address. /// /// [Nonce]: starknet_api::core::Nonce pub(crate) nonce_mapping: HashMap, - /// FCFS queue of all [ready] intents. + /// FIFO queue of all [ready] intents. /// /// [ready]: TransactionIntentReady pub(crate) tx_intent_queue_ready: BTreeSet, - /// FCFS queue of all [pending] intents. + /// FIFO queue of all [pending] intents, sorted by their [Nonce]. + /// + /// [pending]: TransactionIntentPendingByNonce + pub(crate) tx_intent_queue_pending_by_nonce: HashMap>, + /// FIFO queue of all [pending] intents, sorted by their time of arrival. + /// + /// This is required for the rapid removal of age-exceeded txs in + /// [remove_age_exceeded_txs] and must be kept in sync with + /// [tx_intent_queue_pending_by_nonce]. /// - /// [pending]: TransactionIntentPending - pub(crate) tx_intent_queue_pending: HashMap>, + /// [pending]: TransactionIntentPendingByTimestamp + /// [remove_age_exceeded_txs]: Self::remove_age_exceeded_txs + /// [tx_intent_queue_pending_by_nonce]: Self::tx_intent_queue_pending_by_nonce + pub(crate) tx_intent_queue_pending_by_timestamp: BTreeSet, /// A count of all deployed contract declared so far. deployed_contracts: DeployedContracts, /// Constraints on the number of transactions allowed in the [Mempool] @@ -169,8 +189,6 @@ impl CheckInvariants for MempoolInner { // tx_intent_queue_ready can only contain one tx of every contract let mut tx_counts = HashMap::::default(); for intent in self.tx_intent_queue_ready.iter() { - // TODO: check the nonce against the db to make sure this intent is - // really ready intent.check_invariants(); let nonce_mapping = self @@ -195,12 +213,20 @@ impl CheckInvariants for MempoolInner { *tx_counts.entry(intent.contract_address).or_insert(0) += 1; } - for (contract_address, queue) in self.tx_intent_queue_pending.iter() { + let mut count = 0; + for (contract_address, queue) in self.tx_intent_queue_pending_by_nonce.iter() { assert!(!queue.is_empty()); - for intent in queue.iter() { - // TODO: check the nonce against the db to make sure this intent is - // really pending + for intent in queue.keys() { + let intent_pending_by_timestamp = intent.by_timestamp(); + self.tx_intent_queue_pending_by_timestamp.get(&intent_pending_by_timestamp).unwrap_or_else(|| { + panic!( + "Missing pending intent by timestamp: {intent_pending_by_timestamp:#?}, available: {:#?}", + self.tx_intent_queue_pending_by_timestamp + ) + }); + count += 1; + intent.check_invariants(); assert_eq!(&intent.contract_address, contract_address); @@ -231,6 +257,13 @@ impl CheckInvariants for MempoolInner { } } + assert_eq!( + self.tx_intent_queue_pending_by_timestamp.len(), + count, + "Excess transactions by timetamp, remaining: {:#?}", + self.tx_intent_queue_pending_by_timestamp + ); + for (contract_address, nonce_mapping) in self.nonce_mapping.iter() { let count = tx_counts.get(contract_address).unwrap_or_else(|| { panic!( @@ -254,7 +287,8 @@ impl MempoolInner { Self { nonce_mapping: Default::default(), tx_intent_queue_ready: Default::default(), - tx_intent_queue_pending: Default::default(), + tx_intent_queue_pending_by_nonce: Default::default(), + tx_intent_queue_pending_by_timestamp: Default::default(), deployed_contracts: Default::default(), limiter: MempoolLimiter::new(limits_config), } @@ -312,7 +346,7 @@ impl MempoolInner { timestamp: previous.arrived_at, nonce: nonce_info.nonce, nonce_next: nonce_info.nonce_next, - phantom: Default::default(), + phantom: std::marker::PhantomData, }); debug_assert!(removed); self.limiter.mark_removed(&TransactionCheckedLimits::limits_for(&previous)); @@ -334,31 +368,56 @@ impl MempoolInner { // The pending queue works a little bit differently as // it is split into individual sub-queues for each // contract address - let queue = self.tx_intent_queue_pending.entry(contract_address).or_default(); + let queue = self.tx_intent_queue_pending_by_nonce.entry(contract_address).or_default(); // Remove old value (if collision and force == true) if let ReplacedState::Replaced { previous } = replaced { - let removed = queue.remove(&TransactionIntentPending { + let removed = queue.remove(&TransactionIntentPendingByNonce { contract_address, timestamp: previous.arrived_at, nonce: nonce_info.nonce, nonce_next: nonce_info.nonce_next, - phantom: Default::default(), + phantom: std::marker::PhantomData, }); + debug_assert!(removed.is_some()); + + let removed = self.tx_intent_queue_pending_by_timestamp.remove( + &TransactionIntentPendingByTimestamp { + contract_address, + timestamp: previous.arrived_at, + nonce: nonce_info.nonce, + nonce_next: nonce_info.nonce_next, + phantom: std::marker::PhantomData, + }, + ); debug_assert!(removed); + self.limiter.mark_removed(&TransactionCheckedLimits::limits_for(&previous)); } else if let Some(contract_address) = &deployed_contract_address { self.deployed_contracts.increment(*contract_address); } // Insert new value - let inserted = queue.insert(TransactionIntentPending { - contract_address, - timestamp: arrived_at, - nonce: nonce_info.nonce, - nonce_next: nonce_info.nonce_next, - phantom: Default::default(), - }); + let inserted = queue.insert( + TransactionIntentPendingByNonce { + contract_address, + timestamp: arrived_at, + nonce: nonce_info.nonce, + nonce_next: nonce_info.nonce_next, + phantom: std::marker::PhantomData, + }, + (), + ); + debug_assert!(inserted.is_none()); + + let inserted = + self.tx_intent_queue_pending_by_timestamp.insert(TransactionIntentPendingByTimestamp { + contract_address, + timestamp: arrived_at, + nonce: nonce_info.nonce, + nonce_next: nonce_info.nonce_next, + phantom: std::marker::PhantomData, + }); debug_assert!(inserted); } }; @@ -375,17 +434,36 @@ impl MempoolInner { timestamp: arrived_at, nonce: nonce_info.nonce, nonce_next: nonce_info.nonce_next, - phantom: Default::default(), + phantom: std::marker::PhantomData, }), - NonceStatus::Pending => self.tx_intent_queue_pending.entry(contract_address).or_default().insert( - TransactionIntentPending { - contract_address, - timestamp: arrived_at, - nonce: nonce_info.nonce, - nonce_next: nonce_info.nonce_next, - phantom: Default::default(), - }, - ), + NonceStatus::Pending => { + let insert_1 = + self.tx_intent_queue_pending_by_timestamp.insert(TransactionIntentPendingByTimestamp { + contract_address, + timestamp: arrived_at, + nonce: nonce_info.nonce, + nonce_next: nonce_info.nonce_next, + phantom: std::marker::PhantomData, + }); + + let insert_2 = self + .tx_intent_queue_pending_by_nonce + .entry(contract_address) + .or_default() + .insert( + TransactionIntentPendingByNonce { + contract_address, + timestamp: arrived_at, + nonce: nonce_info.nonce, + nonce_next: nonce_info.nonce_next, + phantom: std::marker::PhantomData, + }, + (), + ) + .is_none(); + + insert_1 && insert_2 + } }; debug_assert!(inserted); @@ -418,14 +496,13 @@ impl MempoolInner { }; let nonce_mapping = entry.get_mut(); - let btree_map::Entry::Occupied(nonce) = nonce_mapping.transactions.entry(intent.nonce) else { + let btree_map::Entry::Occupied(nonce_mapping_entry) = nonce_mapping.transactions.entry(intent.nonce) else { unreachable!("Nonce chain without a tx"); }; - let limits = TransactionCheckedLimits::limits_for(nonce.get()); - + let limits = TransactionCheckedLimits::limits_for(nonce_mapping_entry.get()); if self.limiter.tx_age_exceeded(&limits) { - nonce.remove(); + nonce_mapping_entry.remove(); if nonce_mapping.transactions.is_empty() { entry.remove(); } @@ -436,63 +513,62 @@ impl MempoolInner { } } - // The complexity of this is not great if we mostly have ready - // transactions. Any ideas on how to improve the performance of this are - // welcome. Lets recap the problem: - // - // 1. We need to store pending intents separate from ready intents, for - // obvious reasons. - // - // 2. Pending intents might be added back into the ready queue and so - // they need to store at least as much information as a ready intent. - // - // 3. A ready intent must be able to look for the next pending intent to - // add it to the ready queue. - // - // 4. Pending intents need to be sorted by time or else the complexity - // of removing aged transactions collapses. - // - // How do we reconcile (3.) and (4.)? We need a queue wich is sometimes - // sorted by account, and sometimes sorted by timestamp! - // PERF: remove or limit this allocation, perhaps by keeping a count - // of the current number of pending transactions. Maybe this is an - // indication that we need to change the underlying data structure - let mut to_remove = Vec::default(); - for (contract_address, queue) in self.tx_intent_queue_pending.iter_mut() { - while let Some(intent) = queue.first() { - let hash_map::Entry::Occupied(mut entry) = self.nonce_mapping.entry(intent.contract_address) else { - unreachable!("Nonce chain does not match tx queue"); - }; + // The code for removing age-exceeded pending transactions is similar, + // but with the added complexity that we need to keep + // tx_intent_queue_pending and tx_intent_queue_pending_by_timestamp in + // sync. This means removals need to take place across both queues. + while let Some(intent) = self.tx_intent_queue_pending_by_timestamp.first() { + // Set 1: look for a pending transaction which is too old - let nonce_mapping = entry.get_mut(); - let btree_map::Entry::Occupied(nonce) = nonce_mapping.transactions.entry(intent.nonce) else { - unreachable!("Nonce chain without a tx"); - }; + let hash_map::Entry::Occupied(mut entry) = self.nonce_mapping.entry(intent.contract_address) else { + unreachable!("Nonce chain does not match tx queue"); + }; - let limits = TransactionCheckedLimits::limits_for(nonce.get()); + let nonce_mapping = entry.get_mut(); + let btree_map::Entry::Occupied(nonce_mapping_entry) = nonce_mapping.transactions.entry(intent.nonce) else { + unreachable!("Nonce chain without a tx"); + }; - if self.limiter.tx_age_exceeded(&limits) { - nonce.remove(); - if nonce_mapping.transactions.is_empty() { - entry.remove(); - } + let limits = TransactionCheckedLimits::limits_for(nonce_mapping_entry.get()); + if self.limiter.tx_age_exceeded(&limits) { + // Step 2: we found it! Now we remove the entry in + // tx_intent_queue_pending_by_timestamp - queue.pop_first(); - } else if limits.checks_age() { - break; + nonce_mapping_entry.remove(); // *- snip -* + if nonce_mapping.transactions.is_empty() { + entry.remove(); // *- snip -* } - } - if queue.is_empty() { - to_remove.push(*contract_address); - } - } + let intent = self + .tx_intent_queue_pending_by_timestamp + .pop_first() // *- snip -* + .expect("Already in loop, first entry must exist"); - // We can't do this inside of the loop above due to rules on mutable - // ownership - for contract_address in to_remove.iter() { - let removed = self.tx_intent_queue_pending.remove(contract_address); - debug_assert!(removed.is_some()); + // Step 3: we remove the TransactionIntentPendingByNonce + // associated to the TransactionIntentPendingByTimestamp we just + // removed. + + let hash_map::Entry::Occupied(mut entry) = + self.tx_intent_queue_pending_by_nonce.entry(intent.contract_address) + else { + unreachable!("Missing pending intent mapping for {:?}", intent.contract_address); + }; + + let queue = entry.get_mut(); + let remove = queue.remove(&intent.by_nonce()); // *- snip -* + debug_assert!(remove.is_some()); + + // Step 4: tx_intent_queue_pending is essentially a tree of + // trees. We cannot leave empty sub-trees or else the memory + // will fill up! So if the queue of pending intents by nonce is + // empty, we remove it as well. + + if queue.is_empty() { + entry.remove(); // *- snip -* + } + } else if limits.checks_age() { + break; + } } } @@ -515,18 +591,18 @@ impl MempoolInner { // Looks for the next transaction from the same account in the pending // queue and marks it as ready if found. 'pending: { - if let hash_map::Entry::Occupied(mut entry) = self.tx_intent_queue_pending.entry(contract_address) { + if let hash_map::Entry::Occupied(mut entry) = self.tx_intent_queue_pending_by_nonce.entry(contract_address) + { let queue = entry.get_mut(); - let nonce = queue.first().expect("Intent queue cannot be empty").nonce; + + let entry_inner = queue.first_entry().expect("Intent queue cannot be empty"); + let nonce = entry_inner.key().nonce; if nonce != nonce_next { break 'pending; } - // PERF: we could avoid the double lookup by using a BTreeMap - // instead - let intent_pending = queue.pop_first().expect("Intent queue cannot be empty"); - let intent_ready = intent_pending.ready(); + let intent_pending_by_nonce = entry_inner.remove_entry().0; // This works like a NonceMapping: if a pending intent queue is // empty, we remove the mapping. @@ -534,6 +610,12 @@ impl MempoolInner { entry.remove(); } + // We need to keep pending intents by timestamp in sync! + let intent_pending_by_timestamp = intent_pending_by_nonce.by_timestamp(); + let removed = self.tx_intent_queue_pending_by_timestamp.remove(&intent_pending_by_timestamp); + debug_assert!(removed); + + let intent_ready = intent_pending_by_nonce.ready(); self.tx_intent_queue_ready.insert(intent_ready); } } diff --git a/crates/madara/client/mempool/src/inner/nonce_chain.rs b/crates/madara/client/mempool/src/inner/nonce_chain.rs deleted file mode 100644 index 7359927e2..000000000 --- a/crates/madara/client/mempool/src/inner/nonce_chain.rs +++ /dev/null @@ -1,156 +0,0 @@ -use super::tx::{ArrivedAtTimestamp, MempoolTransaction}; -use crate::TxInsersionError; -use starknet_api::{core::Nonce, transaction::TransactionHash}; -use std::collections::{btree_map, BTreeMap}; -use std::{cmp, iter}; - -#[derive(Debug, Clone)] -pub struct OrderMempoolTransactionByNonce(pub MempoolTransaction); - -impl PartialEq for OrderMempoolTransactionByNonce { - fn eq(&self, other: &Self) -> bool { - self.cmp(other).is_eq() - } -} -impl Eq for OrderMempoolTransactionByNonce {} -impl Ord for OrderMempoolTransactionByNonce { - fn cmp(&self, other: &Self) -> cmp::Ordering { - self.0.nonce().cmp(&other.0.nonce()) - } -} -impl PartialOrd for OrderMempoolTransactionByNonce { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] -pub(crate) enum NonceReadiness { - #[default] - Ready, - Pending, -} - -/// Invariants: -/// - front_nonce, front_arrived_at and front_tx_hash must match the front transaction timestamp. -/// - No nonce chain should ever be empty in the mempool. -#[derive(Debug)] -pub struct NonceChain { - /// Use a BTreeMap to so that we can use the entry api. - // TODO(perf): to avoid some double lookups here, we should remove the `OrderMempoolTransactionByNonce` struct - // and make this a BTreeMap - pub(crate) transactions: BTreeMap, - pub(crate) front_arrived_at: ArrivedAtTimestamp, - pub(crate) front_nonce: Nonce, - pub(crate) front_tx_hash: TransactionHash, -} - -#[derive(Eq, PartialEq, Debug)] -pub enum InsertedPosition { - Front { former_head_arrived_at: ArrivedAtTimestamp }, - Other, -} - -#[derive(Debug)] -pub enum ReplacedState { - Replaced { previous: MempoolTransaction }, - NotReplaced, -} - -#[derive(Eq, PartialEq, Debug)] -pub enum NonceChainNewState { - Empty, - NotEmpty, -} - -impl NonceChain { - pub fn new_with_first_tx(tx: MempoolTransaction) -> Self { - Self { - front_arrived_at: tx.arrived_at, - front_tx_hash: tx.tx_hash(), - front_nonce: tx.nonce(), - transactions: iter::once((OrderMempoolTransactionByNonce(tx), ())).collect(), - } - } - - #[cfg(test)] - pub fn check_invariants(&self) { - assert!(!self.transactions.is_empty()); - let (front, _) = self.transactions.first_key_value().unwrap(); - assert_eq!(front.0.tx_hash(), self.front_tx_hash); - assert_eq!(front.0.nonce(), self.front_nonce); - assert_eq!(front.0.arrived_at, self.front_arrived_at); - } - - /// Returns where in the chain it was inserted. - /// When `force` is `true`, this function should never return any error. - pub fn insert( - &mut self, - mempool_tx: MempoolTransaction, - force: bool, - ) -> Result<(InsertedPosition, ReplacedState), TxInsersionError> { - let mempool_tx_arrived_at = mempool_tx.arrived_at; - let mempool_tx_nonce = mempool_tx.nonce(); - let mempool_tx_hash = mempool_tx.tx_hash(); - - let replaced = if force { - // double lookup here unfortunately.. that's because we're using the keys in a hacky way and can't update the - // entry key using the entry api. - let mempool_tx = OrderMempoolTransactionByNonce(mempool_tx); - if let Some((previous, _)) = self.transactions.remove_entry(&mempool_tx) { - let previous = previous.0.clone(); - let inserted = self.transactions.insert(mempool_tx, ()); - debug_assert!(inserted.is_none()); - ReplacedState::Replaced { previous } - } else { - let inserted = self.transactions.insert(mempool_tx, ()); - debug_assert!(inserted.is_none()); - ReplacedState::NotReplaced - } - } else { - match self.transactions.entry(OrderMempoolTransactionByNonce(mempool_tx)) { - btree_map::Entry::Occupied(entry) => { - // duplicate nonce, either it's because the hash is duplicated or nonce conflict with another tx. - if entry.key().0.tx_hash() == mempool_tx_hash { - return Err(TxInsersionError::DuplicateTxn); - } else { - return Err(TxInsersionError::NonceConflict); - } - } - btree_map::Entry::Vacant(entry) => *entry.insert(()), - } - - ReplacedState::NotReplaced - }; - - let position = if self.front_nonce >= mempool_tx_nonce { - // We insrted at the front here - let former_head_arrived_at = core::mem::replace(&mut self.front_arrived_at, mempool_tx_arrived_at); - self.front_nonce = mempool_tx_nonce; - self.front_tx_hash = mempool_tx_hash; - InsertedPosition::Front { former_head_arrived_at } - } else { - InsertedPosition::Other - }; - - debug_assert_eq!( - self.transactions.first_key_value().expect("Getting the first tx").0 .0.tx_hash(), - self.front_tx_hash - ); - - Ok((position, replaced)) - } - - pub fn pop(&mut self) -> (MempoolTransaction, NonceChainNewState) { - // TODO(perf): avoid double lookup - let (tx, _) = self.transactions.pop_first().expect("Nonce chain should not be empty"); - if let Some((new_front, _)) = self.transactions.first_key_value() { - self.front_arrived_at = new_front.0.arrived_at; - self.front_tx_hash = new_front.0.tx_hash(); - self.front_nonce = new_front.0.nonce(); - (tx.0, NonceChainNewState::NotEmpty) - } else { - (tx.0, NonceChainNewState::Empty) - } - } -} diff --git a/crates/madara/client/mempool/src/lib.rs b/crates/madara/client/mempool/src/lib.rs index e1867c354..6dc13cbf3 100644 --- a/crates/madara/client/mempool/src/lib.rs +++ b/crates/madara/client/mempool/src/lib.rs @@ -709,7 +709,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); let arrived_at = ArrivedAtTimestamp::now(); @@ -737,7 +737,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); let arrived_at = ArrivedAtTimestamp::now(); @@ -760,10 +760,10 @@ mod test { .inner .read() .expect("Poisoned lock") - .tx_intent_queue_pending + .tx_intent_queue_pending_by_nonce .get(&**tx_new_3_mempool.contract_address()) .expect("Missing nonce mapping for tx_new_3") - .contains(&TransactionIntentPending { + .contains_key(&TransactionIntentPendingByNonce { contract_address: **tx_new_3_mempool.contract_address(), timestamp: tx_new_3_mempool.arrived_at, nonce: tx_new_3_mempool.nonce, @@ -772,7 +772,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); let arrived_at = ArrivedAtTimestamp::UNIX_EPOCH; @@ -800,7 +800,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); let arrived_at = ArrivedAtTimestamp::UNIX_EPOCH; @@ -828,7 +828,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); let arrived_at = ArrivedAtTimestamp::UNIX_EPOCH; @@ -851,10 +851,10 @@ mod test { .inner .read() .expect("Poisoned lock") - .tx_intent_queue_pending + .tx_intent_queue_pending_by_nonce .get(&**tx_old_3_mempool.contract_address()) .expect("Missing nonce mapping for tx_old_3") - .contains(&TransactionIntentPending { + .contains_key(&TransactionIntentPendingByNonce { contract_address: **tx_old_3_mempool.contract_address(), timestamp: tx_old_3_mempool.arrived_at, nonce: tx_old_3_mempool.nonce, @@ -863,7 +863,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); let arrived_at = ArrivedAtTimestamp::UNIX_EPOCH; @@ -886,10 +886,10 @@ mod test { .inner .read() .expect("Poisoned lock") - .tx_intent_queue_pending + .tx_intent_queue_pending_by_nonce .get(&**tx_old_4_mempool.contract_address()) .expect("Missing nonce_mapping for tx_old_4") - .contains(&TransactionIntentPending { + .contains_key(&TransactionIntentPendingByNonce { contract_address: **tx_old_4_mempool.contract_address(), timestamp: tx_old_4_mempool.arrived_at, nonce: tx_old_4_mempool.nonce, @@ -898,7 +898,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); // Make sure we have not entered an invalid state. @@ -923,7 +923,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); assert!( mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready.contains(&TransactionIntentReady { @@ -935,7 +935,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); // tx_old_1 and tx_old_2 should no longer be in the ready queue @@ -949,7 +949,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); assert!( !mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready.contains(&TransactionIntentReady { @@ -961,7 +961,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); // tx_new_3 should still be in the pending queue but tx_old_3 should not @@ -970,10 +970,10 @@ mod test { .inner .read() .expect("Poisoned lock") - .tx_intent_queue_pending + .tx_intent_queue_pending_by_nonce .get(&**tx_new_3_mempool.contract_address()) .expect("Missing nonce mapping for tx_new_3") - .contains(&TransactionIntentPending { + .contains_key(&TransactionIntentPendingByNonce { contract_address: **tx_new_3_mempool.contract_address(), timestamp: tx_new_3_mempool.arrived_at, nonce: tx_new_3_mempool.nonce, @@ -982,17 +982,17 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); assert!( !mempool .inner .read() .expect("Poisoned lock") - .tx_intent_queue_pending + .tx_intent_queue_pending_by_nonce .get(&**tx_old_3_mempool.contract_address()) .expect("Missing nonce mapping for tx_old_3") - .contains(&TransactionIntentPending { + .contains_key(&TransactionIntentPendingByNonce { contract_address: **tx_old_3_mempool.contract_address(), timestamp: tx_old_3_mempool.arrived_at, nonce: tx_old_3_mempool.nonce, @@ -1001,7 +1001,7 @@ mod test { }), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); // tx_old_4 should no longer be in the pending queue and since it was @@ -1012,11 +1012,11 @@ mod test { .inner .read() .expect("Poisoned lock") - .tx_intent_queue_pending + .tx_intent_queue_pending_by_nonce .contains_key(&**tx_old_4_mempool.contract_address()), "ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); // Make sure we have not entered an invalid state. @@ -1055,10 +1055,10 @@ mod test { let inner = mempool.inner.read().expect("Poisoned lock"); inner.check_invariants(); inner - .tx_intent_queue_pending + .tx_intent_queue_pending_by_nonce .get(&Felt::ZERO) .expect("Mempool should have a pending queue for contract address ZERO") - .get(&TransactionIntentPending { + .get(&TransactionIntentPendingByNonce { contract_address: Felt::ZERO, timestamp: timestamp_pending, nonce: Nonce(Felt::ONE), @@ -1067,7 +1067,7 @@ mod test { }) .expect("Mempool should contain pending transaction"); - assert_eq!(inner.tx_intent_queue_pending.len(), 1); + assert_eq!(inner.tx_intent_queue_pending_by_nonce.len(), 1); drop(inner); @@ -1118,7 +1118,7 @@ mod test { assert_eq!(mempool_tx.arrived_at, timestamp_ready); assert_eq!(inner.tx_intent_queue_ready.len(), 1); - assert!(inner.tx_intent_queue_pending.is_empty()); + assert!(inner.tx_intent_queue_pending_by_nonce.is_empty()); drop(inner); @@ -1134,7 +1134,7 @@ mod test { assert_eq!(mempool_tx.arrived_at, timestamp_pending); assert!(inner.tx_intent_queue_ready.is_empty()); - assert!(inner.tx_intent_queue_pending.is_empty()); + assert!(inner.tx_intent_queue_pending_by_nonce.is_empty()); } /// This tests makes sure that if a transaction is inserted into the @@ -1186,7 +1186,7 @@ mod test { contains, "Mempool should contain transaction 1, ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); assert_eq!(inner.tx_intent_queue_ready.len(), 1); @@ -1216,7 +1216,7 @@ mod test { contains, "Mempool should contain transaction 2, ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); assert_eq!(inner.tx_intent_queue_ready.len(), 2); @@ -1354,7 +1354,7 @@ mod test { contains, "Mempool should contain transaction 1, ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); assert_eq!(inner.tx_intent_queue_ready.len(), 1); @@ -1398,7 +1398,7 @@ mod test { contains, "mempool should contain transaction 2, ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("poisoned lock").tx_intent_queue_pending_by_nonce ); let contains = inner.tx_intent_queue_ready.contains(&TransactionIntentReady { contract_address: **tx_1_mempool.contract_address(), @@ -1411,7 +1411,7 @@ mod test { !contains, "Mempool should not contain transaction 1 after it has been replaced, ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); assert_eq!(inner.tx_intent_queue_ready.len(), 1); @@ -1455,7 +1455,7 @@ mod test { contains, "Mempool should contain transaction 3, ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); let contains = inner.tx_intent_queue_ready.contains(&TransactionIntentReady { contract_address: **tx_2_mempool.contract_address(), @@ -1468,7 +1468,7 @@ mod test { contains, "mempool should contain transaction 2, ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("poisoned lock").tx_intent_queue_pending_by_nonce ); assert_eq!(inner.tx_intent_queue_ready.len(), 2); @@ -1503,7 +1503,7 @@ mod test { contains, "Mempool should contain transaction 3, ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("Poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("Poisoned lock").tx_intent_queue_pending_by_nonce ); let contains = inner.tx_intent_queue_ready.contains(&TransactionIntentReady { contract_address: **tx_2_mempool.contract_address(), @@ -1516,7 +1516,7 @@ mod test { contains, "mempool should contain transaction 2, ready transaction intents are: {:#?}\npending transaction intents are: {:#?}", mempool.inner.read().expect("poisoned lock").tx_intent_queue_ready, - mempool.inner.read().expect("poisoned lock").tx_intent_queue_pending + mempool.inner.read().expect("poisoned lock").tx_intent_queue_pending_by_nonce ); assert_eq!(inner.tx_intent_queue_ready.len(), 2);