Skip to content

Commit

Permalink
feature(collator): use messages count for set range distance
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Feb 4, 2025
1 parent f60822a commit 1a9e9ae
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 39 deletions.
123 changes: 88 additions & 35 deletions collator/src/collator/messages_reader/internals_reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use everscale_types::models::{IntAddr, MsgInfo, MsgsExecutionParams, ShardIdent};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};

Expand Down Expand Up @@ -360,37 +360,42 @@ impl InternalsPartitionReader {

fn create_append_next_range_reader(
&mut self,
last_range_reader_shards_and_offset_opt: Option<(
last_range_reader_info_opt: Option<(
BTreeMap<ShardIdent, ShardReaderState>,
u32,
BlockSeqno,
)>,
) -> Result<()> {
) -> Result<BlockSeqno> {
const RANGE_MAX_BLOCKS: u32 = 3;
const RANGE_MAX_MESSAGES: u32 = 100_000;
let reader = self.create_next_internals_range_reader(
last_range_reader_shards_and_offset_opt,
self.block_seqno,
last_range_reader_info_opt,
Some(RANGE_MAX_BLOCKS),
Some(RANGE_MAX_MESSAGES),
)?;
if self
.range_readers
.insert(self.block_seqno, reader)
.is_some()
{
let reader_seqno = reader.seqno;
// we should add created range reader using calculated reader seqno instead of current block seqno
// otherwise the next range will exeed the max blocks limit
if self.range_readers.insert(reader_seqno, reader).is_some() {
panic!(
"internals range reader should not already exist (for_shard_id: {}, seqno: {})",
self.for_shard_id, self.block_seqno,
)
};
self.all_ranges_fully_read = false;
Ok(())
Ok(reader_seqno)
}

#[tracing::instrument(skip_all)]
fn create_next_internals_range_reader(
&self,
last_range_reader_shards_and_offset_opt: Option<(
last_range_reader_info_opt: Option<(
BTreeMap<ShardIdent, ShardReaderState>,
u32,
BlockSeqno,
)>,
seqno: BlockSeqno,
_range_max_blocks: Option<u32>,
range_max_messages: Option<u32>,
) -> Result<InternalsRangeReader> {
let mut shard_reader_states = BTreeMap::new();

Expand All @@ -401,19 +406,70 @@ impl InternalsPartitionReader {
let mut ranges = Vec::with_capacity(1 + self.mc_top_shards_end_lts.len());

let mut fully_read = true;

let (last_to_lts, processed_offset, last_range_block_seqno) =
last_range_reader_info_opt.unwrap_or_default();

let range_seqno = match range_max_messages {
None => self.block_seqno,
Some(max_messages) => {
let mut current_block_seqno = last_range_block_seqno + 1;
let mut messages_count = 0;

while current_block_seqno < self.block_seqno {
let diff = self
.mq_adapter
.get_diff(self.for_shard_id, current_block_seqno)
.ok_or(anyhow!(
"cannot get diff for block {}:{}",
self.for_shard_id,
current_block_seqno
))?;

messages_count += diff
.statistics()
.get_messages_amount_by_shard(&self.for_shard_id);

if messages_count > max_messages as u64 {
break;
}

current_block_seqno += 1;
}
current_block_seqno
}
};

// let range_seqno = match range_max_blocks {
// Some(max) if self.block_seqno > last_range_block_seqno + max => {
// last_range_block_seqno + max
// }
// _ => self.block_seqno,
// };

for (shard_id, end_lt) in all_end_lts {
let last_to_lt_opt = last_range_reader_shards_and_offset_opt
.as_ref()
.and_then(|(s_r, _)| s_r.get(&shard_id).map(|s| s.to));
let last_to_lt_opt = last_to_lts.get(&shard_id).map(|s| s.to);
let shard_range_from =
last_to_lt_opt.map_or_else(|| QueueKey::min_for_lt(0), QueueKey::max_for_lt);

let to_lt = if shard_id == self.for_shard_id {
self.prev_state_gen_lt
let shard_range_to = if shard_id == self.for_shard_id {
if range_seqno != self.block_seqno {
let diff = self
.mq_adapter
.get_diff(shard_id, range_seqno)
.ok_or(anyhow!(
"cannot get diff for block {}:{}",
shard_id,
range_seqno
))?;

*diff.max_message()
} else {
QueueKey::max_for_lt(self.prev_state_gen_lt)
}
} else {
end_lt
QueueKey::max_for_lt(end_lt)
};
let shard_range_to = QueueKey::max_for_lt(to_lt);

if shard_range_from != shard_range_to {
fully_read = false;
Expand All @@ -432,10 +488,6 @@ impl InternalsPartitionReader {
});
}

let processed_offset = last_range_reader_shards_and_offset_opt
.map(|(_, processed_offset)| processed_offset)
.unwrap_or_default();

let mut range_reader_state = InternalsRangeReaderState {
buffer: Default::default(),

Expand All @@ -453,7 +505,7 @@ impl InternalsPartitionReader {
let reader = InternalsRangeReader {
partition_id: self.partition_id,
for_shard_id: self.for_shard_id,
seqno,
seqno: range_seqno,
kind: InternalsRangeReaderKind::Next,
buffer_limits: self.target_limits,
reader_state: range_reader_state,
Expand Down Expand Up @@ -628,19 +680,19 @@ impl InternalsPartitionReader {
// if open ranges limit not reached
if !self.open_ranges_limit_reached() {
if read_mode == GetNextMessageGroupMode::Continue {
let last_range_reader_shards_and_offset_opt = self
let last_range_reader_info_opt = self
.get_last_range_reader()
.map(|(_, reader)| {
Some((
reader.reader_state.shards.clone(),
reader.reader_state.processed_offset,
reader.seqno,
))
})
.unwrap_or_default();
self.create_append_next_range_reader(
last_range_reader_shards_and_offset_opt,
)?;
ranges_seqno.push_back(self.block_seqno);
let range_seqno =
self.create_append_next_range_reader(last_range_reader_info_opt)?;
ranges_seqno.push_back(range_seqno);
} else {
// do not create next range reader on refill
tracing::debug!(target: tracing_targets::COLLATOR,
Expand Down Expand Up @@ -734,19 +786,20 @@ impl InternalsPartitionReader {

// if last range is not from current block then create and check next range
if last_seqno < self.block_seqno {
let last_range_reader_shards_and_offset_opt = self
let last_range_reader_info_opt = self
.get_last_range_reader()
.map(|(_, reader)| {
Some((
reader.reader_state.shards.clone(),
reader.reader_state.processed_offset,
reader.seqno,
))
})
.unwrap_or_default();
let mut range_reader = self.create_next_internals_range_reader(
last_range_reader_shards_and_offset_opt,
self.block_seqno,
)?;
// we should look thru the whole range to check for pending messages
// so we do not pass `range_max_blocks` to force use the prev block end lt
let mut range_reader =
self.create_next_internals_range_reader(last_range_reader_info_opt, None, None)?;
if !range_reader.fully_read {
range_reader.init()?;

Expand Down
18 changes: 18 additions & 0 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ where
) -> Result<QueueStatistics>;
/// Get diffs for the given blocks from committed and uncommitted state
fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)>;
/// Get diff for the given blocks from committed and uncommitted state
fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff>;
}

// IMPLEMENTATION
Expand Down Expand Up @@ -485,4 +487,20 @@ where

result
}

fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff> {
if let Some(shard_diffs) = self.uncommitted_diffs.get(&shard_ident) {
if let Some(diff) = shard_diffs.get(&seqno) {
return Some(diff.clone());
}
}

if let Some(shard_diffs) = self.committed_diffs.get(&shard_ident) {
if let Some(diff) = shard_diffs.get(&seqno) {
return Some(diff.clone());
}
}

None
}
}
25 changes: 23 additions & 2 deletions collator/src/internal_queue/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,20 +394,30 @@ impl DiffStatistics {
pub fn partition(&self, partition: QueuePartitionIdx) -> Option<&FastHashMap<IntAddr, u64>> {
self.inner.statistics.get(&partition)
}

pub fn get_messages_amount_by_shard(&self, shard_ident: &ShardIdent) -> u64 {
self.inner
.shards_messages
.get(shard_ident)
.copied()
.unwrap_or_default()
}
}
#[derive(Debug, Clone)]
struct DiffStatisticsInner {
shard_ident: ShardIdent,
min_message: QueueKey,
max_message: QueueKey,
statistics: FastHashMap<QueuePartitionIdx, FastHashMap<IntAddr, u64>>,
shards_messages: FastHashMap<ShardIdent, u64>,
}

impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for DiffStatistics {
fn from(value: (&QueueDiffWithMessages<V>, ShardIdent)) -> Self {
let (diff, shard_ident) = value;
let min_message = diff.messages.keys().next().cloned().unwrap_or_default();
let max_message = diff.messages.keys().last().cloned().unwrap_or_default();
let mut shards_messages = FastHashMap::default();

let mut statistics = FastHashMap::default();

Expand All @@ -423,6 +433,18 @@ impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for
.or_insert(FastHashMap::default())
.entry(destination.clone())
.or_insert(0) += 1;

// TODO after split/merge implementation we should use detailed counter for 256 shards
let dest_shard = if message.destination().is_masterchain() {
ShardIdent::MASTERCHAIN
} else {
ShardIdent::new_full(0)
};

shards_messages
.entry(dest_shard)
.and_modify(|count| *count += 1)
.or_insert(1);
}

Self {
Expand All @@ -431,6 +453,7 @@ impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for
min_message,
max_message,
statistics,
shards_messages,
}),
}
}
Expand All @@ -440,8 +463,6 @@ impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for
mod tests {
use std::collections::{BTreeMap, BTreeSet};

use tycho_util::FastHashSet;

use super::*;

#[test]
Expand Down
7 changes: 6 additions & 1 deletion collator/src/queue_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ where
fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>;
/// Get diffs for the given blocks from committed and uncommitted state
fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)>;

/// Get diff for the given block from committed and uncommitted state
fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff>;
/// Returns the number of diffs in cache for the given shard
fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize;
}
Expand Down Expand Up @@ -209,6 +210,10 @@ impl<V: InternalMessageValue> MessageQueueAdapter<V> for MessageQueueAdapterStdI
self.queue.get_diffs(blocks)
}

fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff> {
self.queue.get_diff(shard_ident, seqno)
}

fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize {
self.queue.get_diffs_count_by_shard(shard_ident)
}
Expand Down
2 changes: 1 addition & 1 deletion collator/tests/internal_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ fn test_statistics_check_statistics(
assert_eq!(*addr_1_stat, 10000);
assert_eq!(*addr_2_stat, 5000);

// check second diff
// check second diff, we have 0.,35000 lt in low partition
let statistics_low_priority_partition = queue.load_statistics(1, &[QueueShardRange {
shard_ident: ShardIdent::new_full(0),
from: QueueKey {
Expand Down

0 comments on commit 1a9e9ae

Please sign in to comment.