Skip to content

Commit

Permalink
fix(collator) large ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Jan 30, 2025
1 parent 5084f30 commit dfff5ee
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 23 deletions.
61 changes: 39 additions & 22 deletions collator/src/collator/messages_reader/internals_reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use everscale_types::models::{IntAddr, MsgInfo, MsgsExecutionParams, ShardIdent};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};

Expand Down Expand Up @@ -364,12 +364,11 @@ impl InternalsParitionReader {
last_range_reader_shards_and_offset_opt: Option<(
BTreeMap<ShardIdent, ShardReaderState>,
u32,
BlockSeqno,
)>,
) -> Result<()> {
let reader = self.create_next_internals_range_reader(
last_range_reader_shards_and_offset_opt,
self.block_seqno,
)?;
let reader =
self.create_next_internals_range_reader(last_range_reader_shards_and_offset_opt)?;
if self
.range_readers
.insert(self.block_seqno, reader)
Expand All @@ -390,31 +389,53 @@ impl InternalsParitionReader {
last_range_reader_shards_and_offset_opt: Option<(
BTreeMap<ShardIdent, ShardReaderState>,
u32,
BlockSeqno,
)>,
seqno: BlockSeqno,
) -> Result<InternalsRangeReader> {
let mut shard_reader_states = BTreeMap::new();

const MAX_RANGE_BLOCKS: u32 = 10;

let all_end_lts = [(ShardIdent::MASTERCHAIN, self.mc_state_gen_lt)]
.into_iter()
.chain(self.mc_top_shards_end_lts.iter().cloned());

let mut ranges = Vec::with_capacity(1 + self.mc_top_shards_end_lts.len());

let mut fully_read = true;

let (last_to_lt_opt, processed_offset, last_range_block_seqno) =
last_range_reader_shards_and_offset_opt.unwrap_or_default();

let range_seqno = if self.block_seqno > last_range_block_seqno + MAX_RANGE_BLOCKS {
last_range_block_seqno + MAX_RANGE_BLOCKS
} else {
self.block_seqno
};

for (shard_id, end_lt) in all_end_lts {
let last_to_lt_opt = last_range_reader_shards_and_offset_opt
.as_ref()
.and_then(|(s_r, _)| s_r.get(&shard_id).map(|s| s.to));
let last_to_lt_opt = last_to_lt_opt.get(&shard_id).map(|s| s.to);
let shard_range_from =
last_to_lt_opt.map_or_else(|| QueueKey::min_for_lt(0), QueueKey::max_for_lt);

let to_lt = if shard_id == self.for_shard_id {
self.prev_state_gen_lt
let shard_range_to = if shard_id == self.for_shard_id {
if range_seqno != self.block_seqno {
let diff = self
.mq_adapter
.get_diff(shard_id, range_seqno)
.ok_or(anyhow!(
"cannot get diff for block {}:{}",
shard_id,
range_seqno
))?;

*diff.max_message()
} else {
QueueKey::max_for_lt(self.prev_state_gen_lt)
}
} else {
end_lt
QueueKey::max_for_lt(end_lt)
};
let shard_range_to = QueueKey::max_for_lt(to_lt);

if shard_range_from != shard_range_to {
fully_read = false;
Expand All @@ -433,10 +454,6 @@ impl InternalsParitionReader {
});
}

let processed_offset = last_range_reader_shards_and_offset_opt
.map(|(_, processed_offset)| processed_offset)
.unwrap_or_default();

let mut range_reader_state = InternalsRangeReaderState {
buffer: Default::default(),

Expand All @@ -454,7 +471,7 @@ impl InternalsParitionReader {
let reader = InternalsRangeReader {
partition_id: self.partition_id,
for_shard_id: self.for_shard_id,
seqno,
seqno: range_seqno,
kind: InternalsRangeReaderKind::Next,
buffer_limits: self.target_limits,
reader_state: range_reader_state,
Expand Down Expand Up @@ -635,6 +652,7 @@ impl InternalsParitionReader {
Some((
reader.reader_state.shards.clone(),
reader.reader_state.processed_offset,
reader.seqno,
))
})
.unwrap_or_default();
Expand Down Expand Up @@ -741,13 +759,12 @@ impl InternalsParitionReader {
Some((
reader.reader_state.shards.clone(),
reader.reader_state.processed_offset,
reader.seqno,
))
})
.unwrap_or_default();
let mut range_reader = self.create_next_internals_range_reader(
last_range_reader_shards_and_offset_opt,
self.block_seqno,
)?;
let mut range_reader =
self.create_next_internals_range_reader(last_range_reader_shards_and_offset_opt)?;
if !range_reader.fully_read {
range_reader.init()?;

Expand Down
18 changes: 18 additions & 0 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ where
) -> Result<QueueStatistics>;
/// Get diffs for the given blocks from committed and uncommitted state
fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)>;
/// Get diff for the given blocks from committed and uncommitted state
fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff>;
}

// IMPLEMENTATION
Expand Down Expand Up @@ -485,4 +487,20 @@ where

result
}

fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff> {
if let Some(shard_diffs) = self.uncommitted_diffs.get(&shard_ident) {
if let Some(diff) = shard_diffs.get(&seqno) {
return Some(diff.clone());
}
}

if let Some(shard_diffs) = self.committed_diffs.get(&shard_ident) {
if let Some(diff) = shard_diffs.get(&seqno) {
return Some(diff.clone());
}
}

None
}
}
7 changes: 6 additions & 1 deletion collator/src/queue_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ where
fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>;
/// Get diffs for the given blocks from committed and uncommitted state
fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)>;

/// Get diff for the given block from committed and uncommitted state
fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff>;
/// Returns the number of diffs in cache for the given shard
fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize;
}
Expand Down Expand Up @@ -209,6 +210,10 @@ impl<V: InternalMessageValue> MessageQueueAdapter<V> for MessageQueueAdapterStdI
self.queue.get_diffs(blocks)
}

fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff> {
self.queue.get_diff(shard_ident, seqno)
}

fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize {
self.queue.get_diffs_count_by_shard(shard_ident)
}
Expand Down

0 comments on commit dfff5ee

Please sign in to comment.