Skip to content

Commit

Permalink
test(mempool): added aged tx removal test
Browse files Browse the repository at this point in the history
  • Loading branch information
Trantorian1 committed Jan 8, 2025
1 parent 4c85d16 commit 9dc7185
Show file tree
Hide file tree
Showing 3 changed files with 462 additions and 58 deletions.
1 change: 1 addition & 0 deletions crates/madara/client/mempool/src/inner/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub enum MempoolLimitReached {
Age { max: Duration },
}

#[derive(Debug)]
pub(crate) struct TransactionCheckedLimits {
check_tx_limit: bool,
check_declare_limit: bool,
Expand Down
116 changes: 66 additions & 50 deletions crates/madara/client/mempool/src/inner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ use blockifier::transaction::transaction_execution::Transaction;
use deployed_contracts::DeployedContracts;
use mc_db::mempool_db::{NonceInfo, NonceStatus};
use mp_convert::ToFelt;
use starknet_api::{
core::{ContractAddress, Nonce},
StarknetApiError,
};
use starknet_api::core::{ContractAddress, Nonce};
use starknet_types_core::felt::Felt;
use std::collections::{hash_map, BTreeSet, HashMap};
use std::collections::{btree_map, hash_map, BTreeSet, HashMap};

mod deployed_contracts;
mod intent;
Expand Down Expand Up @@ -199,6 +196,8 @@ impl CheckInvariants for MempoolInner {
}

for (contract_address, queue) in self.tx_intent_queue_pending.iter() {
assert!(!queue.is_empty());

for intent in queue.iter() {
// TODO: check the nonce against the db to make sure this intent is
// really pending
Expand Down Expand Up @@ -274,11 +273,9 @@ impl MempoolInner {
// delete age-exceeded txs from the mempool
// todo(perf): this may want to limit this check once every few seconds
// to avoid it being in the hot path?
self.remove_age_exceeded_txs();

// check limits
let limits_for_tx = TransactionCheckedLimits::limits_for(&mempool_tx);
if !force {
self.remove_age_exceeded_txs();
self.limiter.check_insert_limits(&limits_for_tx)?;
}

Expand Down Expand Up @@ -338,7 +335,7 @@ impl MempoolInner {
// it is split into individual sub-queues for each
// contract address
let queue =
self.tx_intent_queue_pending.entry(contract_address).or_insert_with(|| BTreeSet::default());
self.tx_intent_queue_pending.entry(contract_address).or_insert_with(BTreeSet::default);

// Remove old value (if collision and force == true)
if let ReplacedState::Replaced { previous } = replaced {
Expand Down Expand Up @@ -381,17 +378,17 @@ impl MempoolInner {
nonce_next: nonce_info.nonce_next,
phantom: Default::default(),
}),
NonceStatus::Pending => self
.tx_intent_queue_pending
.entry(contract_address)
.or_insert_with(|| BTreeSet::default())
.insert(TransactionIntentPending {
contract_address,
timestamp: arrived_at,
nonce: nonce_info.nonce,
nonce_next: nonce_info.nonce_next,
phantom: Default::default(),
}),
NonceStatus::Pending => {
self.tx_intent_queue_pending.entry(contract_address).or_insert_with(BTreeSet::default).insert(
TransactionIntentPending {
contract_address,
timestamp: arrived_at,
nonce: nonce_info.nonce,
nonce_next: nonce_info.nonce_next,
phantom: Default::default(),
},
)
}
};
debug_assert!(inserted);

Expand All @@ -418,27 +415,25 @@ impl MempoolInner {
// ordered by timestamp, so as soon as we find a transaction which has
// not exceeded its max age (and that transaction supports age limits)
// we know no more transactions can be removed.
//
// INFO: we traverse this in reverse as the intents at the end of the
// queue have the highest chance of pointing to transactions which have
// exceeded their age limit. It is very unlikely that intents at the
// front of the queue point to such transactions. If that were the case,
// it would be handled by pop_next anyway. This should help with
// maximizing the number of removals and keeping the mempool from being
// congested.
while let Some(intent) = self.tx_intent_queue_ready.last() {
let tx = self
.nonce_mapping
.get_mut(&intent.contract_address)
.expect("Nonce chain does not match tx queue")
.transactions
.get(&intent.nonce)
.expect("Nonce chain without a tx");
while let Some(intent) = self.tx_intent_queue_ready.first() {
let hash_map::Entry::Occupied(mut entry) = self.nonce_mapping.entry(intent.contract_address) else {
unreachable!("Nonce chain does not match tx queue");
};

let limits = TransactionCheckedLimits::limits_for(tx);
let nonce_mapping = entry.get_mut();
let btree_map::Entry::Occupied(nonce) = nonce_mapping.transactions.entry(intent.nonce) else {
unreachable!("Nonce chain without a tx");
};

let limits = TransactionCheckedLimits::limits_for(nonce.get());

if self.limiter.tx_age_exceeded(&limits) {
self.tx_intent_queue_ready.pop_last();
nonce.remove();
if nonce_mapping.transactions.is_empty() {
entry.remove();
}

self.tx_intent_queue_ready.pop_first();
} else if limits.checks_age() {
break;
}
Expand All @@ -462,24 +457,45 @@ impl MempoolInner {
//
// How do we reconcile (3.) and (4.)? We need a queue wich is sometimes
// sorted by account, and sometimes sorted by timestamp!
for queue in self.tx_intent_queue_pending.values_mut() {
while let Some(intent) = queue.last() {
let tx = self
.nonce_mapping
.get_mut(&intent.contract_address)
.expect("Nonce chain does not match tx queue")
.transactions
.get(&intent.nonce)
.expect("Nonce chain without a tx");

let limits = TransactionCheckedLimits::limits_for(tx);
// PERF: remove or limit this allocation, perhaps by keeping a count
// of the current number of pending transactions. Maybe this is an
// indication that we need to change the underlying data structure
let mut to_remove = Vec::default();
for (contract_address, queue) in self.tx_intent_queue_pending.iter_mut() {
while let Some(intent) = queue.first() {
let hash_map::Entry::Occupied(mut entry) = self.nonce_mapping.entry(intent.contract_address) else {
unreachable!("Nonce chain does not match tx queue");
};

let nonce_mapping = entry.get_mut();
let btree_map::Entry::Occupied(nonce) = nonce_mapping.transactions.entry(intent.nonce) else {
unreachable!("Nonce chain without a tx");
};

let limits = TransactionCheckedLimits::limits_for(nonce.get());

if self.limiter.tx_age_exceeded(&limits) {
queue.pop_last();
nonce.remove();
if nonce_mapping.transactions.is_empty() {
entry.remove();
}

queue.pop_first();
} else if limits.checks_age() {
break;
}
}

if queue.is_empty() {
to_remove.push(*contract_address);
}
}

// We can't do this inside of the loop above due to rules on mutable
// ownership
for contract_address in to_remove.iter() {
let removed = self.tx_intent_queue_pending.remove(contract_address);
debug_assert!(removed.is_some());
}
}

Expand Down
Loading

0 comments on commit 9dc7185

Please sign in to comment.