Skip to content

Commit

Permalink
feat(collator): use range_messages_limit from eversscale-types
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Feb 4, 2025
1 parent 5b56374 commit 14a5ae4
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 28 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ tycho-storage = { path = "./storage", version = "0.2.2" }
tycho-util = { path = "./util", version = "0.2.2" }

[patch.crates-io]
everscale-types = { git = "https://github.com/broxus/everscale-types.git", rev = "69c344eac4e1f95d66b045ae90db904ad8afdfaf" }
everscale-types = { git = "https://github.com/broxus/everscale-types.git", rev = "c9d1617c61701efff59d130ebf8cc4032475c365" }

[workspace.lints.rust]
future_incompatible = "warn"
Expand Down
1 change: 1 addition & 0 deletions cli/src/cmd/tools/gen_zerostate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ fn make_default_params() -> Result<BlockchainConfigParams> {
par_0_int_msgs_count_limit: 50_000,
par_0_ext_msgs_count_limit: 5_000,
group_slots_fractions,
range_messages_limit: 10_000,
},

wu_used_to_import_next_anchor: 1_200_000_000,
Expand Down
25 changes: 10 additions & 15 deletions collator/src/collator/messages_reader/internals_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,15 @@ impl InternalsPartitionReader {
BlockSeqno,
)>,
) -> Result<BlockSeqno> {
const RANGE_MAX_BLOCKS: u32 = 3;
const RANGE_MAX_MESSAGES: u32 = 100_000;
let range_max_messages = if self.msgs_exec_params.range_messages_limit == 0 {
10_000
} else {
self.msgs_exec_params.range_messages_limit
};

let reader = self.create_next_internals_range_reader(
last_range_reader_info_opt,
Some(RANGE_MAX_BLOCKS),
Some(RANGE_MAX_MESSAGES),
Some(range_max_messages),
)?;
let reader_seqno = reader.seqno;
// we should add created range reader using calculated reader seqno instead of current block seqno
Expand All @@ -394,7 +397,6 @@ impl InternalsPartitionReader {
u32,
BlockSeqno,
)>,
_range_max_blocks: Option<u32>,
range_max_messages: Option<u32>,
) -> Result<InternalsRangeReader> {
let mut shard_reader_states = BTreeMap::new();
Expand Down Expand Up @@ -428,7 +430,7 @@ impl InternalsPartitionReader {

messages_count += diff
.statistics()
.get_messages_amount_by_shard(&self.for_shard_id);
.get_messages_count_by_shard(&self.for_shard_id);

if messages_count > max_messages as u64 {
break;
Expand All @@ -440,13 +442,6 @@ impl InternalsPartitionReader {
}
};

// let range_seqno = match range_max_blocks {
// Some(max) if self.block_seqno > last_range_block_seqno + max => {
// last_range_block_seqno + max
// }
// _ => self.block_seqno,
// };

for (shard_id, end_lt) in all_end_lts {
let last_to_lt_opt = last_to_lts.get(&shard_id).map(|s| s.to);
let shard_range_from =
Expand Down Expand Up @@ -797,9 +792,9 @@ impl InternalsPartitionReader {
})
.unwrap_or_default();
// we should look thru the whole range to check for pending messages
// so we do not pass `range_max_blocks` to force use the prev block end lt
// so we do not pass `range_max_messages` to force use the prev block end lt
let mut range_reader =
self.create_next_internals_range_reader(last_range_reader_info_opt, None, None)?;
self.create_next_internals_range_reader(last_range_reader_info_opt, None)?;
if !range_reader.fully_read {
range_reader.init()?;

Expand Down
15 changes: 8 additions & 7 deletions collator/src/internal_queue/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,9 @@ impl DiffStatistics {
self.inner.statistics.get(&partition)
}

pub fn get_messages_amount_by_shard(&self, shard_ident: &ShardIdent) -> u64 {
pub fn get_messages_count_by_shard(&self, shard_ident: &ShardIdent) -> u64 {
self.inner
.shards_messages
.shards_messages_count
.get(shard_ident)
.copied()
.unwrap_or_default()
Expand All @@ -409,15 +409,15 @@ struct DiffStatisticsInner {
min_message: QueueKey,
max_message: QueueKey,
statistics: FastHashMap<QueuePartitionIdx, FastHashMap<IntAddr, u64>>,
shards_messages: FastHashMap<ShardIdent, u64>,
shards_messages_count: FastHashMap<ShardIdent, u64>,
}

impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for DiffStatistics {
fn from(value: (&QueueDiffWithMessages<V>, ShardIdent)) -> Self {
let (diff, shard_ident) = value;
let min_message = diff.messages.keys().next().cloned().unwrap_or_default();
let max_message = diff.messages.keys().last().cloned().unwrap_or_default();
let mut shards_messages = FastHashMap::default();
let mut shards_messages_count = FastHashMap::default();

let mut statistics = FastHashMap::default();

Expand All @@ -441,7 +441,7 @@ impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for
ShardIdent::new_full(0)
};

shards_messages
shards_messages_count
.entry(dest_shard)
.and_modify(|count| *count += 1)
.or_insert(1);
Expand All @@ -453,7 +453,7 @@ impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for
min_message,
max_message,
statistics,
shards_messages,
shards_messages_count,
}),
}
}
Expand All @@ -463,8 +463,9 @@ impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for
mod tests {
use std::collections::{BTreeMap, BTreeSet};

use super::*;
use tycho_util::FastHashSet;

use super::*;
#[test]
fn test_partition_router_from_btreemap() {
let addr1 = RouterAddr {
Expand Down
6 changes: 3 additions & 3 deletions collator/tests/internal_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ fn test_statistics_check_statistics(
dest_3_normal_priority: RouterAddr,
) -> anyhow::Result<()> {
// check two diff statistics
// there are 30000 messages in low partition, 2000 message in normal partition
let statistics_low_priority_partition = queue.load_statistics(1, &[QueueShardRange {
shard_ident: ShardIdent::new_full(0),
from: QueueKey {
Expand Down Expand Up @@ -149,7 +150,7 @@ fn test_statistics_check_statistics(
.unwrap();
assert_eq!(*addr_3_stat, 2000);

// check first diff
// check first diff, there are 15000 messages in low partition
let statistics_low_priority_partition = queue.load_statistics(1, &[QueueShardRange {
shard_ident: ShardIdent::new_full(0),
from: QueueKey {
Expand All @@ -174,7 +175,7 @@ fn test_statistics_check_statistics(
assert_eq!(*addr_1_stat, 10000);
assert_eq!(*addr_2_stat, 5000);

// check second diff, we have 0.,35000 lt in low partition
// check second diff, there are 15000 messages in low partition
let statistics_low_priority_partition = queue.load_statistics(1, &[QueueShardRange {
shard_ident: ShardIdent::new_full(0),
from: QueueKey {
Expand Down Expand Up @@ -964,7 +965,6 @@ fn test_queue_diff_with_messages_from_queue_diff_stuff() -> anyhow::Result<()> {
let addr2 = RouterAddr::from(StdAddr::new(0, HashBytes::ZERO));
let addr3 = RouterAddr::from(StdAddr::new(0, HashBytes::ZERO));

// И теперь можно создать QueueDiff:
let diff = QueueDiff {
hash: HashBytes::ZERO,
prev_hash: HashBytes::from([0x33; 32]),
Expand Down

0 comments on commit 14a5ae4

Please sign in to comment.