diff --git a/.gitignore b/.gitignore index 3af21dd77..be506a9f6 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,6 @@ # These are backup files generated by rustfmt **/*.rs.bk Cargo.lock + +# VIM swap files +*.sw[op] diff --git a/src/config.rs b/src/config.rs index e248426d0..26bd32a78 100644 --- a/src/config.rs +++ b/src/config.rs @@ -57,11 +57,6 @@ pub struct Discv5Config { /// excluded if they do not pass this filter. The default is to accept all nodes. pub table_filter: fn(&Enr) -> bool, - /// The callback for handling TALKREQ requests. The input to this callback is the protocol and - /// the output is the response sent back to the requester. - // This is a temporary measure and this may change in the future. - pub talkreq_callback: fn(&[u8], &[u8]) -> Vec, - /// The time between pings to ensure connectivity amongst connected nodes. Default: 300 /// seconds. pub ping_interval: Duration, @@ -100,7 +95,6 @@ impl Default for Discv5Config { ip_limit: false, incoming_bucket_limit: MAX_NODES_PER_BUCKET, table_filter: |_| true, - talkreq_callback: |_, _| Vec::new(), ping_interval: Duration::from_secs(300), report_discovered_peers: true, filter_config: FilterConfig::default(), @@ -230,13 +224,6 @@ impl Discv5ConfigBuilder { self } - /// The callback function for handling TALK requests. The input is the protocol in bytes and the request data in bytes and - /// the output will be the response sent back to the requester. - pub fn talkreq_callback(&mut self, callback: fn(&[u8], &[u8]) -> Vec) -> &mut Self { - self.config.talkreq_callback = callback; - self - } - /// The time between pings to ensure connectivity amongst connected nodes. pub fn ping_interval(&mut self, interval: Duration) -> &mut Self { self.config.ping_interval = interval; diff --git a/src/discv5.rs b/src/discv5.rs index 0eed904c0..55bf8326d 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -19,7 +19,7 @@ use crate::{ NodeStatus, UpdateResult, }, node_info::NodeContact, - service::{QueryKind, Service, ServiceRequest}, + service::{QueryKind, Service, ServiceRequest, TalkRequest}, Discv5Config, Enr, }; use enr::{CombinedKey, EnrError, EnrKey, NodeId}; @@ -58,6 +58,8 @@ pub enum Discv5Event { }, /// Our local ENR IP address has been updated. SocketUpdated(SocketAddr), + /// A node has initiated a talk request. + TalkRequest(TalkRequest), } /// The main Discv5 Service struct. This provides the user-level API for performing queries and @@ -212,6 +214,31 @@ impl Discv5 { self.kbuckets.write().remove(key) } + /// Returns a vector of closest nodes by the given distances. + pub fn nodes_by_distance(&self, mut distances: Vec) -> Vec { + let mut nodes_to_send = Vec::new(); + distances.sort_unstable(); + distances.dedup(); + + if let Some(0) = distances.first() { + // if the distance is 0 send our local ENR + nodes_to_send.push(self.local_enr.read().clone()); + distances.remove(0); + } + + if !distances.is_empty() { + let mut kbuckets = self.kbuckets.write(); + for node in kbuckets + .nodes_by_distances(distances, self.config.max_nodes_response) + .into_iter() + .map(|entry| entry.node.value.clone()) + { + nodes_to_send.push(node); + } + } + nodes_to_send + } + /// Mark a node in the routing table as `Disconnnected`. /// /// A `Disconnected` node will be present in the routing table and will be only @@ -332,7 +359,7 @@ impl Discv5 { } /// Returns an iterator over all ENR node IDs of nodes currently contained in the routing table. - pub fn table_entries_id(&mut self) -> Vec { + pub fn table_entries_id(&self) -> Vec { self.kbuckets .write() .iter() @@ -341,7 +368,7 @@ impl Discv5 { } /// Returns an iterator over all the ENR's of nodes currently contained in the routing table. - pub fn table_entries_enr(&mut self) -> Vec { + pub fn table_entries_enr(&self) -> Vec { self.kbuckets .write() .iter() @@ -406,7 +433,7 @@ impl Discv5 { /// Request a TALK message from a node, identified via the ENR. pub fn talk_req( - &mut self, + &self, enr: Enr, protocol: Vec, request: Vec, diff --git a/src/error.rs b/src/error.rs index 70e90ddaa..26f07574b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,6 @@ use crate::handler::Challenge; use rlp::DecoderError; +use std::fmt; #[derive(Debug)] /// A general error that is used throughout the Discv5 library. @@ -69,6 +70,25 @@ pub enum PacketError { InvalidEnr(DecoderError), } +#[derive(Debug, Clone, PartialEq)] +#[non_exhaustive] +pub enum ResponseError { + /// The channel used to send the response has already been closed. + ChannelClosed, +} + +impl fmt::Display for ResponseError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ResponseError::ChannelClosed => { + write!(f, "response channel has already been closed") + } + } + } +} + +impl std::error::Error for ResponseError {} + #[derive(Debug, Clone, PartialEq)] pub enum RequestError { /// The request timed out. @@ -107,20 +127,20 @@ pub enum QueryError { InvalidMultiaddr(String), } -impl std::fmt::Display for Discv5Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for Discv5Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self) } } -impl std::fmt::Display for RequestError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for RequestError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self) } } -impl std::fmt::Display for QueryError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for QueryError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self) } } diff --git a/src/lib.rs b/src/lib.rs index 11f7bac90..29136a594 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -124,10 +124,11 @@ pub type Enr = enr::Enr; pub use crate::discv5::{Discv5, Discv5Event}; pub use config::{Discv5Config, Discv5ConfigBuilder}; -pub use error::{Discv5Error, QueryError, RequestError}; +pub use error::{Discv5Error, QueryError, RequestError, ResponseError}; pub use executor::{Executor, TokioExecutor}; pub use kbucket::{ConnectionDirection, ConnectionState, Key}; pub use permit_ban::PermitBanList; +pub use service::TalkRequest; pub use socket::{FilterConfig, FilterConfigBuilder}; // re-export the ENR crate pub use enr; diff --git a/src/service.rs b/src/service.rs index c873b9237..00b90eb1f 100644 --- a/src/service.rs +++ b/src/service.rs @@ -18,7 +18,7 @@ use self::{ query_info::{QueryInfo, QueryType}, }; use crate::{ - error::RequestError, + error::{RequestError, ResponseError}, handler::{Handler, HandlerRequest, HandlerResponse}, kbucket::{ self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable, @@ -34,6 +34,7 @@ use crate::{ use enr::{CombinedKey, NodeId}; use fnv::FnvHashMap; use futures::prelude::*; +use hashset_delay::HashSetDelay; use parking_lot::RwLock; use rpc::*; use std::{collections::HashMap, net::SocketAddr, sync::Arc, task::Poll}; @@ -45,7 +46,68 @@ mod ip_vote; mod query_info; mod test; -use hashset_delay::HashSetDelay; +/// Request type for Protocols using `TalkReq` message. +/// +/// Automatically responds with an empty body on drop if +/// [`TalkRequest::respond`] is not called. +#[derive(Debug)] +pub struct TalkRequest { + id: RequestId, + node_address: NodeAddress, + protocol: Vec, + body: Vec, + sender: Option>, +} + +impl Drop for TalkRequest { + fn drop(&mut self) { + let sender = match self.sender.take() { + Some(s) => s, + None => return, + }; + + let response = Response { + id: self.id.clone(), + body: ResponseBody::Talk { response: vec![] }, + }; + + debug!("Sending empty TALK response to {}", self.node_address); + let _ = sender.send(HandlerRequest::Response( + self.node_address.clone(), + Box::new(response), + )); + } +} + +impl TalkRequest { + pub fn protocol(&self) -> &[u8] { + &self.protocol + } + + pub fn body(&self) -> &[u8] { + &self.body + } + + pub fn respond(mut self, response: Vec) -> Result<(), ResponseError> { + debug!("Sending TALK response to {}", self.node_address); + + let response = Response { + id: self.id.clone(), + body: ResponseBody::Talk { response }, + }; + + self.sender + .take() + .unwrap() + .send(HandlerRequest::Response( + self.node_address.clone(), + Box::new(response), + )) + .map_err(|_| ResponseError::ChannelClosed)?; + + Ok(()) + } +} /// The number of distances (buckets) we simultaneously request from each peer. pub(crate) const DISTANCES_TO_REQUEST_PER_PEER: usize = 3; @@ -480,17 +542,15 @@ impl Service { .send(HandlerRequest::Response(node_address, Box::new(response))); } RequestBody::Talk { protocol, request } => { - // Send the callback's response to this protocol. - let response = (self.config.talkreq_callback)(&protocol, &request); - let response = Response { + let req = TalkRequest { id, - body: ResponseBody::Talk { response }, + node_address, + protocol, + body: request, + sender: Some(self.handler_send.clone()), }; - debug!("Sending TALK response to {}", node_address); - let _ = self - .handler_send - .send(HandlerRequest::Response(node_address, Box::new(response))); + self.send_event(Discv5Event::TalkRequest(req)); } RequestBody::RegisterTopic { .. } => { debug!("Received RegisterTopic request which is unimplemented");