diff --git a/src/config.rs b/src/config.rs index 1834b462c..0d3fafa3b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,9 +1,13 @@ //! A set of configuration parameters to tune the discovery protocol. use crate::{ - handler::MIN_SESSIONS_UNREACHABLE_ENR, kbucket::MAX_NODES_PER_BUCKET, Enr, Executor, - ListenConfig, PermitBanList, RateLimiter, RateLimiterBuilder, + kbucket::MAX_NODES_PER_BUCKET, Enr, Executor, ListenConfig, PermitBanList, RateLimiter, + RateLimiterBuilder, }; +/// The minimum number of unreachable Sessions a node must allow. This enables the network to +/// boostrap. +const MIN_SESSIONS_UNREACHABLE_ENR: usize = 10; + use std::{ops::RangeInclusive, time::Duration}; /// Configuration parameters that define the performance of the discovery network. @@ -100,7 +104,7 @@ pub struct Discv5Config { /// The max limit for peers with unreachable ENRs. Benevolent examples of such peers are peers /// that are discovering their externally reachable socket, nodes must assist at least one /// such peer in discovering their reachable socket via ip voting, and peers behind symmetric - /// NAT. Default is no limit. Minimum is 1. + /// NAT. Default is no limit. Minimum is 10. pub unreachable_enr_limit: Option, /// The unused port range to try and bind to when testing if this node is behind NAT based on diff --git a/src/handler/sessions/crypto/ecdh.rs b/src/handler/crypto/ecdh.rs similarity index 100% rename from src/handler/sessions/crypto/ecdh.rs rename to src/handler/crypto/ecdh.rs diff --git a/src/handler/sessions/crypto/mod.rs b/src/handler/crypto/mod.rs similarity index 100% rename from src/handler/sessions/crypto/mod.rs rename to src/handler/crypto/mod.rs diff --git a/src/handler/mod.rs b/src/handler/mod.rs index bcdf97042..0d108ea25 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -58,20 +58,20 @@ use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, trace, warn}; mod active_requests; +mod crypto; mod nat_hole_punch; mod request_call; -mod sessions; +mod session; mod tests; use crate::metrics::METRICS; pub use crate::node_info::{NodeAddress, NodeContact}; -pub use sessions::MIN_SESSIONS_UNREACHABLE_ENR; use crate::{lru_time_cache::LruTimeCache, socket::ListenConfig}; use active_requests::ActiveRequests; -use nat_hole_punch::{Error as NatHolePunchError, HolePunchNat, NatHolePunchUtils}; +use nat_hole_punch::{Error as NatError, HolePunchNat, NatUtils}; use request_call::RequestCall; -use sessions::{Session, Sessions}; +use session::Session; // The time interval to check banned peer timeouts and unban peers when the timeout has elapsed (in // seconds). @@ -220,7 +220,7 @@ pub struct Handler { /// Currently in-progress outbound handshakes (WHOAREYOU packets) with peers. active_challenges: HashMapDelay, /// Established sessions with peers. - sessions: Sessions, + sessions: LruTimeCache, /// Established sessions with peers for a specific request, stored just one per node. one_time_sessions: LruTimeCache, /// The channel to receive messages from the application layer. @@ -234,7 +234,7 @@ pub struct Handler { /// Exit channel to shutdown the handler. exit: oneshot::Receiver<()>, /// Types necessary to plug in nat hole punching. - nat_hole_puncher: NatHolePunchUtils, + nat_utils: NatUtils, } type HandlerReturn = ( @@ -317,19 +317,16 @@ impl Handler { // Attempt to bind to the socket before spinning up the send/recv tasks. let socket = Socket::new::

