diff --git a/Cargo.toml b/Cargo.toml index e3b73fe67..78ed95ef2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,18 +1,22 @@ [package] name = "discv5" +authors = ["Age Manning "] edition = "2018" -version = "0.1.0-alpha" +version = "0.1.0-alpha.1" description = "Implementation of the p2p discv5 discovery protocol" -authors = ["Age Manning "] license = "MIT" repository = "https://github.com/sigp/discv5" readme = "./README.md" keywords = ["peer-to-peer", "libp2p", "networking", "discovery", "discv5"] categories = ["network-programming", "asynchronous"] +exclude = [ + ".gitignore", + ".github/*" +] [dependencies] -enr = { version = "0.1.0-alpha.5", features = ["libsecp256k1", "ed25519"] } -tokio = { version = "0.2.19", features = ["time", "stream"] } +enr = { version = "0.1.0-alpha.6", features = ["libsecp256k1", "ed25519"] } +tokio = { version = "0.2.20", features = ["net", "stream", "time"] } zeroize = { version = "1.1.0", features = ["zeroize_derive"] } libsecp256k1 = "0.3.5" futures = "0.3.4" @@ -27,7 +31,6 @@ fnv = "1.0.6" arrayvec = "0.5.1" digest = "0.8.1" rand = "0.7.3" -async-std = "1.5.0" net2 = "0.2.33" smallvec = "1.4.0" @@ -35,6 +38,6 @@ smallvec = "1.4.0" quickcheck = "0.9.2" env_logger = "0.7.1" simple_logger = "1.6.0" -tokio = { version = "0.2.19", features = ["time", "rt-threaded", "macros"] } +tokio = { version = "0.2.20", features = ["time", "rt-threaded", "macros"] } rand_xorshift = "0.2.0" rand_core = "0.5.1" diff --git a/README.md b/README.md index 2a0489c93..5bbf46493 100644 --- a/README.md +++ b/README.md @@ -78,16 +78,17 @@ async fn main() { let target_random_node_id = enr::NodeId::random(); discv5.find_node(target_random_node_id); - // poll the stream for the next FindNoeResult event - loop { - match discv5.next().await { - Some(Discv5Event::FindNodeResult { closer_peers, .. }) => { - println!("Query completed. Found {} peers", closer_peers.len()); - break; - } - _ => {} // handle other discv5 events - } - } + + // poll the stream for the next FindNoeResult event + while let Some(event) = discv5.next().await { + match event { + Discv5Event::FindNodeResult { closer_peers, .. } => { + println!("Query completed. Found {} peers", closer_peers.len()); + break; + } + _ => {} // handle other discv5 events + } + } } ``` @@ -106,7 +107,7 @@ This protocol is split into three main sections/layers: undergoes a handshake, which results in a [`Session`]. [`Session`]'s are established when needed and get dropped after a timeout. This section manages the creation and maintenance of sessions between nodes. It is realised by the [`SessionService`] struct. - * Behaviour - This section contains the protocol-level logic. In particular it manages the + * Application - This section contains the protocol-level logic. In particular it manages the routing table of known ENR's, topic registration/advertisement and performs various queries such as peer discovery. This section is realised by the [`Discv5`] struct. diff --git a/src/config.rs b/src/config.rs index b657d0d51..786bef747 100644 --- a/src/config.rs +++ b/src/config.rs @@ -62,6 +62,7 @@ impl Default for Discv5Config { } } +#[derive(Debug)] pub struct Discv5ConfigBuilder { config: Discv5Config, } @@ -144,7 +145,7 @@ impl Discv5ConfigBuilder { } impl std::fmt::Debug for Discv5Config { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut builder = f.debug_struct("Discv5Config"); let _ = builder.field("request_timeout", &self.request_timeout); let _ = builder.field("query_timeout", &self.query_timeout); diff --git a/src/discv5.rs b/src/discv5.rs index aeeded3f3..af88f7f3f 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -21,10 +21,9 @@ use crate::query_pool::{ FindNodeQueryConfig, PredicateQueryConfig, QueryId, QueryPool, QueryPoolState, ReturnPeer, }; use crate::rpc; -use crate::service::MAX_PACKET_SIZE; use crate::session_service::{SessionEvent, SessionService}; +use crate::transport::MAX_PACKET_SIZE; use crate::Discv5Config; -use async_std::prelude::*; use enr::{CombinedKey, Enr as RawEnr, EnrError, EnrKey, NodeId}; use fnv::FnvHashMap; use futures::prelude::*; @@ -517,7 +516,7 @@ impl Discv5 { rpc_id: u64, distance: u64, ) { - let nodes: Vec> = self + let nodes: Vec> = self .kbuckets .nodes_by_distance(distance) .into_iter() @@ -937,7 +936,7 @@ impl Discv5 { impl Stream for Discv5 { type Item = Discv5Event; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { // Process events from the session service while let Poll::Ready(Some(event)) = self.service.poll_next_unpin(cx) { diff --git a/src/discv5/query_info.rs b/src/discv5/query_info.rs index 9f7b73eb6..9179fe011 100644 --- a/src/discv5/query_info.rs +++ b/src/discv5/query_info.rs @@ -29,7 +29,7 @@ pub enum QueryType { impl QueryInfo { /// Builds an RPC Request - pub fn into_rpc_request( + pub(crate) fn into_rpc_request( self, return_peer: &ReturnPeer, ) -> Result { diff --git a/src/discv5/test.rs b/src/discv5/test.rs index b03068692..97dcea398 100644 --- a/src/discv5/test.rs +++ b/src/discv5/test.rs @@ -203,7 +203,7 @@ async fn test_discovery_star_topology() { nodes.first_mut().unwrap().find_node(target_random_node_id); nodes.push(bootstrap_node); - let main = |cx: &mut Context| loop { + let main = |cx: &mut Context| { for node in nodes.iter_mut() { loop { match node.poll_next_unpin(cx) { @@ -221,6 +221,7 @@ async fn test_discovery_star_topology() { } } } + Poll::Pending }; future::poll_fn(main).await } @@ -259,7 +260,7 @@ async fn test_findnode_query() { .take(total_nodes - 1) .collect(); - let main = |cx: &mut Context| loop { + let main = |cx: &mut Context| { for node in nodes.iter_mut() { loop { match node.poll_next_unpin(cx) { @@ -283,11 +284,12 @@ async fn test_findnode_query() { } } } + Poll::Pending }; let future = future::poll_fn(main); - if let Err(_) = timeout(Duration::from_millis(100), future).await { + if let Err(_) = timeout(Duration::from_millis(800), future).await { panic!("Future timed out"); } } @@ -341,8 +343,8 @@ async fn test_updating_connection_on_ping() { } // The kbuckets table can have maximum 10 nodes in the same /24 subnet across all buckets -#[test] -fn test_table_limits() { +#[tokio::test] +async fn test_table_limits() { // this seed generates 12 node id's that are distributed accross buckets such that no more than // 2 exist in a single bucket. let mut keypairs = generate_deterministic_keypair(12, 9487); @@ -381,8 +383,8 @@ fn test_table_limits() { } // Each bucket can have maximum 2 nodes in the same /24 subnet -#[test] -fn test_bucket_limits() { +#[tokio::test] +async fn test_bucket_limits() { let enr_key = CombinedKey::generate_secp256k1(); let ip: IpAddr = "127.0.0.1".parse().unwrap(); let enr = EnrBuilder::new("v4") @@ -503,7 +505,7 @@ async fn test_predicate_search() { .find_enr_predicate(target_random_node_id, predicate, total_nodes); nodes.push(bootstrap_node); - let main = |cx: &mut Context| loop { + let main = |cx: &mut Context| { for node in nodes.iter_mut() { loop { match node.poll_next_unpin(cx) { @@ -513,19 +515,21 @@ async fn test_predicate_search() { closer_peers.len(), total_nodes, ); + println!("Nodes expected to pass predicate search {}", num_nodes); assert!(closer_peers.len() == num_nodes); return Poll::Ready(()); } Poll::Ready(_) => {} - _ => break, + Poll::Pending => break, } } } + Poll::Pending }; let future = future::poll_fn(main); - if let Err(_) = timeout(Duration::from_millis(100), future).await { + if let Err(_) = timeout(Duration::from_millis(500), future).await { panic!("Future timed out"); } } diff --git a/src/kbucket.rs b/src/kbucket.rs index 5a30bf05e..2ca5853dd 100644 --- a/src/kbucket.rs +++ b/src/kbucket.rs @@ -178,7 +178,7 @@ where } /// Returns an iterator over all the entries in the routing table. - pub fn iter(&mut self) -> impl Iterator> { + pub fn iter(&mut self) -> impl Iterator> { let applied_pending = &mut self.applied_pending; self.buckets.iter_mut().flat_map(move |table| { if let Some(applied) = table.apply_pending() { @@ -197,7 +197,7 @@ where /// Returns an iterator over all the entries in the routing table. /// Does not add pending node to kbucket to get an iterator which /// takes a reference instead of a mutable reference. - pub fn iter_ref(&self) -> impl Iterator> { + pub fn iter_ref(&self) -> impl Iterator> { self.buckets.iter().flat_map(move |table| { table.iter().map(move |(n, status)| EntryRefView { node: NodeRefView { diff --git a/src/lib.rs b/src/lib.rs index 00d98b3bd..1da1512b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,13 @@ +#![warn(rust_2018_idioms)] +#![deny(intra_doc_link_resolution_failure)] +#![cfg_attr(docsrs, feature(doc_cfg))] #![allow(clippy::needless_doctest_main)] //! An implementation of [Discovery V5](https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md). //! //! # Overview //! //! Discovery v5 is a protocol designed for encrypted peer discovery and topic advertisement. Each peer/node -//! on the network is identified via it's ['ENR'] ([Ethereum Name +//! on the network is identified via it's ENR ([Ethereum Name //! Record](https://eips.ethereum.org/EIPS/eip-778)), which is essentially a signed key-value store //! containing the node's public key and optionally IP address and port. //! @@ -16,14 +19,14 @@ //! //! This protocol is split into three main sections/layers: //! -//! * Transport - The transport for this protocol is currently fixed to UDP and is realised by the -//! [`Discv5Service`] struct. It encodes/decodes [`Packet`]'s to and from the specified UDP +//! * Transport - The transport for this protocol is currently fixed to UDP and is realised by a +//! [`Transport`]. It encodes/decodes [Packet]'s to and from the specified UDP //! socket. //! * Session - The protocol's communication is encrypted with `AES_GCM`. All node communication //! undergoes a handshake, which results in a [`Session`]. [`Session`]'s are established when //! needed and get dropped after a timeout. This section manages the creation and maintenance of //! sessions between nodes. It is realised by the [`SessionService`] struct. -//! * Behaviour - This section contains the protocol-level logic. In particular it manages the +//! * Application - This section contains the protocol-level logic. In particular it manages the //! routing table of known ENR's, topic registration/advertisement and performs various queries //! such as peer discovery. This section is realised by the [`Discv5`] struct. //! @@ -31,13 +34,16 @@ //! //! For a simple CLI discovery service see [discv5-cli](https://github.com/AgeManning/discv5-cli) //! +//! //! # Usage //! -//! The [`Discv5`] service implements [`Stream`] which emits [`Discv5Event`] events. Running a +//! The [`Discv5`] service implements `Stream` which emits [`Discv5Event`] events. Running a //! discv5 service is as simple as initialising a [`Discv5`] struct and driving the stream. //! -//! A simple example of creating this service is as follows: +//! The service can be configured via [`Discv5Config`] which can be created using the +//! [`Discv5ConfigBuilder`]. //! +//! A simple example of creating this service is as follows: //! //! ```rust //! use enr::{Enr,EnrBuilder, CombinedKey}; @@ -78,9 +84,9 @@ //! discv5.find_node(target_random_node_id); //! //! // poll the stream for the next FindNoeResult event -//! loop { -//! match discv5.next().await { -//! Some(Discv5Event::FindNodeResult { closer_peers, .. }) => { +//! while let Some(event) = discv5.next().await { +//! match event { +//! Discv5Event::FindNodeResult { closer_peers, .. } => { //! println!("Query completed. Found {} peers", closer_peers.len()); //! break; //! } @@ -92,14 +98,14 @@ //! //! To see a usage in a runtime environment, see the `find_nodes` example in `/examples`. //! -//! [`Enr`]: enr::Enr -//! [`Discv5`]: crate::Discv5 -//! [`Discv5Event`]: crate::Discv5Event.enum -//! [`Discv5Service`]: crate::service::Discv5Service -//! [`Packet`]: crate::service::Packet -//! [`SessionService`]: crate::session_service::SessionService -//! [`Session`]: crate::session::Session -//! [`Stream`]: future::stream::Stream +//! [`Discv5`]: struct.Discv5.html +//! [`Discv5Event`]: enum.Discv5Event.html +//! [`Discv5Config`]: config/struct.Discv5Config.html +//! [`Discv5ConfigBuilder`]: config/struct.Discv5ConfigBuilder.html +//! [`Transport`]: transport/struct.Transport.html +//! [Packet]: packet/enum.Packet.html +//! [`SessionService`]: session_service/struct.SessionService.html +//! [`Session`]: session/struct.Session.html mod config; mod discv5; @@ -108,9 +114,9 @@ mod kbucket; mod packet; mod query_pool; mod rpc; -mod service; mod session; mod session_service; +mod transport; pub use crate::discv5::{Discv5, Discv5Event}; pub use config::{Discv5Config, Discv5ConfigBuilder}; diff --git a/src/packet/auth_header.rs b/src/packet/auth_header.rs index 9db288560..8cb3b2300 100644 --- a/src/packet/auth_header.rs +++ b/src/packet/auth_header.rs @@ -88,7 +88,7 @@ impl Encodable for AuthResponse { } impl Decodable for AuthResponse { - fn decode(rlp: &Rlp) -> Result { + fn decode(rlp: &Rlp<'_>) -> Result { if !rlp.is_list() { return Err(DecoderError::RlpExpectedToBeList); } @@ -124,7 +124,7 @@ impl Encodable for AuthHeader { } impl Decodable for AuthHeader { - fn decode(rlp: &Rlp) -> Result { + fn decode(rlp: &Rlp<'_>) -> Result { match rlp.item_count() { Ok(size) => { if size != 5 { diff --git a/src/packet/mod.rs b/src/packet/mod.rs index 8363e3550..c76c52333 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -2,10 +2,12 @@ //! //! The [discv5 wire specification](https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md) provides further information on UDP message packets as implemented in this module. //! -//! The `Packet` struct defines all raw UDP message variants and implements the encoding/decoding +//! A [`Packet`] defines all raw UDP message variants and implements the encoding/decoding //! logic. //! //! Note, that all message encryption/decryption is handled outside of this module. +//! +//! [`Packet`]: enum.Packet.html mod auth_header; diff --git a/src/query_pool.rs b/src/query_pool.rs index 6a24795cd..aaf042f16 100644 --- a/src/query_pool.rs +++ b/src/query_pool.rs @@ -23,8 +23,8 @@ mod peers; -pub use peers::closest::{FindNodeQuery, FindNodeQueryConfig}; -pub use peers::predicate::{PredicateQuery, PredicateQueryConfig}; +pub(crate) use peers::closest::{FindNodeQuery, FindNodeQueryConfig}; +pub(crate) use peers::predicate::{PredicateQuery, PredicateQueryConfig}; pub use peers::{QueryState, ReturnPeer}; use crate::kbucket::{Key, PredicateKey}; @@ -98,7 +98,7 @@ where } /// Adds a query to the pool that returns peers that satisfy a predicate. - pub fn add_predicate_query( + pub(crate) fn add_predicate_query( &mut self, config: PredicateQueryConfig, target: TTarget, @@ -133,7 +133,7 @@ where } /// Polls the pool to advance the queries. - pub fn poll(&mut self) -> QueryPoolState { + pub fn poll(&mut self) -> QueryPoolState<'_, TTarget, TNodeId, TResult> { let now = Instant::now(); let mut finished = None; let mut waiting = None; diff --git a/src/query_pool/peers/predicate.rs b/src/query_pool/peers/predicate.rs index ddce271b6..73f55d7d1 100644 --- a/src/query_pool/peers/predicate.rs +++ b/src/query_pool/peers/predicate.rs @@ -5,7 +5,7 @@ use std::collections::btree_map::{BTreeMap, Entry}; use std::iter::FromIterator; use std::time::{Duration, Instant}; -pub struct PredicateQuery { +pub(crate) struct PredicateQuery { /// The target key we are looking for target_key: Key, @@ -30,20 +30,20 @@ pub struct PredicateQuery { /// Configuration for a `Query`. #[derive(Debug, Clone)] -pub struct PredicateQueryConfig { +pub(crate) struct PredicateQueryConfig { /// Allowed level of parallelism. /// /// The `α` parameter in the Kademlia paper. The maximum number of peers that a query /// is allowed to wait for in parallel while iterating towards the closest /// nodes to a target. Defaults to `3`. - pub parallelism: usize, + pub(crate) parallelism: usize, /// Number of results to produce. /// /// The number of closest peers that a query must obtain successful results /// for before it terminates. Defaults to the maximum number of entries in a /// single k-bucket, i.e. the `k` parameter in the Kademlia paper. - pub num_results: usize, + pub(crate) num_results: usize, /// The timeout for a single peer. /// @@ -51,11 +51,11 @@ pub struct PredicateQueryConfig { /// window, the iterator considers the peer unresponsive and will not wait for /// the peer when evaluating the termination conditions, until and unless a /// result is delivered. Defaults to `10` seconds. - pub peer_timeout: Duration, + pub(crate) peer_timeout: Duration, } impl PredicateQueryConfig { - pub fn new_from_config(config: &Discv5Config) -> Self { + pub(crate) fn new_from_config(config: &Discv5Config) -> Self { Self { parallelism: config.query_parallelism, num_results: MAX_NODES_PER_BUCKET, diff --git a/src/rpc.rs b/src/rpc.rs index 139f88925..a8ed2293f 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -6,19 +6,19 @@ use std::net::IpAddr; type TopicHash = [u8; 32]; #[derive(Debug, Clone, PartialEq)] -pub struct ProtocolMessage { - pub id: u64, - pub body: RpcType, +pub(crate) struct ProtocolMessage { + pub(crate) id: u64, + pub(crate) body: RpcType, } #[derive(Debug, Clone, PartialEq)] -pub enum RpcType { +pub(crate) enum RpcType { Request(Request), Response(Response), } #[derive(Debug, Clone, PartialEq)] -pub enum Request { +pub(crate) enum Request { Ping { enr_seq: u64 }, FindNode { distance: u64 }, Ticket { topic: TopicHash }, @@ -27,7 +27,7 @@ pub enum Request { } #[derive(Debug, Clone, PartialEq)] -pub enum Response { +pub(crate) enum Response { Ping { enr_seq: u64, ip: IpAddr, @@ -48,7 +48,7 @@ pub enum Response { impl Response { /// Determines if the response is a valid response to the given request. - pub fn match_request(&self, req: &Request) -> bool { + pub(crate) fn match_request(&self, req: &Request) -> bool { match self { Response::Ping { .. } => { if let Request::Ping { .. } = req { @@ -81,7 +81,7 @@ impl Response { } impl std::fmt::Display for RpcType { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RpcType::Request(request) => write!(f, "{:?}", request), RpcType::Response(response) => write!(f, "{}", response), @@ -90,7 +90,7 @@ impl std::fmt::Display for RpcType { } impl std::fmt::Display for Response { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Response::Ping { enr_seq, ip, port } => write!( f, @@ -124,13 +124,13 @@ impl std::fmt::Display for Response { } impl std::fmt::Display for ProtocolMessage { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Message: Id: {}, Body: {}", self.id, self.body) } } impl ProtocolMessage { - pub fn msg_type(&self) -> u8 { + pub(crate) fn msg_type(&self) -> u8 { match &self.body { RpcType::Request(request) => match request { Request::Ping { .. } => 1, @@ -149,7 +149,7 @@ impl ProtocolMessage { } /// Encodes a ProtocolMessage to RLP-encoded bytes. - pub fn encode(self) -> Vec { + pub(crate) fn encode(self) -> Vec { let mut buf = Vec::with_capacity(10); let msg_type = self.msg_type(); buf.push(msg_type); @@ -250,7 +250,7 @@ impl ProtocolMessage { } } - pub fn decode(data: Vec) -> Result { + pub(crate) fn decode(data: Vec) -> Result { if data.len() < 3 { return Err(DecoderError::RlpIsTooShort); } diff --git a/src/service.rs b/src/service.rs deleted file mode 100644 index cc04f4412..000000000 --- a/src/service.rs +++ /dev/null @@ -1,104 +0,0 @@ -//! The base UDP layer of the discv5 service. -//! -//! The `Discv5Service` opens a UDP socket and handles the encoding/decoding of raw Discv5 -//! messages. These messages are defined in the `Packet` module. - -use super::packet::{Packet, MAGIC_LENGTH}; -use async_std::net::UdpSocket; -use futures::prelude::*; -use log::debug; -use std::{ - io, - net::SocketAddr, - pin::Pin, - task::{Context, Poll}, -}; - -pub(crate) const MAX_PACKET_SIZE: usize = 1280; - -/// The main service that handles the transport. Specifically the UDP sockets and packet -/// encoding/decoding. -pub struct Discv5Service { - /// The UDP socket for interacting over UDP. - socket: UdpSocket, - /// The buffer to accept inbound datagrams. - recv_buffer: [u8; MAX_PACKET_SIZE], - /// List of discv5 packets to send. - send_queue: Vec<(SocketAddr, Packet)>, - /// WhoAreYou Magic Value. Used to decode raw WHOAREYOU packets. - whoareyou_magic: [u8; MAGIC_LENGTH], -} - -impl Discv5Service { - /// Initializes the UDP socket, can fail when binding the socket. - pub fn new(socket_addr: SocketAddr, whoareyou_magic: [u8; MAGIC_LENGTH]) -> io::Result { - // set up the UDP socket - let socket = { - #[cfg(unix)] - fn platform_specific(s: &net2::UdpBuilder) -> io::Result<()> { - net2::unix::UnixUdpBuilderExt::reuse_port(s, true)?; - Ok(()) - } - #[cfg(not(unix))] - fn platform_specific(_: &net2::UdpBuilder) -> io::Result<()> { - Ok(()) - } - let builder = net2::UdpBuilder::new_v4()?; - builder.reuse_address(true)?; - platform_specific(&builder)?; - builder.bind(socket_addr)? - }; - let socket = UdpSocket::from(socket); - - Ok(Discv5Service { - socket, - recv_buffer: [0; MAX_PACKET_SIZE], - send_queue: Vec::new(), - whoareyou_magic, - }) - } - - /// Add packets to the send queue. - pub fn send(&mut self, to: SocketAddr, packet: Packet) { - self.send_queue.push((to, packet)); - } - - /// Drive reading/writing to the UDP socket. - pub async fn poll(&mut self) -> (SocketAddr, Packet) { - loop { - // send messages - while !self.send_queue.is_empty() { - let (dst, packet) = self.send_queue.remove(0); - - match self.socket.send_to(&packet.encode(), &dst).await { - Ok(bytes_written) => { - debug_assert_eq!(bytes_written, packet.encode().len()); - } - Err(_) => { - self.send_queue.clear(); - break; - } - } - } - - // handle incoming messages - if let Ok((length, src)) = self.socket.recv_from(&mut self.recv_buffer).await { - match Packet::decode(&self.recv_buffer[..length], &self.whoareyou_magic) { - Ok(p) => return (src, p), - Err(e) => debug!("Could not decode packet: {:?}", e), // could not decode the packet, drop it - } - } - } - } -} - -impl Stream for Discv5Service { - type Item = (SocketAddr, Packet); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match Box::pin(self.poll()).as_mut().poll(cx) { - Poll::Ready(v) => Poll::Ready(Some(v)), - Poll::Pending => Poll::Pending, - } - } -} diff --git a/src/session/crypto.rs b/src/session/crypto.rs index 0f4c27e57..5fc472e40 100644 --- a/src/session/crypto.rs +++ b/src/session/crypto.rs @@ -28,7 +28,7 @@ type Key = [u8; KEY_LENGTH]; /// Generates session and auth-response keys for a nonce and remote ENR. This currently only /// supports Secp256k1 signed ENR's. This returns four keys; initiator key, responder key, auth /// response key and the ephemeral public key. -pub fn generate_session_keys( +pub(crate) fn generate_session_keys( local_id: &NodeId, remote_enr: &Enr, id_nonce: &Nonce, @@ -85,7 +85,7 @@ fn derive_key( } /// Derives the session keys for a public key type that matches the local keypair. -pub fn derive_keys_from_pubkey( +pub(crate) fn derive_keys_from_pubkey( local_key: &CombinedKey, local_id: &NodeId, remote_id: &NodeId, @@ -115,7 +115,7 @@ pub fn derive_keys_from_pubkey( /// Generates a signature of a nonce given a keypair. This prefixes the `NONCE_PREFIX` to the /// signature. -pub fn sign_nonce( +pub(crate) fn sign_nonce( signing_key: &CombinedKey, nonce: &Nonce, ephem_pubkey: &[u8], @@ -134,7 +134,7 @@ pub fn sign_nonce( } /// Verifies the authentication header nonce. -pub fn verify_authentication_nonce( +pub(crate) fn verify_authentication_nonce( remote_pubkey: &CombinedPublicKey, remote_ephem_pubkey: &[u8], nonce: &Nonce, @@ -170,7 +170,7 @@ fn generate_signing_nonce(id_nonce: &Nonce, ephem_pubkey: &[u8]) -> Vec { /// Verifies the encoding and nonce signature given in the authentication header. If /// the header contains an updated ENR, it is returned. -pub fn decrypt_authentication_header( +pub(crate) fn decrypt_authentication_header( auth_resp_key: &Key, header: &AuthHeader, ) -> Result { @@ -186,7 +186,7 @@ pub fn decrypt_authentication_header( } /// Decrypt messages that are post-fixed with an authenticated MAC. -pub fn decrypt_message( +pub(crate) fn decrypt_message( key: &Key, nonce: AuthTag, message: &[u8], @@ -216,7 +216,7 @@ pub fn decrypt_message( /// A wrapper around the underlying default AES_GCM implementation. This may be abstracted in the /// future. -pub fn encrypt_message( +pub(crate) fn encrypt_message( key: &Key, nonce: AuthTag, message: &[u8], diff --git a/src/session/mod.rs b/src/session/mod.rs index 363f10630..9792b0599 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -1,4 +1,4 @@ -//! The `Session` struct handles the stages of creating and establishing a handshake with a +//! A [`Session`] handles the stages of creating and establishing a handshake with a //! peer. //! //! There are two ways a Session can get initialised. @@ -7,7 +7,9 @@ //! - A message was received from an unknown peer and we start the `Session` by sending a //! WHOAREYOU message. //! -//! This `Session` module is responsible for generating, deriving and holding keys for sessions for known peers. +//! This module is responsible for generating, deriving and holding keys for sessions for known peers. +//! +//! [`Session`]: struct.Session.html use super::packet::{AuthHeader, AuthResponse, AuthTag, Nonce, Packet, Tag, MAGIC_LENGTH}; use crate::Discv5Error; @@ -26,7 +28,7 @@ const WHOAREYOU_STRING: &str = "WHOAREYOU"; /// a session can be in, initializing (`WhoAreYouSent` or `RandomSent`), `Untrusted` (when the /// socket address of the ENR doesn't match the `last_seen_socket`) and `Established` (the session /// has been successfully established). -pub struct Session { +pub(crate) struct Session { /// The current state of the Session state: SessionState, @@ -42,19 +44,19 @@ pub struct Session { } #[derive(Zeroize, PartialEq)] -pub struct Keys { +pub(crate) struct Keys { /// The Authentication response key. - pub auth_resp_key: [u8; 16], + auth_resp_key: [u8; 16], /// The encryption key. - pub encryption_key: [u8; 16], + encryption_key: [u8; 16], /// The decryption key. - pub decryption_key: [u8; 16], + decryption_key: [u8; 16], } /// A State -pub enum TrustedState { +enum TrustedState { /// The ENR socket address matches what is observed Trusted, /// The source socket address of the last message doesn't match the known ENR. In this state, the service will respond to requests, but does not treat the node as @@ -64,7 +66,7 @@ pub enum TrustedState { #[derive(PartialEq)] /// The current state of the session. This enum holds the encryption keys for various states. -pub enum SessionState { +pub(crate) enum SessionState { /// A WHOAREYOU packet has been sent, and the Session is awaiting an Authentication response. WhoAreYouSent, @@ -100,7 +102,7 @@ impl Session { /// Creates a new `Session` instance and generates a RANDOM packet to be sent along with this /// session being established. This session is set to `RandomSent` state. - pub fn new_random(tag: Tag, remote_enr: Enr) -> (Self, Packet) { + pub(crate) fn new_random(tag: Tag, remote_enr: Enr) -> (Self, Packet) { let random_packet = Packet::random(tag); let session = Session { @@ -115,7 +117,7 @@ impl Session { /// Creates a new `Session` and generates an associated WHOAREYOU packet. The returned session is in the /// `WhoAreYouSent` state. - pub fn new_whoareyou( + pub(crate) fn new_whoareyou( node_id: &NodeId, enr_seq: u64, remote_enr: Option>, @@ -157,7 +159,7 @@ impl Session { /// Generates session keys from an authentication header. If the IP of the ENR does not match the /// source IP address, we consider this session untrusted. The output returns a boolean which /// specifies if the Session is trusted or not. - pub fn establish_from_header( + pub(crate) fn establish_from_header( &mut self, local_key: &CombinedKey, local_id: &NodeId, @@ -225,7 +227,7 @@ impl Session { /* Encryption Related Functions */ /// Encrypts a message and produces an AuthMessage. - pub fn encrypt_with_header( + pub(crate) fn encrypt_with_header( &mut self, tag: Tag, local_key: &CombinedKey, @@ -295,7 +297,7 @@ impl Session { /// Uses the current `Session` to encrypt a message. Encrypt packets with the current session /// key if we are awaiting a response from AuthMessage. - pub fn encrypt_message(&self, tag: Tag, message: &[u8]) -> Result { + pub(crate) fn encrypt_message(&self, tag: Tag, message: &[u8]) -> Result { //TODO: Establish a counter to prevent repeats of nonce let auth_tag: AuthTag = rand::random(); @@ -321,7 +323,7 @@ impl Session { /// Decrypts an encrypted message. If a Session is already established, the original decryption /// keys are tried first, upon failure, the new keys are attempted. If the new keys succeed, /// the session keys are updated along with the Session state. - pub fn decrypt_message( + pub(crate) fn decrypt_message( &mut self, nonce: AuthTag, message: &[u8], @@ -391,7 +393,7 @@ impl Session { /* Session Helper Functions */ - pub fn update_enr(&mut self, enr: Enr) -> bool { + pub(crate) fn update_enr(&mut self, enr: Enr) -> bool { if let Some(remote_enr) = &self.remote_enr { if remote_enr.seq() < enr.seq() { self.remote_enr = Some(enr); @@ -405,7 +407,7 @@ impl Session { /// Updates the trusted status of a Session. It can be promoted to an `established` state, or /// demoted to an `untrusted` state. This value returns true if the Session has been /// promoted. - pub fn update_trusted(&mut self) -> bool { + pub(crate) fn update_trusted(&mut self) -> bool { if let TrustedState::Untrusted = self.trusted { if let Some(remote_enr) = &self.remote_enr { if Some(self.last_seen_socket) == remote_enr.udp_socket() { @@ -424,19 +426,19 @@ impl Session { } /// The socket address of the last packer received from this node. - pub fn set_last_seen_socket(&mut self, socket: SocketAddr) { + pub(crate) fn set_last_seen_socket(&mut self, socket: SocketAddr) { self.last_seen_socket = socket; } - pub fn is_whoareyou_sent(&self) -> bool { + pub(crate) fn is_whoareyou_sent(&self) -> bool { SessionState::WhoAreYouSent == self.state } - pub fn is_random_sent(&self) -> bool { + pub(crate) fn is_random_sent(&self) -> bool { SessionState::RandomSent == self.state } - pub fn is_awaiting_response(&self) -> bool { + pub(crate) fn is_awaiting_response(&self) -> bool { if let SessionState::AwaitingResponse(_) = self.state { true } else { @@ -444,11 +446,11 @@ impl Session { } } - pub fn remote_enr(&self) -> &Option> { + pub(crate) fn remote_enr(&self) -> &Option> { &self.remote_enr } - pub fn is_trusted(&self) -> bool { + pub(crate) fn is_trusted(&self) -> bool { if let TrustedState::Trusted = self.trusted { true } else { @@ -458,7 +460,7 @@ impl Session { /// Returns true if the Session is trusted and has established session keys. This state means /// the session is capable of sending requests. - pub fn trusted_established(&self) -> bool { + pub(crate) fn trusted_established(&self) -> bool { let established = match &self.state { SessionState::WhoAreYouSent => false, SessionState::RandomSent => false, diff --git a/src/session_service.rs b/src/session_service.rs index b349a495f..648785734 100644 --- a/src/session_service.rs +++ b/src/session_service.rs @@ -1,6 +1,6 @@ //! Session management for the Discv5 Discovery service. //! -//! The `SessionService` is responsible for establishing and maintaining sessions with +//! The [`SessionService`] is responsible for establishing and maintaining sessions with //! connected/discovered nodes. Each node, identified by it's [`NodeId`] is associated with a //! [`Session`]. This service drives the handshakes for establishing the sessions and associated //! logic for sending/requesting initial connections/ENR's from unknown peers. @@ -9,14 +9,14 @@ //! session timeouts and received messages. Messages are encrypted and decrypted using the //! associated `Session` for each node. //! -//! An ongoing connection is managed by the `Session` struct. A node that provides and ENR with an +//! An ongoing connection is managed by [`Session`]. A node that provides and ENR with an //! IP address/port that doesn't match the source, is considered untrusted. Once the IP is updated //! to match the source, the `Session` is promoted to an established state. RPC requests are not sent //! to untrusted Sessions, only responses. //TODO: Document the event structure and WHOAREYOU requests to the protocol layer. //TODO: Limit packets per node to avoid DOS/Spam. -use super::service::Discv5Service; +use super::transport::Transport; use crate::config::Discv5Config; use crate::error::Discv5Error; use crate::packet::{AuthHeader, AuthTag, Magic, Nonce, Packet, Tag, TAG_LENGTH}; @@ -41,7 +41,7 @@ mod timed_sessions; use timed_requests::TimedRequests; use timed_sessions::TimedSessions; -pub struct SessionService { +pub(crate) struct SessionService { /// Queue of events produced by the session service. events: VecDeque, @@ -68,14 +68,14 @@ pub struct SessionService { sessions: TimedSessions, /// The discovery v5 UDP service. - service: Discv5Service, + transport: Transport, } impl SessionService { /* Public Functions */ /// A new Session service which instantiates the UDP socket. - pub fn new( + pub(crate) fn new( enr: Enr, key: enr::CombinedKey, listen_socket: SocketAddr, @@ -98,24 +98,28 @@ impl SessionService { pending_requests: TimedRequests::new(config.request_timeout), pending_messages: HashMap::default(), sessions: TimedSessions::new(config.session_establish_timeout), - service: Discv5Service::new(listen_socket, magic) + transport: Transport::new(listen_socket, magic) .map_err(|e| Discv5Error::Error(format!("{:?}", e)))?, config, }) } /// The local ENR of the service. - pub fn enr(&self) -> &Enr { + pub(crate) fn enr(&self) -> &Enr { &self.enr } /// Generic function to modify a field in the local ENR. - pub fn enr_insert(&mut self, key: &str, value: Vec) -> Result>, EnrError> { + pub(crate) fn enr_insert( + &mut self, + key: &str, + value: Vec, + ) -> Result>, EnrError> { self.enr.insert(key, value, &self.key) } /// Updates the local ENR `SocketAddr` for either the TCP or UDP port. - pub fn update_local_enr_socket( + pub(crate) fn update_local_enr_socket( &mut self, socket: SocketAddr, is_tcp: bool, @@ -136,7 +140,7 @@ impl SessionService { } /// Updates a session if a new ENR or an updated ENR is discovered. - pub fn update_enr(&mut self, enr: Enr) { + pub(crate) fn update_enr(&mut self, enr: Enr) { if let Some(session) = self.sessions.get_mut(&enr.node_id()) { // if an ENR is updated to an address that was not the last seen address of the // session, we demote the session to untrusted. @@ -151,7 +155,7 @@ impl SessionService { /// addresses not related to the ENR. // To update an ENR for an unknown node, we request a FINDNODE with distance 0 to the IP // address that we know of. - pub fn send_request( + pub(crate) fn send_request( &mut self, dst_enr: &Enr, message: ProtocolMessage, @@ -225,7 +229,7 @@ impl SessionService { /// therefore assumed to be valid. // An example of this is requesting an ENR update from a NODE who's IP address is incorrect. // We send this request as a response to a ping. Assume a session is valid. - pub fn send_request_unknown_enr( + pub(crate) fn send_request_unknown_enr( &mut self, dst: SocketAddr, dst_id: &NodeId, @@ -250,7 +254,7 @@ impl SessionService { /// Sends an RPC Response. This differs from send request as responses do not require a /// known ENR to send messages and session's should already be established. - pub fn send_response( + pub(crate) fn send_response( &mut self, dst: SocketAddr, dst_id: &NodeId, @@ -270,14 +274,13 @@ impl SessionService { })?; // send the response - // trace!("Sending Response: {:?} to {:?}", packet, dst); - self.service.send(dst, packet); + self.transport.send(dst, packet); Ok(()) } /// This is called in response to a SessionMessage::WhoAreYou event. The protocol finds the /// highest known ENR then calls this function to send a WHOAREYOU packet. - pub fn send_whoareyou( + pub(crate) fn send_whoareyou( &mut self, dst: SocketAddr, node_id: &NodeId, @@ -688,7 +691,7 @@ impl SessionService { Ok(()) } - /// Wrapper around `service.send()` that adds all sent messages to the `pending_requests`. This + /// Wrapper around `transport.send()` that adds all sent messages to the `pending_requests`. This /// builds a request adds a timeout and sends the request. #[inline] fn process_request( @@ -700,17 +703,17 @@ impl SessionService { ) { // construct the request let request = Request::new(dst_id, packet, message); - self.service.send(dst, request.packet.clone()); + self.transport.send(dst, request.packet.clone()); self.pending_requests.insert(dst.clone(), request); } /// The heartbeat which checks for timeouts and reports back failed RPC requests/sessions. - fn check_timeouts(&mut self, cx: &mut Context) { + fn check_timeouts(&mut self, cx: &mut Context<'_>) { // remove expired requests/sessions // log pending request timeouts // TODO: Split into own task, to be called only when timeouts are required let sessions_ref = &mut self.sessions; - let service_ref = &mut self.service; + let transport_ref = &mut self.transport; let pending_messages_ref = &mut self.pending_messages; let events_ref = &mut self.events; @@ -746,7 +749,7 @@ impl SessionService { "Resending message: {:?} to node: {}", request.packet, node_id ); - service_ref.send(dst.clone(), request.packet.clone()); + transport_ref.send(dst.clone(), request.packet.clone()); request.retries += 1; self.pending_requests.insert(dst, request); } @@ -779,15 +782,15 @@ impl SessionService { impl Stream for SessionService { type Item = SessionEvent; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { // process any events if necessary if let Some(event) = self.events.pop_front() { return Poll::Ready(Some(event)); } - // poll the discv5 service - match self.service.poll_next_unpin(cx) { + // poll the discv5 transport + match self.transport.poll_next_unpin(cx) { Poll::Ready(Some((src, packet))) => { match packet { Packet::WhoAreYou { @@ -828,7 +831,7 @@ impl Stream for SessionService { #[derive(Debug)] /// The output from polling the `SessionSerivce`. -pub enum SessionEvent { +pub(crate) enum SessionEvent { /// A session has been established with a node. Established(Enr), @@ -854,22 +857,22 @@ pub enum SessionEvent { #[derive(Debug)] /// A request to a node that we are waiting for a response. -pub struct Request { +pub(crate) struct Request { /// The destination NodeId. - pub dst_id: NodeId, + dst_id: NodeId, /// The raw discv5 packet sent. - pub packet: Packet, + packet: Packet, /// The unencrypted message. Required if need to re-encrypt and re-send. - pub message: Option, + message: Option, /// The number of times this request has been re-sent. - pub retries: u8, + retries: u8, } impl Request { - pub fn new(dst_id: NodeId, packet: Packet, message: Option) -> Self { + fn new(dst_id: NodeId, packet: Packet, message: Option) -> Self { Request { dst_id, packet, @@ -878,7 +881,7 @@ impl Request { } } - pub fn id(&self) -> Option { + fn id(&self) -> Option { self.message.as_ref().map(|m| m.id) } } diff --git a/src/session_service/timed_requests.rs b/src/session_service/timed_requests.rs index c4f83f6d0..e6522b7d1 100644 --- a/src/session_service/timed_requests.rs +++ b/src/session_service/timed_requests.rs @@ -16,7 +16,7 @@ use std::{ use tokio::time::{delay_queue, DelayQueue}; /// A collection of requests that have an associated timeout. -pub struct TimedRequests { +pub(crate) struct TimedRequests { /// Pending raw requests with timeout keys for removing from a delay queue and to be identified during a timeout. /// These are indexed by SocketAddr as WHOAREYOU messages do not return a source node id to /// match against. @@ -37,11 +37,11 @@ pub struct TimedRequests { struct RequestKey(usize); impl RequestKey { - pub fn new() -> Self { + pub(crate) fn new() -> Self { RequestKey(0) } - pub fn next(self) -> RequestKey { + pub(crate) fn next(self) -> RequestKey { RequestKey(self.0.saturating_add(1)) } } @@ -56,7 +56,7 @@ struct TimeoutIndex { } impl TimeoutIndex { - pub fn new(dst: SocketAddr, request_key: RequestKey) -> Self { + fn new(dst: SocketAddr, request_key: RequestKey) -> Self { TimeoutIndex { dst, request_key } } } @@ -64,13 +64,17 @@ impl TimeoutIndex { /// A request with an attached delay queue key and request key. Allows for removing the delay /// timeout when being removed and for being removed from the collection being timed out. struct RequestTimeout { - pub request: Request, - pub delay_key: delay_queue::Key, - pub request_key: RequestKey, + request: Request, + delay_key: delay_queue::Key, + request_key: RequestKey, } impl RequestTimeout { - pub fn new(request: Request, delay_key: delay_queue::Key, request_key: RequestKey) -> Self { + pub(crate) fn new( + request: Request, + delay_key: delay_queue::Key, + request_key: RequestKey, + ) -> Self { RequestTimeout { request, delay_key, @@ -86,7 +90,7 @@ impl Default for TimedRequests { } impl TimedRequests { - pub fn new(request_timeout: Duration) -> Self { + pub(crate) fn new(request_timeout: Duration) -> Self { TimedRequests { requests: HashMap::new(), timeouts: DelayQueue::new(), @@ -97,7 +101,7 @@ impl TimedRequests { /// Removes a request based on the given filter. Returns `Some(Request)` if the request exists, /// otherwise returns None. - pub fn remove bool>( + pub(crate) fn remove bool>( &mut self, src: &SocketAddr, mut filter: F, @@ -117,7 +121,7 @@ impl TimedRequests { } } - pub fn insert(&mut self, dst: SocketAddr, request: Request) { + pub(crate) fn insert(&mut self, dst: SocketAddr, request: Request) { // create a timeout for the request let timeout_index = TimeoutIndex::new(dst, self.current_key); let delay_key = self.timeouts.insert(timeout_index, self.request_timeout); @@ -130,7 +134,7 @@ impl TimedRequests { self.current_key = self.current_key.next(); } - pub fn exists bool>(&self, mut filter: F) -> bool { + pub(crate) fn exists bool>(&self, mut filter: F) -> bool { self.requests .iter() .any(|(_dst, v)| v.iter().any(|req| filter(&req.request))) @@ -140,7 +144,7 @@ impl TimedRequests { impl Stream for TimedRequests { type Item = (SocketAddr, Request); - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.timeouts.poll_expired(cx) { Poll::Ready(Some(Ok(timeout_index))) => { let timeout_index = timeout_index.get_ref(); diff --git a/src/session_service/timed_sessions.rs b/src/session_service/timed_sessions.rs index d5f2b880d..5f1907f3b 100644 --- a/src/session_service/timed_sessions.rs +++ b/src/session_service/timed_sessions.rs @@ -19,7 +19,7 @@ use tokio::time::{delay_queue, DelayQueue}; /// /// Sessions have an establishment timeout as /// well as lifetime. -pub struct TimedSessions { +pub(crate) struct TimedSessions { /// The sessions being established. sessions: HashMap, /// A queue indicating when a session has timed out. @@ -29,7 +29,7 @@ pub struct TimedSessions { } impl TimedSessions { - pub fn new(session_establish_timeout: Duration) -> Self { + pub(crate) fn new(session_establish_timeout: Duration) -> Self { TimedSessions { sessions: HashMap::new(), timeouts: DelayQueue::new(), @@ -37,31 +37,31 @@ impl TimedSessions { } } - pub fn insert(&mut self, node_id: NodeId, session: Session) { + pub(crate) fn insert(&mut self, node_id: NodeId, session: Session) { self.insert_at(node_id, session, self.session_establish_timeout); } - pub fn insert_at(&mut self, node_id: NodeId, session: Session, duration: Duration) { + pub(crate) fn insert_at(&mut self, node_id: NodeId, session: Session, duration: Duration) { let delay = self.timeouts.insert(node_id.clone(), duration); self.sessions.insert(node_id, (session, delay)); } - pub fn get(&self, node_id: &NodeId) -> Option<&Session> { + pub(crate) fn get(&self, node_id: &NodeId) -> Option<&Session> { self.sessions.get(node_id).map(|&(ref v, _)| v) } - pub fn get_mut(&mut self, node_id: &NodeId) -> Option<&mut Session> { + pub(crate) fn get_mut(&mut self, node_id: &NodeId) -> Option<&mut Session> { self.sessions.get_mut(node_id).map(|(v, _)| v) } - pub fn update_timeout(&mut self, node_id: &NodeId, timeout: Duration) { + pub(crate) fn update_timeout(&mut self, node_id: &NodeId, timeout: Duration) { if let Some((_, key)) = self.sessions.get(node_id) { self.timeouts.reset(key, timeout); } } - pub fn remove(&mut self, node_id: &NodeId) { + pub(crate) fn remove(&mut self, node_id: &NodeId) { if let Some((_, delay_key)) = self.sessions.remove(node_id) { self.timeouts.remove(&delay_key); } @@ -71,7 +71,7 @@ impl TimedSessions { impl Stream for TimedSessions { type Item = (NodeId, Session); - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.timeouts.poll_expired(cx) { Poll::Ready(Some(Ok(node_id))) => { let node_id = node_id.into_inner(); diff --git a/src/transport.rs b/src/transport.rs new file mode 100644 index 000000000..dacf75de1 --- /dev/null +++ b/src/transport.rs @@ -0,0 +1,136 @@ +//! The base UDP layer of the Discv5 service. +//! +//! The [`Transport`] opens a UDP socket and handles the encoding/decoding of raw Discv5 +//! messages. These messages are defined in the [`Packet`] module. +//! +//! [`Transport`]: transport/struct.Transport.html +//! [`Packet`]: ../packet/index.html + +use super::packet::{Packet, MAGIC_LENGTH}; +use futures::prelude::*; +use log::debug; +use std::{ + io, + net::SocketAddr, + pin::Pin, + task::{Context, Poll, Waker}, +}; +use tokio::net::UdpSocket; + +pub(crate) const MAX_PACKET_SIZE: usize = 1280; + +/// The main service that handles the transport. Specifically the UDP sockets and packet +/// encoding/decoding. +pub(crate) struct Transport { + /// The UDP socket for interacting over UDP. + socket: UdpSocket, + /// List of discv5 packets to send. + send_queue: Vec<(SocketAddr, Packet)>, + /// The buffer to accept inbound datagrams. + recv_buffer: [u8; MAX_PACKET_SIZE], + /// WhoAreYou Magic Value. Used to decode raw WHOAREYOU packets. + whoareyou_magic: [u8; MAGIC_LENGTH], + /// Waker to awake the thread on new messages. + waker: Option, +} + +impl Transport { + /// Initializes the UDP socket, can fail when binding the socket. + pub(crate) fn new( + socket_addr: SocketAddr, + whoareyou_magic: [u8; MAGIC_LENGTH], + ) -> io::Result { + // set up the UDP socket + let socket = { + #[cfg(unix)] + fn platform_specific(s: &net2::UdpBuilder) -> io::Result<()> { + net2::unix::UnixUdpBuilderExt::reuse_port(s, true)?; + Ok(()) + } + #[cfg(not(unix))] + fn platform_specific(_: &net2::UdpBuilder) -> io::Result<()> { + Ok(()) + } + let builder = net2::UdpBuilder::new_v4()?; + builder.reuse_address(true)?; + platform_specific(&builder)?; + builder.bind(socket_addr)? + }; + let socket = UdpSocket::from_std(socket)?; + + Ok(Transport { + socket, + send_queue: Vec::new(), + recv_buffer: [0; MAX_PACKET_SIZE], + whoareyou_magic, + waker: None, + }) + } + + /// Add packets to the send queue. + pub(crate) fn send(&mut self, to: SocketAddr, packet: Packet) { + self.send_queue.push((to, packet)); + if let Some(waker) = &self.waker { + waker.wake_by_ref() + } + } +} + +impl Stream for Transport { + type Item = (SocketAddr, Packet); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let s = self.get_mut(); + if let Some(waker) = &s.waker { + if waker.will_wake(cx.waker()) { + s.waker = Some(cx.waker().clone()); + } + } else { + s.waker = Some(cx.waker().clone()); + } + + // send messages + while !s.send_queue.is_empty() { + let (dst, packet) = s.send_queue.remove(0); + + match s.socket.poll_send_to(cx, &packet.encode(), &dst) { + Poll::Ready(Ok(bytes_written)) => { + debug_assert_eq!(bytes_written, packet.encode().len()); + } + Poll::Pending => { + // didn't write add back and break + s.send_queue.insert(0, (dst, packet)); + // notify to try again + cx.waker().wake_by_ref(); + break; + } + Poll::Ready(Err(_)) => { + s.send_queue.clear(); + break; + } + } + } + + // handle incoming messages + loop { + match s.socket.poll_recv_from(cx, &mut s.recv_buffer) { + Poll::Ready(Ok((length, src))) => { + let whoareyou_magic = s.whoareyou_magic; + match Packet::decode(&s.recv_buffer[..length], &whoareyou_magic) { + Ok(p) => { + return Poll::Ready(Some((src, p))); + } + Err(e) => debug!("Could not decode packet: {:?}", e), // could not decode the packet, drop it + } + } + Poll::Pending => { + break; + } + Poll::Ready(Err(_)) => { + break; + } // wait for reconnection to poll again. + } + } + Poll::Pending + } +}