diff --git a/collator/src/collator/messages_reader/internals_reader.rs b/collator/src/collator/messages_reader/internals_reader.rs index 8a93564d0..0458301b1 100644 --- a/collator/src/collator/messages_reader/internals_reader.rs +++ b/collator/src/collator/messages_reader/internals_reader.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; -use anyhow::{bail, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use everscale_types::models::{IntAddr, MsgInfo, MsgsExecutionParams, ShardIdent}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; @@ -360,37 +360,42 @@ impl InternalsPartitionReader { fn create_append_next_range_reader( &mut self, - last_range_reader_shards_and_offset_opt: Option<( + last_range_reader_info_opt: Option<( BTreeMap, u32, + BlockSeqno, )>, - ) -> Result<()> { + ) -> Result { + const RANGE_MAX_BLOCKS: u32 = 3; + const RANGE_MAX_MESSAGES: u32 = 100_000; let reader = self.create_next_internals_range_reader( - last_range_reader_shards_and_offset_opt, - self.block_seqno, + last_range_reader_info_opt, + Some(RANGE_MAX_BLOCKS), + Some(RANGE_MAX_MESSAGES), )?; - if self - .range_readers - .insert(self.block_seqno, reader) - .is_some() - { + let reader_seqno = reader.seqno; + // we should add created range reader using calculated reader seqno instead of current block seqno + // otherwise the next range will exeed the max blocks limit + if self.range_readers.insert(reader_seqno, reader).is_some() { panic!( "internals range reader should not already exist (for_shard_id: {}, seqno: {})", self.for_shard_id, self.block_seqno, ) }; self.all_ranges_fully_read = false; - Ok(()) + Ok(reader_seqno) } #[tracing::instrument(skip_all)] fn create_next_internals_range_reader( &self, - last_range_reader_shards_and_offset_opt: Option<( + last_range_reader_info_opt: Option<( BTreeMap, u32, + BlockSeqno, )>, - seqno: BlockSeqno, + _range_max_blocks: Option, + range_max_messages: Option, ) -> Result { let mut shard_reader_states = BTreeMap::new(); @@ -401,19 +406,70 @@ impl InternalsPartitionReader { let mut ranges = Vec::with_capacity(1 + self.mc_top_shards_end_lts.len()); let mut fully_read = true; + + let (last_to_lts, processed_offset, last_range_block_seqno) = + last_range_reader_info_opt.unwrap_or_default(); + + let range_seqno = match range_max_messages { + None => self.block_seqno, + Some(max_messages) => { + let mut current_block_seqno = last_range_block_seqno + 1; + let mut messages_count = 0; + + while current_block_seqno < self.block_seqno { + let diff = self + .mq_adapter + .get_diff(self.for_shard_id, current_block_seqno) + .ok_or(anyhow!( + "cannot get diff for block {}:{}", + self.for_shard_id, + current_block_seqno + ))?; + + messages_count += diff + .statistics() + .get_messages_amount_by_shard(&self.for_shard_id); + + if messages_count > max_messages as u64 { + break; + } + + current_block_seqno += 1; + } + current_block_seqno + } + }; + + // let range_seqno = match range_max_blocks { + // Some(max) if self.block_seqno > last_range_block_seqno + max => { + // last_range_block_seqno + max + // } + // _ => self.block_seqno, + // }; + for (shard_id, end_lt) in all_end_lts { - let last_to_lt_opt = last_range_reader_shards_and_offset_opt - .as_ref() - .and_then(|(s_r, _)| s_r.get(&shard_id).map(|s| s.to)); + let last_to_lt_opt = last_to_lts.get(&shard_id).map(|s| s.to); let shard_range_from = last_to_lt_opt.map_or_else(|| QueueKey::min_for_lt(0), QueueKey::max_for_lt); - let to_lt = if shard_id == self.for_shard_id { - self.prev_state_gen_lt + let shard_range_to = if shard_id == self.for_shard_id { + if range_seqno != self.block_seqno { + let diff = self + .mq_adapter + .get_diff(shard_id, range_seqno) + .ok_or(anyhow!( + "cannot get diff for block {}:{}", + shard_id, + range_seqno + ))?; + + *diff.max_message() + } else { + QueueKey::max_for_lt(self.prev_state_gen_lt) + } } else { - end_lt + QueueKey::max_for_lt(end_lt) }; - let shard_range_to = QueueKey::max_for_lt(to_lt); if shard_range_from != shard_range_to { fully_read = false; @@ -432,10 +488,6 @@ impl InternalsPartitionReader { }); } - let processed_offset = last_range_reader_shards_and_offset_opt - .map(|(_, processed_offset)| processed_offset) - .unwrap_or_default(); - let mut range_reader_state = InternalsRangeReaderState { buffer: Default::default(), @@ -453,7 +505,7 @@ impl InternalsPartitionReader { let reader = InternalsRangeReader { partition_id: self.partition_id, for_shard_id: self.for_shard_id, - seqno, + seqno: range_seqno, kind: InternalsRangeReaderKind::Next, buffer_limits: self.target_limits, reader_state: range_reader_state, @@ -628,19 +680,19 @@ impl InternalsPartitionReader { // if open ranges limit not reached if !self.open_ranges_limit_reached() { if read_mode == GetNextMessageGroupMode::Continue { - let last_range_reader_shards_and_offset_opt = self + let last_range_reader_info_opt = self .get_last_range_reader() .map(|(_, reader)| { Some(( reader.reader_state.shards.clone(), reader.reader_state.processed_offset, + reader.seqno, )) }) .unwrap_or_default(); - self.create_append_next_range_reader( - last_range_reader_shards_and_offset_opt, - )?; - ranges_seqno.push_back(self.block_seqno); + let range_seqno = + self.create_append_next_range_reader(last_range_reader_info_opt)?; + ranges_seqno.push_back(range_seqno); } else { // do not create next range reader on refill tracing::debug!(target: tracing_targets::COLLATOR, @@ -734,19 +786,20 @@ impl InternalsPartitionReader { // if last range is not from current block then create and check next range if last_seqno < self.block_seqno { - let last_range_reader_shards_and_offset_opt = self + let last_range_reader_info_opt = self .get_last_range_reader() .map(|(_, reader)| { Some(( reader.reader_state.shards.clone(), reader.reader_state.processed_offset, + reader.seqno, )) }) .unwrap_or_default(); - let mut range_reader = self.create_next_internals_range_reader( - last_range_reader_shards_and_offset_opt, - self.block_seqno, - )?; + // we should look thru the whole range to check for pending messages + // so we do not pass `range_max_blocks` to force use the prev block end lt + let mut range_reader = + self.create_next_internals_range_reader(last_range_reader_info_opt, None, None)?; if !range_reader.fully_read { range_reader.init()?; diff --git a/collator/src/internal_queue/queue.rs b/collator/src/internal_queue/queue.rs index 7cbad29fe..833c615b8 100644 --- a/collator/src/internal_queue/queue.rs +++ b/collator/src/internal_queue/queue.rs @@ -106,6 +106,8 @@ where ) -> Result; /// Get diffs for the given blocks from committed and uncommitted state fn get_diffs(&self, blocks: FastHashMap) -> Vec<(ShardIdent, ShortQueueDiff)>; + /// Get diff for the given blocks from committed and uncommitted state + fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option; } // IMPLEMENTATION @@ -485,4 +487,20 @@ where result } + + fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option { + if let Some(shard_diffs) = self.uncommitted_diffs.get(&shard_ident) { + if let Some(diff) = shard_diffs.get(&seqno) { + return Some(diff.clone()); + } + } + + if let Some(shard_diffs) = self.committed_diffs.get(&shard_ident) { + if let Some(diff) = shard_diffs.get(&seqno) { + return Some(diff.clone()); + } + } + + None + } } diff --git a/collator/src/internal_queue/types.rs b/collator/src/internal_queue/types.rs index 226241dd8..4443f3749 100644 --- a/collator/src/internal_queue/types.rs +++ b/collator/src/internal_queue/types.rs @@ -394,6 +394,14 @@ impl DiffStatistics { pub fn partition(&self, partition: QueuePartitionIdx) -> Option<&FastHashMap> { self.inner.statistics.get(&partition) } + + pub fn get_messages_amount_by_shard(&self, shard_ident: &ShardIdent) -> u64 { + self.inner + .shards_messages + .get(shard_ident) + .copied() + .unwrap_or_default() + } } #[derive(Debug, Clone)] struct DiffStatisticsInner { @@ -401,6 +409,7 @@ struct DiffStatisticsInner { min_message: QueueKey, max_message: QueueKey, statistics: FastHashMap>, + shards_messages: FastHashMap, } impl From<(&QueueDiffWithMessages, ShardIdent)> for DiffStatistics { @@ -408,6 +417,7 @@ impl From<(&QueueDiffWithMessages, ShardIdent)> for let (diff, shard_ident) = value; let min_message = diff.messages.keys().next().cloned().unwrap_or_default(); let max_message = diff.messages.keys().last().cloned().unwrap_or_default(); + let mut shards_messages = FastHashMap::default(); let mut statistics = FastHashMap::default(); @@ -423,6 +433,18 @@ impl From<(&QueueDiffWithMessages, ShardIdent)> for .or_insert(FastHashMap::default()) .entry(destination.clone()) .or_insert(0) += 1; + + // TODO after split/merge implementation we should use detailed counter for 256 shards + let dest_shard = if message.destination().is_masterchain() { + ShardIdent::MASTERCHAIN + } else { + ShardIdent::new_full(0) + }; + + shards_messages + .entry(dest_shard) + .and_modify(|count| *count += 1) + .or_insert(1); } Self { @@ -431,6 +453,7 @@ impl From<(&QueueDiffWithMessages, ShardIdent)> for min_message, max_message, statistics, + shards_messages, }), } } @@ -440,8 +463,6 @@ impl From<(&QueueDiffWithMessages, ShardIdent)> for mod tests { use std::collections::{BTreeMap, BTreeSet}; - use tycho_util::FastHashSet; - use super::*; #[test] diff --git a/collator/src/queue_adapter.rs b/collator/src/queue_adapter.rs index bc8994aa8..1787bfb9d 100644 --- a/collator/src/queue_adapter.rs +++ b/collator/src/queue_adapter.rs @@ -74,7 +74,8 @@ where fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>; /// Get diffs for the given blocks from committed and uncommitted state fn get_diffs(&self, blocks: FastHashMap) -> Vec<(ShardIdent, ShortQueueDiff)>; - + /// Get diff for the given block from committed and uncommitted state + fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option; /// Returns the number of diffs in cache for the given shard fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize; } @@ -209,6 +210,10 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI self.queue.get_diffs(blocks) } + fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option { + self.queue.get_diff(shard_ident, seqno) + } + fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize { self.queue.get_diffs_count_by_shard(shard_ident) } diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index 8d8b8e273..ed94b590a 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -174,7 +174,7 @@ fn test_statistics_check_statistics( assert_eq!(*addr_1_stat, 10000); assert_eq!(*addr_2_stat, 5000); - // check second diff + // check second diff, we have 0.,35000 lt in low partition let statistics_low_priority_partition = queue.load_statistics(1, &[QueueShardRange { shard_ident: ShardIdent::new_full(0), from: QueueKey {