Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/collator large ranges #578

Merged
merged 5 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/src/cmd/tools/gen_zerostate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ fn make_default_params() -> Result<BlockchainConfigParams> {
par_0_int_msgs_count_limit: 50_000,
par_0_ext_msgs_count_limit: 5_000,
group_slots_fractions,
range_messages_limit: 0,
range_messages_limit: 10_000,
},

wu_used_to_import_next_anchor: 1_200_000_000,
Expand Down
148 changes: 99 additions & 49 deletions collator/src/collator/messages_reader/internals_reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use everscale_types::models::{IntAddr, MsgInfo, MsgsExecutionParams, ShardIdent};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};

Expand Down Expand Up @@ -360,37 +360,36 @@ impl InternalsPartitionReader {

fn create_append_next_range_reader(
&mut self,
last_range_reader_shards_and_offset_opt: Option<(
BTreeMap<ShardIdent, ShardReaderState>,
u32,
)>,
) -> Result<()> {
last_range_reader_info_opt: Option<InternalsRangeReaderInfo>,
) -> Result<BlockSeqno> {
let range_max_messages = if self.msgs_exec_params.range_messages_limit == 0 {
10_000
} else {
self.msgs_exec_params.range_messages_limit
};

let reader = self.create_next_internals_range_reader(
last_range_reader_shards_and_offset_opt,
self.block_seqno,
last_range_reader_info_opt,
Some(range_max_messages),
)?;
if self
.range_readers
.insert(self.block_seqno, reader)
.is_some()
{
let reader_seqno = reader.seqno;
// we should add created range reader using calculated reader seqno instead of current block seqno
// otherwise the next range will exeed the max blocks limit
if self.range_readers.insert(reader_seqno, reader).is_some() {
panic!(
"internals range reader should not already exist (for_shard_id: {}, seqno: {})",
self.for_shard_id, self.block_seqno,
)
};
self.all_ranges_fully_read = false;
Ok(())
Ok(reader_seqno)
}

#[tracing::instrument(skip_all)]
fn create_next_internals_range_reader(
&self,
last_range_reader_shards_and_offset_opt: Option<(
BTreeMap<ShardIdent, ShardReaderState>,
u32,
)>,
seqno: BlockSeqno,
last_range_reader_info_opt: Option<InternalsRangeReaderInfo>,
range_max_messages: Option<u32>,
) -> Result<InternalsRangeReader> {
let mut shard_reader_states = BTreeMap::new();

Expand All @@ -401,19 +400,66 @@ impl InternalsPartitionReader {
let mut ranges = Vec::with_capacity(1 + self.mc_top_shards_end_lts.len());

let mut fully_read = true;

let InternalsRangeReaderInfo {
last_to_lts,
processed_offset,
last_range_block_seqno,
} = last_range_reader_info_opt.unwrap_or_default();

let range_seqno = match range_max_messages {
None => self.block_seqno,
Some(max_messages) => {
let mut current_block_seqno = last_range_block_seqno + 1;
let mut messages_count = 0;

while current_block_seqno < self.block_seqno {
let diff = self
.mq_adapter
.get_diff(self.for_shard_id, current_block_seqno)
.ok_or_else(|| {
anyhow!(
"cannot get diff for block {}:{}",
self.for_shard_id,
current_block_seqno
)
})?;

messages_count += diff
.statistics()
.get_messages_count_by_shard(&self.for_shard_id);

if messages_count > max_messages as u64 {
break;
}

current_block_seqno += 1;
}
current_block_seqno
}
};

for (shard_id, end_lt) in all_end_lts {
let last_to_lt_opt = last_range_reader_shards_and_offset_opt
.as_ref()
.and_then(|(s_r, _)| s_r.get(&shard_id).map(|s| s.to));
let last_to_lt_opt = last_to_lts.get(&shard_id).map(|s| s.to);
let shard_range_from =
last_to_lt_opt.map_or_else(|| QueueKey::min_for_lt(0), QueueKey::max_for_lt);

let to_lt = if shard_id == self.for_shard_id {
self.prev_state_gen_lt
let shard_range_to = if shard_id == self.for_shard_id {
if range_seqno != self.block_seqno {
let diff =
self.mq_adapter
.get_diff(shard_id, range_seqno)
.ok_or_else(|| {
anyhow!("cannot get diff for block {shard_id}:{range_seqno}")
})?;

*diff.max_message()
} else {
QueueKey::max_for_lt(self.prev_state_gen_lt)
}
} else {
end_lt
QueueKey::max_for_lt(end_lt)
};
let shard_range_to = QueueKey::max_for_lt(to_lt);

if shard_range_from != shard_range_to {
fully_read = false;
Expand All @@ -432,10 +478,6 @@ impl InternalsPartitionReader {
});
}

let processed_offset = last_range_reader_shards_and_offset_opt
.map(|(_, processed_offset)| processed_offset)
.unwrap_or_default();

let mut range_reader_state = InternalsRangeReaderState {
buffer: Default::default(),

Expand All @@ -453,7 +495,7 @@ impl InternalsPartitionReader {
let reader = InternalsRangeReader {
partition_id: self.partition_id,
for_shard_id: self.for_shard_id,
seqno,
seqno: range_seqno,
kind: InternalsRangeReaderKind::Next,
buffer_limits: self.target_limits,
reader_state: range_reader_state,
Expand Down Expand Up @@ -628,19 +670,19 @@ impl InternalsPartitionReader {
// if open ranges limit not reached
if !self.open_ranges_limit_reached() {
if read_mode == GetNextMessageGroupMode::Continue {
let last_range_reader_shards_and_offset_opt = self
let last_range_reader_info_opt = self
.get_last_range_reader()
.map(|(_, reader)| {
Some((
reader.reader_state.shards.clone(),
reader.reader_state.processed_offset,
))
Some(InternalsRangeReaderInfo {
last_to_lts: reader.reader_state.shards.clone(),
processed_offset: reader.reader_state.processed_offset,
last_range_block_seqno: reader.seqno,
})
})
.unwrap_or_default();
self.create_append_next_range_reader(
last_range_reader_shards_and_offset_opt,
)?;
ranges_seqno.push_back(self.block_seqno);
let range_seqno =
self.create_append_next_range_reader(last_range_reader_info_opt)?;
ranges_seqno.push_back(range_seqno);
} else {
// do not create next range reader on refill
tracing::debug!(target: tracing_targets::COLLATOR,
Expand Down Expand Up @@ -734,19 +776,20 @@ impl InternalsPartitionReader {

// if last range is not from current block then create and check next range
if last_seqno < self.block_seqno {
let last_range_reader_shards_and_offset_opt = self
let last_range_reader_info_opt = self
.get_last_range_reader()
.map(|(_, reader)| {
Some((
reader.reader_state.shards.clone(),
reader.reader_state.processed_offset,
))
Some(InternalsRangeReaderInfo {
last_to_lts: reader.reader_state.shards.clone(),
processed_offset: reader.reader_state.processed_offset,
last_range_block_seqno: reader.seqno,
})
})
.unwrap_or_default();
let mut range_reader = self.create_next_internals_range_reader(
last_range_reader_shards_and_offset_opt,
self.block_seqno,
)?;
// we should look thru the whole range to check for pending messages
// so we do not pass `range_max_messages` to force use the prev block end lt
let mut range_reader =
self.create_next_internals_range_reader(last_range_reader_info_opt, None)?;
if !range_reader.fully_read {
range_reader.init()?;

Expand Down Expand Up @@ -833,6 +876,13 @@ impl InternalsPartitionReader {
}
}

#[derive(Debug, Default)]
struct InternalsRangeReaderInfo {
last_to_lts: BTreeMap<ShardIdent, ShardReaderState>,
processed_offset: u32,
last_range_block_seqno: BlockSeqno,
}

#[derive(Default)]
pub(super) struct CollectInternalsResult {
pub metrics: MessagesReaderMetrics,
Expand Down
10 changes: 8 additions & 2 deletions collator/src/collator/messages_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,13 +440,19 @@ impl MessagesReader {

// metrics: accounts count in isolated partitions
{
for (par_id, count) in queue_diff_with_msgs.partition_router.partitions_stats() {
let partitions_stats = queue_diff_with_msgs.partition_router.partitions_stats();
for par_id in self
.internals_partition_readers
.keys()
.filter(|&&par_id| par_id > 0)
{
let count = partitions_stats.get(par_id).copied().unwrap_or_default();
let labels = [
("workchain", self.for_shard_id.workchain().to_string()),
("par_id", par_id.to_string()),
];
metrics::gauge!("tycho_do_collate_accounts_count_in_partitions", &labels)
.set(*count as f64);
.set(count as f64);
}
}

Expand Down
30 changes: 30 additions & 0 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ where
) -> Result<QueueStatistics>;
/// Get diffs for the given blocks from committed and uncommitted state
fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)>;
/// Get diff for the given blocks from committed and uncommitted state
fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff>;
/// Check if diff exists in the cache
fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool;
}

// IMPLEMENTATION
Expand Down Expand Up @@ -485,4 +489,30 @@ where

result
}

fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff> {
if let Some(shard_diffs) = self.uncommitted_diffs.get(&shard_ident) {
if let Some(diff) = shard_diffs.get(&seqno) {
return Some(diff.clone());
}
}

if let Some(shard_diffs) = self.committed_diffs.get(&shard_ident) {
if let Some(diff) = shard_diffs.get(&seqno) {
return Some(diff.clone());
}
}

None
}

fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool {
self.uncommitted_diffs
.get(&block_id_short.shard)
.is_some_and(|diffs| diffs.contains_key(&block_id_short.seqno))
|| self
.committed_diffs
.get(&block_id_short.shard)
.is_some_and(|diffs| diffs.contains_key(&block_id_short.seqno))
}
}
24 changes: 23 additions & 1 deletion collator/src/internal_queue/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,20 +394,30 @@ impl DiffStatistics {
pub fn partition(&self, partition: QueuePartitionIdx) -> Option<&FastHashMap<IntAddr, u64>> {
self.inner.statistics.get(&partition)
}

pub fn get_messages_count_by_shard(&self, shard_ident: &ShardIdent) -> u64 {
self.inner
.shards_messages_count
.get(shard_ident)
.copied()
.unwrap_or_default()
}
}
#[derive(Debug, Clone)]
struct DiffStatisticsInner {
shard_ident: ShardIdent,
min_message: QueueKey,
max_message: QueueKey,
statistics: FastHashMap<QueuePartitionIdx, FastHashMap<IntAddr, u64>>,
shards_messages_count: FastHashMap<ShardIdent, u64>,
}

impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for DiffStatistics {
fn from(value: (&QueueDiffWithMessages<V>, ShardIdent)) -> Self {
let (diff, shard_ident) = value;
let min_message = diff.messages.keys().next().cloned().unwrap_or_default();
let max_message = diff.messages.keys().last().cloned().unwrap_or_default();
let mut shards_messages_count = FastHashMap::default();

let mut statistics = FastHashMap::default();

Expand All @@ -423,6 +433,18 @@ impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for
.or_insert(FastHashMap::default())
.entry(destination.clone())
.or_insert(0) += 1;

// TODO after split/merge implementation we should use detailed counter for 256 shards
let dest_shard = if message.destination().is_masterchain() {
ShardIdent::MASTERCHAIN
} else {
ShardIdent::new_full(0)
};

shards_messages_count
.entry(dest_shard)
.and_modify(|count| *count += 1)
.or_insert(1);
}

Self {
Expand All @@ -431,6 +453,7 @@ impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for
min_message,
max_message,
statistics,
shards_messages_count,
}),
}
}
Expand All @@ -443,7 +466,6 @@ mod tests {
use tycho_util::FastHashSet;

use super::*;

#[test]
fn test_partition_router_from_btreemap() {
let addr1 = RouterAddr {
Expand Down
Loading