Skip to content

Commit

Permalink
refactor(collator): fix build and simplify tuple args
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Feb 10, 2025
1 parent 07dd245 commit 6de3b2c
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 52 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ tycho-storage = { path = "./storage", version = "0.2.2" }
tycho-util = { path = "./util", version = "0.2.2" }

[patch.crates-io]
everscale-types = { git = "https://github.com/broxus/everscale-types.git", rev = "c9d1617c61701efff59d130ebf8cc4032475c365" }
everscale-types = { git = "https://github.com/broxus/everscale-types.git", rev = "adada6e6539abd798dbbc5a7fc1d17cc31fa7eff" }

[workspace.lints.rust]
future_incompatible = "warn"
Expand Down
72 changes: 37 additions & 35 deletions collator/src/collator/messages_reader/internals_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,7 @@ impl InternalsPartitionReader {

fn create_append_next_range_reader(
&mut self,
last_range_reader_info_opt: Option<(
BTreeMap<ShardIdent, ShardReaderState>,
u32,
BlockSeqno,
)>,
last_range_reader_info_opt: Option<InternalsRangeReaderInfo>,
) -> Result<BlockSeqno> {
let range_max_messages = if self.msgs_exec_params.range_messages_limit == 0 {
10_000
Expand Down Expand Up @@ -392,11 +388,7 @@ impl InternalsPartitionReader {
#[tracing::instrument(skip_all)]
fn create_next_internals_range_reader(
&self,
last_range_reader_info_opt: Option<(
BTreeMap<ShardIdent, ShardReaderState>,
u32,
BlockSeqno,
)>,
last_range_reader_info_opt: Option<InternalsRangeReaderInfo>,
range_max_messages: Option<u32>,
) -> Result<InternalsRangeReader> {
let mut shard_reader_states = BTreeMap::new();
Expand All @@ -409,8 +401,11 @@ impl InternalsPartitionReader {

let mut fully_read = true;

let (last_to_lts, processed_offset, last_range_block_seqno) =
last_range_reader_info_opt.unwrap_or_default();
let InternalsRangeReaderInfo {
last_to_lts,
processed_offset,
last_range_block_seqno,
} = last_range_reader_info_opt.unwrap_or_default();

let range_seqno = match range_max_messages {
None => self.block_seqno,
Expand All @@ -422,11 +417,13 @@ impl InternalsPartitionReader {
let diff = self
.mq_adapter
.get_diff(self.for_shard_id, current_block_seqno)
.ok_or(anyhow!(
"cannot get diff for block {}:{}",
self.for_shard_id,
current_block_seqno
))?;
.ok_or_else(|| {
anyhow!(
"cannot get diff for block {}:{}",
self.for_shard_id,
current_block_seqno
)
})?;

messages_count += diff
.statistics()
Expand All @@ -449,14 +446,12 @@ impl InternalsPartitionReader {

let shard_range_to = if shard_id == self.for_shard_id {
if range_seqno != self.block_seqno {
let diff = self
.mq_adapter
.get_diff(shard_id, range_seqno)
.ok_or(anyhow!(
"cannot get diff for block {}:{}",
shard_id,
range_seqno
))?;
let diff =
self.mq_adapter
.get_diff(shard_id, range_seqno)
.ok_or_else(|| {
anyhow!("cannot get diff for block {shard_id}:{range_seqno}")
})?;

*diff.max_message()
} else {
Expand Down Expand Up @@ -678,11 +673,11 @@ impl InternalsPartitionReader {
let last_range_reader_info_opt = self
.get_last_range_reader()
.map(|(_, reader)| {
Some((
reader.reader_state.shards.clone(),
reader.reader_state.processed_offset,
reader.seqno,
))
Some(InternalsRangeReaderInfo {
last_to_lts: reader.reader_state.shards.clone(),
processed_offset: reader.reader_state.processed_offset,
last_range_block_seqno: reader.seqno,
})
})
.unwrap_or_default();
let range_seqno =
Expand Down Expand Up @@ -784,11 +779,11 @@ impl InternalsPartitionReader {
let last_range_reader_info_opt = self
.get_last_range_reader()
.map(|(_, reader)| {
Some((
reader.reader_state.shards.clone(),
reader.reader_state.processed_offset,
reader.seqno,
))
Some(InternalsRangeReaderInfo {
last_to_lts: reader.reader_state.shards.clone(),
processed_offset: reader.reader_state.processed_offset,
last_range_block_seqno: reader.seqno,
})
})
.unwrap_or_default();
// we should look thru the whole range to check for pending messages
Expand Down Expand Up @@ -881,6 +876,13 @@ impl InternalsPartitionReader {
}
}

#[derive(Debug, Default)]
struct InternalsRangeReaderInfo {
last_to_lts: BTreeMap<ShardIdent, ShardReaderState>,
processed_offset: u32,
last_range_block_seqno: BlockSeqno,
}

#[derive(Default)]
pub(super) struct CollectInternalsResult {
pub metrics: MessagesReaderMetrics,
Expand Down
2 changes: 1 addition & 1 deletion collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1631,7 +1631,7 @@ where
hash_map::Entry::Vacant(entry) => {
let (subset, hash_short) = full_validators_set
.compute_mc_subset(current_session_seqno, collation_config.shuffle_mc_validators)
.ok_or(anyhow!(
.ok_or_else(|| anyhow!(
"Error calculating subset of validators for session (shard_id = {}, seqno = {})",
ShardIdent::MASTERCHAIN,
current_session_seqno,
Expand Down
4 changes: 2 additions & 2 deletions collator/src/mempool/impls/std_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod parser;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, bail, Result};
use anyhow::{bail, Context, Result};
use async_trait::async_trait;
use bytes::Bytes;
use everscale_crypto::ed25519::KeyPair;
Expand Down Expand Up @@ -89,7 +89,7 @@ impl MempoolAdapterStdImpl {
let last_state_update = config_guard
.state_update_ctx
.as_ref()
.ok_or(anyhow!("last state update context is not set"))?;
.context("last state update context is not set")?;
let mempool_config = config_guard.builder.build()?;

// TODO support config change; payload size is bound to mempool rounds
Expand Down
4 changes: 1 addition & 3 deletions consensus/src/effects/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,7 @@ impl MempoolStoreImpl for MempoolStorage {
.points_status
.iterator(IteratorMode::End)
.next()
.ok_or(anyhow::anyhow!(
"db is empty, at least last genesis must be provided"
))??;
.context("db is empty, at least last genesis must be provided")??;

let mut bytes = [0_u8; 4];
bytes.copy_from_slice(&last_key[..4]);
Expand Down
14 changes: 6 additions & 8 deletions consensus/src/engine/mempool_config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::num::NonZeroU16;
use std::sync::OnceLock;

use anyhow::{anyhow, ensure, Result};
use anyhow::{ensure, Context, Result};
use everscale_crypto::ed25519::{KeyPair, SecretKey};
use everscale_types::models::{ConsensusConfig, GenesisInfo};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -118,17 +118,15 @@ impl MempoolConfigBuilder {
let genesis_data = *self
.genesis_info
.as_ref()
.ok_or(anyhow!("mempool genesis data for config is not known"))?;
.context("mempool genesis data for config is not known")?;
let consensus_config = self
.consensus_config
.as_ref()
.ok_or(anyhow!("mempool consensus config is not known"))?
.clone();
.clone()
.context("mempool consensus config is not known")?;
let node_config = self
.node_config
.as_ref()
.ok_or(anyhow!("mempool node config is not known"))?
.clone();
.clone()
.context("mempool node config is not known")?;

let point_max_bytes = Point::max_byte_size(consensus_config.payload_batch_bytes as usize);

Expand Down

0 comments on commit 6de3b2c

Please sign in to comment.