(socket_config).await?; - let sessions = Sessions::new( - session_cache_capacity, - session_timeout, - unreachable_enr_limit, - ); + let sessions = LruTimeCache::new(session_timeout, Some(session_cache_capacity)); - let nat_hole_puncher = NatHolePunchUtils::new( - listen_sockets.iter(), + let nat_utils = NatUtils::new( + &listen_sockets, &enr.read(), ip_mode, unused_port_range, ban_duration, session_cache_capacity, + unreachable_enr_limit, ); executor @@ -353,7 +350,7 @@ impl Handler { service_send, listen_sockets, socket, - nat_hole_puncher, + nat_utils, exit, }; debug!("Handler Starting"); @@ -402,7 +399,7 @@ impl Handler { // inserted into its peers' kbuckets before the session they // already had expires. Session duration, in this impl defaults to // 24 hours. - self.sessions.cache.clear(); + self.sessions.clear(); if let Err(e) = self .service_send .send(HandlerOut::PingAllPeers) @@ -411,7 +408,7 @@ impl Handler { warn!("Failed to inform that request failed {}", e); } } - self.nat_hole_puncher.set_is_behind_nat(self.listen_sockets.iter(), Some(ip), Some(port)); + self.nat_utils.set_is_behind_nat(&self.listen_sockets, Some(ip), Some(port)); } } } @@ -426,7 +423,12 @@ impl Handler { // challenge. We process them here self.send_next_request::

(node_address).await; } - Some(peer_socket) = self.nat_hole_puncher.next() => { + Some(Ok(peer_socket)) = self.nat_utils.hole_punch_tracker.next() => { + if self.nat_utils.is_behind_nat == Some(false) { + // Until ip voting is done and an observed public address is finalised, all nodes act as + // if they are behind a NAT. + return; + } if let Err(e) = self.on_hole_punch_expired(peer_socket).await { warn!("Failed to keep hole punched for peer, error: {}", e); } @@ -537,7 +539,7 @@ impl Handler { if request_call.retries() >= self.request_retries { trace!("Request timed out with {}", node_address); if let Some(relay) = self - .nat_hole_puncher + .nat_utils .new_peer_latest_relay_cache .pop(&node_address.node_id) { @@ -611,7 +613,7 @@ impl Handler { } let (packet, initiating_session) = { - if let Some(session) = self.sessions.cache.get_mut(&node_address) { + if let Some(session) = self.sessions.get_mut(&node_address) { // Encrypt the message and send let request = match &request_id { HandlerReqId::Internal(id) | HandlerReqId::External(id) => Request { @@ -658,7 +660,7 @@ impl Handler { response: Response, ) { // Check for an established session - let packet = if let Some(session) = self.sessions.cache.get_mut(&node_address) { + let packet = if let Some(session) = self.sessions.get_mut(&node_address) { session.encrypt_session_message::

(self.node_id, &response.encode()) } else if let Some(mut session) = self.remove_one_time_session(&node_address, &response.id) { @@ -818,6 +820,10 @@ impl Handler { // All sent requests must have an associated node_id. Therefore the following // must not panic. let node_address = request_call.contact().node_address(); + + // Keep track if the ENR is reachable. In the case we don't know the ENR, we assume its + // fine. + let mut enr_not_reachable = false; match request_call.contact().enr() { Some(enr) => { // NOTE: Here we decide if the session is outgoing or ingoing. The condition for an @@ -830,6 +836,8 @@ impl Handler { ConnectionDirection::Incoming }; + enr_not_reachable = NatUtils::is_enr_reachable(&enr); + // We already know the ENR. Send the handshake response packet trace!("Sending Authentication response to node: {}", node_address); request_call.update_packet(auth_packet.clone()); @@ -867,7 +875,7 @@ impl Handler { } } } - self.new_session(node_address, session); + self.new_session(node_address, session, enr_not_reachable); } /// Verifies a Node ENR to it's observed address. If it fails, any associated session is also @@ -907,20 +915,37 @@ impl Handler { ); if let Some(challenge) = self.active_challenges.remove(&node_address) { - let session_limiter = self.sessions.limiter.as_mut(); + // Find the most recent ENR, a known ENR or one they sent in their challenge. + let Challenge { data, remote_enr } = challenge; + let Ok(most_recent_enr) = most_recent_enr(enr_record, remote_enr) else { + warn!( + "Peer did not respond with their ENR. Session could not be established. Node: {}",node_address + ); + self.fail_session(&node_address, RequestError::InvalidRemotePacket, true) + .await; + return; + }; + + // Keep count of the unreachable Sessions we are tracking + // Peer is reachable + let enr_not_reachable = !NatUtils::is_enr_reachable(&most_recent_enr); + + // Decide whether to establish this connection based on our appetite for unreachable + if enr_not_reachable + && Some(self.sessions.tagged()) >= self.nat_utils.unreachable_enr_limit + { + debug!("Reached limit of unreachable ENR sessions. Avoiding a new connection. Limit: {}", self.sessions.tagged()); + return; + } match Session::establish_from_challenge( self.key.clone(), &self.node_id, - challenge, + &node_address.node_id, + data, id_nonce_sig, ephem_pubkey, - enr_record, - &node_address, - |node_address, enr| { - session_limiter - .map(|limiter| limiter.track_sessions_unreachable_enr(node_address, enr)) - }, + most_recent_enr, ) { Ok((mut session, enr)) => { // Receiving an AuthResponse must give us an up-to-date view of the node ENR. @@ -936,8 +961,8 @@ impl Handler { ConnectionDirection::Incoming, ) .await; - self.new_session(node_address.clone(), session); - self.nat_hole_puncher + self.new_session(node_address.clone(), session, enr_not_reachable); + self.nat_utils .new_peer_latest_relay_cache .pop(&node_address.node_id); self.handle_message::

( @@ -1002,9 +1027,6 @@ impl Handler { // insert back the challenge self.active_challenges.insert(node_address, challenge); } - Err(Discv5Error::LimitSessionsUnreachableEnr) => { - warn!("Limit reached for sessions with unreachable ENRs. Dropping session."); - } Err(e) => { warn!( "Invalid Authentication header. Dropping session. Error: {:?}", @@ -1073,7 +1095,7 @@ impl Handler { authenticated_data: &[u8], ) { // check if we have an available session - let Some(session) = self.sessions.cache.get_mut(&node_address) else { + let Some(session) = self.sessions.get_mut(&node_address) else { warn!( "Dropping message. Error: {}, {}", Discv5Error::SessionNotEstablished, @@ -1109,30 +1131,27 @@ impl Handler { match message { Message::Response(response) => self.handle_response::

(node_address, response).await, - Message::RelayInitNotification(notif) => { - let inr_node_id = notif.initiator_enr().node_id(); - if inr_node_id != node_address.node_id { - warn!("peer {node_address} tried to initiate hole punch attempt for another node {inr_node_id}, banning peer {node_address}"); + Message::RelayInitNotification(notification) => { + let initiator_node_id = notification.initiator_enr().node_id(); + if initiator_node_id != node_address.node_id { + warn!("peer {node_address} tried to initiate hole punch attempt for another node {initiator_node_id}, banning peer {node_address}"); self.fail_session(&node_address, RequestError::MaliciousRelayInit, true) .await; - let ban_timeout = self - .nat_hole_puncher - .ban_duration - .map(|v| Instant::now() + v); + let ban_timeout = self.nat_utils.ban_duration.map(|v| Instant::now() + v); PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); - } else if let Err(e) = self.on_relay_init(notif).await { + } else if let Err(e) = self.on_relay_init(notification).await { warn!("failed handling notification to relay for {node_address}, {e}"); } } - Message::RelayMsgNotification(notif) => { - match self.nat_hole_puncher.is_behind_nat { + Message::RelayMsgNotification(notification) => { + match self.nat_utils.is_behind_nat { Some(false) => { // inr may not be malicious and initiated a hole punch attempt when // a request to this node timed out for another reason debug!("peer {node_address} relayed a hole punch notification but we are not behind nat"); } _ => { - if let Err(e) = self.on_relay_msg::

(notif).await { + if let Err(e) = self.on_relay_msg::

(notification).await { warn!("failed handling notification relayed from {node_address}, {e}"); } } @@ -1157,7 +1176,7 @@ impl Handler { authenticated_data: &[u8], ) { // check if we have an available session - if let Some(session) = self.sessions.cache.get_mut(&node_address) { + if let Some(session) = self.sessions.get_mut(&node_address) { // attempt to decrypt and process the message. let message = match session.decrypt_message(message_nonce, message, authenticated_data) { @@ -1253,7 +1272,7 @@ impl Handler { // Sessions could be awaiting an ENR response. Check if this response matches // this // check if we have an available session - let Some(session) = self.sessions.cache.get_mut(&node_address) else { + let Some(session) = self.sessions.get_mut(&node_address) else { warn!( "Dropping response. Error: {}, {}", Discv5Error::SessionNotEstablished, @@ -1314,16 +1333,14 @@ impl Handler { // extra responses if let ResponseBody::Nodes { total, ref nodes } = response.body { for node in nodes { - if let Some(socket_addr) = - self.nat_hole_puncher.ip_mode.get_contactable_addr(node) - { + if let Some(socket_addr) = self.nat_utils.ip_mode.get_contactable_addr(node) { let node_id = node.node_id(); let new_peer_node_address = NodeAddress { socket_addr, node_id, }; - if self.sessions.cache.peek(&new_peer_node_address).is_none() { - self.nat_hole_puncher + if self.sessions.peek(&new_peer_node_address).is_none() { + self.nat_utils .new_peer_latest_relay_cache .put(node_id, node_address.clone()); } @@ -1395,14 +1412,21 @@ impl Handler { self.active_requests.insert(node_address, request_call); } - fn new_session(&mut self, node_address: NodeAddress, session: Session) { - if let Some(current_session) = self.sessions.cache.get_mut(&node_address) { + /// Updates the session cache for a new session. + fn new_session( + &mut self, + node_address: NodeAddress, + session: Session, + enr_not_reachable: bool, + ) { + if let Some(current_session) = self.sessions.get_mut(&node_address) { current_session.update(session); } else { - self.sessions.cache.insert(node_address, session); + self.sessions + .insert_raw(node_address, session, enr_not_reachable); METRICS .active_sessions - .store(self.sessions.cache.len(), Ordering::Relaxed); + .store(self.sessions.len(), Ordering::Relaxed); } } @@ -1448,7 +1472,7 @@ impl Handler { } } let node_address = request_call.contact().node_address(); - self.nat_hole_puncher + self.nat_utils .new_peer_latest_relay_cache .pop(&node_address.node_id); self.fail_session(&node_address, error, remove_session) @@ -1463,16 +1487,12 @@ impl Handler { remove_session: bool, ) { if remove_session { - self.sessions.cache.remove(node_address); + self.sessions.remove(node_address); METRICS .active_sessions - .store(self.sessions.cache.len(), Ordering::Relaxed); + .store(self.sessions.len(), Ordering::Relaxed); // stop keeping hole punched for peer - self.nat_hole_puncher.untrack(&node_address.socket_addr); - // update unreachable enr session limiter - if let Some(ref mut limiter) = self.sessions.limiter { - limiter.untrack_session(node_address) - } + self.nat_utils.untrack(&node_address.socket_addr); } if let Some(to_remove) = self.pending_requests.remove(node_address) { for PendingRequest { request_id, .. } in to_remove { @@ -1509,7 +1529,7 @@ impl Handler { if let Err(e) = self.socket.send.send(packet).await { warn!("Failed to send outbound packet {}", e) } - self.nat_hole_puncher.track(dst); + self.nat_utils.track(dst); } /// Check if any banned nodes have served their time and unban them. @@ -1543,6 +1563,23 @@ impl Handler { } } +/// Given two optional ENRs, find the most recent one based on the sequence number. +/// This function will error if both inputs are None. +fn most_recent_enr(first: Option, second: Option) -> Result { + match (first, second) { + (Some(first_enr), Some(second_enr)) => { + if first_enr.seq() > second_enr.seq() { + Ok(first_enr) + } else { + Ok(second_enr) + } + } + (Some(first), None) => Ok(first), + (None, Some(second)) => Ok(second), + (None, None) => Err(()), // No ENR provided + } +} + #[async_trait::async_trait] impl HolePunchNat for Handler { async fn on_request_time_out( @@ -1551,12 +1588,12 @@ impl HolePunchNat for Handler { local_enr: Enr, // initiator-enr timed_out_nonce: MessageNonce, target_node_address: NodeAddress, - ) -> Result<(), NatHolePunchError> { + ) -> Result<(), NatError> { // Another hole punch process with this target may have just completed. - if self.sessions.cache.get(&target_node_address).is_some() { + if self.sessions.get(&target_node_address).is_some() { return Ok(()); } - if let Some(session) = self.sessions.cache.get_mut(&relay) { + if let Some(session) = self.sessions.get_mut(&relay) { let relay_init_notif = RelayInitNotification::new(local_enr, target_node_address.node_id, timed_out_nonce); trace!( @@ -1570,7 +1607,7 @@ impl HolePunchNat for Handler { { Ok(packet) => packet, Err(e) => { - return Err(NatHolePunchError::Initiator(e)); + return Err(NatError::Initiator(e)); } }; self.send(relay, packet).await; @@ -1587,17 +1624,14 @@ impl HolePunchNat for Handler { Ok(()) } - async fn on_relay_init( - &mut self, - relay_init: RelayInitNotification, - ) -> Result<(), NatHolePunchError> { + async fn on_relay_init(&mut self, relay_init: RelayInitNotification) -> Result<(), NatError> { // Check for target peer in our kbuckets otherwise drop notification. if let Err(e) = self .service_send .send(HandlerOut::FindHolePunchEnr(relay_init)) .await { - return Err(NatHolePunchError::Relay(e.into())); + return Err(NatError::Relay(e.into())); } Ok(()) } @@ -1605,16 +1639,16 @@ impl HolePunchNat for Handler { async fn on_relay_msg( &mut self, relay_msg: RelayMsgNotification, - ) -> Result<(), NatHolePunchError> { + ) -> Result<(), NatError> { let (inr_enr, timed_out_msg_nonce) = relay_msg.into(); let initiator_node_address = - match NodeContact::try_from_enr(inr_enr, self.nat_hole_puncher.ip_mode) { + match NodeContact::try_from_enr(inr_enr, self.nat_utils.ip_mode) { Ok(contact) => contact.node_address(), - Err(e) => return Err(NatHolePunchError::Target(e.into())), + Err(e) => return Err(NatError::Target(e.into())), }; // A session may already have been established. - if self.sessions.cache.get(&initiator_node_address).is_some() { + if self.sessions.get(&initiator_node_address).is_some() { trace!("Session already established with initiator: {initiator_node_address}"); return Ok(()); } @@ -1640,13 +1674,12 @@ impl HolePunchNat for Handler { &mut self, tgt_enr: Enr, relay_msg_notif: RelayMsgNotification, - ) -> Result<(), NatHolePunchError> { - let tgt_node_address = - match NodeContact::try_from_enr(tgt_enr, self.nat_hole_puncher.ip_mode) { - Ok(contact) => contact.node_address(), - Err(e) => return Err(NatHolePunchError::Relay(e.into())), - }; - if let Some(session) = self.sessions.cache.get_mut(&tgt_node_address) { + ) -> Result<(), NatError> { + let tgt_node_address = match NodeContact::try_from_enr(tgt_enr, self.nat_utils.ip_mode) { + Ok(contact) => contact.node_address(), + Err(e) => return Err(NatError::Relay(e.into())), + }; + if let Some(session) = self.sessions.get_mut(&tgt_node_address) { trace!( "Sending notif to target {}. relay msg: {}", tgt_node_address.node_id, @@ -1658,7 +1691,7 @@ impl HolePunchNat for Handler { { Ok(packet) => packet, Err(e) => { - return Err(NatHolePunchError::Relay(e)); + return Err(NatError::Relay(e)); } }; self.send(tgt_node_address, packet).await; @@ -1669,12 +1702,12 @@ impl HolePunchNat for Handler { // time out of the udp entrypoint for the target peer in the initiator's NAT, set by // the original timed out FINDNODE request from the initiator, as the initiator may // also be behind a NAT. - Err(NatHolePunchError::Relay(Discv5Error::SessionNotEstablished)) + Err(NatError::Relay(Discv5Error::SessionNotEstablished)) } } #[inline] - async fn on_hole_punch_expired(&mut self, peer: SocketAddr) -> Result<(), NatHolePunchError> { + async fn on_hole_punch_expired(&mut self, peer: SocketAddr) -> Result<(), NatError> { self.send_outbound(peer.into()).await; Ok(()) } diff --git a/src/handler/nat_hole_punch/mod.rs b/src/handler/nat_hole_punch/mod.rs index 1d1d26a14..e5b03a6fb 100644 --- a/src/handler/nat_hole_punch/mod.rs +++ b/src/handler/nat_hole_punch/mod.rs @@ -11,7 +11,7 @@ mod error; mod utils; pub use error::Error; -pub use utils::NatHolePunchUtils; +pub use utils::NatUtils; #[async_trait::async_trait] pub trait HolePunchNat { diff --git a/src/handler/nat_hole_punch/utils.rs b/src/handler/nat_hole_punch/utils.rs index 165e3add8..5674fb238 100644 --- a/src/handler/nat_hole_punch/utils.rs +++ b/src/handler/nat_hole_punch/utils.rs @@ -1,18 +1,15 @@ use std::{ net::{IpAddr, SocketAddr, UdpSocket}, ops::RangeInclusive, - pin::Pin, - task::{Context, Poll}, time::Duration, }; -use derive_more::{Deref, DerefMut}; +use delay_map::HashSetDelay; use enr::NodeId; -use futures::{channel::mpsc, Stream, StreamExt}; use lru::LruCache; use rand::Rng; -use crate::{lru_time_cache::LruTimeCache, node_info::NodeAddress, Enr, IpMode}; +use crate::{node_info::NodeAddress, Enr, IpMode}; /// The expected shortest lifetime in most NAT configurations of a punched hole in seconds. pub const DEFAULT_HOLE_PUNCH_LIFETIME: u64 = 20; @@ -22,7 +19,7 @@ pub const PORT_BIND_TRIES: usize = 4; pub const USER_AND_DYNAMIC_PORTS: RangeInclusive = 1025..=u16::MAX; /// Aggregates types necessary to implement nat hole punching for [`crate::handler::Handler`]. -pub struct NatHolePunchUtils { +pub struct NatUtils { /// Ip mode as set in config. pub ip_mode: IpMode, /// This node has been observed to be behind a NAT. @@ -36,29 +33,33 @@ pub struct NatHolePunchUtils { pub new_peer_latest_relay_cache: LruCache, /// Keeps track if this node needs to send a packet to a peer in order to keep a hole punched /// for it in its NAT. - hole_punch_tracker: NatHolePunchTracker, + pub hole_punch_tracker: HashSetDelay, /// Ports to trie to bind to check if this node is behind NAT. pub unused_port_range: Option>, /// If the filter is enabled this sets the default timeout for bans enacted by the filter. pub ban_duration: Option, + /// The number of unreachable ENRs we store at most in our session cache. + pub unreachable_enr_limit: Option, } -impl NatHolePunchUtils { - pub fn new<'a>( - listen_sockets: impl Iterator, +impl NatUtils { + pub fn new( + listen_sockets: &[SocketAddr], local_enr: &Enr, ip_mode: IpMode, unused_port_range: Option>, ban_duration: Option, session_cache_capacity: usize, + unreachable_enr_limit: Option, ) -> Self { - let mut nat_hole_puncher = NatHolePunchUtils { + let mut nat_hole_puncher = NatUtils { ip_mode, is_behind_nat: None, new_peer_latest_relay_cache: LruCache::new(session_cache_capacity), - hole_punch_tracker: NatHolePunchTracker::new(session_cache_capacity), + hole_punch_tracker: HashSetDelay::new(Duration::from_secs(DEFAULT_HOLE_PUNCH_LIFETIME)), unused_port_range, ban_duration, + unreachable_enr_limit, }; // Optimistically only test one advertised socket, ipv4 has precedence. If it is // reachable, assumption is made that also the other ip version socket is reachable. @@ -86,7 +87,7 @@ impl NatHolePunchUtils { if self.is_behind_nat == Some(false) { return; } - self.hole_punch_tracker.insert(peer_socket, ()); + self.hole_punch_tracker.insert(peer_socket); } pub fn untrack(&mut self, peer_socket: &SocketAddr) { @@ -95,13 +96,16 @@ impl NatHolePunchUtils { /// Called when a new observed address is reported at start up or after a /// [`crate::Discv5Event::SocketUpdated`]. - pub fn set_is_behind_nat<'a>( + pub fn set_is_behind_nat( &mut self, - mut listen_sockets: impl Iterator, + listen_sockets: &[SocketAddr], observed_ip: Option, observed_port: Option, ) { - if !listen_sockets.any(|listen_socket| Some(listen_socket.port()) == observed_port) { + if !listen_sockets + .iter() + .any(|listen_socket| Some(listen_socket.port()) == observed_port) + { self.is_behind_nat = Some(true); return; } @@ -121,40 +125,10 @@ impl NatHolePunchUtils { } }); } -} - -impl Stream for NatHolePunchUtils { - type Item = SocketAddr; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Until ip voting is done and an observed public address is finalised, all nodes act as - // if they are behind a NAT. - if self.is_behind_nat == Some(false) || self.hole_punch_tracker.len() == 0 { - return Poll::Pending; - } - self.hole_punch_tracker.expired_entries.poll_next_unpin(cx) - } -} - -#[derive(Deref, DerefMut)] -struct NatHolePunchTracker { - #[deref] - #[deref_mut] - cache: LruTimeCache, - expired_entries: mpsc::Receiver, -} - -impl NatHolePunchTracker { - fn new(session_cache_capacity: usize) -> Self { - let (tx, rx) = futures::channel::mpsc::channel::(session_cache_capacity); - Self { - cache: LruTimeCache::new_with_expiry_feedback( - Duration::from_secs(DEFAULT_HOLE_PUNCH_LIFETIME), - Some(session_cache_capacity), - tx, - ), - expired_entries: rx, - } + /// Determines if an ENR is reachable or not based on its assigned keys. + pub fn is_enr_reachable(enr: &Enr) -> bool { + enr.udp4_socket().is_some() || enr.udp6_socket().is_some() } } diff --git a/src/handler/sessions/session.rs b/src/handler/session.rs similarity index 75% rename from src/handler/sessions/session.rs rename to src/handler/session.rs index 209d551b2..d0f53f305 100644 --- a/src/handler/sessions/session.rs +++ b/src/handler/session.rs @@ -10,17 +10,12 @@ use crate::{ Discv5Error, Enr, }; -// If the message nonce length is ever set below 4 bytes this will explode. The packet -// size constants shouldn't be modified. -const _: () = assert!(MESSAGE_NONCE_LENGTH > 4); - use enr::{CombinedKey, NodeId}; use parking_lot::RwLock; use std::sync::Arc; -use tracing::warn; use zeroize::Zeroize; -#[derive(Zeroize, PartialEq, Clone, Copy)] +#[derive(Zeroize, PartialEq)] pub(crate) struct Keys { /// The encryption key. encryption_key: [u8; 16], @@ -39,7 +34,6 @@ impl From<([u8; 16], [u8; 16])> for Keys { /// A Session containing the encryption/decryption keys. These are kept individually for a given /// node. -#[derive(Clone)] pub(crate) struct Session { /// The current keys used to encrypt/decrypt messages. keys: Keys, @@ -174,60 +168,23 @@ impl Session { pub(crate) fn establish_from_challenge( local_key: Arc>, local_id: &NodeId, - challenge: Challenge, + remote_id: &NodeId, + challenge_data: ChallengeData, id_nonce_sig: &[u8], ephem_pubkey: &[u8], - enr_record: Option, - node_address: &NodeAddress, - session_limiter: impl FnOnce(&NodeAddress, &Enr) -> Option>, + session_enr: Enr, ) -> Result<(Session, Enr), Discv5Error> { - // check and verify a potential ENR update - - let Challenge { data, remote_enr } = challenge; - - // Avoid cloning an ENR - let session_enr = match (enr_record, remote_enr) { - (Some(new_enr), Some(known_enr)) => { - if new_enr.seq() > known_enr.seq() { - MostRecentEnr::Handshake { - enr: new_enr, - challenge_enr: Some(known_enr), - } - } else { - MostRecentEnr::Challenge { enr: known_enr } - } - } - (Some(new_enr), None) => MostRecentEnr::Handshake { - enr: new_enr, - challenge_enr: None, - }, - (None, Some(known_enr)) => MostRecentEnr::Challenge { enr: known_enr }, - (None, None) => { - warn!( - "Peer did not respond with their ENR. Session could not be established. Node: {}", - node_address.node_id - ); - return Err(Discv5Error::SessionNotEstablished); - } - }; - - // Avoid unnecessary key derivation computation, first verify session candidate against - // current sessions state. - session_limiter(node_address, session_enr.enr()).transpose()?; - - let remote_public_key = session_enr.enr().public_key(); - // verify the auth header nonce if !crypto::verify_authentication_nonce( - &remote_public_key, + &session_enr.public_key(), ephem_pubkey, - &data, + &challenge_data, local_id, id_nonce_sig, ) { let challenge = Challenge { - data, - remote_enr: session_enr.into_challenge_enr(), + data: challenge_data, + remote_enr: Some(session_enr), }; return Err(Discv5Error::InvalidChallengeSignature(challenge)); } @@ -239,8 +196,8 @@ impl Session { let (decryption_key, encryption_key) = crypto::derive_keys_from_pubkey( &local_key.read(), local_id, - &node_address.node_id, - &data, + remote_id, + &challenge_data, ephem_pubkey, )?; @@ -249,7 +206,6 @@ impl Session { decryption_key, }; - let session_enr = session_enr.into_most_recent_enr(); Ok((Session::new(keys), session_enr)) } @@ -307,41 +263,6 @@ impl Session { } } -enum MostRecentEnr { - Challenge { - enr: Enr, - }, - Handshake { - enr: Enr, - challenge_enr: Option, - }, -} - -impl MostRecentEnr { - fn enr(&self) -> &Enr { - match self { - Self::Challenge { enr } => enr, - Self::Handshake { enr, .. } => enr, - } - } - - fn into_most_recent_enr(self) -> Enr { - match self { - MostRecentEnr::Challenge { enr } => enr, - MostRecentEnr::Handshake { enr, .. } => enr, - } - } - fn into_challenge_enr(self) -> Option { - match self { - MostRecentEnr::Challenge { enr } => Some(enr), - MostRecentEnr::Handshake { - enr: _, - challenge_enr, - } => challenge_enr, - } - } -} - #[cfg(test)] pub(crate) fn build_dummy_session() -> Session { Session::new(Keys { diff --git a/src/handler/sessions/limiter.rs b/src/handler/sessions/limiter.rs deleted file mode 100644 index 70db681b0..000000000 --- a/src/handler/sessions/limiter.rs +++ /dev/null @@ -1,69 +0,0 @@ -use crate::{node_info::NodeAddress, Discv5Error, Enr}; -use std::collections::HashSet; - -/// The minimum number of peers to accept sessions with that have an unreachable ENR, i.e. cater -/// requests for, at a time. Benevolent peers of this type could for example be symmetrically -/// NAT:ed nodes or nodes that have recently joined the network and are still unaware of their -/// externally reachable socket, relying on their peers to discover it. -pub const MIN_SESSIONS_UNREACHABLE_ENR: usize = 1; - -pub(crate) struct SessionLimiter { - /// Keeps track of the sessions held for peers with unreachable ENRs. These could be peers yet - /// to discover their externally reachable socket or symmetrically NAT:ed peers that, - /// naturally, will never discover one externally reachable socket. - sessions_unreachable_enr_tracker: HashSet, - /// Receiver of expired sessions. - rx_expired_sessions: futures::channel::mpsc::Receiver, - /// The max number of sessions to peers with unreachable ENRs at a time. - limit: usize, -} - -impl SessionLimiter { - pub fn new( - rx_expired_sessions: futures::channel::mpsc::Receiver, - limit: usize, - ) -> Self { - SessionLimiter { - sessions_unreachable_enr_tracker: Default::default(), - rx_expired_sessions, - limit, - } - } - - /// Drains buffer of expired sessions, and untracks any which belong to unreachable ENRs. - fn drain_expired_sessions_buffer(&mut self) { - while let Ok(Some(session_node_address)) = self.rx_expired_sessions.try_next() { - self.sessions_unreachable_enr_tracker - .remove(&session_node_address); - } - } - - /// Checks if a session with this peer should be allowed at this given time. Called after - /// connection establishment, before session key derivation. As a side effect this drains the - /// expired entries buffer. - pub fn track_sessions_unreachable_enr( - &mut self, - node_address: &NodeAddress, - enr: &Enr, - ) -> Result<(), Discv5Error> { - self.drain_expired_sessions_buffer(); - - // Peer is reachable - if enr.udp4_socket().is_some() || enr.udp6_socket().is_some() { - return Ok(()); - } - // Peer is unreachable. - if self.sessions_unreachable_enr_tracker.len() >= self.limit { - return Err(Discv5Error::LimitSessionsUnreachableEnr); - } - - self.sessions_unreachable_enr_tracker - .insert(node_address.clone()); - Ok(()) - } - - /// Untracks the given session if it has an unreachable ENR. - pub fn untrack_session(&mut self, node_address: &NodeAddress) { - self.sessions_unreachable_enr_tracker.remove(node_address); - } -} diff --git a/src/handler/sessions/mod.rs b/src/handler/sessions/mod.rs deleted file mode 100644 index be4aa2c1c..000000000 --- a/src/handler/sessions/mod.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::{lru_time_cache::LruTimeCache, node_info::NodeAddress}; -use std::time::Duration; - -mod crypto; -mod limiter; -pub(crate) mod session; - -use limiter::SessionLimiter; -pub use limiter::MIN_SESSIONS_UNREACHABLE_ENR; -pub(crate) use session::Session; - -pub struct Sessions { - /// Stores established sessions. - pub cache: LruTimeCache, - /// Limits the number of sessions to peers with unreachable ENRs. Limiter is queried upon - /// session establishment. - pub limiter: Option, -} - -impl Sessions { - pub fn new( - cache_capacity: usize, - entry_ttl: Duration, - unreachable_enr_limit: Option, - ) -> Self { - let (cache, limiter) = match unreachable_enr_limit { - Some(limit) => { - let (tx, rx) = futures::channel::mpsc::channel::(cache_capacity); - let limiter = SessionLimiter::new(rx, limit); - let cache = - LruTimeCache::new_with_expiry_feedback(entry_ttl, Some(cache_capacity), tx); - (cache, Some(limiter)) - } - None => { - let cache = LruTimeCache::new(entry_ttl, Some(cache_capacity)); - (cache, None) - } - }; - - Sessions { cache, limiter } - } -} - -#[cfg(test)] -mod test { - use super::*; - use enr::{CombinedKey, EnrBuilder}; - - #[tokio::test] - async fn test_limiter() { - let max_nodes_unreachable_enr = 2; - let session_time_out = Duration::from_secs(1); - let mut sessions = Sessions::new(3, session_time_out, Some(max_nodes_unreachable_enr)); - - // first node - let first_key = CombinedKey::generate_secp256k1(); - let mut builder = EnrBuilder::new("v4"); - let first_enr = builder.build(&first_key).unwrap(); - - let first_unreachable_node = NodeAddress { - socket_addr: "0.0.0.0:10010".parse().unwrap(), - node_id: first_enr.node_id(), - }; - // second node - let second_key = CombinedKey::generate_secp256k1(); - builder = EnrBuilder::new("v4"); - let second_enr = builder.build(&second_key).unwrap(); - - let second_unreachable_node = NodeAddress { - socket_addr: "0.0.0.0:10011".parse().unwrap(), - node_id: second_enr.node_id(), - }; - // third node - let third_key = CombinedKey::generate_secp256k1(); - builder = EnrBuilder::new("v4"); - let third_enr = builder.build(&third_key).unwrap(); - - let third_unreachable_node = NodeAddress { - socket_addr: "0.0.0.0:10012".parse().unwrap(), - node_id: third_enr.node_id(), - }; - // check if space for first node - let res = sessions.limiter.as_mut().map(|limiter| { - limiter.track_sessions_unreachable_enr(&first_unreachable_node, &first_enr) - }); - res.unwrap() - .expect("should be space for first unreachable node"); - // insert first node - let first_session = Session::new(([0u8; 16], [0u8; 16]).into()); - sessions.cache.insert(first_unreachable_node, first_session); - // check if space for second node - let second_res = sessions.limiter.as_mut().map(|limiter| { - limiter.track_sessions_unreachable_enr(&second_unreachable_node, &second_enr) - }); - second_res - .unwrap() - .expect("should be space for second unreachable node"); - // insert second node - let second_session = Session::new(([0u8; 16], [0u8; 16]).into()); - sessions - .cache - .insert(second_unreachable_node, second_session); - - // check if space for third node, should fail - let third_res = sessions.limiter.as_mut().map(|limiter| { - limiter.track_sessions_unreachable_enr(&third_unreachable_node, &third_enr) - }); - assert!(third_res.unwrap().is_err()); - // let sessions expire - tokio::time::sleep(session_time_out).await; - // calling `len` removes expired entries - sessions.cache.len(); - // retry check if space for third node, should be successful - let third_res = sessions.limiter.as_mut().map(|limiter| { - limiter.track_sessions_unreachable_enr(&third_unreachable_node, &third_enr) - }); - third_res - .unwrap() - .expect("should be space for third unreachable node"); - } -} diff --git a/src/handler/tests.rs b/src/handler/tests.rs index bc38ad694..b82099010 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -2,7 +2,7 @@ use super::*; use crate::{ - handler::sessions::session::build_dummy_session, + handler::session::build_dummy_session, packet::{DefaultProtocolId, PacketHeader, MAX_PACKET_SIZE, MESSAGE_NONCE_LENGTH}, return_if_ipv6_is_not_supported, rpc::{Request, Response}, @@ -75,13 +75,14 @@ async fn build_handler_with_listen_config( let (service_send, handler_recv) = mpsc::channel(50); let (exit_tx, exit) = oneshot::channel(); - let nat_hole_puncher = NatHolePunchUtils::new( - listen_sockets.iter(), + let nat_utils = NatUtils::new( + &listen_sockets, &enr, config.listen_config.ip_mode(), None, None, config.session_cache_capacity, + None, ); ( @@ -93,7 +94,10 @@ async fn build_handler_with_listen_config( active_requests: ActiveRequests::new(config.request_timeout), pending_requests: HashMap::new(), filter_expected_responses, - sessions: Sessions::new(config.session_cache_capacity, config.session_timeout, None), + sessions: LruTimeCache::new( + config.session_timeout, + Some(config.session_cache_capacity), + ), one_time_sessions: LruTimeCache::new( Duration::from_secs(ONE_TIME_SESSION_TIMEOUT), Some(ONE_TIME_SESSION_CACHE_CAPACITY), @@ -103,7 +107,7 @@ async fn build_handler_with_listen_config( service_send, listen_sockets, socket, - nat_hole_puncher, + nat_utils, exit, }, MockService { @@ -494,7 +498,6 @@ async fn nat_hole_punch_relay() { build_handler_with_listen_config::(listen_config).await; let relay_addr = handler.enr.read().udp4_socket().unwrap().into(); let relay_node_id = handler.enr.read().node_id(); - let mut dummy_session = build_dummy_session(); // Initiator let inr_enr = { @@ -508,11 +511,10 @@ async fn nat_hole_punch_relay() { let inr_addr = inr_enr.udp4_socket().unwrap().into(); let inr_node_id = inr_enr.node_id(); - let inr_node_address = NodeAddress::new(inr_addr, inr_enr.node_id()); + let initr_node_address = NodeAddress::new(inr_addr, inr_enr.node_id()); handler .sessions - .cache - .insert(inr_node_address, dummy_session.clone()); + .insert(initr_node_address, build_dummy_session()); let inr_socket = UdpSocket::bind(inr_addr) .await @@ -533,8 +535,7 @@ async fn nat_hole_punch_relay() { let tgt_node_address = NodeAddress::new(tgt_addr, tgt_enr.node_id()); handler .sessions - .cache - .insert(tgt_node_address, dummy_session.clone()); + .insert(tgt_node_address, build_dummy_session()); let tgt_socket = UdpSocket::bind(tgt_addr) .await @@ -618,7 +619,7 @@ async fn nat_hole_punch_relay() { kind ); - let decrypted_message = dummy_session + let decrypted_message = build_dummy_session() .decrypt_message(message_nonce, &message, &aad) .expect("should decrypt message"); match Message::decode(&decrypted_message).expect("should decode message") { @@ -640,8 +641,7 @@ async fn nat_hole_punch_target() { build_handler_with_listen_config::(listen_config).await; let tgt_addr = handler.enr.read().udp4_socket().unwrap().into(); let tgt_node_id = handler.enr.read().node_id(); - let dummy_session = build_dummy_session(); - handler.nat_hole_puncher.is_behind_nat = Some(true); + handler.nat_utils.is_behind_nat = Some(true); // Relay let relay_enr = { @@ -658,8 +658,7 @@ async fn nat_hole_punch_target() { let relay_node_address = NodeAddress::new(relay_addr, relay_node_id); handler .sessions - .cache - .insert(relay_node_address, dummy_session.clone()); + .insert(relay_node_address, build_dummy_session()); let relay_socket = UdpSocket::bind(relay_addr) .await diff --git a/src/lru_time_cache.rs b/src/lru_time_cache.rs index 79e8a9756..b90336ab2 100644 --- a/src/lru_time_cache.rs +++ b/src/lru_time_cache.rs @@ -1,19 +1,20 @@ -use futures::channel::mpsc; use hashlink::LinkedHashMap; use std::{ hash::Hash, time::{Duration, Instant}, }; -use tracing::warn; pub struct LruTimeCache { - map: LinkedHashMap, + /// The main map storing the internal values. It stores the time the value was inserted and an + /// optional tag to keep track of individual values. + map: LinkedHashMap, /// The time elements remain in the cache. ttl: Duration, /// The max size of the cache. capacity: usize, - /// Channel to send expiry notifications on. - tx: Option>, + /// Optional count of specific tagged elements. This is used in discv5 for tracking + /// the number of unreachable sessions currently held. + tagged_count: usize, } impl LruTimeCache { @@ -27,42 +28,54 @@ impl LruTimeCache { map: LinkedHashMap::new(), ttl, capacity, - tx: None, + tagged_count: 0, } } - /// Queues expired entries on buffer. If enabled, ensure that buffer is emptied regularly. - pub fn new_with_expiry_feedback( - ttl: Duration, - capacity: Option, - tx: mpsc::Sender, - ) -> LruTimeCache { - let capacity = if let Some(cap) = capacity { - cap - } else { - usize::MAX - }; - LruTimeCache { - map: LinkedHashMap::new(), - ttl, - capacity, - tx: Some(tx), - } + /// Returns the number of elements that are currently tagged in the cache. + pub fn tagged(&self) -> usize { + self.tagged_count } - /// Inserts a key-value pair into the cache. + // Insert an untagged key-value pair into the cache. pub fn insert(&mut self, key: K, value: V) { + self.insert_raw(key, value, false); + } + + // Insert a tagged key-value pair into the cache. + #[cfg(test)] + pub fn insert_tagged(&mut self, key: K, value: V) { + self.insert_raw(key, value, true); + } + + /// Inserts a key-value pair into the cache. + pub fn insert_raw(&mut self, key: K, value: V, tagged: bool) { let now = Instant::now(); - self.map.insert(key, (value, now)); + if let Some(old_value) = self.map.insert(key, (value, now, tagged)) { + // If the old value was tagged but the new one isn't, we reduce our count + if !tagged && old_value.2 { + self.tagged_count = self.tagged_count.saturating_sub(1); + } else if tagged && !old_value.2 { + // Else if the new value is tagged and the old wasn't tagged increment the count + self.tagged_count += 1; + } + } else if tagged { + // No previous value, increment the tagged count + self.tagged_count += 1; + } if self.map.len() > self.capacity { - self.map.pop_front(); + if let Some((_, value)) = self.map.pop_front() { + if value.2 { + // We have removed a tagged element + self.tagged_count = self.tagged_count.saturating_sub(1); + } + } } } /// Retrieves a reference to the value stored under `key`, or `None` if the key doesn't exist. /// Also removes expired elements and updates the time. - #[allow(dead_code)] pub fn get(&mut self, key: &K) -> Option<&V> { self.get_mut(key).map(|value| &*value) } @@ -85,16 +98,14 @@ impl LruTimeCache { /// Returns a reference to the value with the given `key`, if present and not expired, without /// updating the timestamp. - #[allow(dead_code)] pub fn peek(&self, key: &K) -> Option<&V> { - if let Some((value, time)) = self.map.get(key) { + if let Some((value, time, _)) = self.map.get(key) { return if *time + self.ttl >= Instant::now() { Some(value) } else { None }; } - None } @@ -107,28 +118,31 @@ impl LruTimeCache { /// Removes a key-value pair from the cache, returning the value at the key if the key /// was previously in the map. pub fn remove(&mut self, key: &K) -> Option { - self.map.remove(key).map(|v| v.0) + let value = self.map.remove(key)?; + + // This element was tagged, reduce the count + if value.2 { + self.tagged_count = self.tagged_count.saturating_sub(1); + } + Some(value.0) } /// Removes expired items from the cache. fn remove_expired_values(&mut self, now: Instant) { let mut expired_keys = vec![]; - for (key, (_, time)) in self.map.iter_mut() { + for (key, (_, time, _)) in self.map.iter_mut() { if *time + self.ttl >= now { break; } expired_keys.push(key.clone()); } - for k in expired_keys.iter() { - self.map.remove(k); - } - - if let Some(ref mut tx) = self.tx { - for k in expired_keys { - if let Err(e) = tx.try_send(k) { - warn!("Failed removing expired key, {}", e); + for k in expired_keys { + if let Some(v) = self.map.remove(&k) { + // This key was tagged, reduce the count + if v.2 { + self.tagged_count = self.tagged_count.saturating_sub(1); } } } @@ -171,6 +185,30 @@ mod tests { assert_eq!(Some(&30), cache.get(&3)); } + #[test] + fn tagging() { + let mut cache = LruTimeCache::new(Duration::from_secs(10), Some(2)); + + cache.insert_tagged(1, 10); + cache.insert(2, 20); + assert_eq!(2, cache.len()); + assert_eq!(1, cache.tagged()); + + cache.insert_tagged(3, 30); + assert_eq!(2, cache.len()); + assert_eq!(1, cache.tagged()); + assert_eq!(Some(&20), cache.get(&2)); + assert_eq!(Some(&30), cache.get(&3)); + + cache.insert_tagged(2, 30); + assert_eq!(2, cache.tagged()); + + cache.insert(4, 30); + assert_eq!(1, cache.tagged()); + cache.insert(5, 30); + assert_eq!(0, cache.tagged()); + } + #[test] fn get() { let mut cache = LruTimeCache::new(Duration::from_secs(10), Some(2)); diff --git a/src/packet/mod.rs b/src/packet/mod.rs index 3207ea4b3..c59e72e08 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -24,7 +24,7 @@ pub const IV_LENGTH: usize = 16; /// The length of the static header. (6 byte protocol id, 2 bytes version, 1 byte kind, 12 byte /// message nonce and a 2 byte authdata-size). pub const STATIC_HEADER_LENGTH: usize = 23; -/// The message nonce length (in bytes). +/// The message nonce length (in bytes). This must be at least 4 bytes. pub const MESSAGE_NONCE_LENGTH: usize = 12; /// The Id nonce length (in bytes). pub const ID_NONCE_LENGTH: usize = 16;