Skip to content

net: be more aggresive wrt removing from known peers DB #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions app/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub enum Error {
"Unable to verify existence of CUSF mainchain service(s) at {0}: {1}"
)]
VerifyMainchainServices(url::Url, tonic::Status),
#[error("failed to connect to CUSF mainchain enforcer at {0}")]
ConnectMainchain(url::Url, #[source] tonic::Status),
#[error("io error")]
Io(#[from] std::io::Error),
#[error("miner error")]
Expand Down Expand Up @@ -210,11 +212,21 @@ impl App {
.connect_lazy();
let (cusf_mainchain, cusf_mainchain_wallet) = if runtime
.block_on(Self::check_proto_support(transport.clone()))
.map_err(|err| {
Error::VerifyMainchainServices(
.map_err(|err| match err {
status
// Kind of crude, but I'm unable to match this on a std::io::Error...
if status.code() == tonic::Code::Unavailable
&& status.message().contains("tcp connect error") =>
{
Error::ConnectMainchain(
config.mainchain_grpc_address.clone(),
status,
)
}
_ => Error::VerifyMainchainServices(
config.mainchain_grpc_address.clone(),
err,
)
),
})? {
(
mainchain::ValidatorClient::new(transport.clone()),
Expand Down
55 changes: 37 additions & 18 deletions lib/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,25 @@ impl Net {
}
}

pub fn remove_active_peer(&self, addr: SocketAddr) {
tracing::trace!(%addr, "remove active peer: starting");
/// Removes a peer from the active peers list.
#[instrument(skip_all, fields(addr))]
pub fn remove_active_peer(
&self,
env: &heed::Env,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a RwTxn

addr: SocketAddr,
) -> Result<(), Error> {
let mut active_peers_write = self.active_peers.write();
if let Some(peer_connection) = active_peers_write.remove(&addr) {
drop(peer_connection);
tracing::info!(%addr, "remove active peer: disconnected");
}

let mut rwtxn = env.write_txn()?;
self.known_peers.delete(&mut rwtxn, &addr)?;
rwtxn.commit()?;
tracing::info!(%addr, "remove active peer: removed from known peers");

Ok(())
}

// TODO: This should have more context.
Expand Down Expand Up @@ -317,16 +329,8 @@ impl Net {
match env.open_database(&rwtxn, Some("known_peers"))? {
Some(known_peers) => known_peers,
None => {
let known_peers =
env.create_database(&mut rwtxn, Some("known_peers"))?;
const SEED_NODE_ADDR: SocketAddr = SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::new(
172, 105, 148, 135,
)),
4000 + THIS_SIDECHAIN as u16,
);
known_peers.put(&mut rwtxn, &SEED_NODE_ADDR, &())?;
known_peers
tracing::info!("creating known peers database");
env.create_database(&mut rwtxn, Some("known_peers"))?
}
};
rwtxn.commit()?;
Expand All @@ -339,16 +343,33 @@ impl Net {
peer_info_tx,
known_peers,
};
#[allow(clippy::let_and_return)]
let known_peers: Vec<_> = {
let rotxn = env.read_txn()?;
let mut txn = env.write_txn()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Prefer rotxn/rwtxn to clearly indicate DB locks, and to disambiguate w.r.t. network/block txs


// important: we need to always have at least one peer. we
// might end up with this peer misbehaving, causing us to
// disconnect and delete it from the database. always try
// and stick it back in, if it's missing!
if net.known_peers.is_empty(&txn)? {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why start a new db txn? This could happen in the same db txn that creates/opens the DB

const SEED_NODE_ADDR: SocketAddr = SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::new(
172, 105, 148, 135,
)),
4000 + THIS_SIDECHAIN as u16,
);

tracing::info!("empty list of known peers, adding seed node {SEED_NODE_ADDR}");
net.known_peers.put(&mut txn, &SEED_NODE_ADDR, &())?;
}

let known_peers = net
.known_peers
.iter(&rotxn)?
.iter(&txn)?
.transpose_into_fallible()
.collect()?;
known_peers
};

let () = known_peers.into_iter().try_for_each(|(peer_addr, _)| {
tracing::trace!(
"new net: connecting to already known peer at {peer_addr}"
Expand All @@ -360,9 +381,7 @@ impl Net {
tracing::warn!(
%addr, "new net: known peer with invalid remote address, removing"
);
let mut tx = env.write_txn()?;
net.known_peers.delete(&mut tx, &peer_addr)?;
tx.commit()?;
net.remove_active_peer(env, peer_addr)?;

tracing::info!(
%addr,
Expand Down
26 changes: 17 additions & 9 deletions lib/node/net_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ where
if header.hash() != block_hash {
// Invalid response
tracing::warn!(%addr, ?req, ?resp,"Invalid response from peer; unexpected block hash");
let () = ctxt.net.remove_active_peer(addr);
let () = ctxt.net.remove_active_peer(&ctxt.env, addr)?;
return Ok(());
}
{
Expand Down Expand Up @@ -567,13 +567,13 @@ where
// check that the end header is as requested
let Some(end_header) = headers.last() else {
tracing::warn!(%addr, ?req, "Invalid response from peer; missing end header");
let () = ctxt.net.remove_active_peer(addr);
let () = ctxt.net.remove_active_peer(&ctxt.env, addr)?;
return Ok(());
};
let end_header_hash = end_header.hash();
if end_header_hash != end {
tracing::warn!(%addr, ?req, ?end_header,"Invalid response from peer; unexpected end header");
let () = ctxt.net.remove_active_peer(addr);
let () = ctxt.net.remove_active_peer(&ctxt.env, addr)?;
return Ok(());
}
// Must be at least one header due to previous check
Expand All @@ -583,7 +583,7 @@ where
&& !start.contains(&start_hash)
{
tracing::warn!(%addr, ?req, %start_hash, "Invalid response from peer; invalid start hash");
let () = ctxt.net.remove_active_peer(addr);
let () = ctxt.net.remove_active_peer(&ctxt.env, addr)?;
return Ok(());
}
// check that the end header height is as expected
Expand All @@ -602,7 +602,8 @@ where
};
if end_height != height {
tracing::warn!(%addr, ?req, ?start_hash, "Invalid response from peer; invalid end height");
let () = ctxt.net.remove_active_peer(addr);
let () =
ctxt.net.remove_active_peer(&ctxt.env, addr)?;
return Ok(());
}
}
Expand All @@ -611,7 +612,8 @@ where
for header in &headers {
if header.prev_side_hash != prev_side_hash {
tracing::warn!(%addr, ?req, ?headers,"Invalid response from peer; non-sequential headers");
let () = ctxt.net.remove_active_peer(addr);
let () =
ctxt.net.remove_active_peer(&ctxt.env, addr)?;
return Ok(());
}
prev_side_hash = Some(header.hash());
Expand Down Expand Up @@ -669,7 +671,7 @@ where
) => {
// Invalid response
tracing::warn!(%addr, ?req, ?resp,"Invalid response from peer");
let () = ctxt.net.remove_active_peer(addr);
let () = ctxt.net.remove_active_peer(&ctxt.env, addr)?;
Ok(())
}
}
Expand Down Expand Up @@ -856,7 +858,10 @@ where
MailboxItem::PeerInfo(Some((addr, None))) => {
// peer connection is closed, remove it
tracing::warn!(%addr, "Connection to peer closed");
let () = self.ctxt.net.remove_active_peer(addr);
let () = self
.ctxt
.net
.remove_active_peer(&self.ctxt.env, addr)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If remove_active_peer also removes from known peers, then this seems wrong. A disconnected peer should still be in known peers

continue;
}
MailboxItem::PeerInfo(Some((addr, Some(peer_info)))) => {
Expand All @@ -865,7 +870,10 @@ where
PeerConnectionInfo::Error(err) => {
let err = anyhow::anyhow!(err);
tracing::error!(%addr, err = format!("{err:#}"), "Peer connection error");
let () = self.ctxt.net.remove_active_peer(addr);
let () = self
.ctxt
.net
.remove_active_peer(&self.ctxt.env, addr)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A peer connection error could have many causes, not sure if it is correct to remove from known peers here

}
PeerConnectionInfo::NeedBmmVerification {
main_hash,
Expand Down
Loading