From 14a5ae4ec2138f12746578bb6be3c0537e6dc777 Mon Sep 17 00:00:00 2001 From: Maksim Greshnyakov Date: Mon, 3 Feb 2025 19:34:44 +0100 Subject: [PATCH] feat(collator): use `range_messages_limit` from `eversscale-types` --- Cargo.lock | 4 +-- Cargo.toml | 2 +- cli/src/cmd/tools/gen_zerostate.rs | 1 + .../messages_reader/internals_reader.rs | 25 ++++++++----------- collator/src/internal_queue/types.rs | 15 +++++------ collator/tests/internal_queue.rs | 6 ++--- 6 files changed, 25 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2c7b5c22a..9c85dc47c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -947,7 +947,7 @@ dependencies = [ [[package]] name = "everscale-types" version = "0.1.2" -source = "git+https://github.com/broxus/everscale-types.git?rev=69c344eac4e1f95d66b045ae90db904ad8afdfaf#69c344eac4e1f95d66b045ae90db904ad8afdfaf" +source = "git+https://github.com/broxus/everscale-types.git?rev=c9d1617c61701efff59d130ebf8cc4032475c365#c9d1617c61701efff59d130ebf8cc4032475c365" dependencies = [ "ahash", "anyhow", @@ -976,7 +976,7 @@ dependencies = [ [[package]] name = "everscale-types-proc" version = "0.1.5" -source = "git+https://github.com/broxus/everscale-types.git?rev=69c344eac4e1f95d66b045ae90db904ad8afdfaf#69c344eac4e1f95d66b045ae90db904ad8afdfaf" +source = "git+https://github.com/broxus/everscale-types.git?rev=c9d1617c61701efff59d130ebf8cc4032475c365#c9d1617c61701efff59d130ebf8cc4032475c365" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index b3c023c16..1872c877b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,7 +138,7 @@ tycho-storage = { path = "./storage", version = "0.2.2" } tycho-util = { path = "./util", version = "0.2.2" } [patch.crates-io] -everscale-types = { git = "https://github.com/broxus/everscale-types.git", rev = "69c344eac4e1f95d66b045ae90db904ad8afdfaf" } +everscale-types = { git = "https://github.com/broxus/everscale-types.git", rev = "c9d1617c61701efff59d130ebf8cc4032475c365" } [workspace.lints.rust] future_incompatible = "warn" diff --git a/cli/src/cmd/tools/gen_zerostate.rs b/cli/src/cmd/tools/gen_zerostate.rs index 8d49a1ab4..f7065f67e 100644 --- a/cli/src/cmd/tools/gen_zerostate.rs +++ b/cli/src/cmd/tools/gen_zerostate.rs @@ -701,6 +701,7 @@ fn make_default_params() -> Result { par_0_int_msgs_count_limit: 50_000, par_0_ext_msgs_count_limit: 5_000, group_slots_fractions, + range_messages_limit: 10_000, }, wu_used_to_import_next_anchor: 1_200_000_000, diff --git a/collator/src/collator/messages_reader/internals_reader.rs b/collator/src/collator/messages_reader/internals_reader.rs index 0458301b1..baca7a5d2 100644 --- a/collator/src/collator/messages_reader/internals_reader.rs +++ b/collator/src/collator/messages_reader/internals_reader.rs @@ -366,12 +366,15 @@ impl InternalsPartitionReader { BlockSeqno, )>, ) -> Result { - const RANGE_MAX_BLOCKS: u32 = 3; - const RANGE_MAX_MESSAGES: u32 = 100_000; + let range_max_messages = if self.msgs_exec_params.range_messages_limit == 0 { + 10_000 + } else { + self.msgs_exec_params.range_messages_limit + }; + let reader = self.create_next_internals_range_reader( last_range_reader_info_opt, - Some(RANGE_MAX_BLOCKS), - Some(RANGE_MAX_MESSAGES), + Some(range_max_messages), )?; let reader_seqno = reader.seqno; // we should add created range reader using calculated reader seqno instead of current block seqno @@ -394,7 +397,6 @@ impl InternalsPartitionReader { u32, BlockSeqno, )>, - _range_max_blocks: Option, range_max_messages: Option, ) -> Result { let mut shard_reader_states = BTreeMap::new(); @@ -428,7 +430,7 @@ impl InternalsPartitionReader { messages_count += diff .statistics() - .get_messages_amount_by_shard(&self.for_shard_id); + .get_messages_count_by_shard(&self.for_shard_id); if messages_count > max_messages as u64 { break; @@ -440,13 +442,6 @@ impl InternalsPartitionReader { } }; - // let range_seqno = match range_max_blocks { - // Some(max) if self.block_seqno > last_range_block_seqno + max => { - // last_range_block_seqno + max - // } - // _ => self.block_seqno, - // }; - for (shard_id, end_lt) in all_end_lts { let last_to_lt_opt = last_to_lts.get(&shard_id).map(|s| s.to); let shard_range_from = @@ -797,9 +792,9 @@ impl InternalsPartitionReader { }) .unwrap_or_default(); // we should look thru the whole range to check for pending messages - // so we do not pass `range_max_blocks` to force use the prev block end lt + // so we do not pass `range_max_messages` to force use the prev block end lt let mut range_reader = - self.create_next_internals_range_reader(last_range_reader_info_opt, None, None)?; + self.create_next_internals_range_reader(last_range_reader_info_opt, None)?; if !range_reader.fully_read { range_reader.init()?; diff --git a/collator/src/internal_queue/types.rs b/collator/src/internal_queue/types.rs index 4443f3749..54df9d91f 100644 --- a/collator/src/internal_queue/types.rs +++ b/collator/src/internal_queue/types.rs @@ -395,9 +395,9 @@ impl DiffStatistics { self.inner.statistics.get(&partition) } - pub fn get_messages_amount_by_shard(&self, shard_ident: &ShardIdent) -> u64 { + pub fn get_messages_count_by_shard(&self, shard_ident: &ShardIdent) -> u64 { self.inner - .shards_messages + .shards_messages_count .get(shard_ident) .copied() .unwrap_or_default() @@ -409,7 +409,7 @@ struct DiffStatisticsInner { min_message: QueueKey, max_message: QueueKey, statistics: FastHashMap>, - shards_messages: FastHashMap, + shards_messages_count: FastHashMap, } impl From<(&QueueDiffWithMessages, ShardIdent)> for DiffStatistics { @@ -417,7 +417,7 @@ impl From<(&QueueDiffWithMessages, ShardIdent)> for let (diff, shard_ident) = value; let min_message = diff.messages.keys().next().cloned().unwrap_or_default(); let max_message = diff.messages.keys().last().cloned().unwrap_or_default(); - let mut shards_messages = FastHashMap::default(); + let mut shards_messages_count = FastHashMap::default(); let mut statistics = FastHashMap::default(); @@ -441,7 +441,7 @@ impl From<(&QueueDiffWithMessages, ShardIdent)> for ShardIdent::new_full(0) }; - shards_messages + shards_messages_count .entry(dest_shard) .and_modify(|count| *count += 1) .or_insert(1); @@ -453,7 +453,7 @@ impl From<(&QueueDiffWithMessages, ShardIdent)> for min_message, max_message, statistics, - shards_messages, + shards_messages_count, }), } } @@ -463,8 +463,9 @@ impl From<(&QueueDiffWithMessages, ShardIdent)> for mod tests { use std::collections::{BTreeMap, BTreeSet}; - use super::*; + use tycho_util::FastHashSet; + use super::*; #[test] fn test_partition_router_from_btreemap() { let addr1 = RouterAddr { diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index ed94b590a..608fd7e8a 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -106,6 +106,7 @@ fn test_statistics_check_statistics( dest_3_normal_priority: RouterAddr, ) -> anyhow::Result<()> { // check two diff statistics + // there are 30000 messages in low partition, 2000 message in normal partition let statistics_low_priority_partition = queue.load_statistics(1, &[QueueShardRange { shard_ident: ShardIdent::new_full(0), from: QueueKey { @@ -149,7 +150,7 @@ fn test_statistics_check_statistics( .unwrap(); assert_eq!(*addr_3_stat, 2000); - // check first diff + // check first diff, there are 15000 messages in low partition let statistics_low_priority_partition = queue.load_statistics(1, &[QueueShardRange { shard_ident: ShardIdent::new_full(0), from: QueueKey { @@ -174,7 +175,7 @@ fn test_statistics_check_statistics( assert_eq!(*addr_1_stat, 10000); assert_eq!(*addr_2_stat, 5000); - // check second diff, we have 0.,35000 lt in low partition + // check second diff, there are 15000 messages in low partition let statistics_low_priority_partition = queue.load_statistics(1, &[QueueShardRange { shard_ident: ShardIdent::new_full(0), from: QueueKey { @@ -964,7 +965,6 @@ fn test_queue_diff_with_messages_from_queue_diff_stuff() -> anyhow::Result<()> { let addr2 = RouterAddr::from(StdAddr::new(0, HashBytes::ZERO)); let addr3 = RouterAddr::from(StdAddr::new(0, HashBytes::ZERO)); - // И теперь можно создать QueueDiff: let diff = QueueDiff { hash: HashBytes::ZERO, prev_hash: HashBytes::from([0x33; 32]),