Skip to content

Commit

Permalink
Various block sync fixes (#62)
Browse files Browse the repository at this point in the history
* Check current syncing state before blocks-first sync attempt

* Display syncing progress percentage

* Use blocks-first directly when the chain is higher than the last checkpoint

* Check max get blocks size separately

* Only process the messages from current sync peer

* Adjust the block data request size based on the current height

* Increate stall timeout

* Set syncing state to idle if synced to the majority of peers

* Extract `truncate_and_prepare_block_data_request()`

* Switch to idle when blocks-first is complete

Handle the new blocks broadcasted via inv message.

* Fix clippy

* Nit

* Update download state on block sync completion

* Reduce one needless allocation
  • Loading branch information
liuchengxu authored Oct 13, 2024
1 parent ceb96c9 commit 2710198
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 86 deletions.
32 changes: 13 additions & 19 deletions crates/pallet-bitcoin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,25 +205,19 @@ impl<T: Config> Pallet<T> {

let height = frame_system::Pallet::<T>::current_block_number();

let new_coins = tx
.output
.into_iter()
.enumerate()
.map(|(index, txout)| {
let out_point = OutPoint {
txid,
vout: index as u32,
};
let coin = Coin {
is_coinbase,
amount: txout.value.to_sat(),
script_pubkey: txout.script_pubkey.into_bytes(),
height: height.saturated_into(),
};

(out_point, coin)
})
.collect::<Vec<_>>();
let new_coins = tx.output.into_iter().enumerate().map(|(index, txout)| {
let out_point = OutPoint {
txid,
vout: index as u32,
};
let coin = Coin {
is_coinbase,
amount: txout.value.to_sat(),
script_pubkey: txout.script_pubkey.into_bytes(),
height: height.saturated_into(),
};
(out_point, coin)
});

if is_coinbase {
for (out_point, coin) in new_coins {
Expand Down
27 changes: 17 additions & 10 deletions crates/subcoin-informant/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use bitcoin::hashes::Hash;
use bitcoin::BlockHash;
use console::style;
use sp_runtime::traits::{Block as BlockT, CheckedDiv, NumberFor, Saturating, Zero};
use sp_runtime::SaturatedConversion;
use std::fmt::{self, Display};
use std::time::Instant;
use subcoin_network::{NetworkStatus, SyncStatus};
Expand Down Expand Up @@ -106,16 +107,22 @@ impl<B: BlockT> InformantDisplay<B> {

let (level, status, target) = match net_status.sync_status {
SyncStatus::Idle => ("💤", "Idle".into(), "".into()),
SyncStatus::Downloading { target, .. } => (
"⚙️ ",
format!("Syncing{speed}"),
format!(", target=#{target}"),
),
SyncStatus::Importing { target, .. } => (
"⚙️ ",
format!("Preparing{speed}"),
format!(", target=#{target}"),
),
SyncStatus::Downloading { target, .. } => {
let progress = best_number.saturated_into::<u32>() as f64 * 100.0 / target as f64;
(
"⚙️ ",
format!("Syncing{speed}"),
format!(", target=#{target} ({progress:.2}%)"),
)
}
SyncStatus::Importing { target, .. } => {
let progress = best_number.saturated_into::<u32>() as f64 * 100.0 / target as f64;
(
"⚙️ ",
format!("Preparing{speed}"),
format!(", target=#{target} ({progress:.2}%)"),
)
}
};

let finalized_hash = info.chain.finalized_hash;
Expand Down
25 changes: 18 additions & 7 deletions crates/subcoin-network/src/block_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,25 @@ impl BlockDownloadManager {
}
}

// Determine if the downloader is stalled based on the time elapsed since the last progress.
/// Determine if the downloader is stalled based on the time elapsed since the last progress
/// update.
///
/// The stall detection is influenced by the size of the blockchain and the time since the last
/// successful block processing. As the chain grows, the following factors contribute to the need
/// for an extended timeout:
///
/// - **Increased Local Block Execution Time**: Larger chain state lead to longer execution times
/// when processing blocks (primarily due to the state root computation).
/// - **Higher Average Block Size**: As the blockchain grows, the average size of blocks typically
/// increases, resulting in longer network response times for block retrieval.
///
/// The timeout values are configurated arbitrarily.
fn is_stalled(&self) -> bool {
// The timeout (in seconds) is extended when the chain exceeds a certain size, as block
// execution times increase significantly with chain growth.
let stall_timeout = if self.best_queued_number > 300_000 {
120 // Extended timeout for larger chains
} else {
60 // Standard timeout for smaller chains
let stall_timeout = match self.best_queued_number {
0..300_000 => 60, // Standard timeout, 1 minute
300_000..600_000 => 120, // Extended timeout, 2 minutes
600_000..800_000 => 180, // Extended timeout, 3 minutes
_ => 300,
};

self.last_progress_time.elapsed().as_secs() > stall_timeout
Expand Down
96 changes: 80 additions & 16 deletions crates/subcoin-network/src/block_downloader/blocks_first.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub struct BlocksFirstDownloader<Block, Client> {
download_state: DownloadState,
download_manager: BlockDownloadManager,
last_locator_start: u32,
pending_block_requests: Vec<Inventory>,
/// Number of blocks' data requested but not yet received.
requested_blocks_count: usize,
_phantom: PhantomData<Block>,
}

Expand All @@ -64,6 +67,8 @@ where
download_state: DownloadState::Idle,
download_manager: BlockDownloadManager::new(),
last_locator_start: 0u32,
pending_block_requests: Vec::new(),
requested_blocks_count: 0,
_phantom: Default::default(),
};

Expand Down Expand Up @@ -115,6 +120,16 @@ where
}
}

if !self.pending_block_requests.is_empty() && self.requested_blocks_count == 0 {
let block_data_request = std::mem::take(&mut self.pending_block_requests);
return self.truncate_and_prepare_block_data_request(block_data_request);
}

if self.client.best_number() == self.target_block_number {
self.download_state = DownloadState::Completed;
return SyncAction::SwitchToIdle;
}

if self.download_manager.is_stalled() {
return SyncAction::RestartSyncWithStalledPeer(self.peer_id);
}
Expand All @@ -128,32 +143,39 @@ where
self.last_locator_start = 0u32;
self.download_manager.reset();
self.download_state = DownloadState::Restarting;
self.pending_block_requests.clear();
self.requested_blocks_count = 0;
}

// Handle `inv` message.
//
// NOTE: `inv` can be received unsolicited as an announcement of a new block,
// or in reply to `getblocks`.
pub(crate) fn on_inv(&mut self, inventories: Vec<Inventory>, from: PeerId) -> SyncAction {
// TODO: only handle the data from self.peer_id?
if from != self.peer_id {
tracing::debug!(?from, current_sync_peer = ?self.peer_id, "Recv unexpected {} inventories", inventories.len());
return SyncAction::None;
}

if inventories
.iter()
.filter(|inv| matches!(inv, Inventory::Block(_)))
.count()
> MAX_GET_BLOCKS_RESPONSE as usize
{
tracing::warn!(
?from,
"Received inv with more than {MAX_GET_BLOCKS_RESPONSE} block entries"
);
self.download_state = DownloadState::Disconnecting;
return SyncAction::Disconnect(self.peer_id, Error::InvHasTooManyBlockItems);
}

let mut block_data_request = Vec::new();
let mut block_inventories = 0;

for inv in inventories {
match inv {
Inventory::Block(block_hash) => {
block_inventories += 1;

if block_inventories > MAX_GET_BLOCKS_RESPONSE {
tracing::warn!(
?from,
"Received inv with more than {MAX_GET_BLOCKS_RESPONSE} block entries"
);
self.download_state = DownloadState::Disconnecting;
return SyncAction::Disconnect(self.peer_id, Error::TooManyBlockEntries);
}

if !self.client.block_exists(block_hash)
&& self.download_manager.is_unknown_block(block_hash)
{
Expand All @@ -172,16 +194,37 @@ where
return SyncAction::None;
}

self.truncate_and_prepare_block_data_request(block_data_request)
}

fn truncate_and_prepare_block_data_request(
&mut self,
mut block_data_request: Vec<Inventory>,
) -> SyncAction {
let max_request_size = self.max_block_data_request_size();

if block_data_request.len() > max_request_size {
self.pending_block_requests = block_data_request.split_off(max_request_size);
}

self.requested_blocks_count = block_data_request.len();

tracing::debug!(
from = ?self.peer_id,
requested_blocks_count = self.download_manager.requested_blocks.len(),
"📦 Downloading {} blocks", block_data_request.len(),
pending_block_data_request = self.pending_block_requests.len(),
"📦 Downloading {} blocks",
self.requested_blocks_count,
);

SyncAction::Request(SyncRequest::Data(block_data_request, self.peer_id))
}

pub(crate) fn on_block(&mut self, block: BitcoinBlock, from: PeerId) -> SyncAction {
if from != self.peer_id {
tracing::debug!(?from, current_sync_peer = ?self.peer_id, "Recv unexpected block #{}", block.block_hash());
return SyncAction::None;
}

let last_get_blocks_target = match &self.download_state {
DownloadState::DownloadingNew(range) => range.end - 1,
state => {
Expand Down Expand Up @@ -222,6 +265,8 @@ where
self.download_manager
.add_block(block_number, block_hash, block);

self.requested_blocks_count -= 1;

match block_number.cmp(&last_get_blocks_target) {
CmpOrdering::Less => {
// The last `getblocks` request is not yet finished, waiting for more blocks to come.
Expand All @@ -235,7 +280,7 @@ where
"Received block #{block_number},{block_hash} higher than the target block"
);
self.download_state = DownloadState::Completed;
SyncAction::None
SyncAction::SwitchToIdle
} else {
self.download_state = DownloadState::Disconnecting;
SyncAction::Disconnect(
Expand Down Expand Up @@ -368,6 +413,25 @@ where
self.download_manager.queued_blocks.block_hash(height)
})
}

fn max_block_data_request_size(&self) -> usize {
let best_known = self
.client
.best_number()
.max(self.download_manager.best_queued_number);

match best_known {
0..=99_999 => 500,
100_000..=199_999 => 256,
200_000..=299_999 => 128,
300_000..=399_999 => 64,
400_000..=499_999 => 32,
500_000..=599_999 => 16,
600_000..=699_999 => 8,
700_000..=799_999 => 4,
_ => 2,
}
}
}

#[cfg(test)]
Expand Down
7 changes: 7 additions & 0 deletions crates/subcoin-network/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ pub(crate) fn next_checkpoint(block_number: u32) -> Option<IndexedBlock> {
}
}

pub(crate) fn last_checkpoint_height() -> u32 {
CHECKPOINTS
.last()
.expect("Checkpoints are not empty; qed")
.number
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 2 additions & 2 deletions crates/subcoin-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ pub enum Error {
ProtocolVersionTooLow,
#[error("Header contains invalid proof-of-block")]
BadProofOfWork(BlockHash),
#[error("Too many block entries in inv message")]
TooManyBlockEntries,
#[error("Too many Inventory::Block items in inv message")]
InvHasTooManyBlockItems,
#[error("Too many entries (> 2000) in headers message")]
TooManyHeaders,
#[error("Entries in headers message are not in ascending order")]
Expand Down
Loading

0 comments on commit 2710198

Please sign in to comment.