Skip to content

Commit

Permalink
Handful of changes from portal network fork (#70)
Browse files Browse the repository at this point in the history
* Add a TalkReqHandler trait

* Fix tests

* Add a nodes_by_distance public method

* Introduce an event for TalkReq messages.

* Create error type for explicit responses

* Relax mutable reference for table_entries_enr and table_entries_id

* Ignore vim swap files in git

Co-authored-by: pawan <pawandhananjay@gmail.com>
Co-authored-by: Jason Carver <ut96caarrs@snkmail.com>
  • Loading branch information
3 people authored Jun 25, 2021
1 parent 19ede3c commit 478c860
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 34 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@
# These are backup files generated by rustfmt
**/*.rs.bk
Cargo.lock

# VIM swap files
*.sw[op]
13 changes: 0 additions & 13 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ pub struct Discv5Config {
/// excluded if they do not pass this filter. The default is to accept all nodes.
pub table_filter: fn(&Enr) -> bool,

/// The callback for handling TALKREQ requests. The input to this callback is the protocol and
/// the output is the response sent back to the requester.
// This is a temporary measure and this may change in the future.
pub talkreq_callback: fn(&[u8], &[u8]) -> Vec<u8>,

/// The time between pings to ensure connectivity amongst connected nodes. Default: 300
/// seconds.
pub ping_interval: Duration,
Expand Down Expand Up @@ -100,7 +95,6 @@ impl Default for Discv5Config {
ip_limit: false,
incoming_bucket_limit: MAX_NODES_PER_BUCKET,
table_filter: |_| true,
talkreq_callback: |_, _| Vec::new(),
ping_interval: Duration::from_secs(300),
report_discovered_peers: true,
filter_config: FilterConfig::default(),
Expand Down Expand Up @@ -230,13 +224,6 @@ impl Discv5ConfigBuilder {
self
}

/// The callback function for handling TALK requests. The input is the protocol in bytes and the request data in bytes and
/// the output will be the response sent back to the requester.
pub fn talkreq_callback(&mut self, callback: fn(&[u8], &[u8]) -> Vec<u8>) -> &mut Self {
self.config.talkreq_callback = callback;
self
}

/// The time between pings to ensure connectivity amongst connected nodes.
pub fn ping_interval(&mut self, interval: Duration) -> &mut Self {
self.config.ping_interval = interval;
Expand Down
35 changes: 31 additions & 4 deletions src/discv5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
NodeStatus, UpdateResult,
},
node_info::NodeContact,
service::{QueryKind, Service, ServiceRequest},
service::{QueryKind, Service, ServiceRequest, TalkRequest},
Discv5Config, Enr,
};
use enr::{CombinedKey, EnrError, EnrKey, NodeId};
Expand Down Expand Up @@ -58,6 +58,8 @@ pub enum Discv5Event {
},
/// Our local ENR IP address has been updated.
SocketUpdated(SocketAddr),
/// A node has initiated a talk request.
TalkRequest(TalkRequest),
}

/// The main Discv5 Service struct. This provides the user-level API for performing queries and
Expand Down Expand Up @@ -212,6 +214,31 @@ impl Discv5 {
self.kbuckets.write().remove(key)
}

/// Returns a vector of closest nodes by the given distances.
pub fn nodes_by_distance(&self, mut distances: Vec<u64>) -> Vec<Enr> {
let mut nodes_to_send = Vec::new();
distances.sort_unstable();
distances.dedup();

if let Some(0) = distances.first() {
// if the distance is 0 send our local ENR
nodes_to_send.push(self.local_enr.read().clone());
distances.remove(0);
}

if !distances.is_empty() {
let mut kbuckets = self.kbuckets.write();
for node in kbuckets
.nodes_by_distances(distances, self.config.max_nodes_response)
.into_iter()
.map(|entry| entry.node.value.clone())
{
nodes_to_send.push(node);
}
}
nodes_to_send
}

/// Mark a node in the routing table as `Disconnnected`.
///
/// A `Disconnected` node will be present in the routing table and will be only
Expand Down Expand Up @@ -332,7 +359,7 @@ impl Discv5 {
}

/// Returns an iterator over all ENR node IDs of nodes currently contained in the routing table.
pub fn table_entries_id(&mut self) -> Vec<NodeId> {
pub fn table_entries_id(&self) -> Vec<NodeId> {
self.kbuckets
.write()
.iter()
Expand All @@ -341,7 +368,7 @@ impl Discv5 {
}

/// Returns an iterator over all the ENR's of nodes currently contained in the routing table.
pub fn table_entries_enr(&mut self) -> Vec<Enr> {
pub fn table_entries_enr(&self) -> Vec<Enr> {
self.kbuckets
.write()
.iter()
Expand Down Expand Up @@ -406,7 +433,7 @@ impl Discv5 {

/// Request a TALK message from a node, identified via the ENR.
pub fn talk_req(
&mut self,
&self,
enr: Enr,
protocol: Vec<u8>,
request: Vec<u8>,
Expand Down
32 changes: 26 additions & 6 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::handler::Challenge;
use rlp::DecoderError;
use std::fmt;

#[derive(Debug)]
/// A general error that is used throughout the Discv5 library.
Expand Down Expand Up @@ -69,6 +70,25 @@ pub enum PacketError {
InvalidEnr(DecoderError),
}

#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum ResponseError {
/// The channel used to send the response has already been closed.
ChannelClosed,
}

impl fmt::Display for ResponseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ResponseError::ChannelClosed => {
write!(f, "response channel has already been closed")
}
}
}
}

impl std::error::Error for ResponseError {}

#[derive(Debug, Clone, PartialEq)]
pub enum RequestError {
/// The request timed out.
Expand Down Expand Up @@ -107,20 +127,20 @@ pub enum QueryError {
InvalidMultiaddr(String),
}

impl std::fmt::Display for Discv5Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for Discv5Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}

impl std::fmt::Display for RequestError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for RequestError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}

impl std::fmt::Display for QueryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for QueryError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ pub type Enr = enr::Enr<enr::CombinedKey>;

pub use crate::discv5::{Discv5, Discv5Event};
pub use config::{Discv5Config, Discv5ConfigBuilder};
pub use error::{Discv5Error, QueryError, RequestError};
pub use error::{Discv5Error, QueryError, RequestError, ResponseError};
pub use executor::{Executor, TokioExecutor};
pub use kbucket::{ConnectionDirection, ConnectionState, Key};
pub use permit_ban::PermitBanList;
pub use service::TalkRequest;
pub use socket::{FilterConfig, FilterConfigBuilder};
// re-export the ENR crate
pub use enr;
80 changes: 70 additions & 10 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use self::{
query_info::{QueryInfo, QueryType},
};
use crate::{
error::RequestError,
error::{RequestError, ResponseError},
handler::{Handler, HandlerRequest, HandlerResponse},
kbucket::{
self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable,
Expand All @@ -34,6 +34,7 @@ use crate::{
use enr::{CombinedKey, NodeId};
use fnv::FnvHashMap;
use futures::prelude::*;
use hashset_delay::HashSetDelay;
use parking_lot::RwLock;
use rpc::*;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, task::Poll};
Expand All @@ -45,7 +46,68 @@ mod ip_vote;
mod query_info;
mod test;

use hashset_delay::HashSetDelay;
/// Request type for Protocols using `TalkReq` message.
///
/// Automatically responds with an empty body on drop if
/// [`TalkRequest::respond`] is not called.
#[derive(Debug)]
pub struct TalkRequest {
id: RequestId,
node_address: NodeAddress,
protocol: Vec<u8>,
body: Vec<u8>,
sender: Option<mpsc::UnboundedSender<HandlerRequest>>,
}

impl Drop for TalkRequest {
fn drop(&mut self) {
let sender = match self.sender.take() {
Some(s) => s,
None => return,
};

let response = Response {
id: self.id.clone(),
body: ResponseBody::Talk { response: vec![] },
};

debug!("Sending empty TALK response to {}", self.node_address);
let _ = sender.send(HandlerRequest::Response(
self.node_address.clone(),
Box::new(response),
));
}
}

impl TalkRequest {
pub fn protocol(&self) -> &[u8] {
&self.protocol
}

pub fn body(&self) -> &[u8] {
&self.body
}

pub fn respond(mut self, response: Vec<u8>) -> Result<(), ResponseError> {
debug!("Sending TALK response to {}", self.node_address);

let response = Response {
id: self.id.clone(),
body: ResponseBody::Talk { response },
};

self.sender
.take()
.unwrap()
.send(HandlerRequest::Response(
self.node_address.clone(),
Box::new(response),
))
.map_err(|_| ResponseError::ChannelClosed)?;

Ok(())
}
}

/// The number of distances (buckets) we simultaneously request from each peer.
pub(crate) const DISTANCES_TO_REQUEST_PER_PEER: usize = 3;
Expand Down Expand Up @@ -480,17 +542,15 @@ impl Service {
.send(HandlerRequest::Response(node_address, Box::new(response)));
}
RequestBody::Talk { protocol, request } => {
// Send the callback's response to this protocol.
let response = (self.config.talkreq_callback)(&protocol, &request);
let response = Response {
let req = TalkRequest {
id,
body: ResponseBody::Talk { response },
node_address,
protocol,
body: request,
sender: Some(self.handler_send.clone()),
};

debug!("Sending TALK response to {}", node_address);
let _ = self
.handler_send
.send(HandlerRequest::Response(node_address, Box::new(response)));
self.send_event(Discv5Event::TalkRequest(req));
}
RequestBody::RegisterTopic { .. } => {
debug!("Received RegisterTopic request which is unimplemented");
Expand Down

0 comments on commit 478c860

Please sign in to comment.