From 29b5267c27e493760978201c25583219c91eab69 Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Mon, 3 Jul 2023 17:57:38 -0400 Subject: [PATCH 01/44] Initial support for concurrent requests --- src/handler/active_requests.rs | 117 ++++++++++++++++------- src/handler/mod.rs | 165 ++++++++++----------------------- src/handler/tests.rs | 1 - 3 files changed, 135 insertions(+), 148 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index e46ccee83..6663f1303 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -1,10 +1,9 @@ use super::*; use delay_map::HashMapDelay; -use more_asserts::debug_unreachable; pub(super) struct ActiveRequests { /// A list of raw messages we are awaiting a response from the remote. - active_requests_mapping: HashMapDelay, + active_requests_mapping: HashMapDelay>, // WHOAREYOU messages do not include the source node id. We therefore maintain another // mapping of active_requests via message_nonce. This allows us to match WHOAREYOU // requests with active requests sent. @@ -20,49 +19,97 @@ impl ActiveRequests { } } + // Insert a new request into the active requests mapping. pub fn insert(&mut self, node_address: NodeAddress, request_call: RequestCall) { let nonce = *request_call.packet().message_nonce(); + let mut request_calls = self + .active_requests_mapping + .remove(&node_address) + .unwrap_or_default(); + request_calls.push(request_call); self.active_requests_mapping - .insert(node_address.clone(), request_call); + .insert(node_address.clone(), request_calls); self.active_requests_nonce_mapping .insert(nonce, node_address); } - pub fn get(&self, node_address: &NodeAddress) -> Option<&RequestCall> { - self.active_requests_mapping.get(node_address) - } - + // Remove a single request identified by its nonce. + // Returns None if the nonce is not found. pub fn remove_by_nonce(&mut self, nonce: &MessageNonce) -> Option<(NodeAddress, RequestCall)> { - match self.active_requests_nonce_mapping.remove(nonce) { - Some(node_address) => match self.active_requests_mapping.remove(&node_address) { - Some(request_call) => Some((node_address, request_call)), - None => { - debug_unreachable!("A matching request call doesn't exist"); - error!("A matching request call doesn't exist"); - None - } + let node_address = match self.active_requests_nonce_mapping.remove(nonce) { + Some(node_address) => node_address, + None => return None, + }; + let mut requests = match self.active_requests_mapping.remove(&node_address) { + Some(requests) => match requests.len() { + 0 => return None, + _ => requests, }, - None => None, + None => return None, + }; + let index = match requests + .iter() + .position(|req| req.packet().message_nonce() == nonce) + { + Some(index) => index, + None => { + return None; + } + }; + let req = requests.remove(index); + self.active_requests_mapping + .insert(node_address.clone(), requests); + Some((node_address, req)) + } + + // Remove all requests associated with a node. + // Returns None if the node is not found. + pub fn remove_requests(&mut self, node_address: &NodeAddress) -> Option> { + let requests = self + .active_requests_mapping + .remove(&node_address) + .unwrap_or_default(); + for req in &requests { + self.active_requests_nonce_mapping + .remove(req.packet().message_nonce()); } + Some(requests) } - pub fn remove(&mut self, node_address: &NodeAddress) -> Option { - match self.active_requests_mapping.remove(node_address) { - Some(request_call) => { + // Remove a single request identified by its id. + // Returns None if the node is not found. + pub fn remove_request( + &mut self, + node_address: &NodeAddress, + id: &RequestId, + ) -> Option { + let mut reqs = match self.active_requests_mapping.remove(node_address) { + Some(reqs) => reqs, + None => return None, + }; + match reqs.len() { + 0 => None, + _ => { + let index = reqs.iter().position(|req| { + let req_id = match req.id() { + HandlerReqId::Internal(id) | HandlerReqId::External(id) => id, + }; + req_id == id + }); + let index = match index { + Some(index) => index, + None => return None, + }; + let req = reqs.remove(index); // Remove the associated nonce mapping. match self .active_requests_nonce_mapping - .remove(request_call.packet().message_nonce()) + .remove(req.packet().message_nonce()) { - Some(_) => Some(request_call), - None => { - debug_unreachable!("A matching nonce mapping doesn't exist"); - error!("A matching nonce mapping doesn't exist"); - None - } + Some(_) => Some(req), + None => None, } } - None => None, } } @@ -81,9 +128,11 @@ impl ActiveRequests { } for (address, request) in self.active_requests_mapping.iter() { - let nonce = request.packet().message_nonce(); - if !self.active_requests_nonce_mapping.contains_key(nonce) { - panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce); + for req in request { + let nonce = req.packet().message_nonce(); + if !self.active_requests_nonce_mapping.contains_key(nonce) { + panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce); + } } } } @@ -92,11 +141,13 @@ impl ActiveRequests { impl Stream for ActiveRequests { type Item = Result<(NodeAddress, RequestCall), String>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // should we move timeout management to active_requests_nonce_mapping? match self.active_requests_mapping.poll_next_unpin(cx) { - Poll::Ready(Some(Ok((node_address, request_call)))) => { - // Remove the associated nonce mapping. + Poll::Ready(Some(Ok((node_address, mut request_calls)))) => { + let request_call = request_calls.remove(0); self.active_requests_nonce_mapping - .remove(request_call.packet().message_nonce()); + .remove(request_call.packet().message_nonce()) + .expect("fuck"); Poll::Ready(Some(Ok((node_address, request_call)))) } Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))), diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 6df6ef835..b833e1eff 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -169,13 +169,6 @@ enum HandlerReqId { External(RequestId), } -/// A request queued for sending. -struct PendingRequest { - contact: NodeContact, - request_id: HandlerReqId, - request: RequestBody, -} - /// Process to handle handshakes and sessions established from raw RPC communications between nodes. pub struct Handler { /// Configuration for the discv5 service. @@ -187,12 +180,10 @@ pub struct Handler { enr: Arc>, /// The key to sign the ENR and set up encrypted communication with peers. key: Arc>, - /// Pending raw requests. + /// Active requests that are awaiting a response. active_requests: ActiveRequests, /// The expected responses by SocketAddr which allows packets to pass the underlying filter. filter_expected_responses: Arc>>, - /// Requests awaiting a handshake completion. - pending_requests: HashMap>, /// Currently in-progress outbound handshakes (WHOAREYOU packets) with peers. active_challenges: HashMapDelay, /// Established sessions with peers. @@ -283,7 +274,6 @@ impl Handler { enr, key, active_requests: ActiveRequests::new(config.request_timeout), - pending_requests: HashMap::new(), filter_expected_responses, sessions: LruTimeCache::new( config.session_timeout, @@ -331,13 +321,13 @@ impl Handler { Some(inbound_packet) = self.socket.recv.recv() => { self.process_inbound_packet::

(inbound_packet).await; } - Some(Ok((node_address, pending_request))) = self.active_requests.next() => { - self.handle_request_timeout(node_address, pending_request).await; + Some(Ok((node_address, active_request))) = self.active_requests.next() => { + self.handle_request_timeout(node_address, active_request).await; } Some(Ok((node_address, _challenge))) = self.active_challenges.next() => { // A challenge has expired. There could be pending requests awaiting this // challenge. We process them here - self.send_next_request::

(node_address).await; + self.replay_active_requests::

(&node_address).await; } _ = banned_nodes_check.tick() => self.unban_nodes_check(), // Unban nodes that are past the timeout _ = &mut self.exit => { @@ -464,22 +454,6 @@ impl Handler { return Err(RequestError::SelfRequest); } - // If there is already an active request or an active challenge (WHOAREYOU sent) for this node, add to pending requests - if self.active_requests.get(&node_address).is_some() - || self.active_challenges.get(&node_address).is_some() - { - trace!("Request queued for node: {}", node_address); - self.pending_requests - .entry(node_address) - .or_insert_with(Vec::new) - .push(PendingRequest { - contact, - request_id, - request, - }); - return Ok(()); - } - let (packet, initiating_session) = { if let Some(session) = self.sessions.get_mut(&node_address) { // Encrypt the message and send @@ -823,7 +797,7 @@ impl Handler { .await; // We could have pending messages that were awaiting this session to be // established. If so process them. - self.send_next_request::

(node_address).await; + self.replay_active_requests::

(&node_address).await; } else { // IP's or NodeAddress don't match. Drop the session. warn!( @@ -832,8 +806,7 @@ impl Handler { enr.udp6_socket(), node_address ); - self.fail_session(&node_address, RequestError::InvalidRemoteEnr, true) - .await; + self.fail_session(&node_address, true).await; // Respond to PING request even if the ENR or NodeAddress don't match // so that the source node can notice its external IP address has been changed. @@ -881,8 +854,7 @@ impl Handler { "Invalid Authentication header. Dropping session. Error: {:?}", e ); - self.fail_session(&node_address, RequestError::InvalidRemotePacket, true) - .await; + self.fail_session(&node_address, true).await; } } } else { @@ -893,41 +865,36 @@ impl Handler { } } - async fn send_next_request(&mut self, node_address: NodeAddress) { - // ensure we are not over writing any existing requests - if self.active_requests.get(&node_address).is_none() { - if let std::collections::hash_map::Entry::Occupied(mut entry) = - self.pending_requests.entry(node_address) + async fn replay_active_requests(&mut self, node_address: &NodeAddress) { + let active_requests = self + .active_requests + .remove_requests(node_address) + .unwrap_or_default(); + for req in active_requests { + let request_id = RequestId::random(); // or reuse old one + let request_id = HandlerReqId::Internal(request_id); //? + if let Err(request_error) = self + .send_request::

( + req.contact().clone(), + request_id.clone(), + req.body().clone(), + ) + .await { - // If it exists, there must be a request here - let PendingRequest { - contact, - request_id, - request, - } = entry.get_mut().remove(0); - if entry.get().is_empty() { - entry.remove(); - } - trace!("Sending next awaiting message. Node: {}", contact); - if let Err(request_error) = self - .send_request::

(contact, request_id.clone(), request) - .await - { - warn!("Failed to send next awaiting request {}", request_error); - // Inform the service that the request failed - match request_id { - HandlerReqId::Internal(_) => { - // An internal request could not be sent. For now we do nothing about - // this. - } - HandlerReqId::External(id) => { - if let Err(e) = self - .service_send - .send(HandlerOut::RequestFailed(id, request_error)) - .await - { - warn!("Failed to inform that request failed {}", e); - } + warn!("Failed to send next awaiting request {}", request_error); + // Inform the service that the request failed + match request_id { + HandlerReqId::Internal(_) => { + // An internal request could not be sent. For now we do nothing about + // this. + } + HandlerReqId::External(id) => { + if let Err(e) = self + .service_send + .send(HandlerOut::RequestFailed(id, request_error)) + .await + { + warn!("Failed to inform that request failed {}", e); } } } @@ -966,8 +933,7 @@ impl Handler { "Message from node: {} is not encrypted with known session keys.", node_address ); - self.fail_session(&node_address, RequestError::InvalidRemotePacket, true) - .await; + self.fail_session(&node_address, true).await; // If we haven't already sent a WhoAreYou, // spawn a WHOAREYOU event to check for highest known ENR if self.active_challenges.get(&node_address).is_none() { @@ -1033,8 +999,7 @@ impl Handler { _ => {} } debug!("Session failed invalid ENR response"); - self.fail_session(&node_address, RequestError::InvalidRemoteEnr, true) - .await; + self.fail_session(&node_address, true).await; return; } } @@ -1069,22 +1034,17 @@ impl Handler { response: Response, ) { // Find a matching request, if any - if let Some(mut request_call) = self.active_requests.remove(&node_address) { - let id = match request_call.id() { - HandlerReqId::Internal(id) | HandlerReqId::External(id) => id, - }; - if id != &response.id { - trace!( - "Received an RPC Response to an unknown request. Likely late response. {}", - node_address - ); - // add the request back and reset the timer - self.active_requests.insert(node_address, request_call); - return; - } + if let Some(mut request_call) = self + .active_requests + .remove_request(&node_address, &response.id) + { + // handle this case + //trace!( + //"Received an RPC Response to an unknown request. Likely late response. {}", + //node_address + //); // The response matches a request - // Check to see if this is a Nodes response, in which case we may require to wait for // extra responses if let ResponseBody::Nodes { total, .. } = response.body { @@ -1138,7 +1098,8 @@ impl Handler { { warn!("Failed to inform of response {}", e) } - self.send_next_request::

(node_address).await; + // idk ... + self.replay_active_requests::

(&node_address).await; } else { // This is likely a late response and we have already failed the request. These get // dropped here. @@ -1208,41 +1169,17 @@ impl Handler { } let node_address = request_call.contact().node_address(); - self.fail_session(&node_address, error, remove_session) - .await; + self.fail_session(&node_address, remove_session).await; } /// Removes a session and updates associated metrics and fields. - async fn fail_session( - &mut self, - node_address: &NodeAddress, - error: RequestError, - remove_session: bool, - ) { + async fn fail_session(&mut self, node_address: &NodeAddress, remove_session: bool) { if remove_session { self.sessions.remove(node_address); METRICS .active_sessions .store(self.sessions.len(), Ordering::Relaxed); } - if let Some(to_remove) = self.pending_requests.remove(node_address) { - for PendingRequest { request_id, .. } in to_remove { - match request_id { - HandlerReqId::Internal(_) => { - // Do not report failures on requests belonging to the handler. - } - HandlerReqId::External(id) => { - if let Err(e) = self - .service_send - .send(HandlerOut::RequestFailed(id, error.clone())) - .await - { - warn!("Failed to inform request failure {}", e) - } - } - } - } - } } /// Sends a packet to the send handler to be encoded and sent. diff --git a/src/handler/tests.rs b/src/handler/tests.rs index f46803659..f326b4a53 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -70,7 +70,6 @@ async fn build_handler( enr: Arc::new(RwLock::new(enr)), key: Arc::new(RwLock::new(key)), active_requests: ActiveRequests::new(config.request_timeout), - pending_requests: HashMap::new(), filter_expected_responses, sessions: LruTimeCache::new(config.session_timeout, Some(config.session_cache_capacity)), one_time_sessions: LruTimeCache::new( From a46961e4de23a2105ab49cad8305cbef97c1a75a Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Thu, 6 Jul 2023 10:44:38 -0400 Subject: [PATCH 02/44] Fail all concurrent requests for a session if any one req fails --- src/handler/active_requests.rs | 122 +++++++++++++++------------ src/handler/mod.rs | 78 +++++++++++------ src/handler/tests.rs | 148 ++++++++++++++++++++++++++++----- 3 files changed, 250 insertions(+), 98 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index 6663f1303..5056c36e5 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -1,5 +1,23 @@ use super::*; use delay_map::HashMapDelay; +use std::fmt; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ActiveRequestsError { + InvalidState, +} + +impl fmt::Display for ActiveRequestsError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ActiveRequestsError::InvalidState => { + write!(f, "Invalid state: active requests mappings are not in sync") + } + } + } +} + +impl std::error::Error for ActiveRequestsError {} pub(super) struct ActiveRequests { /// A list of raw messages we are awaiting a response from the remote. @@ -34,83 +52,78 @@ impl ActiveRequests { } // Remove a single request identified by its nonce. - // Returns None if the nonce is not found. - pub fn remove_by_nonce(&mut self, nonce: &MessageNonce) -> Option<(NodeAddress, RequestCall)> { - let node_address = match self.active_requests_nonce_mapping.remove(nonce) { - Some(node_address) => node_address, - None => return None, - }; - let mut requests = match self.active_requests_mapping.remove(&node_address) { - Some(requests) => match requests.len() { - 0 => return None, - _ => requests, - }, - None => return None, - }; + pub fn remove_by_nonce( + &mut self, + nonce: &MessageNonce, + ) -> Result<(NodeAddress, RequestCall), ActiveRequestsError> { + let node_address = self + .active_requests_nonce_mapping + .remove(nonce) + .ok_or_else(|| ActiveRequestsError::InvalidState)?; + let mut requests = self + .active_requests_mapping + .remove(&node_address) + .ok_or_else(|| ActiveRequestsError::InvalidState)?; let index = match requests .iter() .position(|req| req.packet().message_nonce() == nonce) { Some(index) => index, None => { - return None; + // if nonce req is missing, reinsert remaining requests into mapping + self.active_requests_mapping + .insert(node_address.clone(), requests); + return Err(ActiveRequestsError::InvalidState); } }; let req = requests.remove(index); self.active_requests_mapping .insert(node_address.clone(), requests); - Some((node_address, req)) + Ok((node_address, req)) } // Remove all requests associated with a node. - // Returns None if the node is not found. - pub fn remove_requests(&mut self, node_address: &NodeAddress) -> Option> { + pub fn remove_requests( + &mut self, + node_address: &NodeAddress, + ) -> Result, ActiveRequestsError> { let requests = self .active_requests_mapping .remove(&node_address) - .unwrap_or_default(); + .ok_or_else(|| ActiveRequestsError::InvalidState)?; for req in &requests { self.active_requests_nonce_mapping .remove(req.packet().message_nonce()); } - Some(requests) + Ok(requests) } // Remove a single request identified by its id. - // Returns None if the node is not found. pub fn remove_request( &mut self, node_address: &NodeAddress, id: &RequestId, - ) -> Option { - let mut reqs = match self.active_requests_mapping.remove(node_address) { - Some(reqs) => reqs, - None => return None, - }; - match reqs.len() { - 0 => None, - _ => { - let index = reqs.iter().position(|req| { - let req_id = match req.id() { - HandlerReqId::Internal(id) | HandlerReqId::External(id) => id, - }; - req_id == id - }); - let index = match index { - Some(index) => index, - None => return None, - }; - let req = reqs.remove(index); - // Remove the associated nonce mapping. - match self - .active_requests_nonce_mapping - .remove(req.packet().message_nonce()) - { - Some(_) => Some(req), - None => None, - } - } - } + ) -> Result { + let reqs = self + .active_requests_mapping + .get(node_address) + .ok_or_else(|| ActiveRequestsError::InvalidState)?; + let index = reqs + .iter() + .position(|req| { + let req_id: RequestId = req.id().into(); + &req_id == id + }) + .ok_or_else(|| ActiveRequestsError::InvalidState)?; + let nonce = reqs + .get(index) + .ok_or_else(|| ActiveRequestsError::InvalidState)? + .packet() + .message_nonce() + .clone(); + // Remove the associated nonce mapping. + let (_, request_call) = self.remove_by_nonce(&nonce)?; + Ok(request_call) } /// Checks that `active_requests_mapping` and `active_requests_nonce_mapping` are in sync. @@ -127,8 +140,8 @@ impl ActiveRequests { } } - for (address, request) in self.active_requests_mapping.iter() { - for req in request { + for (address, requests) in self.active_requests_mapping.iter() { + for req in requests { let nonce = req.packet().message_nonce(); if !self.active_requests_nonce_mapping.contains_key(nonce) { panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce); @@ -141,13 +154,16 @@ impl ActiveRequests { impl Stream for ActiveRequests { type Item = Result<(NodeAddress, RequestCall), String>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // should we move timeout management to active_requests_nonce_mapping? match self.active_requests_mapping.poll_next_unpin(cx) { Poll::Ready(Some(Ok((node_address, mut request_calls)))) => { let request_call = request_calls.remove(0); + // reinsert remaining requests into mapping + self.active_requests_mapping + .insert(node_address.clone(), request_calls); + // remove the nonce mapping self.active_requests_nonce_mapping .remove(request_call.packet().message_nonce()) - .expect("fuck"); + .expect("Invariant violated: nonce mapping does not exist for request"); Poll::Ready(Some(Ok((node_address, request_call)))) } Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))), diff --git a/src/handler/mod.rs b/src/handler/mod.rs index b833e1eff..49c0d1f35 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -169,6 +169,15 @@ enum HandlerReqId { External(RequestId), } +impl Into for &HandlerReqId { + fn into(self) -> RequestId { + match self { + HandlerReqId::Internal(id) => id.clone(), + HandlerReqId::External(id) => id.clone(), + } + } +} + /// Process to handle handshakes and sessions established from raw RPC communications between nodes. pub struct Handler { /// Configuration for the discv5 service. @@ -325,8 +334,8 @@ impl Handler { self.handle_request_timeout(node_address, active_request).await; } Some(Ok((node_address, _challenge))) = self.active_challenges.next() => { - // A challenge has expired. There could be pending requests awaiting this - // challenge. We process them here + // A challenge has expired. There could be active requests impacted by this + // challenge. We replay them here self.replay_active_requests::

(&node_address).await; } _ = banned_nodes_check.tick() => self.unban_nodes_check(), // Unban nodes that are past the timeout @@ -578,7 +587,7 @@ impl Handler { // Check that this challenge matches a known active request. // If this message passes all the requisite checks, a request call is returned. let mut request_call = match self.active_requests.remove_by_nonce(&request_nonce) { - Some((node_address, request_call)) => { + Ok((node_address, request_call)) => { // Verify that the src_addresses match if node_address.socket_addr != src_address { debug!("Received a WHOAREYOU packet for a message with a non-expected source. Source {}, expected_source: {} message_nonce {}", src_address, node_address.socket_addr, hex::encode(request_nonce)); @@ -588,7 +597,7 @@ impl Handler { } request_call } - None => { + Err(_) => { trace!("Received a WHOAREYOU packet that references an unknown or expired request. Source {}, message_nonce {}", src_address, hex::encode(request_nonce)); return; } @@ -806,7 +815,8 @@ impl Handler { enr.udp6_socket(), node_address ); - self.fail_session(&node_address, true).await; + self.fail_session(&node_address, RequestError::InvalidRemoteEnr, true) + .await; // Respond to PING request even if the ENR or NodeAddress don't match // so that the source node can notice its external IP address has been changed. @@ -854,7 +864,8 @@ impl Handler { "Invalid Authentication header. Dropping session. Error: {:?}", e ); - self.fail_session(&node_address, true).await; + self.fail_session(&node_address, RequestError::InvalidRemotePacket, true) + .await; } } } else { @@ -871,8 +882,7 @@ impl Handler { .remove_requests(node_address) .unwrap_or_default(); for req in active_requests { - let request_id = RequestId::random(); // or reuse old one - let request_id = HandlerReqId::Internal(request_id); //? + let request_id = req.id().clone(); if let Err(request_error) = self .send_request::

( req.contact().clone(), @@ -881,7 +891,7 @@ impl Handler { ) .await { - warn!("Failed to send next awaiting request {}", request_error); + warn!("Failed to send next awaiting request {request_error}"); // Inform the service that the request failed match request_id { HandlerReqId::Internal(_) => { @@ -894,7 +904,7 @@ impl Handler { .send(HandlerOut::RequestFailed(id, request_error)) .await { - warn!("Failed to inform that request failed {}", e); + warn!("Failed to inform that request failed {e}"); } } } @@ -933,7 +943,8 @@ impl Handler { "Message from node: {} is not encrypted with known session keys.", node_address ); - self.fail_session(&node_address, true).await; + self.fail_session(&node_address, RequestError::InvalidRemotePacket, true) + .await; // If we haven't already sent a WhoAreYou, // spawn a WHOAREYOU event to check for highest known ENR if self.active_challenges.get(&node_address).is_none() { @@ -999,7 +1010,8 @@ impl Handler { _ => {} } debug!("Session failed invalid ENR response"); - self.fail_session(&node_address, true).await; + self.fail_session(&node_address, RequestError::InvalidRemoteEnr, true) + .await; return; } } @@ -1034,16 +1046,10 @@ impl Handler { response: Response, ) { // Find a matching request, if any - if let Some(mut request_call) = self + if let Ok(mut request_call) = self .active_requests .remove_request(&node_address, &response.id) { - // handle this case - //trace!( - //"Received an RPC Response to an unknown request. Likely late response. {}", - //node_address - //); - // The response matches a request // Check to see if this is a Nodes response, in which case we may require to wait for // extra responses @@ -1098,8 +1104,6 @@ impl Handler { { warn!("Failed to inform of response {}", e) } - // idk ... - self.replay_active_requests::

(&node_address).await; } else { // This is likely a late response and we have already failed the request. These get // dropped here. @@ -1169,17 +1173,43 @@ impl Handler { } let node_address = request_call.contact().node_address(); - self.fail_session(&node_address, remove_session).await; + self.fail_session(&node_address, error, remove_session) + .await; } - /// Removes a session and updates associated metrics and fields. - async fn fail_session(&mut self, node_address: &NodeAddress, remove_session: bool) { + /// Removes a session, fails all of that session's active requests, and updates associated metrics and fields. + async fn fail_session( + &mut self, + node_address: &NodeAddress, + error: RequestError, + remove_session: bool, + ) { if remove_session { self.sessions.remove(node_address); METRICS .active_sessions .store(self.sessions.len(), Ordering::Relaxed); } + for req in self + .active_requests + .remove_requests(node_address) + .unwrap_or_default() + { + match req.id() { + HandlerReqId::Internal(_) => { + // Do not report failures on requests belonging to the handler. + } + HandlerReqId::External(id) => { + if let Err(e) = self + .service_send + .send(HandlerOut::RequestFailed(id.clone(), error.clone())) + .await + { + warn!("Failed to inform request failure {e}") + } + } + } + } } /// Sends a packet to the send handler to be encoded and sent. diff --git a/src/handler/tests.rs b/src/handler/tests.rs index f326b4a53..3d64ccf1e 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -330,39 +330,145 @@ async fn multiple_messages() { } } -#[tokio::test] -async fn test_active_requests_insert() { - const EXPIRY: Duration = Duration::from_secs(5); - let mut active_requests = ActiveRequests::new(EXPIRY); - - // Create the test values needed - let port = 5000; - let ip = "127.0.0.1".parse().unwrap(); - +fn create_node() -> Enr { let key = CombinedKey::generate_secp256k1(); - - let enr = EnrBuilder::new("v4") + let ip = "127.0.0.1".parse().unwrap(); + let port = 8080 + rand::random::() % 1000; + EnrBuilder::new("v4") .ip4(ip) .udp4(port) .build(&key) - .unwrap(); - let node_id = enr.node_id(); - - let contact: NodeContact = enr.into(); - let node_address = contact.node_address(); + .unwrap() +} - let packet = Packet::new_random(&node_id).unwrap(); +fn create_req_call(node: &Enr) -> (RequestCall, NodeAddress) { + let node_contact: NodeContact = node.clone().into(); + let packet = Packet::new_random(&node.node_id()).unwrap(); let id = HandlerReqId::Internal(RequestId::random()); let request = RequestBody::Ping { enr_seq: 1 }; let initiating_session = true; - let request_call = RequestCall::new(contact, packet, id, request, initiating_session); + let node_addr = node_contact.node_address(); + let req = RequestCall::new(node_contact, packet, id, request, initiating_session); + (req, node_addr) +} + +#[tokio::test] +async fn test_active_requests_insert() { + const EXPIRY: Duration = Duration::from_secs(5); + let mut active_requests = ActiveRequests::new(EXPIRY); + + let node_1 = create_node(); + let node_2 = create_node(); + let (req_1, req_1_addr) = create_req_call(&node_1); + let (req_2, req_2_addr) = create_req_call(&node_2); + let (req_3, req_3_addr) = create_req_call(&node_2); // insert the pair and verify the mapping remains in sync - let nonce = *request_call.packet().message_nonce(); - active_requests.insert(node_address, request_call); + active_requests.insert(req_1_addr, req_1); + active_requests.check_invariant(); + active_requests.insert(req_2_addr, req_2); + active_requests.check_invariant(); + active_requests.insert(req_3_addr, req_3); + active_requests.check_invariant(); +} + +#[tokio::test] +async fn test_active_requests_remove_requests() { + const EXPIRY: Duration = Duration::from_secs(5); + let mut active_requests = ActiveRequests::new(EXPIRY); + + let node_1 = create_node(); + let node_2 = create_node(); + let (req_1, req_1_addr) = create_req_call(&node_1); + let (req_2, req_2_addr) = create_req_call(&node_2); + let (req_3, req_3_addr) = create_req_call(&node_2); + active_requests.insert(req_1_addr.clone(), req_1); + active_requests.insert(req_2_addr.clone(), req_2); + active_requests.insert(req_3_addr.clone(), req_3); + active_requests.check_invariant(); + let reqs = active_requests.remove_requests(&req_1_addr).unwrap(); + assert_eq!(reqs.len(), 1); + active_requests.check_invariant(); + let reqs = active_requests.remove_requests(&req_2_addr).unwrap(); + assert_eq!(reqs.len(), 2); + active_requests.check_invariant(); + assert!(active_requests.remove_requests(&req_3_addr).is_err()); +} + +#[tokio::test] +async fn test_active_requests_remove_request() { + const EXPIRY: Duration = Duration::from_secs(5); + let mut active_requests = ActiveRequests::new(EXPIRY); + + let node_1 = create_node(); + let node_2 = create_node(); + let (req_1, req_1_addr) = create_req_call(&node_1); + let (req_2, req_2_addr) = create_req_call(&node_2); + let (req_3, req_3_addr) = create_req_call(&node_2); + let req_1_id = req_1.id().into(); + let req_2_id = req_2.id().into(); + let req_3_id = req_3.id().into(); + + active_requests.insert(req_1_addr.clone(), req_1); + active_requests.insert(req_2_addr.clone(), req_2); + active_requests.insert(req_3_addr.clone(), req_3); + active_requests.check_invariant(); + let req_id: RequestId = active_requests + .remove_request(&req_1_addr, &req_1_id) + .unwrap() + .id() + .into(); + assert_eq!(req_id, req_1_id); + active_requests.check_invariant(); + let req_id: RequestId = active_requests + .remove_request(&req_2_addr, &req_2_id) + .unwrap() + .id() + .into(); + assert_eq!(req_id, req_2_id); + active_requests.check_invariant(); + let req_id: RequestId = active_requests + .remove_request(&req_3_addr, &req_3_id) + .unwrap() + .id() + .into(); + assert_eq!(req_id, req_3_id); + active_requests.check_invariant(); + assert!(active_requests + .remove_request(&req_3_addr, &req_3_id) + .is_err()); +} + +#[tokio::test] +async fn test_active_requests_remove_by_nonce() { + const EXPIRY: Duration = Duration::from_secs(5); + let mut active_requests = ActiveRequests::new(EXPIRY); + + let node_1 = create_node(); + let node_2 = create_node(); + let (req_1, req_1_addr) = create_req_call(&node_1); + let (req_2, req_2_addr) = create_req_call(&node_2); + let (req_3, req_3_addr) = create_req_call(&node_2); + let req_1_nonce = req_1.packet().message_nonce().clone(); + let req_2_nonce = req_2.packet().message_nonce().clone(); + let req_3_nonce = req_3.packet().message_nonce().clone(); + + active_requests.insert(req_1_addr.clone(), req_1); + active_requests.insert(req_2_addr.clone(), req_2); + active_requests.insert(req_3_addr.clone(), req_3); + active_requests.check_invariant(); + + let req = active_requests.remove_by_nonce(&req_1_nonce).unwrap(); + assert_eq!(req.0, req_1_addr); + active_requests.check_invariant(); + let req = active_requests.remove_by_nonce(&req_2_nonce).unwrap(); + assert_eq!(req.0, req_2_addr); active_requests.check_invariant(); - active_requests.remove_by_nonce(&nonce); + let req = active_requests.remove_by_nonce(&req_3_nonce).unwrap(); + assert_eq!(req.0, req_3_addr); active_requests.check_invariant(); + let random_nonce = rand::random(); + assert!(active_requests.remove_by_nonce(&random_nonce).is_err()); } #[tokio::test] From 7c1c8f87c123e89f87a9ce1de859bc2bbfcf9aea Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Mon, 10 Jul 2023 15:58:00 -0400 Subject: [PATCH 03/44] move hashmapdelay to nonce mapping --- src/handler/active_requests.rs | 52 +++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index 5056c36e5..e35002daf 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -21,19 +21,19 @@ impl std::error::Error for ActiveRequestsError {} pub(super) struct ActiveRequests { /// A list of raw messages we are awaiting a response from the remote. - active_requests_mapping: HashMapDelay>, + active_requests_mapping: HashMap>, // WHOAREYOU messages do not include the source node id. We therefore maintain another // mapping of active_requests via message_nonce. This allows us to match WHOAREYOU // requests with active requests sent. - /// A mapping of all pending active raw requests message nonces to their NodeAddress. - active_requests_nonce_mapping: HashMap, + /// A mapping of all active raw requests message nonces to their NodeAddress. + active_requests_nonce_mapping: HashMapDelay, } impl ActiveRequests { pub fn new(request_timeout: Duration) -> Self { ActiveRequests { - active_requests_mapping: HashMapDelay::new(request_timeout), - active_requests_nonce_mapping: HashMap::new(), + active_requests_mapping: HashMap::new(), + active_requests_nonce_mapping: HashMapDelay::new(request_timeout), } } @@ -71,14 +71,18 @@ impl ActiveRequests { Some(index) => index, None => { // if nonce req is missing, reinsert remaining requests into mapping - self.active_requests_mapping - .insert(node_address.clone(), requests); + if !requests.is_empty() { + self.active_requests_mapping + .insert(node_address.clone(), requests); + } return Err(ActiveRequestsError::InvalidState); } }; let req = requests.remove(index); - self.active_requests_mapping - .insert(node_address.clone(), requests); + if !requests.is_empty() { + self.active_requests_mapping + .insert(node_address.clone(), requests); + } Ok((node_address, req)) } @@ -154,17 +158,25 @@ impl ActiveRequests { impl Stream for ActiveRequests { type Item = Result<(NodeAddress, RequestCall), String>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.active_requests_mapping.poll_next_unpin(cx) { - Poll::Ready(Some(Ok((node_address, mut request_calls)))) => { - let request_call = request_calls.remove(0); - // reinsert remaining requests into mapping - self.active_requests_mapping - .insert(node_address.clone(), request_calls); - // remove the nonce mapping - self.active_requests_nonce_mapping - .remove(request_call.packet().message_nonce()) - .expect("Invariant violated: nonce mapping does not exist for request"); - Poll::Ready(Some(Ok((node_address, request_call)))) + match self.active_requests_nonce_mapping.poll_next_unpin(cx) { + Poll::Ready(Some(Ok((nonce, node_address)))) => { + // remove the associated mapping + let mut reqs = self + .active_requests_mapping + .remove(&node_address) + .ok_or_else(|| ActiveRequestsError::InvalidState) + .unwrap(); + let index = reqs + .iter() + .position(|req| req.packet().message_nonce() == &nonce) + .ok_or_else(|| ActiveRequestsError::InvalidState) + .unwrap(); + let req = reqs.remove(index); + if reqs.len() > 0 { + self.active_requests_mapping + .insert(node_address.clone(), reqs); + } + Poll::Ready(Some(Ok((node_address, req)))) } Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))), Poll::Ready(None) => Poll::Ready(None), From d334d20dcfae5439817d6e961c72075ae4448534 Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Wed, 19 Jul 2023 16:17:24 -0400 Subject: [PATCH 04/44] Use entry to access/mutate ActiveRequests --- src/handler/active_requests.rs | 174 ++++++++++++++------------------- src/handler/mod.rs | 130 ++++++++++++++++++------ src/handler/request_call.rs | 5 + src/handler/tests.rs | 13 +-- 4 files changed, 186 insertions(+), 136 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index e35002daf..000e5dabf 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -1,23 +1,7 @@ use super::*; use delay_map::HashMapDelay; -use std::fmt; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ActiveRequestsError { - InvalidState, -} - -impl fmt::Display for ActiveRequestsError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ActiveRequestsError::InvalidState => { - write!(f, "Invalid state: active requests mappings are not in sync") - } - } - } -} - -impl std::error::Error for ActiveRequestsError {} +use more_asserts::debug_unreachable; +use std::collections::hash_map::Entry; pub(super) struct ActiveRequests { /// A list of raw messages we are awaiting a response from the remote. @@ -40,66 +24,58 @@ impl ActiveRequests { // Insert a new request into the active requests mapping. pub fn insert(&mut self, node_address: NodeAddress, request_call: RequestCall) { let nonce = *request_call.packet().message_nonce(); - let mut request_calls = self - .active_requests_mapping - .remove(&node_address) - .unwrap_or_default(); - request_calls.push(request_call); self.active_requests_mapping - .insert(node_address.clone(), request_calls); + .entry(node_address.clone()) + .or_insert_with(Vec::new) + .push(request_call); self.active_requests_nonce_mapping .insert(nonce, node_address); } // Remove a single request identified by its nonce. - pub fn remove_by_nonce( - &mut self, - nonce: &MessageNonce, - ) -> Result<(NodeAddress, RequestCall), ActiveRequestsError> { - let node_address = self - .active_requests_nonce_mapping - .remove(nonce) - .ok_or_else(|| ActiveRequestsError::InvalidState)?; - let mut requests = self - .active_requests_mapping - .remove(&node_address) - .ok_or_else(|| ActiveRequestsError::InvalidState)?; - let index = match requests - .iter() - .position(|req| req.packet().message_nonce() == nonce) - { - Some(index) => index, - None => { - // if nonce req is missing, reinsert remaining requests into mapping - if !requests.is_empty() { - self.active_requests_mapping - .insert(node_address.clone(), requests); - } - return Err(ActiveRequestsError::InvalidState); - } + pub fn remove_by_nonce(&mut self, nonce: &MessageNonce) -> Option<(NodeAddress, RequestCall)> { + let node_address = match self.active_requests_nonce_mapping.remove(nonce) { + Some(val) => val, + None => return None, }; - let req = requests.remove(index); - if !requests.is_empty() { - self.active_requests_mapping - .insert(node_address.clone(), requests); + match self.active_requests_mapping.entry(node_address.clone()) { + Entry::Vacant(_) => { + debug_unreachable!("expected to find node address in active_requests_mapping"); + error!("expected to find node address in active_requests_mapping"); + None + } + Entry::Occupied(mut requests) => { + let index = requests + .get() + .iter() + .position(|req| req.packet().message_nonce() == nonce) + .expect("to find request call by nonce"); + Some((node_address, requests.get_mut().remove(index))) + } } - Ok((node_address, req)) } // Remove all requests associated with a node. - pub fn remove_requests( - &mut self, - node_address: &NodeAddress, - ) -> Result, ActiveRequestsError> { - let requests = self - .active_requests_mapping - .remove(&node_address) - .ok_or_else(|| ActiveRequestsError::InvalidState)?; + pub fn remove_requests(&mut self, node_address: &NodeAddress) -> Option> { + let requests = match self.active_requests_mapping.remove(node_address) { + Some(val) => val, + None => return None, + }; + // Account for node addresses in `active_requests_nonce_mapping` with an empty list + if requests.is_empty() { + return None; + } for req in &requests { - self.active_requests_nonce_mapping - .remove(req.packet().message_nonce()); + if self + .active_requests_nonce_mapping + .remove(req.packet().message_nonce()) + .is_none() + { + debug_unreachable!("expected to find req with nonce"); + error!("expected to find req with nonce"); + } } - Ok(requests) + Some(requests) } // Remove a single request identified by its id. @@ -107,27 +83,27 @@ impl ActiveRequests { &mut self, node_address: &NodeAddress, id: &RequestId, - ) -> Result { - let reqs = self - .active_requests_mapping - .get(node_address) - .ok_or_else(|| ActiveRequestsError::InvalidState)?; - let index = reqs - .iter() - .position(|req| { - let req_id: RequestId = req.id().into(); - &req_id == id - }) - .ok_or_else(|| ActiveRequestsError::InvalidState)?; - let nonce = reqs - .get(index) - .ok_or_else(|| ActiveRequestsError::InvalidState)? - .packet() - .message_nonce() - .clone(); - // Remove the associated nonce mapping. - let (_, request_call) = self.remove_by_nonce(&nonce)?; - Ok(request_call) + ) -> Option { + match self.active_requests_mapping.entry(node_address.to_owned()) { + Entry::Vacant(_) => None, + Entry::Occupied(mut requests) => { + let index = requests.get().iter().position(|req| { + let req_id: RequestId = req.id().into(); + &req_id == id + }); + let index = match index { + Some(index) => index, + // Node address existence in active requests mapping does not guarantee request + // id existence. + None => return None, + }; + let request_call = requests.get_mut().remove(index); + // Remove the associated nonce mapping. + self.active_requests_nonce_mapping + .remove(request_call.packet().message_nonce()); + Some(request_call) + } + } } /// Checks that `active_requests_mapping` and `active_requests_nonce_mapping` are in sync. @@ -160,23 +136,17 @@ impl Stream for ActiveRequests { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.active_requests_nonce_mapping.poll_next_unpin(cx) { Poll::Ready(Some(Ok((nonce, node_address)))) => { - // remove the associated mapping - let mut reqs = self - .active_requests_mapping - .remove(&node_address) - .ok_or_else(|| ActiveRequestsError::InvalidState) - .unwrap(); - let index = reqs - .iter() - .position(|req| req.packet().message_nonce() == &nonce) - .ok_or_else(|| ActiveRequestsError::InvalidState) - .unwrap(); - let req = reqs.remove(index); - if reqs.len() > 0 { - self.active_requests_mapping - .insert(node_address.clone(), reqs); + match self.active_requests_mapping.entry(node_address.clone()) { + Entry::Vacant(_) => panic!("invalid ActiveRequests state"), + Entry::Occupied(mut requests) => { + let index = requests + .get() + .iter() + .position(|req| req.packet().message_nonce() == &nonce) + .expect("to find request call by nonce"); + Poll::Ready(Some(Ok((node_address, requests.get_mut().remove(index))))) + } } - Poll::Ready(Some(Ok((node_address, req)))) } Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))), Poll::Ready(None) => Poll::Ready(None), diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 49c0d1f35..dc31db97d 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -169,9 +169,16 @@ enum HandlerReqId { External(RequestId), } -impl Into for &HandlerReqId { - fn into(self) -> RequestId { - match self { +/// A request queued for sending. +struct PendingRequest { + contact: NodeContact, + request_id: HandlerReqId, + request: RequestBody, +} + +impl From<&HandlerReqId> for RequestId { + fn from(id: &HandlerReqId) -> Self { + match id { HandlerReqId::Internal(id) => id.clone(), HandlerReqId::External(id) => id.clone(), } @@ -193,6 +200,8 @@ pub struct Handler { active_requests: ActiveRequests, /// The expected responses by SocketAddr which allows packets to pass the underlying filter. filter_expected_responses: Arc>>, + /// Requests awaiting a handshake completion. + pending_requests: HashMap>, /// Currently in-progress outbound handshakes (WHOAREYOU packets) with peers. active_challenges: HashMapDelay, /// Established sessions with peers. @@ -283,6 +292,7 @@ impl Handler { enr, key, active_requests: ActiveRequests::new(config.request_timeout), + pending_requests: HashMap::new(), filter_expected_responses, sessions: LruTimeCache::new( config.session_timeout, @@ -391,7 +401,7 @@ impl Handler { socket_addr: inbound_packet.src_address, node_id: src_id, }; - self.handle_message::

( + self.handle_message( node_address, message_nonce, &inbound_packet.message, @@ -463,6 +473,20 @@ impl Handler { return Err(RequestError::SelfRequest); } + // If there is already an active request or an active challenge (WHOAREYOU sent) for this node, add to pending requests + if self.active_challenges.get(&node_address).is_some() { + trace!("Request queued for node: {}", node_address); + self.pending_requests + .entry(node_address) + .or_insert_with(Vec::new) + .push(PendingRequest { + contact, + request_id, + request, + }); + return Ok(()); + } + let (packet, initiating_session) = { if let Some(session) = self.sessions.get_mut(&node_address) { // Encrypt the message and send @@ -477,14 +501,23 @@ impl Handler { .map_err(|e| RequestError::EncryptionFailed(format!("{e:?}")))?; (packet, false) } else { - // No session exists, start a new handshake + // No session exists, start a new handshake initiating a new session trace!( "Starting session. Sending random packet to: {}", node_address ); let packet = Packet::new_random(&self.node_id).map_err(RequestError::EntropyFailure)?; - // We are initiating a new session + // Queue the request for sending after the handshake completes + self.pending_requests + .entry(node_address.clone()) + .or_insert_with(Vec::new) + .push(PendingRequest { + contact: contact.clone(), + request_id: request_id.clone(), + request: request.clone(), + }); + (packet, true) } }; @@ -587,7 +620,7 @@ impl Handler { // Check that this challenge matches a known active request. // If this message passes all the requisite checks, a request call is returned. let mut request_call = match self.active_requests.remove_by_nonce(&request_nonce) { - Ok((node_address, request_call)) => { + Some((node_address, request_call)) => { // Verify that the src_addresses match if node_address.socket_addr != src_address { debug!("Received a WHOAREYOU packet for a message with a non-expected source. Source {}, expected_source: {} message_nonce {}", src_address, node_address.socket_addr, hex::encode(request_nonce)); @@ -597,7 +630,7 @@ impl Handler { } request_call } - Err(_) => { + None => { trace!("Received a WHOAREYOU packet that references an unknown or expired request. Source {}, message_nonce {}", src_address, hex::encode(request_nonce)); return; } @@ -797,7 +830,7 @@ impl Handler { warn!("Failed to inform of established session {}", e) } self.new_session(node_address.clone(), session); - self.handle_message::

( + self.handle_message( node_address.clone(), message_nonce, message, @@ -806,7 +839,7 @@ impl Handler { .await; // We could have pending messages that were awaiting this session to be // established. If so process them. - self.replay_active_requests::

(&node_address).await; + self.send_pending_requests::

(&node_address).await; } else { // IP's or NodeAddress don't match. Drop the session. warn!( @@ -876,24 +909,49 @@ impl Handler { } } + async fn send_pending_requests(&mut self, node_address: &NodeAddress) { + let pending_requests = self + .pending_requests + .remove(node_address) + .unwrap_or_default(); + for req in pending_requests { + if let Err(request_error) = self + .send_request::

(req.contact, req.request_id.clone(), req.request) + .await + { + warn!("Failed to send next pending request {request_error}"); + // Inform the service that the request failed + match req.request_id { + HandlerReqId::Internal(_) => { + // An internal request could not be sent. For now we do nothing about + // this. + } + HandlerReqId::External(id) => { + if let Err(e) = self + .service_send + .send(HandlerOut::RequestFailed(id, request_error)) + .await + { + warn!("Failed to inform that request failed {e}"); + } + } + } + } + } + } + async fn replay_active_requests(&mut self, node_address: &NodeAddress) { let active_requests = self .active_requests .remove_requests(node_address) .unwrap_or_default(); for req in active_requests { - let request_id = req.id().clone(); - if let Err(request_error) = self - .send_request::

( - req.contact().clone(), - request_id.clone(), - req.body().clone(), - ) - .await + let (req_id, contact, body) = req.into_request_parts(); + if let Err(request_error) = self.send_request::

(contact, req_id.clone(), body).await { warn!("Failed to send next awaiting request {request_error}"); // Inform the service that the request failed - match request_id { + match req_id { HandlerReqId::Internal(_) => { // An internal request could not be sent. For now we do nothing about // this. @@ -914,7 +972,7 @@ impl Handler { /// Handle a standard message that does not contain an authentication header. #[allow(clippy::single_match)] - async fn handle_message( + async fn handle_message( &mut self, node_address: NodeAddress, message_nonce: MessageNonce, @@ -1016,7 +1074,7 @@ impl Handler { } } // Handle standard responses - self.handle_response::

(node_address, response).await; + self.handle_response(node_address, response).await; } } } else { @@ -1040,13 +1098,9 @@ impl Handler { /// Handles a response to a request. Re-inserts the request call if the response is a multiple /// Nodes response. - async fn handle_response( - &mut self, - node_address: NodeAddress, - response: Response, - ) { + async fn handle_response(&mut self, node_address: NodeAddress, response: Response) { // Find a matching request, if any - if let Ok(mut request_call) = self + if let Some(mut request_call) = self .active_requests .remove_request(&node_address, &response.id) { @@ -1177,7 +1231,7 @@ impl Handler { .await; } - /// Removes a session, fails all of that session's active requests, and updates associated metrics and fields. + /// Removes a session, fails all of that session's active & pending requests, and updates associated metrics and fields. async fn fail_session( &mut self, node_address: &NodeAddress, @@ -1190,6 +1244,26 @@ impl Handler { .active_sessions .store(self.sessions.len(), Ordering::Relaxed); } + // fail all pending requests + if let Some(to_remove) = self.pending_requests.remove(node_address) { + for PendingRequest { request_id, .. } in to_remove { + match request_id { + HandlerReqId::Internal(_) => { + // Do not report failures on requests belonging to the handler. + } + HandlerReqId::External(id) => { + if let Err(e) = self + .service_send + .send(HandlerOut::RequestFailed(id, error.clone())) + .await + { + warn!("Failed to inform request failure {}", e) + } + } + } + } + } + // fail all active requests for req in self .active_requests .remove_requests(node_address) diff --git a/src/handler/request_call.rs b/src/handler/request_call.rs index b91c7643e..e82e3ee1d 100644 --- a/src/handler/request_call.rs +++ b/src/handler/request_call.rs @@ -119,4 +119,9 @@ impl RequestCall { pub fn remaining_responses_mut(&mut self) -> &mut Option { &mut self.remaining_responses } + + /// Returns the id, contact, and request body for this call. + pub fn into_request_parts(self) -> (HandlerReqId, NodeContact, RequestBody) { + (self.request_id, self.contact, self.request) + } } diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 3d64ccf1e..a3c1b26d8 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -70,6 +70,7 @@ async fn build_handler( enr: Arc::new(RwLock::new(enr)), key: Arc::new(RwLock::new(key)), active_requests: ActiveRequests::new(config.request_timeout), + pending_requests: HashMap::new(), filter_expected_responses, sessions: LruTimeCache::new(config.session_timeout, Some(config.session_cache_capacity)), one_time_sessions: LruTimeCache::new( @@ -392,7 +393,7 @@ async fn test_active_requests_remove_requests() { let reqs = active_requests.remove_requests(&req_2_addr).unwrap(); assert_eq!(reqs.len(), 2); active_requests.check_invariant(); - assert!(active_requests.remove_requests(&req_3_addr).is_err()); + assert!(active_requests.remove_requests(&req_3_addr).is_none()); } #[tokio::test] @@ -436,7 +437,7 @@ async fn test_active_requests_remove_request() { active_requests.check_invariant(); assert!(active_requests .remove_request(&req_3_addr, &req_3_id) - .is_err()); + .is_none()); } #[tokio::test] @@ -449,9 +450,9 @@ async fn test_active_requests_remove_by_nonce() { let (req_1, req_1_addr) = create_req_call(&node_1); let (req_2, req_2_addr) = create_req_call(&node_2); let (req_3, req_3_addr) = create_req_call(&node_2); - let req_1_nonce = req_1.packet().message_nonce().clone(); - let req_2_nonce = req_2.packet().message_nonce().clone(); - let req_3_nonce = req_3.packet().message_nonce().clone(); + let req_1_nonce = *req_1.packet().message_nonce(); + let req_2_nonce = *req_2.packet().message_nonce(); + let req_3_nonce = *req_3.packet().message_nonce(); active_requests.insert(req_1_addr.clone(), req_1); active_requests.insert(req_2_addr.clone(), req_2); @@ -468,7 +469,7 @@ async fn test_active_requests_remove_by_nonce() { assert_eq!(req.0, req_3_addr); active_requests.check_invariant(); let random_nonce = rand::random(); - assert!(active_requests.remove_by_nonce(&random_nonce).is_err()); + assert!(active_requests.remove_by_nonce(&random_nonce).is_none()); } #[tokio::test] From 3330a6b46155f0ca6c6bdb5e486203f207168cbc Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Thu, 27 Jul 2023 15:20:55 -0500 Subject: [PATCH 05/44] Return none rather than panicking for request mapping mismatches --- src/handler/active_requests.rs | 53 ++++++++++++++-------------------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index 000e5dabf..58d5c801b 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -21,7 +21,7 @@ impl ActiveRequests { } } - // Insert a new request into the active requests mapping. + /// Insert a new request into the active requests mapping. pub fn insert(&mut self, node_address: NodeAddress, request_call: RequestCall) { let nonce = *request_call.packet().message_nonce(); self.active_requests_mapping @@ -32,37 +32,29 @@ impl ActiveRequests { .insert(nonce, node_address); } - // Remove a single request identified by its nonce. + /// Remove a single request identified by its nonce. pub fn remove_by_nonce(&mut self, nonce: &MessageNonce) -> Option<(NodeAddress, RequestCall)> { - let node_address = match self.active_requests_nonce_mapping.remove(nonce) { - Some(val) => val, - None => return None, - }; + let node_address = self.active_requests_nonce_mapping.remove(nonce)?; match self.active_requests_mapping.entry(node_address.clone()) { Entry::Vacant(_) => { debug_unreachable!("expected to find node address in active_requests_mapping"); error!("expected to find node address in active_requests_mapping"); None } - Entry::Occupied(mut requests) => { - let index = requests - .get() - .iter() - .position(|req| req.packet().message_nonce() == nonce) - .expect("to find request call by nonce"); - Some((node_address, requests.get_mut().remove(index))) - } + Entry::Occupied(mut requests) => requests + .get() + .iter() + .position(|req| req.packet().message_nonce() == nonce) + .map(|index| (node_address, requests.get_mut().remove(index))), } } - // Remove all requests associated with a node. + /// Remove all requests associated with a node. pub fn remove_requests(&mut self, node_address: &NodeAddress) -> Option> { - let requests = match self.active_requests_mapping.remove(node_address) { - Some(val) => val, - None => return None, - }; + let requests = self.active_requests_mapping.remove(node_address)?; // Account for node addresses in `active_requests_nonce_mapping` with an empty list if requests.is_empty() { + debug_unreachable!("expected to find requests in active_requests_mapping"); return None; } for req in &requests { @@ -78,7 +70,7 @@ impl ActiveRequests { Some(requests) } - // Remove a single request identified by its id. + /// Remove a single request identified by its id. pub fn remove_request( &mut self, node_address: &NodeAddress, @@ -90,13 +82,7 @@ impl ActiveRequests { let index = requests.get().iter().position(|req| { let req_id: RequestId = req.id().into(); &req_id == id - }); - let index = match index { - Some(index) => index, - // Node address existence in active requests mapping does not guarantee request - // id existence. - None => return None, - }; + })?; let request_call = requests.get_mut().remove(index); // Remove the associated nonce mapping. self.active_requests_nonce_mapping @@ -137,14 +123,19 @@ impl Stream for ActiveRequests { match self.active_requests_nonce_mapping.poll_next_unpin(cx) { Poll::Ready(Some(Ok((nonce, node_address)))) => { match self.active_requests_mapping.entry(node_address.clone()) { - Entry::Vacant(_) => panic!("invalid ActiveRequests state"), + Entry::Vacant(_) => Poll::Ready(None), Entry::Occupied(mut requests) => { - let index = requests + match requests .get() .iter() .position(|req| req.packet().message_nonce() == &nonce) - .expect("to find request call by nonce"); - Poll::Ready(Some(Ok((node_address, requests.get_mut().remove(index))))) + { + Some(index) => Poll::Ready(Some(Ok(( + node_address, + requests.get_mut().remove(index), + )))), + None => Poll::Ready(None), + } } } } From fe34db71e445161c5bfb8f26919bd2c71ad4c462 Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Thu, 3 Aug 2023 14:02:58 -0500 Subject: [PATCH 06/44] Remove expected response upon failed session --- src/handler/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index dc31db97d..12c05b322 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -1284,6 +1284,7 @@ impl Handler { } } } + self.remove_expected_response(node_address.socket_addr); } /// Sends a packet to the send handler to be encoded and sent. From e9e8b54989663565397555f9f312c34445f73f13 Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Thu, 3 Aug 2023 14:24:32 -0500 Subject: [PATCH 07/44] Replay active requests if active session re-established --- src/handler/mod.rs | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 12c05b322..328d9d75c 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -508,16 +508,6 @@ impl Handler { ); let packet = Packet::new_random(&self.node_id).map_err(RequestError::EntropyFailure)?; - // Queue the request for sending after the handshake completes - self.pending_requests - .entry(node_address.clone()) - .or_insert_with(Vec::new) - .push(PendingRequest { - contact: contact.clone(), - request_id: request_id.clone(), - request: request.clone(), - }); - (packet, true) } }; @@ -759,7 +749,7 @@ impl Handler { } } } - self.new_session(node_address, session); + self.new_session::

(node_address, session).await; } /// Verifies a Node ENR to it's observed address. If it fails, any associated session is also @@ -829,7 +819,7 @@ impl Handler { { warn!("Failed to inform of established session {}", e) } - self.new_session(node_address.clone(), session); + self.new_session::

(node_address.clone(), session).await; self.handle_message( node_address.clone(), message_nonce, @@ -1173,9 +1163,14 @@ impl Handler { self.active_requests.insert(node_address, request_call); } - fn new_session(&mut self, node_address: NodeAddress, session: Session) { + async fn new_session( + &mut self, + node_address: NodeAddress, + session: Session, + ) { if let Some(current_session) = self.sessions.get_mut(&node_address) { current_session.update(session); + self.replay_active_requests::

(&node_address).await; } else { self.sessions.insert(node_address, session); METRICS From c92a0cb2a6a65f60264af65f25bc1ccf23420f4a Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Fri, 4 Aug 2023 12:38:39 -0500 Subject: [PATCH 08/44] Relocate remove_expected_response --- src/handler/active_requests.rs | 8 +++++++- src/handler/mod.rs | 4 +++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index 58d5c801b..c3199fd71 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -76,9 +76,12 @@ impl ActiveRequests { node_address: &NodeAddress, id: &RequestId, ) -> Option { - match self.active_requests_mapping.entry(node_address.to_owned()) { + match self.active_requests_mapping.entry(node_address.clone()) { Entry::Vacant(_) => None, Entry::Occupied(mut requests) => { + if requests.get().is_empty() { + return None; + } let index = requests.get().iter().position(|req| { let req_id: RequestId = req.id().into(); &req_id == id @@ -125,6 +128,9 @@ impl Stream for ActiveRequests { match self.active_requests_mapping.entry(node_address.clone()) { Entry::Vacant(_) => Poll::Ready(None), Entry::Occupied(mut requests) => { + if requests.get().is_empty() { + return Poll::Ready(None); + } match requests .get() .iter() diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 328d9d75c..e6a2e85ba 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -1170,6 +1170,8 @@ impl Handler { ) { if let Some(current_session) = self.sessions.get_mut(&node_address) { current_session.update(session); + // If a session is re-established, due to a new handshake during an ongoing + // session, we need to replay any active requests from the prior session. self.replay_active_requests::

(&node_address).await; } else { self.sessions.insert(node_address, session); @@ -1278,8 +1280,8 @@ impl Handler { } } } + self.remove_expected_response(node_address.socket_addr); } - self.remove_expected_response(node_address.socket_addr); } /// Sends a packet to the send handler to be encoded and sent. From 395558892ce6954b0973e5c9ff49322b6b7d3d97 Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Fri, 4 Aug 2023 13:56:43 -0500 Subject: [PATCH 09/44] Remove empty lists from active requests mapping --- src/handler/active_requests.rs | 36 ++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index c3199fd71..ae38d4a41 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -41,11 +41,17 @@ impl ActiveRequests { error!("expected to find node address in active_requests_mapping"); None } - Entry::Occupied(mut requests) => requests - .get() - .iter() - .position(|req| req.packet().message_nonce() == nonce) - .map(|index| (node_address, requests.get_mut().remove(index))), + Entry::Occupied(mut requests) => { + let result = requests + .get() + .iter() + .position(|req| req.packet().message_nonce() == nonce) + .map(|index| (node_address, requests.get_mut().remove(index))); + if requests.get().is_empty() { + requests.remove(); + } + result + } } } @@ -79,14 +85,14 @@ impl ActiveRequests { match self.active_requests_mapping.entry(node_address.clone()) { Entry::Vacant(_) => None, Entry::Occupied(mut requests) => { - if requests.get().is_empty() { - return None; - } let index = requests.get().iter().position(|req| { let req_id: RequestId = req.id().into(); &req_id == id })?; let request_call = requests.get_mut().remove(index); + if requests.get().is_empty() { + requests.remove(); + } // Remove the associated nonce mapping. self.active_requests_nonce_mapping .remove(request_call.packet().message_nonce()); @@ -128,18 +134,18 @@ impl Stream for ActiveRequests { match self.active_requests_mapping.entry(node_address.clone()) { Entry::Vacant(_) => Poll::Ready(None), Entry::Occupied(mut requests) => { - if requests.get().is_empty() { - return Poll::Ready(None); - } match requests .get() .iter() .position(|req| req.packet().message_nonce() == &nonce) { - Some(index) => Poll::Ready(Some(Ok(( - node_address, - requests.get_mut().remove(index), - )))), + Some(index) => { + let result = (node_address, requests.get_mut().remove(index)); + if requests.get().is_empty() { + requests.remove(); + } + Poll::Ready(Some(Ok(result))) + } None => Poll::Ready(None), } } From 5ecbfb195a178b8d35acbc80b6b76ce3ae1fdd3d Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Wed, 16 Aug 2023 12:23:05 -0400 Subject: [PATCH 10/44] Send pending requests rather than replay active requests on outgoing challenge expiration --- src/handler/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index e6a2e85ba..61bad8753 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -344,9 +344,9 @@ impl Handler { self.handle_request_timeout(node_address, active_request).await; } Some(Ok((node_address, _challenge))) = self.active_challenges.next() => { - // A challenge has expired. There could be active requests impacted by this - // challenge. We replay them here - self.replay_active_requests::

(&node_address).await; + // A challenge has expired. There could be pending requests awaiting this + // challenge. We process them here + self.send_pending_requests::

(&node_address).await; } _ = banned_nodes_check.tick() => self.unban_nodes_check(), // Unban nodes that are past the timeout _ = &mut self.exit => { @@ -473,7 +473,7 @@ impl Handler { return Err(RequestError::SelfRequest); } - // If there is already an active request or an active challenge (WHOAREYOU sent) for this node, add to pending requests + // If there is already an active challenge (WHOAREYOU sent) for this node, add to pending requests if self.active_challenges.get(&node_address).is_some() { trace!("Request queued for node: {}", node_address); self.pending_requests From ee1332c57fd80e47d0fa5fb0fa7e49f199874eed Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Wed, 16 Aug 2023 13:14:53 -0400 Subject: [PATCH 11/44] Filter replaying newly established session handshake req --- src/handler/mod.rs | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 61bad8753..61ce38a2e 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -749,7 +749,8 @@ impl Handler { } } } - self.new_session::

(node_address, session).await; + self.new_session::

(node_address, session, Some(request_nonce)) + .await; } /// Verifies a Node ENR to it's observed address. If it fails, any associated session is also @@ -819,7 +820,11 @@ impl Handler { { warn!("Failed to inform of established session {}", e) } - self.new_session::

(node_address.clone(), session).await; + // When (re-)establishing a session from an outgoing challenge, we do not need + // to filter out this request from active requests, so we do not pass + // the message nonce on to `new_session`. + self.new_session::

(node_address.clone(), session, None) + .await; self.handle_message( node_address.clone(), message_nonce, @@ -930,11 +935,19 @@ impl Handler { } } - async fn replay_active_requests(&mut self, node_address: &NodeAddress) { + async fn replay_active_requests( + &mut self, + node_address: &NodeAddress, + message_nonce: Option, + ) { let active_requests = self .active_requests .remove_requests(node_address) - .unwrap_or_default(); + .unwrap_or_default() + .into_iter() + // Skip the active request that was used to establish the new session, + // as it has already been handled and shouldn't be replayed. + .filter(|req| Some(*req.packet().message_nonce()) != message_nonce); for req in active_requests { let (req_id, contact, body) = req.into_request_parts(); if let Err(request_error) = self.send_request::

(contact, req_id.clone(), body).await @@ -1167,12 +1180,17 @@ impl Handler { &mut self, node_address: NodeAddress, session: Session, + // Optional message nonce is required to filter out the request that was used in the + // handshake to re-establish a session, if applicable. + message_nonce: Option, ) { if let Some(current_session) = self.sessions.get_mut(&node_address) { current_session.update(session); // If a session is re-established, due to a new handshake during an ongoing - // session, we need to replay any active requests from the prior session. - self.replay_active_requests::

(&node_address).await; + // session, we need to replay any active requests from the prior session, excluding + // the request that was used to re-establish the session handshake. + self.replay_active_requests::

(&node_address, message_nonce) + .await; } else { self.sessions.insert(node_address, session); METRICS From abf83c8aafa7daee7217a2c2d6559fa4e3eefb8c Mon Sep 17 00:00:00 2001 From: ackintosh Date: Wed, 23 Aug 2023 08:01:59 +0900 Subject: [PATCH 12/44] Add test for replay_active_requests --- src/handler/mod.rs | 15 ++- src/handler/tests.rs | 215 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 227 insertions(+), 3 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 61ce38a2e..72a46aa52 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -707,7 +707,7 @@ impl Handler { }; // We already know the ENR. Send the handshake response packet - trace!("Sending Authentication response to node: {}", node_address); + trace!("Sending Authentication response to node: {} ({:?})", node_address, request_call.id()); request_call.update_packet(auth_packet.clone()); request_call.set_handshake_sent(); request_call.set_initiating_session(false); @@ -731,7 +731,7 @@ impl Handler { // Send the Auth response let contact = request_call.contact().clone(); - trace!("Sending Authentication response to node: {}", node_address); + trace!("Sending Authentication response to node: {} ({:?})", node_address, request_call.id()); request_call.update_packet(auth_packet.clone()); request_call.set_handshake_sent(); // Reinsert the request_call @@ -940,6 +940,11 @@ impl Handler { node_address: &NodeAddress, message_nonce: Option, ) { + trace!( + "Replaying active requests. {}, {:?}", + node_address, + message_nonce + ); let active_requests = self .active_requests .remove_requests(node_address) @@ -950,6 +955,12 @@ impl Handler { .filter(|req| Some(*req.packet().message_nonce()) != message_nonce); for req in active_requests { let (req_id, contact, body) = req.into_request_parts(); + trace!( + "Active request to be replayed. {:?}, {}, {}", + req_id, + contact, + body + ); if let Err(request_error) = self.send_request::

(contact, req_id.clone(), body).await { warn!("Failed to send next awaiting request {request_error}"); diff --git a/src/handler/tests.rs b/src/handler/tests.rs index a3c1b26d8..6b9af22b6 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -7,7 +7,10 @@ use crate::{ rpc::{Request, Response}, ConfigBuilder, IpMode, }; -use std::net::{Ipv4Addr, Ipv6Addr}; +use std::{ + net::{Ipv4Addr, Ipv6Addr}, + ops::Add, +}; use crate::{ handler::{session::build_dummy_session, HandlerOut::RequestFailed}, @@ -593,3 +596,213 @@ async fn remove_one_time_session() { .is_some()); assert_eq!(0, handler.one_time_sessions.len()); } + +// Tests replaying active requests. +// +// In this test, Receiver's session expires and Receiver returns WHOAREYOU. +// Sender then creates a new session and resend active requests. +// +// ```mermaid +// sequenceDiagram +// participant Sender +// participant Receiver +// Note over Sender: Start discv5 server +// Note over Receiver: Start discv5 server +// +// Note over Sender,Receiver: Session established +// +// rect rgb(100, 100, 0) +// Note over Receiver: ** Session expired ** +// end +// +// rect rgb(10, 10, 10) +// Note left of Sender: Sender sends requests
**in parallel**. +// par +// Sender ->> Receiver: PING(id:2) +// and +// Sender -->> Receiver: PING(id:3) +// and +// Sender -->> Receiver: PING(id:4) +// and +// Sender -->> Receiver: PING(id:5) +// end +// end +// +// Note over Receiver: Send WHOAREYOU
since the session has been expired +// Receiver ->> Sender: WHOAREYOU +// +// rect rgb(100, 100, 0) +// Note over Receiver: Drop PING(id:2,3,4,5) request
since WHOAREYOU already sent. +// end +// +// Note over Sender: New session established with Receiver +// +// Sender ->> Receiver: Handshake message (id:2) +// +// Note over Receiver: New session established with Sender +// +// rect rgb(10, 10, 10) +// Note left of Sender: Handler::replay_active_requests() +// Sender ->> Receiver: PING (id:3) +// Sender ->> Receiver: PING (id:4) +// Sender ->> Receiver: PING (id:5) +// end +// +// Receiver ->> Sender: PONG (id:2) +// Receiver ->> Sender: PONG (id:3) +// Receiver ->> Sender: PONG (id:4) +// Receiver ->> Sender: PONG (id:5) +// ``` +#[tokio::test] +async fn test_replay_active_requests() { + init(); + let sender_port = 5004; + let receiver_port = 5005; + let ip = "127.0.0.1".parse().unwrap(); + let key1 = CombinedKey::generate_secp256k1(); + let key2 = CombinedKey::generate_secp256k1(); + + let sender_enr = EnrBuilder::new("v4") + .ip4(ip) + .udp4(sender_port) + .build(&key1) + .unwrap(); + + let receiver_enr = EnrBuilder::new("v4") + .ip4(ip) + .udp4(receiver_port) + .build(&key2) + .unwrap(); + + // Build sender handler + let (sender_exit, sender_send, mut sender_recv, mut handler) = { + let sender_listen_config = ListenConfig::Ipv4 { + ip: sender_enr.ip4().unwrap(), + port: sender_enr.udp4().unwrap(), + }; + let sender_config = ConfigBuilder::new(sender_listen_config).build(); + build_handler::(sender_enr.clone(), key1, sender_config).await + }; + let sender = async move { + // Start sender handler. + handler.start::().await; + // After the handler has been terminated test the handler's states. + assert!(handler.pending_requests.is_empty()); + assert_eq!(0, handler.active_requests.count().await); + assert!(handler.active_challenges.is_empty()); + assert!(handler.filter_expected_responses.read().is_empty()); + }; + + // Build receiver handler + // Shorten receiver's timeout to reproduce session expired. + let receiver_session_timeout = Duration::from_secs(1); + let (receiver_exit, receiver_send, mut receiver_recv, mut handler) = { + let receiver_listen_config = ListenConfig::Ipv4 { + ip: receiver_enr.ip4().unwrap(), + port: receiver_enr.udp4().unwrap(), + }; + let receiver_config = ConfigBuilder::new(receiver_listen_config) + .session_timeout(receiver_session_timeout.clone()) + .build(); + build_handler::(receiver_enr.clone(), key2, receiver_config).await + }; + let receiver = async move { + // Start receiver handler. + handler.start::().await; + // After the handler has been terminated test the handler's states. + assert!(handler.pending_requests.is_empty()); + assert_eq!(0, handler.active_requests.count().await); + assert!(handler.active_challenges.is_empty()); + assert!(handler.filter_expected_responses.read().is_empty()); + }; + + // sender to send the first message then await for the session to be established + let _ = sender_send.send(HandlerIn::Request( + receiver_enr.clone().into(), + Box::new(Request { + id: RequestId(vec![1]), + body: RequestBody::Ping { enr_seq: 1 }, + }), + )); + + let messages_to_send = 5usize; + + let sender_ops = async move { + let mut response_count = 0usize; + loop { + match sender_recv.recv().await { + Some(HandlerOut::Established(_, _, _)) => { + // Sleep until receiver's session expired. + tokio::time::sleep(receiver_session_timeout.add(Duration::from_millis(500))) + .await; + // now the session is established, send the rest of the messages + for req_id in 2..=messages_to_send { + let _ = sender_send.send(HandlerIn::Request( + receiver_enr.clone().into(), + Box::new(Request { + id: RequestId(vec![req_id as u8]), + body: RequestBody::Ping { enr_seq: 1 }, + }), + )); + } + } + Some(HandlerOut::Response(_, _)) => { + response_count += 1; + if response_count == messages_to_send { + // Notify the handlers that the message exchange has been completed. + sender_exit.send(()).unwrap(); + receiver_exit.send(()).unwrap(); + return; + } + } + _ => continue, + }; + } + }; + + let receiver_ops = async move { + let mut message_count = 0usize; + loop { + match receiver_recv.recv().await { + Some(HandlerOut::WhoAreYou(wru_ref)) => { + receiver_send + .send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone()))) + .unwrap(); + } + Some(HandlerOut::Request(addr, request)) => { + assert!(matches!(request.body, RequestBody::Ping { .. })); + let pong_response = Response { + id: request.id, + body: ResponseBody::Pong { + enr_seq: 1, + ip: ip.into(), + port: sender_port, + }, + }; + receiver_send + .send(HandlerIn::Response(addr, Box::new(pong_response))) + .unwrap(); + message_count += 1; + if message_count == messages_to_send { + return; + } + } + _ => { + continue; + } + } + } + }; + + let sleep_future = sleep(Duration::from_secs(5)); + let message_exchange = async move { + let _ = tokio::join!(sender, sender_ops, receiver, receiver_ops); + }; + + tokio::select! { + _ = message_exchange => {} + _ = sleep_future => { + panic!("Test timed out"); + } + } +} From 9c2851f2da76f2570375e779a51372481414f68f Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 26 Aug 2023 14:47:56 +0900 Subject: [PATCH 13/44] Fix duplicated request on replay_active_requests() https://github.com/njgheorghita/discv5/pull/2#issuecomment-1692724323 --- src/handler/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 72a46aa52..b351e5ced 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -694,6 +694,7 @@ impl Handler { // All sent requests must have an associated node_id. Therefore the following // must not panic. let node_address = request_call.contact().node_address(); + let auth_message_nonce = auth_packet.header.message_nonce; match request_call.contact().enr() { Some(enr) => { // NOTE: Here we decide if the session is outgoing or ingoing. The condition for an @@ -749,7 +750,7 @@ impl Handler { } } } - self.new_session::

(node_address, session, Some(request_nonce)) + self.new_session::

(node_address, session, Some(auth_message_nonce)) .await; } From 0df2d70a6611ea64316bd21ce44b820864345209 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 26 Aug 2023 15:30:37 +0900 Subject: [PATCH 14/44] Fix missing active request The requests filtered out in replay_active_requests() are removed from self.active_requests and don't come back. --- src/handler/active_requests.rs | 33 +++++++++++++++++++++++++++++++++ src/handler/mod.rs | 15 +++++++-------- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index ae38d4a41..5b8f72958 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -76,6 +76,39 @@ impl ActiveRequests { Some(requests) } + /// Remove requests associated with a node, except for the request that has the given message nonce. + pub fn remove_requests_except( + &mut self, + node_address: &NodeAddress, + except: &MessageNonce, + ) -> Option> { + let request_ids = self + .active_requests_mapping + .get(node_address)? + .iter() + .filter(|req| req.packet().message_nonce() != except) + .map(|req| { + match req.id() { + HandlerReqId::Internal(id) => id, + HandlerReqId::External(id) => id, + } + .clone() + }) + .collect::>(); + + let mut requests = vec![]; + for id in request_ids.iter() { + match self.remove_request(node_address, id) { + Some(request_call) => requests.push(request_call), + None => { + debug_unreachable!("expected to find request with id"); + error!("expected to find request with id"); + } + } + } + Some(requests) + } + /// Remove a single request identified by its id. pub fn remove_request( &mut self, diff --git a/src/handler/mod.rs b/src/handler/mod.rs index b351e5ced..d56a74f18 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -946,14 +946,13 @@ impl Handler { node_address, message_nonce ); - let active_requests = self - .active_requests - .remove_requests(node_address) - .unwrap_or_default() - .into_iter() - // Skip the active request that was used to establish the new session, - // as it has already been handled and shouldn't be replayed. - .filter(|req| Some(*req.packet().message_nonce()) != message_nonce); + let active_requests = if let Some(nonce) = message_nonce { + self.active_requests + .remove_requests_except(node_address, &nonce) + } else { + self.active_requests.remove_requests(node_address) + } + .unwrap_or_default(); for req in active_requests { let (req_id, contact, body) = req.into_request_parts(); trace!( From e761d9e1788ecefa203d1da73f1b892e1db4ad95 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 26 Aug 2023 15:50:38 +0900 Subject: [PATCH 15/44] Fix wrong expected_response after replaying --- src/handler/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index d56a74f18..0ceae367e 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -961,6 +961,9 @@ impl Handler { contact, body ); + // Remove the request from the packet filter here since the request is added in + // `self.send_request()` again. + self.remove_expected_response(contact.socket_addr()); if let Err(request_error) = self.send_request::

(contact, req_id.clone(), body).await { warn!("Failed to send next awaiting request {request_error}"); From b2051c175be96b05b84bdaec56dcf5b5fad88f71 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 26 Aug 2023 21:55:23 +0900 Subject: [PATCH 16/44] Fix replaying test Sender should receive `HandlerOut::Established` message outside the loop because established messages happen twice due to session expiration. --- src/handler/tests.rs | 61 ++++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 6b9af22b6..af84d5792 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -8,6 +8,7 @@ use crate::{ ConfigBuilder, IpMode, }; use std::{ + collections::HashSet, net::{Ipv4Addr, Ipv6Addr}, ops::Add, }; @@ -716,40 +717,50 @@ async fn test_replay_active_requests() { assert!(handler.filter_expected_responses.read().is_empty()); }; - // sender to send the first message then await for the session to be established - let _ = sender_send.send(HandlerIn::Request( - receiver_enr.clone().into(), - Box::new(Request { - id: RequestId(vec![1]), - body: RequestBody::Ping { enr_seq: 1 }, - }), - )); - let messages_to_send = 5usize; let sender_ops = async move { let mut response_count = 0usize; + let mut expected_request_ids = HashSet::new(); + expected_request_ids.insert(RequestId(vec![1])); + + // sender to send the first message then await for the session to be established + let _ = sender_send.send(HandlerIn::Request( + receiver_enr.clone().into(), + Box::new(Request { + id: RequestId(vec![1]), + body: RequestBody::Ping { enr_seq: 1 }, + }), + )); + + match sender_recv.recv().await { + Some(HandlerOut::Established(_, _, _)) => { + // Sleep until receiver's session expired. + tokio::time::sleep(receiver_session_timeout.add(Duration::from_millis(500))).await; + // send the rest of the messages + for req_id in 2..=messages_to_send { + let request_id = RequestId(vec![req_id as u8]); + expected_request_ids.insert(request_id.clone()); + let _ = sender_send.send(HandlerIn::Request( + receiver_enr.clone().into(), + Box::new(Request { + id: request_id, + body: RequestBody::Ping { enr_seq: 1 }, + }), + )); + } + } + handler_out => panic!("Unexpected message: {:?}", handler_out), + } + loop { match sender_recv.recv().await { - Some(HandlerOut::Established(_, _, _)) => { - // Sleep until receiver's session expired. - tokio::time::sleep(receiver_session_timeout.add(Duration::from_millis(500))) - .await; - // now the session is established, send the rest of the messages - for req_id in 2..=messages_to_send { - let _ = sender_send.send(HandlerIn::Request( - receiver_enr.clone().into(), - Box::new(Request { - id: RequestId(vec![req_id as u8]), - body: RequestBody::Ping { enr_seq: 1 }, - }), - )); - } - } - Some(HandlerOut::Response(_, _)) => { + Some(HandlerOut::Response(_, response)) => { + assert!(expected_request_ids.remove(&response.id)); response_count += 1; if response_count == messages_to_send { // Notify the handlers that the message exchange has been completed. + assert!(expected_request_ids.is_empty()); sender_exit.send(()).unwrap(); receiver_exit.send(()).unwrap(); return; From ee3ae518b6495bf0f6851aeb4a54737f3195f085 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 26 Aug 2023 21:58:50 +0900 Subject: [PATCH 17/44] cargo fmt --- src/handler/mod.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 0ceae367e..bee5ccf53 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -708,7 +708,11 @@ impl Handler { }; // We already know the ENR. Send the handshake response packet - trace!("Sending Authentication response to node: {} ({:?})", node_address, request_call.id()); + trace!( + "Sending Authentication response to node: {} ({:?})", + node_address, + request_call.id() + ); request_call.update_packet(auth_packet.clone()); request_call.set_handshake_sent(); request_call.set_initiating_session(false); @@ -732,7 +736,11 @@ impl Handler { // Send the Auth response let contact = request_call.contact().clone(); - trace!("Sending Authentication response to node: {} ({:?})", node_address, request_call.id()); + trace!( + "Sending Authentication response to node: {} ({:?})", + node_address, + request_call.id() + ); request_call.update_packet(auth_packet.clone()); request_call.set_handshake_sent(); // Reinsert the request_call From cd7170393c3acaf469d9e05292e9e1869ec9b50d Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 26 Aug 2023 22:09:16 +0900 Subject: [PATCH 18/44] Add comment --- src/handler/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index bee5ccf53..87c393ee7 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -955,6 +955,8 @@ impl Handler { message_nonce ); let active_requests = if let Some(nonce) = message_nonce { + // Except the active request that was used to establish the new session, as it has + // already been handled and shouldn't be replayed. self.active_requests .remove_requests_except(node_address, &nonce) } else { From 54a344a8470f9c7afb9b8853bc0270bd5aedaf06 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 26 Aug 2023 22:24:59 +0900 Subject: [PATCH 19/44] Add test for ActiveRequests::remove_requests_except() --- src/handler/tests.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/handler/tests.rs b/src/handler/tests.rs index af84d5792..fcbddb620 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -400,6 +400,34 @@ async fn test_active_requests_remove_requests() { assert!(active_requests.remove_requests(&req_3_addr).is_none()); } +#[tokio::test] +async fn test_active_requests_remove_requests_except() { + const EXPIRY: Duration = Duration::from_secs(5); + let mut active_requests = ActiveRequests::new(EXPIRY); + + let node_1 = create_node(); + let node_2 = create_node(); + let (req_1, req_1_addr) = create_req_call(&node_1); + let (req_2, req_2_addr) = create_req_call(&node_2); + let (req_3, req_3_addr) = create_req_call(&node_2); + + let req_2_nonce = req_2.packet().header.message_nonce.clone(); + let req_3_id: RequestId = req_3.id().into(); + + active_requests.insert(req_1_addr, req_1); + active_requests.insert(req_2_addr.clone(), req_2); + active_requests.insert(req_3_addr, req_3); + + let removed_requests = active_requests + .remove_requests_except(&req_2_addr, &req_2_nonce) + .unwrap(); + active_requests.check_invariant(); + + assert_eq!(1, removed_requests.len()); + let removed_request_id: RequestId = removed_requests.first().unwrap().id().into(); + assert_eq!(removed_request_id, req_3_id); +} + #[tokio::test] async fn test_active_requests_remove_request() { const EXPIRY: Duration = Duration::from_secs(5); From d69576db28fd9d71cc65487c55a343f983d07d61 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 26 Aug 2023 22:29:50 +0900 Subject: [PATCH 20/44] Refactor ActiveRequests::remove_requests_except() --- src/handler/active_requests.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index 5b8f72958..d4e3124f8 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -87,13 +87,7 @@ impl ActiveRequests { .get(node_address)? .iter() .filter(|req| req.packet().message_nonce() != except) - .map(|req| { - match req.id() { - HandlerReqId::Internal(id) => id, - HandlerReqId::External(id) => id, - } - .clone() - }) + .map(|req| req.id().into()) .collect::>(); let mut requests = vec![]; From cb52be94a18a13e918afde1def5d104d6c3e1dbf Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Sat, 2 Sep 2023 17:17:34 -0400 Subject: [PATCH 21/44] Fix linting errors --- src/handler/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/handler/tests.rs b/src/handler/tests.rs index fcbddb620..1b9c318ba 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -411,7 +411,7 @@ async fn test_active_requests_remove_requests_except() { let (req_2, req_2_addr) = create_req_call(&node_2); let (req_3, req_3_addr) = create_req_call(&node_2); - let req_2_nonce = req_2.packet().header.message_nonce.clone(); + let req_2_nonce = req_2.packet().header.message_nonce; let req_3_id: RequestId = req_3.id().into(); active_requests.insert(req_1_addr, req_1); @@ -731,7 +731,7 @@ async fn test_replay_active_requests() { port: receiver_enr.udp4().unwrap(), }; let receiver_config = ConfigBuilder::new(receiver_listen_config) - .session_timeout(receiver_session_timeout.clone()) + .session_timeout(receiver_session_timeout) .build(); build_handler::(receiver_enr.clone(), key2, receiver_config).await }; From 399288c1cf6ab80805e7336b941afb27ea50d153 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Tue, 5 Sep 2023 06:55:35 +0900 Subject: [PATCH 22/44] Fix test: avoid ports used by other tests https://github.com/sigp/discv5/actions/runs/6060572703/job/16457875298?pr=200 --- src/handler/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 1b9c318ba..18cc856ae 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -685,8 +685,8 @@ async fn remove_one_time_session() { #[tokio::test] async fn test_replay_active_requests() { init(); - let sender_port = 5004; - let receiver_port = 5005; + let sender_port = 5006; + let receiver_port = 5007; let ip = "127.0.0.1".parse().unwrap(); let key1 = CombinedKey::generate_secp256k1(); let key2 = CombinedKey::generate_secp256k1(); From 6ef70f2ac599a3cdb29a45317c821dd94bfcd8f8 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 9 Sep 2023 17:20:23 +0900 Subject: [PATCH 23/44] Add a test for sending pending requests --- src/handler/tests.rs | 194 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 194 insertions(+) diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 18cc856ae..00e16db51 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -845,3 +845,197 @@ async fn test_replay_active_requests() { } } } + +// Tests sending pending requests. +// +// Sender attempts to send multiple requests in parallel, but due to the absence of a session, only +// one of the requests from Sender is sent and others are inserted into `pending_requests`. +// The pending requests are sent once a session is established. +// +// ```mermaid +// sequenceDiagram +// participant Sender +// participant Receiver +// +// Note over Sender: No session with Receiver +// +// rect rgb(10, 10, 10) +// Note left of Sender: Sender attempts to send multiple requests in parallel
but no session with Receiver.
So Sender sends a random packet for the first request,
and the rest of the requests are inserted into pending_requests. +// par +// Sender ->> Receiver: Random packet (id:1) +// Note over Sender: Insert the request into `active_requests` +// and +// Note over Sender: Insert Request(id:2) into *pending_requests* +// and +// Note over Sender: Insert Request(id:3) into *pending_requests* +// end +// end +// +// Receiver ->> Sender: WHOAREYOU (id:1) +// +// Note over Sender: New session established with Receiver +// Sender ->> Receiver: Handshake message (id:1) +// +// +// Note over Receiver: New session established with Sender +// +// rect rgb(0, 100, 0) +// Note over Sender: Send pending requests since a session has been established. +// Sender ->> Receiver: Request (id:2) +// Sender ->> Receiver: Request (id:3) +// end +// +// Receiver ->> Sender: Response (id:1) +// Receiver ->> Sender: Response (id:2) +// Receiver ->> Sender: Response (id:3) +// +// Note over Sender: The request (id:1) completed. +// Note over Sender: The request (id:2) completed. +// Note over Sender: The request (id:3) completed. +// ``` +#[tokio::test] +async fn test_send_pending_request() { + init(); + let sender_port = 5008; + let receiver_port = 5009; + let ip = "127.0.0.1".parse().unwrap(); + let key1 = CombinedKey::generate_secp256k1(); + let key2 = CombinedKey::generate_secp256k1(); + + let sender_enr = EnrBuilder::new("v4") + .ip4(ip) + .udp4(sender_port) + .build(&key1) + .unwrap(); + + let receiver_enr = EnrBuilder::new("v4") + .ip4(ip) + .udp4(receiver_port) + .build(&key2) + .unwrap(); + + // Build sender handler + let (sender_exit, sender_send, mut sender_recv, mut handler) = { + let sender_listen_config = ListenConfig::Ipv4 { + ip: sender_enr.ip4().unwrap(), + port: sender_enr.udp4().unwrap(), + }; + let sender_config = ConfigBuilder::new(sender_listen_config).build(); + build_handler::(sender_enr.clone(), key1, sender_config).await + }; + let sender = async move { + // Start sender handler. + handler.start::().await; + // After the handler has been terminated test the handler's states. + assert!(handler.pending_requests.is_empty()); + assert_eq!(0, handler.active_requests.count().await); + assert!(handler.active_challenges.is_empty()); + assert!(handler.filter_expected_responses.read().is_empty()); + }; + + // Build receiver handler + // Shorten receiver's timeout to reproduce session expired. + let receiver_session_timeout = Duration::from_secs(1); + let (receiver_exit, receiver_send, mut receiver_recv, mut handler) = { + let receiver_listen_config = ListenConfig::Ipv4 { + ip: receiver_enr.ip4().unwrap(), + port: receiver_enr.udp4().unwrap(), + }; + let receiver_config = ConfigBuilder::new(receiver_listen_config) + .session_timeout(receiver_session_timeout) + .build(); + build_handler::(receiver_enr.clone(), key2, receiver_config).await + }; + let receiver = async move { + // Start receiver handler. + handler.start::().await; + // After the handler has been terminated test the handler's states. + assert!(handler.pending_requests.is_empty()); + assert_eq!(0, handler.active_requests.count().await); + assert!(handler.active_challenges.is_empty()); + assert!(handler.filter_expected_responses.read().is_empty()); + }; + + let messages_to_send = 3usize; + + let sender_ops = async move { + let mut response_count = 0usize; + let mut expected_request_ids = HashSet::new(); + + // send requests + for req_id in 1..=messages_to_send { + let request_id = RequestId(vec![req_id as u8]); + expected_request_ids.insert(request_id.clone()); + let _ = sender_send.send(HandlerIn::Request( + receiver_enr.clone().into(), + Box::new(Request { + id: request_id, + body: RequestBody::Ping { enr_seq: 1 }, + }), + )); + } + + loop { + match sender_recv.recv().await { + Some(HandlerOut::Response(_, response)) => { + assert!(expected_request_ids.remove(&response.id)); + response_count += 1; + if response_count == messages_to_send { + // Notify the handlers that the message exchange has been completed. + assert!(expected_request_ids.is_empty()); + sender_exit.send(()).unwrap(); + receiver_exit.send(()).unwrap(); + return; + } + } + _ => continue, + }; + } + }; + + let receiver_ops = async move { + let mut message_count = 0usize; + loop { + match receiver_recv.recv().await { + Some(HandlerOut::WhoAreYou(wru_ref)) => { + receiver_send + .send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone()))) + .unwrap(); + } + Some(HandlerOut::Request(addr, request)) => { + assert!(matches!(request.body, RequestBody::Ping { .. })); + let pong_response = Response { + id: request.id, + body: ResponseBody::Pong { + enr_seq: 1, + ip: ip.into(), + port: sender_port, + }, + }; + receiver_send + .send(HandlerIn::Response(addr, Box::new(pong_response))) + .unwrap(); + message_count += 1; + if message_count == messages_to_send { + return; + } + } + _ => { + continue; + } + } + } + }; + + let sleep_future = sleep(Duration::from_secs(5)); + let message_exchange = async move { + let _ = tokio::join!(sender, sender_ops, receiver, receiver_ops); + }; + + tokio::select! { + _ = message_exchange => {} + _ = sleep_future => { + panic!("Test timed out"); + } + } +} From 1bc3ceaefb9a3c74cd41e4811a807c9bb54579b9 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sun, 10 Sep 2023 07:10:50 +0900 Subject: [PATCH 24/44] Fix the timeout issue https://github.com/sigp/discv5/pull/200#issuecomment-1710867435 https://github.com/sigp/discv5/pull/200#issuecomment-1712445478 --- src/handler/active_requests.rs | 4 ++++ src/handler/mod.rs | 27 ++++++++++++++++++++++++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index d4e3124f8..65641452e 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -32,6 +32,10 @@ impl ActiveRequests { .insert(nonce, node_address); } + pub fn get(&self, node_address: &NodeAddress) -> Option<&Vec> { + self.active_requests_mapping.get(node_address) + } + /// Remove a single request identified by its nonce. pub fn remove_by_nonce(&mut self, nonce: &MessageNonce) -> Option<(NodeAddress, RequestCall)> { let node_address = self.active_requests_nonce_mapping.remove(nonce)?; diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 87c393ee7..60f74a7af 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -473,8 +473,11 @@ impl Handler { return Err(RequestError::SelfRequest); } - // If there is already an active challenge (WHOAREYOU sent) for this node, add to pending requests - if self.active_challenges.get(&node_address).is_some() { + // If there is already an active challenge (WHOAREYOU sent) for this node, or if we are + // awaiting a session with this node to be established, add the request to pending requests. + if self.active_challenges.get(&node_address).is_some() + || self.is_awaiting_session_to_be_established(&node_address) + { trace!("Request queued for node: {}", node_address); self.pending_requests .entry(node_address) @@ -758,8 +761,11 @@ impl Handler { } } } - self.new_session::

(node_address, session, Some(auth_message_nonce)) + self.new_session::

(node_address.clone(), session, Some(auth_message_nonce)) .await; + // We could have pending messages that were awaiting this session to be + // established. If so process them. + self.send_pending_requests::

(&node_address).await; } /// Verifies a Node ENR to it's observed address. If it fails, any associated session is also @@ -1348,4 +1354,19 @@ impl Handler { .ban_nodes .retain(|_, time| time.is_none() || Some(Instant::now()) < *time); } + + /// Returns whether a session with this node does not exist and a request that initiates + /// a session has been sent. + fn is_awaiting_session_to_be_established(&mut self, node_address: &NodeAddress) -> bool { + if self.sessions.get(node_address).is_some() { + // session exists + return false; + } + + if let Some(requests) = self.active_requests.get(node_address) { + requests.iter().any(|req| req.initiating_session()) + } else { + false + } + } } From 1dca750f963653ef272ef2924fbb11fb0d829c95 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sun, 10 Sep 2023 16:46:20 +0900 Subject: [PATCH 25/44] Add a log entry --- src/handler/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 60f74a7af..daf25765d 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -925,6 +925,11 @@ impl Handler { .remove(node_address) .unwrap_or_default(); for req in pending_requests { + trace!( + "Sending pending request {} to {node_address}. {}", + RequestId::from(&req.request_id), + req.request, + ); if let Err(request_error) = self .send_request::

(req.contact, req.request_id.clone(), req.request) .await From b81c4dc8cbc24f0511d9de1c6d13d34546d43bd1 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sun, 10 Sep 2023 16:57:30 +0900 Subject: [PATCH 26/44] Tweak log format to be readable --- src/handler/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index daf25765d..acee549dd 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -977,10 +977,8 @@ impl Handler { for req in active_requests { let (req_id, contact, body) = req.into_request_parts(); trace!( - "Active request to be replayed. {:?}, {}, {}", - req_id, - contact, - body + "Active request to be replayed. {}, {contact}, {body}", + RequestId::from(&req_id), ); // Remove the request from the packet filter here since the request is added in // `self.send_request()` again. From 7b169b9c9a2bd94844b05c2404933bfb2c3851d7 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Tue, 12 Sep 2023 07:41:47 +0900 Subject: [PATCH 27/44] Move send_pending_requests into new_session --- src/handler/mod.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index acee549dd..8baf523f1 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -763,9 +763,6 @@ impl Handler { } self.new_session::

(node_address.clone(), session, Some(auth_message_nonce)) .await; - // We could have pending messages that were awaiting this session to be - // established. If so process them. - self.send_pending_requests::

(&node_address).await; } /// Verifies a Node ENR to it's observed address. If it fails, any associated session is also @@ -847,9 +844,6 @@ impl Handler { authenticated_data, ) .await; - // We could have pending messages that were awaiting this session to be - // established. If so process them. - self.send_pending_requests::

(&node_address).await; } else { // IP's or NodeAddress don't match. Drop the session. warn!( @@ -1225,10 +1219,13 @@ impl Handler { self.replay_active_requests::

(&node_address, message_nonce) .await; } else { - self.sessions.insert(node_address, session); + self.sessions.insert(node_address.clone(), session); METRICS .active_sessions .store(self.sessions.len(), Ordering::Relaxed); + // We could have pending messages that were awaiting this session to be + // established. If so process them. + self.send_pending_requests::

(&node_address).await; } } From b525edd5889fea3e81dc71c6f8b8f9eddbad9c85 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Tue, 12 Sep 2023 09:03:14 +0900 Subject: [PATCH 28/44] Update mermaid diagram --- src/handler/tests.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 00e16db51..5211e3049 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -874,10 +874,6 @@ async fn test_replay_active_requests() { // Receiver ->> Sender: WHOAREYOU (id:1) // // Note over Sender: New session established with Receiver -// Sender ->> Receiver: Handshake message (id:1) -// -// -// Note over Receiver: New session established with Sender // // rect rgb(0, 100, 0) // Note over Sender: Send pending requests since a session has been established. @@ -885,13 +881,17 @@ async fn test_replay_active_requests() { // Sender ->> Receiver: Request (id:3) // end // -// Receiver ->> Sender: Response (id:1) +// Sender ->> Receiver: Handshake message (id:1) +// +// Note over Receiver: New session established with Sender +// // Receiver ->> Sender: Response (id:2) // Receiver ->> Sender: Response (id:3) +// Receiver ->> Sender: Response (id:1) // -// Note over Sender: The request (id:1) completed. // Note over Sender: The request (id:2) completed. // Note over Sender: The request (id:3) completed. +// Note over Sender: The request (id:1) completed. // ``` #[tokio::test] async fn test_send_pending_request() { From a94fd26d800b103817a2742326649ddf7b9ffa1b Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sun, 24 Sep 2023 07:23:40 +0900 Subject: [PATCH 29/44] Add test --- src/discv5.rs | 2 +- src/discv5/test.rs | 2 +- src/service/test.rs | 120 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+), 2 deletions(-) diff --git a/src/discv5.rs b/src/discv5.rs index bba43096d..b2a215bb4 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -49,7 +49,7 @@ lazy_static! { RwLock::new(crate::PermitBanList::default()); } -mod test; +pub(crate) mod test; /// Events that can be produced by the `Discv5` event stream. #[derive(Debug)] diff --git a/src/discv5/test.rs b/src/discv5/test.rs index 07d3a9c47..788db35ed 100644 --- a/src/discv5/test.rs +++ b/src/discv5/test.rs @@ -124,7 +124,7 @@ async fn build_nodes_from_keypairs_dual_stack( } /// Generate `n` deterministic keypairs from a given seed. -fn generate_deterministic_keypair(n: usize, seed: u64) -> Vec { +pub(crate) fn generate_deterministic_keypair(n: usize, seed: u64) -> Vec { let mut keypairs = Vec::new(); for i in 0..n { let sk = { diff --git a/src/service/test.rs b/src/service/test.rs index b6cdd47ec..10db6f860 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -3,6 +3,7 @@ use super::*; use crate::{ + discv5::test::generate_deterministic_keypair, handler::Handler, kbucket, kbucket::{BucketInsertResult, KBucketsTable, NodeStatus}, @@ -221,3 +222,122 @@ async fn test_connection_direction_on_inject_session_established() { assert!(status.is_connected()); assert_eq!(ConnectionDirection::Outgoing, status.direction); } + +#[tokio::test] +async fn test_handling_concurrent_responses() { + init(); + + // Seed is chosen such that all nodes are in the 256th distance of the first node. + let seed = 1652; + let mut keypairs = generate_deterministic_keypair(5, seed); + + let mut service = { + let enr_key = keypairs.pop().unwrap(); + let enr = EnrBuilder::new("v4") + .ip4("127.0.0.1".parse().unwrap()) + .udp4(10001) + .build(&enr_key) + .unwrap(); + build_service::( + Arc::new(RwLock::new(enr)), + Arc::new(RwLock::new(enr_key)), + false, + ) + .await + }; + + let node_contact: NodeContact = EnrBuilder::new("v4") + .ip4("127.0.0.1".parse().unwrap()) + .udp4(10002) + .build(&keypairs.remove(0)) + .unwrap() + .into(); + let node_address = node_contact.node_address(); + + // Add fake requests + // Request1 + service.active_requests.insert( + RequestId(vec![1]), + ActiveRequest { + contact: node_contact.clone(), + request_body: RequestBody::FindNode { + distances: vec![254, 255, 256], + }, + query_id: Some(QueryId(1)), + callback: None, + }, + ); + // Request2 + service.active_requests.insert( + RequestId(vec![2]), + ActiveRequest { + contact: node_contact, + request_body: RequestBody::FindNode { + distances: vec![254, 255, 256], + }, + query_id: Some(QueryId(2)), + callback: None, + }, + ); + + assert_eq!(3, keypairs.len()); + let mut enrs_for_response = keypairs + .iter() + .enumerate() + .map(|(i, key)| { + EnrBuilder::new("v4") + .ip4("127.0.0.1".parse().unwrap()) + .udp4(10003 + i as u16) + .build(key) + .unwrap() + }) + .collect::>(); + + // Response to `Request1` is sent as two separate messages in total. Handle the first one of the + // messages here. + service.handle_rpc_response( + node_address.clone(), + Response { + id: RequestId(vec![1]), + body: ResponseBody::Nodes { + total: 2, + nodes: vec![enrs_for_response.pop().unwrap()], + }, + }, + ); + // Service has still two active requests since we are waiting for the second NODE response to + // `Request1`. + assert_eq!(2, service.active_requests.len()); + // Service stores the first response to `Request1` into `active_nodes_responses`. + assert!(!service.active_nodes_responses.is_empty()); + + // Second, handle a response to *`Request2`* before the second response to `Request1`. + service.handle_rpc_response( + node_address.clone(), + Response { + id: RequestId(vec![2]), + body: ResponseBody::Nodes { + total: 1, + nodes: vec![enrs_for_response.pop().unwrap()], + }, + }, + ); + // `Request2` is completed so now the number of active requests should be one. + assert_eq!(1, service.active_requests.len()); + // Service still keeps the first response in `active_nodes_responses`. + assert!(!service.active_nodes_responses.is_empty()); + + // Finally, handle the second response to `Request1`. + service.handle_rpc_response( + node_address.clone(), + Response { + id: RequestId(vec![1]), + body: ResponseBody::Nodes { + total: 2, + nodes: vec![enrs_for_response.pop().unwrap()], + }, + }, + ); + assert!(service.active_requests.is_empty()); + assert!(service.active_nodes_responses.is_empty()); +} From 2db8534081188ebf8a92ee5d0f4186468b6d9ba0 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sun, 24 Sep 2023 07:40:26 +0900 Subject: [PATCH 30/44] Change key type of active_nodes_responses to handle concurrent responses --- src/service.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/service.rs b/src/service.rs index c20e55d36..a6f89c088 100644 --- a/src/service.rs +++ b/src/service.rs @@ -187,7 +187,7 @@ pub struct Service { active_requests: FnvHashMap, /// Keeps track of the number of responses received from a NODES response. - active_nodes_responses: HashMap, + active_nodes_responses: HashMap, /// A map of votes nodes have made about our external IP address. We accept the majority. ip_votes: Option, @@ -730,7 +730,7 @@ impl Service { if total > 1 { let mut current_response = self .active_nodes_responses - .remove(&node_id) + .remove(&id) .unwrap_or_default(); debug!( @@ -749,7 +749,7 @@ impl Service { current_response.received_nodes.append(&mut nodes); self.active_nodes_responses - .insert(node_id, current_response); + .insert(id.clone(), current_response); self.active_requests.insert(id, active_request); return; } @@ -771,7 +771,7 @@ impl Service { // in a later response sends a response with a total of 1, all previous nodes // will be ignored. // ensure any mapping is removed in this rare case - self.active_nodes_responses.remove(&node_id); + self.active_nodes_responses.remove(&id); self.discovered(&node_id, nodes, active_request.query_id); } @@ -1443,7 +1443,7 @@ impl Service { // if a failed FindNodes request, ensure we haven't partially received packets. If // so, process the partially found nodes RequestBody::FindNode { .. } => { - if let Some(nodes_response) = self.active_nodes_responses.remove(&node_id) { + if let Some(nodes_response) = self.active_nodes_responses.remove(&id) { if !nodes_response.received_nodes.is_empty() { warn!( "NODES Response failed, but was partially processed from: {}", From 18ba66fd6a61e49572ff6bba67bfa9862476885c Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sun, 24 Sep 2023 07:53:48 +0900 Subject: [PATCH 31/44] cargo fmt --- src/service.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/service.rs b/src/service.rs index a6f89c088..f19f4b79f 100644 --- a/src/service.rs +++ b/src/service.rs @@ -728,10 +728,8 @@ impl Service { // handle the case that there is more than one response if total > 1 { - let mut current_response = self - .active_nodes_responses - .remove(&id) - .unwrap_or_default(); + let mut current_response = + self.active_nodes_responses.remove(&id).unwrap_or_default(); debug!( "Nodes Response: {} of {} received", From c053ac25e714342cd275c05fef256b872bb72448 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sun, 24 Sep 2023 07:54:55 +0900 Subject: [PATCH 32/44] Fix redundant clone --- src/service/test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service/test.rs b/src/service/test.rs index 10db6f860..230c0e438 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -329,7 +329,7 @@ async fn test_handling_concurrent_responses() { // Finally, handle the second response to `Request1`. service.handle_rpc_response( - node_address.clone(), + node_address, Response { id: RequestId(vec![1]), body: ResponseBody::Nodes { From b153b14e0cfbd8b80035631c419beb718cc627a8 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sun, 24 Sep 2023 13:15:40 +0900 Subject: [PATCH 33/44] Tweak udp4 port numbers to avoid `Address already in use` error --- src/service/test.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/service/test.rs b/src/service/test.rs index 230c0e438..0515196a0 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -235,7 +235,7 @@ async fn test_handling_concurrent_responses() { let enr_key = keypairs.pop().unwrap(); let enr = EnrBuilder::new("v4") .ip4("127.0.0.1".parse().unwrap()) - .udp4(10001) + .udp4(10005) .build(&enr_key) .unwrap(); build_service::( @@ -248,7 +248,7 @@ async fn test_handling_concurrent_responses() { let node_contact: NodeContact = EnrBuilder::new("v4") .ip4("127.0.0.1".parse().unwrap()) - .udp4(10002) + .udp4(10006) .build(&keypairs.remove(0)) .unwrap() .into(); @@ -287,7 +287,7 @@ async fn test_handling_concurrent_responses() { .map(|(i, key)| { EnrBuilder::new("v4") .ip4("127.0.0.1".parse().unwrap()) - .udp4(10003 + i as u16) + .udp4(10007 + i as u16) .build(key) .unwrap() }) From 4fdb78013ab247373d1666b65024e4312fe96200 Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Wed, 27 Sep 2023 12:15:59 -0400 Subject: [PATCH 34/44] Add docstring comments to new handler methods --- src/handler/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 8baf523f1..4fe1cde2a 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -913,6 +913,8 @@ impl Handler { } } + /// Send all pending requests corresponding to the given node address, that were waiting for a + /// new session to be established or when an active outgoing challenge has expired. async fn send_pending_requests(&mut self, node_address: &NodeAddress) { let pending_requests = self .pending_requests @@ -949,9 +951,13 @@ impl Handler { } } + /// Replays all active requests for the given node address, in the case that a new session has + /// been established. If an optional message nonce is provided, the corresponding request will + /// be skipped, eg. the request that established the new session. async fn replay_active_requests( &mut self, node_address: &NodeAddress, + // Optional message nonce to filter out the request used to establish the session. message_nonce: Option, ) { trace!( @@ -1203,6 +1209,8 @@ impl Handler { self.active_requests.insert(node_address, request_call); } + /// Establishes a new session with a peer, or re-establishes an existing session if a + /// new challenge was issued during an ongoing session. async fn new_session( &mut self, node_address: NodeAddress, From d791fe396048698ec581c9cde06f47467efcc16c Mon Sep 17 00:00:00 2001 From: ackintosh Date: Mon, 9 Oct 2023 07:58:54 +0900 Subject: [PATCH 35/44] Fix replay --- src/handler/active_requests.rs | 63 +++++++++++++++++-------------- src/handler/mod.rs | 68 ++++++++++++++++------------------ src/handler/tests.rs | 28 -------------- 3 files changed, 68 insertions(+), 91 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index 65641452e..9c5df85b6 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -32,6 +32,42 @@ impl ActiveRequests { .insert(nonce, node_address); } + pub fn update_packet(&mut self, old_nonce: MessageNonce, new_packet: Packet) { + let node_address = + if let Some(node_address) = self.active_requests_nonce_mapping.remove(&old_nonce) { + node_address + } else { + debug_unreachable!("expected to find nonce in active_requests_nonce_mapping"); + error!("expected to find nonce in active_requests_nonce_mapping"); + return; + }; + + self.active_requests_nonce_mapping.insert( + new_packet.header.message_nonce.clone(), + node_address.clone(), + ); + + match self.active_requests_mapping.entry(node_address) { + Entry::Occupied(mut requests) => { + let maybe_request_call = requests + .get_mut() + .iter_mut() + .find(|req| req.packet().message_nonce() == &old_nonce); + + if let Some(request_call) = maybe_request_call { + request_call.update_packet(new_packet); + } else { + debug_unreachable!("expected to find request call in active_requests_mapping"); + error!("expected to find request call in active_requests_mapping"); + } + } + Entry::Vacant(_) => { + debug_unreachable!("expected to find node address in active_requests_mapping"); + error!("expected to find node address in active_requests_mapping"); + } + } + } + pub fn get(&self, node_address: &NodeAddress) -> Option<&Vec> { self.active_requests_mapping.get(node_address) } @@ -80,33 +116,6 @@ impl ActiveRequests { Some(requests) } - /// Remove requests associated with a node, except for the request that has the given message nonce. - pub fn remove_requests_except( - &mut self, - node_address: &NodeAddress, - except: &MessageNonce, - ) -> Option> { - let request_ids = self - .active_requests_mapping - .get(node_address)? - .iter() - .filter(|req| req.packet().message_nonce() != except) - .map(|req| req.id().into()) - .collect::>(); - - let mut requests = vec![]; - for id in request_ids.iter() { - match self.remove_request(node_address, id) { - Some(request_call) => requests.push(request_call), - None => { - debug_unreachable!("expected to find request with id"); - error!("expected to find request with id"); - } - } - } - Some(requests) - } - /// Remove a single request identified by its id. pub fn remove_request( &mut self, diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 4fe1cde2a..40501c644 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -39,6 +39,7 @@ use crate::{ use delay_map::HashMapDelay; use enr::{CombinedKey, NodeId}; use futures::prelude::*; +use more_asserts::debug_unreachable; use parking_lot::RwLock; use smallvec::SmallVec; use std::{ @@ -965,44 +966,39 @@ impl Handler { node_address, message_nonce ); - let active_requests = if let Some(nonce) = message_nonce { - // Except the active request that was used to establish the new session, as it has - // already been handled and shouldn't be replayed. - self.active_requests - .remove_requests_except(node_address, &nonce) - } else { - self.active_requests.remove_requests(node_address) - } - .unwrap_or_default(); - for req in active_requests { - let (req_id, contact, body) = req.into_request_parts(); - trace!( - "Active request to be replayed. {}, {contact}, {body}", - RequestId::from(&req_id), - ); - // Remove the request from the packet filter here since the request is added in - // `self.send_request()` again. - self.remove_expected_response(contact.socket_addr()); - if let Err(request_error) = self.send_request::

(contact, req_id.clone(), body).await - { - warn!("Failed to send next awaiting request {request_error}"); - // Inform the service that the request failed - match req_id { - HandlerReqId::Internal(_) => { - // An internal request could not be sent. For now we do nothing about - // this. - } - HandlerReqId::External(id) => { - if let Err(e) = self - .service_send - .send(HandlerOut::RequestFailed(id, request_error)) - .await - { - warn!("Failed to inform that request failed {e}"); - } + + let packets = if let Some(session) = self.sessions.get_mut(&node_address) { + let mut packets = vec![]; + for request_call in self + .active_requests + .get(node_address) + .unwrap_or(&mut vec![]) + .iter() + .filter(|req| { + if let Some(nonce) = message_nonce.as_ref() { + req.packet().message_nonce() != nonce + } else { + true } - } + }) + { + let new_packet = session + .encrypt_message::

(self.node_id, &request_call.encode()) + .unwrap(); + + packets.push((request_call.packet().message_nonce().clone(), new_packet)); } + + packets + } else { + debug_unreachable!("Attempted to replay active requests but session doesn't exist."); + error!("Attempted to replay active requests but session doesn't exist."); + return; + }; + + for (old_nonce, new_packet) in packets { + self.active_requests.update_packet(old_nonce, new_packet.clone()); + self.send(node_address.clone(), new_packet).await; } } diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 5211e3049..7c7032dcf 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -400,34 +400,6 @@ async fn test_active_requests_remove_requests() { assert!(active_requests.remove_requests(&req_3_addr).is_none()); } -#[tokio::test] -async fn test_active_requests_remove_requests_except() { - const EXPIRY: Duration = Duration::from_secs(5); - let mut active_requests = ActiveRequests::new(EXPIRY); - - let node_1 = create_node(); - let node_2 = create_node(); - let (req_1, req_1_addr) = create_req_call(&node_1); - let (req_2, req_2_addr) = create_req_call(&node_2); - let (req_3, req_3_addr) = create_req_call(&node_2); - - let req_2_nonce = req_2.packet().header.message_nonce; - let req_3_id: RequestId = req_3.id().into(); - - active_requests.insert(req_1_addr, req_1); - active_requests.insert(req_2_addr.clone(), req_2); - active_requests.insert(req_3_addr, req_3); - - let removed_requests = active_requests - .remove_requests_except(&req_2_addr, &req_2_nonce) - .unwrap(); - active_requests.check_invariant(); - - assert_eq!(1, removed_requests.len()); - let removed_request_id: RequestId = removed_requests.first().unwrap().id().into(); - assert_eq!(removed_request_id, req_3_id); -} - #[tokio::test] async fn test_active_requests_remove_request() { const EXPIRY: Duration = Duration::from_secs(5); From 13bd6fa0905d2e4daddf1c48cf7e5f925dd31fd6 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Thu, 12 Oct 2023 08:10:03 +0900 Subject: [PATCH 36/44] Add doc comment --- src/handler/active_requests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index 9c5df85b6..c9d3051cf 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -32,6 +32,7 @@ impl ActiveRequests { .insert(nonce, node_address); } + /// Update the underlying packet for the request via message nonce. pub fn update_packet(&mut self, old_nonce: MessageNonce, new_packet: Packet) { let node_address = if let Some(node_address) = self.active_requests_nonce_mapping.remove(&old_nonce) { From 5c84d226b3c8080efcb8db2ae82070e5735e5af1 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Thu, 12 Oct 2023 08:10:23 +0900 Subject: [PATCH 37/44] Add test for update_packet() --- src/handler/tests.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 7c7032dcf..56f7211e9 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -476,6 +476,35 @@ async fn test_active_requests_remove_by_nonce() { assert!(active_requests.remove_by_nonce(&random_nonce).is_none()); } +#[tokio::test] +async fn test_active_requests_update_packet() { + const EXPIRY: Duration = Duration::from_secs(5); + let mut active_requests = ActiveRequests::new(EXPIRY); + + let node_1 = create_node(); + let node_2 = create_node(); + let (req_1, req_1_addr) = create_req_call(&node_1); + let (req_2, req_2_addr) = create_req_call(&node_2); + let (req_3, req_3_addr) = create_req_call(&node_2); + + let old_nonce = *req_2.packet().message_nonce(); + active_requests.insert(req_1_addr.clone(), req_1); + active_requests.insert(req_2_addr.clone(), req_2); + active_requests.insert(req_3_addr.clone(), req_3); + active_requests.check_invariant(); + + let new_packet = Packet::new_random(&node_2.node_id()).unwrap(); + let new_nonce = new_packet.message_nonce().clone(); + active_requests.update_packet(old_nonce.clone(), new_packet.clone()); + active_requests.check_invariant(); + + assert_eq!(2, active_requests.get(&req_2_addr).unwrap().len()); + assert!(active_requests.remove_by_nonce(&old_nonce).is_none()); + let (addr, req) = active_requests.remove_by_nonce(&new_nonce).unwrap(); + assert_eq!(addr, req_2_addr); + assert_eq!(req.packet(), &new_packet); +} + #[tokio::test] async fn test_self_request_ipv4() { init(); From f9e75c3a28669487bbee6e874edd51359c9d1f92 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 14 Oct 2023 22:37:59 +0900 Subject: [PATCH 38/44] Add doc comment & cargo fmt --- src/handler/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 40501c644..4077816e6 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -975,6 +975,8 @@ impl Handler { .unwrap_or(&mut vec![]) .iter() .filter(|req| { + // Except the active request that was used to establish the new session, as it has + // already been handled and shouldn't be replayed. if let Some(nonce) = message_nonce.as_ref() { req.packet().message_nonce() != nonce } else { @@ -997,7 +999,8 @@ impl Handler { }; for (old_nonce, new_packet) in packets { - self.active_requests.update_packet(old_nonce, new_packet.clone()); + self.active_requests + .update_packet(old_nonce, new_packet.clone()); self.send(node_address.clone(), new_packet).await; } } From 76b88d99077d79eb5ba5be11ebc2cc79f0ec0d8f Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sun, 15 Oct 2023 11:21:31 +0900 Subject: [PATCH 39/44] Fix clippy errors --- src/handler/active_requests.rs | 8 +++----- src/handler/mod.rs | 8 ++++---- src/handler/request_call.rs | 5 ----- src/handler/tests.rs | 10 +++++----- src/ipmode.rs | 22 +++++++++++----------- 5 files changed, 23 insertions(+), 30 deletions(-) diff --git a/src/handler/active_requests.rs b/src/handler/active_requests.rs index c9d3051cf..8cfa2d602 100644 --- a/src/handler/active_requests.rs +++ b/src/handler/active_requests.rs @@ -26,7 +26,7 @@ impl ActiveRequests { let nonce = *request_call.packet().message_nonce(); self.active_requests_mapping .entry(node_address.clone()) - .or_insert_with(Vec::new) + .or_default() .push(request_call); self.active_requests_nonce_mapping .insert(nonce, node_address); @@ -43,10 +43,8 @@ impl ActiveRequests { return; }; - self.active_requests_nonce_mapping.insert( - new_packet.header.message_nonce.clone(), - node_address.clone(), - ); + self.active_requests_nonce_mapping + .insert(new_packet.header.message_nonce, node_address.clone()); match self.active_requests_mapping.entry(node_address) { Entry::Occupied(mut requests) => { diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 4077816e6..e88185790 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -482,7 +482,7 @@ impl Handler { trace!("Request queued for node: {}", node_address); self.pending_requests .entry(node_address) - .or_insert_with(Vec::new) + .or_default() .push(PendingRequest { contact, request_id, @@ -967,12 +967,12 @@ impl Handler { message_nonce ); - let packets = if let Some(session) = self.sessions.get_mut(&node_address) { + let packets = if let Some(session) = self.sessions.get_mut(node_address) { let mut packets = vec![]; for request_call in self .active_requests .get(node_address) - .unwrap_or(&mut vec![]) + .unwrap_or(&vec![]) .iter() .filter(|req| { // Except the active request that was used to establish the new session, as it has @@ -988,7 +988,7 @@ impl Handler { .encrypt_message::

(self.node_id, &request_call.encode()) .unwrap(); - packets.push((request_call.packet().message_nonce().clone(), new_packet)); + packets.push((*request_call.packet().message_nonce(), new_packet)); } packets diff --git a/src/handler/request_call.rs b/src/handler/request_call.rs index e82e3ee1d..b91c7643e 100644 --- a/src/handler/request_call.rs +++ b/src/handler/request_call.rs @@ -119,9 +119,4 @@ impl RequestCall { pub fn remaining_responses_mut(&mut self) -> &mut Option { &mut self.remaining_responses } - - /// Returns the id, contact, and request body for this call. - pub fn into_request_parts(self) -> (HandlerReqId, NodeContact, RequestBody) { - (self.request_id, self.contact, self.request) - } } diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 56f7211e9..46c4250f0 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -488,19 +488,19 @@ async fn test_active_requests_update_packet() { let (req_3, req_3_addr) = create_req_call(&node_2); let old_nonce = *req_2.packet().message_nonce(); - active_requests.insert(req_1_addr.clone(), req_1); + active_requests.insert(req_1_addr, req_1); active_requests.insert(req_2_addr.clone(), req_2); - active_requests.insert(req_3_addr.clone(), req_3); + active_requests.insert(req_3_addr, req_3); active_requests.check_invariant(); let new_packet = Packet::new_random(&node_2.node_id()).unwrap(); - let new_nonce = new_packet.message_nonce().clone(); - active_requests.update_packet(old_nonce.clone(), new_packet.clone()); + let new_nonce = new_packet.message_nonce(); + active_requests.update_packet(old_nonce, new_packet.clone()); active_requests.check_invariant(); assert_eq!(2, active_requests.get(&req_2_addr).unwrap().len()); assert!(active_requests.remove_by_nonce(&old_nonce).is_none()); - let (addr, req) = active_requests.remove_by_nonce(&new_nonce).unwrap(); + let (addr, req) = active_requests.remove_by_nonce(new_nonce).unwrap(); assert_eq!(addr, req_2_addr); assert_eq!(req.packet(), &new_packet); } diff --git a/src/ipmode.rs b/src/ipmode.rs index de66fd195..f2dbe48da 100644 --- a/src/ipmode.rs +++ b/src/ipmode.rs @@ -69,6 +69,17 @@ impl IpMode { } } +/// Copied from the standard library. See +/// The current code is behind the `ip` feature. +pub const fn to_ipv4_mapped(ip: &std::net::Ipv6Addr) -> Option { + match ip.octets() { + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => { + Some(std::net::Ipv4Addr::new(a, b, c, d)) + } + _ => None, + } +} + #[cfg(test)] mod tests { use super::*; @@ -230,14 +241,3 @@ mod tests { .test(); } } - -/// Copied from the standard library. See -/// The current code is behind the `ip` feature. -pub const fn to_ipv4_mapped(ip: &std::net::Ipv6Addr) -> Option { - match ip.octets() { - [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => { - Some(std::net::Ipv4Addr::new(a, b, c, d)) - } - _ => None, - } -} From f2d345c5b6ab1ee74473efab4dff16431ff1abe0 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 29 Nov 2023 19:50:32 +1100 Subject: [PATCH 40/44] fmt --- src/handler/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handler/tests.rs b/src/handler/tests.rs index b4b8d7770..9d4b7d67b 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -9,9 +9,9 @@ use crate::{ }; use std::{ collections::HashSet, + convert::TryInto, net::{Ipv4Addr, Ipv6Addr}, ops::Add, - convert::TryInto, }; use crate::{ From 35cfcb3eed4f9b438827bf895ca07b83ee3620b0 Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Thu, 30 Nov 2023 18:02:32 -0500 Subject: [PATCH 41/44] Fix broken clippy lint --- src/handler/tests.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 9d4b7d67b..f74546bc1 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -11,6 +11,7 @@ use std::{ collections::HashSet, convert::TryInto, net::{Ipv4Addr, Ipv6Addr}, + num::NonZeroU16, ops::Add, }; @@ -817,7 +818,7 @@ async fn test_replay_active_requests() { body: ResponseBody::Pong { enr_seq: 1, ip: ip.into(), - port: sender_port, + port: NonZeroU16::new(sender_port).unwrap(), }, }; receiver_send @@ -1011,7 +1012,7 @@ async fn test_send_pending_request() { body: ResponseBody::Pong { enr_seq: 1, ip: ip.into(), - port: sender_port, + port: NonZeroU16::new(sender_port).unwrap(), }, }; receiver_send From 3e325d609c80f1e66ebd8747f83ca9d37e91fde9 Mon Sep 17 00:00:00 2001 From: Diva M Date: Sun, 3 Dec 2023 12:00:56 -0500 Subject: [PATCH 42/44] smmal code cleanup --- src/service/test.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/service/test.rs b/src/service/test.rs index 5223f0d70..c85f6a97a 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -17,7 +17,7 @@ use crate::{ }; use enr::CombinedKey; use parking_lot::RwLock; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, net::Ipv4Addr, sync::Arc, time::Duration}; use tokio::sync::{mpsc, oneshot}; /// Default UDP port number to use for tests requiring UDP exposure @@ -234,7 +234,7 @@ async fn test_handling_concurrent_responses() { let mut service = { let enr_key = keypairs.pop().unwrap(); let enr = Enr::builder() - .ip4("127.0.0.1".parse().unwrap()) + .ip4(Ipv4Addr::LOCALHOST) .udp4(10005) .build(&enr_key) .unwrap(); @@ -247,7 +247,7 @@ async fn test_handling_concurrent_responses() { }; let node_contact: NodeContact = Enr::builder() - .ip4("127.0.0.1".parse().unwrap()) + .ip4(Ipv4Addr::LOCALHOST) .udp4(10006) .build(&keypairs.remove(0)) .unwrap() @@ -286,7 +286,7 @@ async fn test_handling_concurrent_responses() { .enumerate() .map(|(i, key)| { Enr::builder() - .ip4("127.0.0.1".parse().unwrap()) + .ip4(Ipv4Addr::LOCALHOST) .udp4(10007 + i as u16) .build(key) .unwrap() From 9c9cdd4590c0dc636cf8564e88738f9e3ef548e4 Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Tue, 5 Dec 2023 11:09:26 -0500 Subject: [PATCH 43/44] Replace naked unwrap with expect --- src/handler/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index e88185790..12c7e0359 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -986,7 +986,10 @@ impl Handler { { let new_packet = session .encrypt_message::

(self.node_id, &request_call.encode()) - .unwrap(); + .expect(&format!( + "Failed to encrypt message for request with id: {:?}", + request_call.id() + )); packets.push((*request_call.packet().message_nonce(), new_packet)); } From 84f303aa9b75cab73793d779bec3f3f7344d0272 Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Thu, 7 Dec 2023 17:38:17 -0500 Subject: [PATCH 44/44] Log error message instead of panicking, if unable to re-encrypt active request --- src/handler/mod.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 12c7e0359..4547078e8 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -984,14 +984,16 @@ impl Handler { } }) { - let new_packet = session - .encrypt_message::

(self.node_id, &request_call.encode()) - .expect(&format!( - "Failed to encrypt message for request with id: {:?}", + if let Ok(new_packet) = + session.encrypt_message::

(self.node_id, &request_call.encode()) + { + packets.push((*request_call.packet().message_nonce(), new_packet)); + } else { + error!( + "Failed to re-encrypt packet while replaying active request with id: {:?}", request_call.id() - )); - - packets.push((*request_call.packet().message_nonce(), new_packet)); + ); + } } packets