diff --git a/.changes/added/2579.md b/.changes/added/2579.md new file mode 100644 index 00000000000..adc3a5d4c23 --- /dev/null +++ b/.changes/added/2579.md @@ -0,0 +1 @@ +Clear expiration txs cache in transaction pool based on inserted transactions \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index bd4839ca2cc..c641d8667b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Version 0.41.5] +### Added +- [2579](https://github.com/FuelLabs/fuel-core/pull/2579): Clear expiration txs cache in transaction pool based on inserted transactions + ### Changed - [2387](https://github.com/FuelLabs/fuel-core/pull/2387): Update description `tx-max-depth` flag. - [2630](https://github.com/FuelLabs/fuel-core/pull/2630): Removed some noisy `tracing::info!` logs diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index de80ded6b7e..81a8143423a 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -354,7 +354,11 @@ where /// Remove transaction but keep its dependents. /// The dependents become executables. - pub fn remove_transaction(&mut self, tx_ids: Vec) { + pub fn remove_transactions( + &mut self, + tx_ids: impl Iterator, + ) -> Vec { + let mut removed_transactions = Vec::with_capacity(tx_ids.size_hint().0); for tx_id in tx_ids { if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) { let dependents: Vec = @@ -383,10 +387,12 @@ where .new_executable_transaction(dependent, storage_data); } self.update_components_and_caches_on_removal(iter::once(&transaction)); + removed_transactions.push(transaction.transaction); } } self.update_stats(); + removed_transactions } /// Check if the pool has enough space to store a transaction. @@ -525,7 +531,7 @@ where /// Remove transaction and its dependents. pub fn remove_transaction_and_dependents( &mut self, - tx_ids: Vec, + tx_ids: impl Iterator, ) -> Vec { let mut removed_transactions = vec![]; for tx_id in tx_ids { diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index cf8a2ec8eb9..6d3ea8320fe 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -314,16 +314,31 @@ where { fn import_block(&mut self, result: SharedImportResult) { let new_height = *result.sealed_block.entity.header().height(); - let executed_transaction = result.tx_status.iter().map(|s| s.id).collect(); + let executed_transactions: Vec = + result.tx_status.iter().map(|s| s.id).collect(); // We don't want block importer way for us to process the result. drop(result); - { + let removed_transactions = { let mut tx_pool = self.pool.write(); - tx_pool.remove_transaction(executed_transaction); + let removed_transactions = + tx_pool.remove_transactions(executed_transactions.into_iter()); if !tx_pool.is_empty() { self.shared_state.new_txs_notifier.send_replace(()); } + removed_transactions + }; + if !removed_transactions.is_empty() { + let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); + for tx in removed_transactions.into_iter() { + let expiration = tx.expiration(); + if expiration < u32::MAX.into() { + if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration) + { + expired_txs.remove(&tx.id()); + } + } + } } { @@ -344,8 +359,10 @@ where let expired_txs = height_expiration_txs.remove(&height); if let Some(expired_txs) = expired_txs { let mut tx_pool = self.pool.write(); - removed_txs - .extend(tx_pool.remove_transaction_and_dependents(expired_txs)); + removed_txs.extend( + tx_pool + .remove_transaction_and_dependents(expired_txs.into_iter()), + ); } } } @@ -485,7 +502,7 @@ where if expiration < u32::MAX.into() { let mut lock = height_expiration_txs.write(); let block_height_expiration = lock.entry(expiration).or_default(); - block_height_expiration.push(tx_id); + block_height_expiration.insert(tx_id); } let duration = submitted_time @@ -661,13 +678,23 @@ where let removed; { let mut pool = self.pool.write(); - removed = pool.remove_transaction_and_dependents(txs_to_remove); + removed = pool.remove_transaction_and_dependents(txs_to_remove.into_iter()); } - for tx in removed { - self.shared_state - .tx_status_sender - .send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl)); + if !removed.is_empty() { + let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); + for tx in removed { + let expiration = tx.expiration(); + if expiration < u32::MAX.into() { + if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration) + { + expired_txs.remove(&tx.id()); + } + } + self.shared_state + .tx_status_sender + .send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl)); + } } { diff --git a/crates/services/txpool_v2/src/service/pruner.rs b/crates/services/txpool_v2/src/service/pruner.rs index 7551afa31f3..a7c76429f59 100644 --- a/crates/services/txpool_v2/src/service/pruner.rs +++ b/crates/services/txpool_v2/src/service/pruner.rs @@ -6,6 +6,7 @@ use fuel_core_types::{ use std::{ collections::{ BTreeMap, + HashSet, VecDeque, }, time::SystemTime, @@ -13,7 +14,7 @@ use std::{ pub(super) struct TransactionPruner { pub time_txs_submitted: Shared>, - pub height_expiration_txs: Shared>>, + pub height_expiration_txs: Shared>>, pub ttl_timer: tokio::time::Interval, pub txs_ttl: tokio::time::Duration, }