diff --git a/Cargo.lock b/Cargo.lock index 77d1ddd..b68e91a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6018,6 +6018,7 @@ dependencies = [ "tonic-build", "tracing", "utoipa", + "uuid", ] [[package]] diff --git a/app/app.rs b/app/app.rs index ac1a9c3..837190d 100644 --- a/app/app.rs +++ b/app/app.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::{StreamExt, TryFutureExt}; use parking_lot::RwLock; @@ -206,6 +206,7 @@ impl App { format!("{}", config.mainchain_grpc_url), ) .unwrap() + .timeout(Duration::from_secs(10)) .concurrency_limit(256) .connect_lazy(); let (cusf_mainchain, cusf_mainchain_wallet) = if runtime diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 2212e3a..a459528 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -51,6 +51,7 @@ tokio-util = { workspace = true, features = ["rt"] } tonic = { workspace = true } tracing = { workspace = true } utoipa = { workspace = true, features = ["macros", "non_strict_integers"] } +uuid = { workspace = true } [lints] workspace = true diff --git a/lib/net/peer.rs b/lib/net/peer.rs index 08ba6af..4f41dff 100644 --- a/lib/net/peer.rs +++ b/lib/net/peer.rs @@ -139,7 +139,7 @@ impl std::fmt::Display for PeerStateId { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, strum::Display)] pub enum Response { Block { header: Header, @@ -157,7 +157,9 @@ pub enum Response { TransactionRejected(Txid), } -#[derive(BorshSerialize, Clone, Debug, Deserialize, Serialize)] +#[derive( + BorshSerialize, Clone, Debug, Deserialize, Serialize, strum::Display, +)] pub enum Request { Heartbeat(PeerState), GetBlock { diff --git a/lib/node/mainchain_task.rs b/lib/node/mainchain_task.rs index 7c56f4e..144388d 100644 --- a/lib/node/mainchain_task.rs +++ b/lib/node/mainchain_task.rs @@ -20,6 +20,7 @@ use tokio::{ spawn, task::{self, JoinHandle}, }; +use tracing::instrument; use crate::{ archive::{self, Archive}, @@ -166,6 +167,7 @@ where /// up to and including the specified block. /// Mainchain headers for the specified block and all ancestors MUST exist /// in the archive. + #[instrument(skip_all, fields(main_hash), err)] async fn request_bmm_commitments( env: &sneed::Env, archive: &Archive, @@ -196,10 +198,10 @@ where .collect()? }; missing_commitments.reverse(); - tracing::debug!(%main_hash, "requesting ancestor bmm commitments"); + tracing::debug!("requesting ancestor BMM commitments"); for missing_commitment in missing_commitments { - tracing::trace!(%main_hash, - "requesting ancestor bmm commitment: {missing_commitment}" + tracing::trace!( + "requesting ancestor BMM commitment: {missing_commitment}" ); let commitment = match mainchain .get_bmm_hstar_commitments(missing_commitment) @@ -208,8 +210,8 @@ where Ok(commitment) => commitment, Err(block_not_found) => return Ok(Err(block_not_found)), }; - tracing::trace!(%main_hash, - "storing ancestor bmm commitment: {missing_commitment}" + tracing::trace!( + "storing ancestor BMM commitment: {missing_commitment}" ); { let mut rwtxn = env.write_txn().map_err(EnvError::from)?; @@ -220,8 +222,8 @@ where )?; rwtxn.commit().map_err(RwTxnError::from)?; } - tracing::trace!(%main_hash, - "stored ancestor bmm commitment: {missing_commitment}" + tracing::trace!( + "stored ancestor BMM commitment: {missing_commitment}" ); } Ok(Ok(())) diff --git a/lib/node/net_task.rs b/lib/node/net_task.rs index c1c717e..c1f7661 100644 --- a/lib/node/net_task.rs +++ b/lib/node/net_task.rs @@ -20,6 +20,7 @@ use thiserror::Error; use tokio::task::JoinHandle; use tokio_stream::StreamNotifyClose; use tokio_util::task::LocalPoolHandle; +use tracing::Instrument; use super::mainchain_task::{self, MainchainTaskHandle}; use crate::{ @@ -70,7 +71,7 @@ pub enum Error { State(#[from] state::Error), } -async fn connect_tip_( +async fn connect_tip( rwtxn: &mut RwTxn<'_>, archive: &Archive, cusf_mainchain: &mut mainchain::ValidatorClient, @@ -346,7 +347,7 @@ where assert_eq!(tip, common_ancestor); // Apply blocks until new tip is reached for (header, body) in blocks_to_apply.into_iter().rev() { - let () = connect_tip_( + let () = connect_tip( &mut rwtxn, archive, cusf_mainchain, @@ -384,6 +385,245 @@ struct NetTaskContext { type NewTipReadyMessage = (Tip, Option, Option>); +/// State maintained by the mailbox loop +struct MailboxState { + /// Attempt to switch to a descendant tip once a body has been + /// stored, if all other ancestor bodies are available. + /// Each descendant tip maps to the peers that sent that tip. + descendant_tips: HashMap>>, + + /// Map associating mainchain task requests with the peer(s) that + /// caused the request, and the request peer state ID + mainchain_task_request_sources: + HashMap>, + + /// PhantomData to use the MainchainTransport type parameter + _phantom: std::marker::PhantomData, +} + +impl MailboxState +where + MainchainTransport: proto::Transport, +{ + fn new() -> Self { + Self { + descendant_tips: HashMap::new(), + mainchain_task_request_sources: HashMap::new(), + _phantom: std::marker::PhantomData, + } + } + + async fn handle_mailbox_item( + &mut self, + item: MailboxItem, + ctxt: &mut NetTaskContext, + forward_mainchain_task_request_tx: &UnboundedSender<( + mainchain_task::Request, + SocketAddr, + PeerStateId, + )>, + new_tip_ready_tx: &UnboundedSender, + ) -> Result<(), Error> { + match item { + MailboxItem::AcceptConnection(res) => match res { + // We received a connection new incoming network connection, but no peer + // was added + Ok(None) => {} + Ok(Some(addr)) => { + tracing::trace!(%addr, "accepted new incoming connection"); + } + Err(err) => { + tracing::error!(%err, "failed to accept connection"); + } + }, + MailboxItem::ForwardMainchainTaskRequest( + request, + peer, + peer_state_id, + ) => { + self.mainchain_task_request_sources + .entry(request) + .or_default() + .insert((peer, peer_state_id)); + let () = ctxt + .mainchain_task + .request(request) + .map_err(|_| Error::SendMainchainTaskRequest)?; + } + MailboxItem::MainchainTaskResponse(response) => { + let request = (&response).into(); + match response { + mainchain_task::Response::AncestorHeaders( + _block_hash, + res, + ) => { + let Some(sources) = self + .mainchain_task_request_sources + .remove(&request) + else { + return Ok(()); + }; + let res = res.map_err(Arc::new); + for (addr, peer_state_id) in sources { + let message = match res { + Ok(()) => PeerConnectionMessage::MainchainAncestors(peer_state_id), + Err(ref err) => PeerConnectionMessage::MainchainAncestorsError( + anyhow::Error::from(err.clone()) + ) + }; + let () = ctxt + .net + .push_internal_message(message, addr)?; + } + } + mainchain_task::Response::VerifyBmm(_block_hash, res) => { + let Some(sources) = self + .mainchain_task_request_sources + .remove(&request) + else { + return Ok(()); + }; + let res = res.map_err(Arc::new); + for (addr, peer_state_id) in sources { + let message = match res { + Ok(bmm_verification_res) => { + PeerConnectionMessage::BmmVerification { + res: bmm_verification_res, + peer_state_id, + } + } + Err(ref err) => { + PeerConnectionMessage::BmmVerificationError( + anyhow::Error::from(err.clone()), + ) + } + }; + let () = ctxt + .net + .push_internal_message(message, addr)?; + } + } + } + } + MailboxItem::NewTipReady(new_tip, _addr, resp_tx) => { + let reorg_applied = reorg_to_tip( + &ctxt.env, + &ctxt.archive, + &mut ctxt.cusf_mainchain, + &ctxt.mempool, + &ctxt.state, + new_tip, + ) + .await?; + if let Some(resp_tx) = resp_tx { + let () = resp_tx + .send(reorg_applied) + .map_err(|_| Error::SendReorgResultOneshot)?; + } + } + MailboxItem::PeerInfo(None) => return Err(Error::PeerInfoRxClosed), + MailboxItem::PeerInfo(Some((addr, None))) => { + // peer connection is closed, remove it + tracing::warn!(%addr, "Connection to peer closed"); + let () = ctxt.net.remove_active_peer(addr); + } + MailboxItem::PeerInfo(Some((addr, Some(peer_info)))) => { + tracing::trace!(%addr, ?peer_info, "mailbox item: received PeerInfo"); + match peer_info { + PeerConnectionInfo::Error(err) => { + let err = anyhow::anyhow!(err); + tracing::error!(%addr, err = format!("{err:#}"), "Peer connection error"); + let () = ctxt.net.remove_active_peer(addr); + } + PeerConnectionInfo::NeedBmmVerification { + main_hash, + peer_state_id, + } => { + let request = + mainchain_task::Request::VerifyBmm(main_hash); + let () = forward_mainchain_task_request_tx + .unbounded_send((request, addr, peer_state_id)) + .map_err(|_| Error::ForwardMainchainTaskRequest)?; + } + PeerConnectionInfo::NeedMainchainAncestors { + main_hash, + peer_state_id, + } => { + let request = + mainchain_task::Request::AncestorHeaders(main_hash); + let () = forward_mainchain_task_request_tx + .unbounded_send((request, addr, peer_state_id)) + .map_err(|_| Error::ForwardMainchainTaskRequest)?; + } + PeerConnectionInfo::NewTipReady(new_tip) => { + tracing::debug!( + ?new_tip, + %addr, + "mailbox item: received NewTipReady from peer, sending on channel" + ); + new_tip_ready_tx + .unbounded_send((new_tip, Some(addr), None)) + .map_err(Error::SendNewTipReady)?; + } + PeerConnectionInfo::NewTransaction(mut new_tx) => { + let mut rwtxn = + ctxt.env.write_txn().map_err(EnvError::from)?; + + let () = ctxt.state.regenerate_proof( + &rwtxn, + &mut new_tx.transaction, + )?; + ctxt.mempool.put(&mut rwtxn, &new_tx)?; + rwtxn.commit().map_err(RwTxnError::from)?; + // broadcast + let () = ctxt + .net + .push_tx(HashSet::from_iter([addr]), new_tx); + } + PeerConnectionInfo::Response(boxed) => { + let (resp, req) = *boxed; + tracing::trace!( + resp = format!("{resp:#?}"), + req = format!("{req:#?}"), + "mail box: received PeerConnectionInfo::Response" + ); + let () = + NetTask::::handle_response( + ctxt, + &mut self.descendant_tips, + new_tip_ready_tx, + addr, + resp, + req, + ) + .await?; + } + } + } + } + Ok(()) + } +} + +#[derive(Debug, strum::Display)] +enum MailboxItem { + AcceptConnection(Result, Error>), + // Forward a mainchain task request, along with the peer that + // caused the request, and the peer state ID of the request + ForwardMainchainTaskRequest( + mainchain_task::Request, + SocketAddr, + PeerStateId, + ), + MainchainTaskResponse(mainchain_task::Response), + // Apply new tip from peer or self. + // An optional oneshot sender can be used receive the result of + // attempting to reorg to the new tip, on the corresponding oneshot + // receiver. + NewTipReady(Tip, Option, Option>), + PeerInfo(Option<(SocketAddr, Option)>), +} + struct NetTask { ctxt: NetTaskContext, /// Receive a request to forward to the mainchain task, with the address of @@ -429,7 +669,11 @@ where resp: PeerResponse, req: PeerRequest, ) -> Result<(), Error> { - tracing::debug!(?req, ?resp, "starting response handler"); + tracing::trace!( + req_kind = req.to_string(), + resp_kind = resp.to_string(), + "starting response handler" + ); match (req, resp) { ( req @ PeerRequest::GetBlock { @@ -693,40 +937,25 @@ where async fn run(mut self) -> Result<(), Error> { tracing::debug!("starting net task"); - #[derive(Debug)] - enum MailboxItem { - AcceptConnection(Result, Error>), - // Forward a mainchain task request, along with the peer that - // caused the request, and the peer state ID of the request - ForwardMainchainTaskRequest( - mainchain_task::Request, - SocketAddr, - PeerStateId, - ), - MainchainTaskResponse(mainchain_task::Response), - // Apply new tip from peer or self. - // An optional oneshot sender can be used receive the result of - // attempting to reorg to the new tip, on the corresponding oneshot - // receiver. - NewTipReady(Tip, Option, Option>), - PeerInfo(Option<(SocketAddr, Option)>), - } - let accept_connections = stream::try_unfold((), |()| { + let accept_connections = stream::try_unfold((), { let env = self.ctxt.env.clone(); let net = self.ctxt.net.clone(); - let fut = async move { - let maybe_socket_addr = net.accept_incoming(env).await?; + move |()| { + let env = env.clone(); + let net = net.clone(); + let fut = async move { + let maybe_socket_addr = net.accept_incoming(env).await?; - // / Return: - // - The value to yield (maybe_socket_addr) - // - The state for the next iteration (()) - // Wrapped in Result and Option - Result::, ())>, Error>::Ok(Some(( - maybe_socket_addr, - (), - ))) - }; - Box::pin(fut) + // / Return: + // - The value to yield (maybe_socket_addr) + // - The state for the next iteration (()) + // Wrapped in Result and Option + Result::, ())>, Error>::Ok(Some( + (maybe_socket_addr, ()), + )) + }; + Box::pin(fut) + } }) .map(MailboxItem::AcceptConnection); let forward_request_stream = self @@ -754,211 +983,28 @@ where new_tip_ready_stream.boxed(), peer_info_stream.boxed(), ]); - // Attempt to switch to a descendant tip once a body has been - // stored, if all other ancestor bodies are available. - // Each descendant tip maps to the peers that sent that tip. - let mut descendant_tips = - HashMap::>>::new(); - // Map associating mainchain task requests with the peer(s) that - // caused the request, and the request peer state ID - let mut mainchain_task_request_sources = HashMap::< - mainchain_task::Request, - HashSet<(SocketAddr, PeerStateId)>, - >::new(); + + let mut mailbox_state = MailboxState::new(); + while let Some(mailbox_item) = mailbox_stream.next().await { - tracing::trace!(?mailbox_item, "received new mailbox item"); - match mailbox_item { - MailboxItem::AcceptConnection(res) => match res { - // We received a connection new incoming network connection, but no peer - // was added - Ok(None) => { - continue; - } - Ok(Some(addr)) => { - tracing::trace!(%addr, "accepted new incoming connection"); - } - Err(err) => { - tracing::error!(%err, "failed to accept connection"); - } - }, - MailboxItem::ForwardMainchainTaskRequest( - request, - peer, - peer_state_id, - ) => { - mainchain_task_request_sources - .entry(request) - .or_default() - .insert((peer, peer_state_id)); - let () = self - .ctxt - .mainchain_task - .request(request) - .map_err(|_| Error::SendMainchainTaskRequest)?; - } - MailboxItem::MainchainTaskResponse(response) => { - let request = (&response).into(); - match response { - mainchain_task::Response::AncestorHeaders( - _block_hash, - res, - ) => { - let Some(sources) = - mainchain_task_request_sources.remove(&request) - else { - continue; - }; - let res = res.map_err(Arc::new); - for (addr, peer_state_id) in sources { - let message = match res { - Ok(()) => PeerConnectionMessage::MainchainAncestors( - peer_state_id, - ), - Err(ref err) => PeerConnectionMessage::MainchainAncestorsError( - anyhow::Error::from(err.clone()) - ) - }; - let () = self - .ctxt - .net - .push_internal_message(message, addr)?; - } - } - mainchain_task::Response::VerifyBmm( - _block_hash, - res, - ) => { - let Some(sources) = - mainchain_task_request_sources.remove(&request) - else { - continue; - }; - let res = res.map_err(Arc::new); - for (addr, peer_state_id) in sources { - let message = match res { - Ok(bmm_verification_res) => PeerConnectionMessage::BmmVerification { - res: bmm_verification_res, - peer_state_id, - }, - Err(ref err) => PeerConnectionMessage::BmmVerificationError(anyhow::Error::from(err.clone())) - }; - let () = self - .ctxt - .net - .push_internal_message(message, addr)?; - } - } - } - } - MailboxItem::NewTipReady(new_tip, _addr, resp_tx) => { - let reorg_applied = reorg_to_tip( - &self.ctxt.env, - &self.ctxt.archive, - &mut self.ctxt.cusf_mainchain, - &self.ctxt.mempool, - &self.ctxt.state, - new_tip, - ) - .await?; - if let Some(resp_tx) = resp_tx { - let () = resp_tx - .send(reorg_applied) - .map_err(|_| Error::SendReorgResultOneshot)?; - } - } - MailboxItem::PeerInfo(None) => { - return Err(Error::PeerInfoRxClosed) - } - MailboxItem::PeerInfo(Some((addr, None))) => { - // peer connection is closed, remove it - tracing::warn!(%addr, "Connection to peer closed"); - let () = self.ctxt.net.remove_active_peer(addr); - continue; - } - MailboxItem::PeerInfo(Some((addr, Some(peer_info)))) => { - tracing::trace!(%addr, ?peer_info, "mailbox item: received PeerInfo"); - match peer_info { - PeerConnectionInfo::Error(err) => { - let err = anyhow::anyhow!(err); - tracing::error!(%addr, err = format!("{err:#}"), "Peer connection error"); - let () = self.ctxt.net.remove_active_peer(addr); - } - PeerConnectionInfo::NeedBmmVerification { - main_hash, - peer_state_id, - } => { - let request = - mainchain_task::Request::VerifyBmm(main_hash); - let () = self - .forward_mainchain_task_request_tx - .unbounded_send((request, addr, peer_state_id)) - .map_err(|_| { - Error::ForwardMainchainTaskRequest - })?; - } - PeerConnectionInfo::NeedMainchainAncestors { - main_hash, - peer_state_id, - } => { - let request = - mainchain_task::Request::AncestorHeaders( - main_hash, - ); - let () = self - .forward_mainchain_task_request_tx - .unbounded_send((request, addr, peer_state_id)) - .map_err(|_| { - Error::ForwardMainchainTaskRequest - })?; - } - PeerConnectionInfo::NewTipReady(new_tip) => { - tracing::debug!( - ?new_tip, - %addr, - "mailbox item: received NewTipReady from peer, sending on channel" - ); - self.new_tip_ready_tx - .unbounded_send((new_tip, Some(addr), None)) - .map_err(Error::SendNewTipReady)?; - } - PeerConnectionInfo::NewTransaction(mut new_tx) => { - let mut rwtxn = self - .ctxt - .env - .write_txn() - .map_err(EnvError::from)?; - let () = self.ctxt.state.regenerate_proof( - &rwtxn, - &mut new_tx.transaction, - )?; - self.ctxt.mempool.put(&mut rwtxn, &new_tx)?; - rwtxn.commit().map_err(RwTxnError::from)?; - // broadcast - let () = self - .ctxt - .net - .push_tx(HashSet::from_iter([addr]), new_tx); - } - PeerConnectionInfo::Response(boxed) => { - let (resp, req) = *boxed; - tracing::trace!( - resp = format!("{resp:#?}"), - req = format!("{req:#?}"), - "mail box: received PeerConnectionInfo::Response" - ); - let () = Self::handle_response( - &self.ctxt, - &mut descendant_tips, - &self.new_tip_ready_tx, - addr, - resp, - req, - ) - .await?; - } - } - } - } + let item_id = uuid::Uuid::new_v4().simple(); + let item_kind = mailbox_item.to_string(); + + tracing::trace!(?item_id, "received new mailbox item"); + mailbox_state + .handle_mailbox_item( + mailbox_item, + &mut self.ctxt, + &self.forward_mainchain_task_request_tx, + &self.new_tip_ready_tx, + ) + .instrument(tracing::span!( + tracing::Level::DEBUG, + "handle_mailbox_item", + item_kind, + item_id = item_id.to_string() + )) + .await?; } Ok(()) } diff --git a/lib/types/mod.rs b/lib/types/mod.rs index c10f022..72f6d59 100644 --- a/lib/types/mod.rs +++ b/lib/types/mod.rs @@ -378,7 +378,7 @@ impl Accumulator { .0 .modify(&insertions, &deletions) .map_err(UtreexoError)?; - tracing::debug!(accumulator = %self.0, "Applied diff"); + tracing::trace!(accumulator = %self.0, "Applied diff"); Ok(()) }