Skip to content

Commit

Permalink
Correct sending async future (#2)
Browse files Browse the repository at this point in the history
* Temporary fix for sending udp packets

* Mutable self in service poll

* Doc and code improvements

* Version bump

* Update enr version

* Update enr version
  • Loading branch information
AgeManning authored Apr 30, 2020
1 parent ef04fda commit dafd11b
Show file tree
Hide file tree
Showing 20 changed files with 326 additions and 269 deletions.
15 changes: 9 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
[package]
name = "discv5"
authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"
version = "0.1.0-alpha"
version = "0.1.0-alpha.1"
description = "Implementation of the p2p discv5 discovery protocol"
authors = ["Age Manning <Age@AgeManning.com>"]
license = "MIT"
repository = "https://github.com/sigp/discv5"
readme = "./README.md"
keywords = ["peer-to-peer", "libp2p", "networking", "discovery", "discv5"]
categories = ["network-programming", "asynchronous"]
exclude = [
".gitignore",
".github/*"
]

[dependencies]
enr = { version = "0.1.0-alpha.5", features = ["libsecp256k1", "ed25519"] }
tokio = { version = "0.2.19", features = ["time", "stream"] }
enr = { version = "0.1.0-alpha.6", features = ["libsecp256k1", "ed25519"] }
tokio = { version = "0.2.20", features = ["net", "stream", "time"] }
zeroize = { version = "1.1.0", features = ["zeroize_derive"] }
libsecp256k1 = "0.3.5"
futures = "0.3.4"
Expand All @@ -27,14 +31,13 @@ fnv = "1.0.6"
arrayvec = "0.5.1"
digest = "0.8.1"
rand = "0.7.3"
async-std = "1.5.0"
net2 = "0.2.33"
smallvec = "1.4.0"

[dev-dependencies]
quickcheck = "0.9.2"
env_logger = "0.7.1"
simple_logger = "1.6.0"
tokio = { version = "0.2.19", features = ["time", "rt-threaded", "macros"] }
tokio = { version = "0.2.20", features = ["time", "rt-threaded", "macros"] }
rand_xorshift = "0.2.0"
rand_core = "0.5.1"
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,17 @@ async fn main() {
let target_random_node_id = enr::NodeId::random();
discv5.find_node(target_random_node_id);

// poll the stream for the next FindNoeResult event
loop {
match discv5.next().await {
Some(Discv5Event::FindNodeResult { closer_peers, .. }) => {
println!("Query completed. Found {} peers", closer_peers.len());
break;
}
_ => {} // handle other discv5 events
}
}

// poll the stream for the next FindNoeResult event
while let Some(event) = discv5.next().await {
match event {
Discv5Event::FindNodeResult { closer_peers, .. } => {
println!("Query completed. Found {} peers", closer_peers.len());
break;
}
_ => {} // handle other discv5 events
}
}
}
```

Expand All @@ -106,7 +107,7 @@ This protocol is split into three main sections/layers:
undergoes a handshake, which results in a [`Session`]. [`Session`]'s are established when
needed and get dropped after a timeout. This section manages the creation and maintenance of
sessions between nodes. It is realised by the [`SessionService`] struct.
* Behaviour - This section contains the protocol-level logic. In particular it manages the
* Application - This section contains the protocol-level logic. In particular it manages the
routing table of known ENR's, topic registration/advertisement and performs various queries
such as peer discovery. This section is realised by the [`Discv5`] struct.

Expand Down
3 changes: 2 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl Default for Discv5Config {
}
}

#[derive(Debug)]
pub struct Discv5ConfigBuilder {
config: Discv5Config,
}
Expand Down Expand Up @@ -144,7 +145,7 @@ impl Discv5ConfigBuilder {
}

impl std::fmt::Debug for Discv5Config {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut builder = f.debug_struct("Discv5Config");
let _ = builder.field("request_timeout", &self.request_timeout);
let _ = builder.field("query_timeout", &self.query_timeout);
Expand Down
7 changes: 3 additions & 4 deletions src/discv5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ use crate::query_pool::{
FindNodeQueryConfig, PredicateQueryConfig, QueryId, QueryPool, QueryPoolState, ReturnPeer,
};
use crate::rpc;
use crate::service::MAX_PACKET_SIZE;
use crate::session_service::{SessionEvent, SessionService};
use crate::transport::MAX_PACKET_SIZE;
use crate::Discv5Config;
use async_std::prelude::*;
use enr::{CombinedKey, Enr as RawEnr, EnrError, EnrKey, NodeId};
use fnv::FnvHashMap;
use futures::prelude::*;
Expand Down Expand Up @@ -517,7 +516,7 @@ impl Discv5 {
rpc_id: u64,
distance: u64,
) {
let nodes: Vec<EntryRefView<NodeId, Enr>> = self
let nodes: Vec<EntryRefView<'_, NodeId, Enr>> = self
.kbuckets
.nodes_by_distance(distance)
.into_iter()
Expand Down Expand Up @@ -937,7 +936,7 @@ impl Discv5 {
impl Stream for Discv5 {
type Item = Discv5Event;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// Process events from the session service
while let Poll::Ready(Some(event)) = self.service.poll_next_unpin(cx) {
Expand Down
2 changes: 1 addition & 1 deletion src/discv5/query_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub enum QueryType {

impl QueryInfo {
/// Builds an RPC Request
pub fn into_rpc_request(
pub(crate) fn into_rpc_request(
self,
return_peer: &ReturnPeer<NodeId>,
) -> Result<Request, &'static str> {
Expand Down
24 changes: 14 additions & 10 deletions src/discv5/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async fn test_discovery_star_topology() {
nodes.first_mut().unwrap().find_node(target_random_node_id);
nodes.push(bootstrap_node);

let main = |cx: &mut Context| loop {
let main = |cx: &mut Context| {
for node in nodes.iter_mut() {
loop {
match node.poll_next_unpin(cx) {
Expand All @@ -221,6 +221,7 @@ async fn test_discovery_star_topology() {
}
}
}
Poll::Pending
};
future::poll_fn(main).await
}
Expand Down Expand Up @@ -259,7 +260,7 @@ async fn test_findnode_query() {
.take(total_nodes - 1)
.collect();

let main = |cx: &mut Context| loop {
let main = |cx: &mut Context| {
for node in nodes.iter_mut() {
loop {
match node.poll_next_unpin(cx) {
Expand All @@ -283,11 +284,12 @@ async fn test_findnode_query() {
}
}
}
Poll::Pending
};

let future = future::poll_fn(main);

if let Err(_) = timeout(Duration::from_millis(100), future).await {
if let Err(_) = timeout(Duration::from_millis(800), future).await {
panic!("Future timed out");
}
}
Expand Down Expand Up @@ -341,8 +343,8 @@ async fn test_updating_connection_on_ping() {
}

// The kbuckets table can have maximum 10 nodes in the same /24 subnet across all buckets
#[test]
fn test_table_limits() {
#[tokio::test]
async fn test_table_limits() {
// this seed generates 12 node id's that are distributed accross buckets such that no more than
// 2 exist in a single bucket.
let mut keypairs = generate_deterministic_keypair(12, 9487);
Expand Down Expand Up @@ -381,8 +383,8 @@ fn test_table_limits() {
}

// Each bucket can have maximum 2 nodes in the same /24 subnet
#[test]
fn test_bucket_limits() {
#[tokio::test]
async fn test_bucket_limits() {
let enr_key = CombinedKey::generate_secp256k1();
let ip: IpAddr = "127.0.0.1".parse().unwrap();
let enr = EnrBuilder::new("v4")
Expand Down Expand Up @@ -503,7 +505,7 @@ async fn test_predicate_search() {
.find_enr_predicate(target_random_node_id, predicate, total_nodes);
nodes.push(bootstrap_node);

let main = |cx: &mut Context| loop {
let main = |cx: &mut Context| {
for node in nodes.iter_mut() {
loop {
match node.poll_next_unpin(cx) {
Expand All @@ -513,19 +515,21 @@ async fn test_predicate_search() {
closer_peers.len(),
total_nodes,
);
println!("Nodes expected to pass predicate search {}", num_nodes);
assert!(closer_peers.len() == num_nodes);
return Poll::Ready(());
}
Poll::Ready(_) => {}
_ => break,
Poll::Pending => break,
}
}
}
Poll::Pending
};

let future = future::poll_fn(main);

if let Err(_) = timeout(Duration::from_millis(100), future).await {
if let Err(_) = timeout(Duration::from_millis(500), future).await {
panic!("Future timed out");
}
}
4 changes: 2 additions & 2 deletions src/kbucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where
}

/// Returns an iterator over all the entries in the routing table.
pub fn iter(&mut self) -> impl Iterator<Item = EntryRefView<TNodeId, TVal>> {
pub fn iter(&mut self) -> impl Iterator<Item = EntryRefView<'_, TNodeId, TVal>> {
let applied_pending = &mut self.applied_pending;
self.buckets.iter_mut().flat_map(move |table| {
if let Some(applied) = table.apply_pending() {
Expand All @@ -197,7 +197,7 @@ where
/// Returns an iterator over all the entries in the routing table.
/// Does not add pending node to kbucket to get an iterator which
/// takes a reference instead of a mutable reference.
pub fn iter_ref(&self) -> impl Iterator<Item = EntryRefView<TNodeId, TVal>> {
pub fn iter_ref(&self) -> impl Iterator<Item = EntryRefView<'_, TNodeId, TVal>> {
self.buckets.iter().flat_map(move |table| {
table.iter().map(move |(n, status)| EntryRefView {
node: NodeRefView {
Expand Down
42 changes: 24 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#![warn(rust_2018_idioms)]
#![deny(intra_doc_link_resolution_failure)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![allow(clippy::needless_doctest_main)]
//! An implementation of [Discovery V5](https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md).
//!
//! # Overview
//!
//! Discovery v5 is a protocol designed for encrypted peer discovery and topic advertisement. Each peer/node
//! on the network is identified via it's ['ENR'] ([Ethereum Name
//! on the network is identified via it's ENR ([Ethereum Name
//! Record](https://eips.ethereum.org/EIPS/eip-778)), which is essentially a signed key-value store
//! containing the node's public key and optionally IP address and port.
//!
Expand All @@ -16,28 +19,31 @@
//!
//! This protocol is split into three main sections/layers:
//!
//! * Transport - The transport for this protocol is currently fixed to UDP and is realised by the
//! [`Discv5Service`] struct. It encodes/decodes [`Packet`]'s to and from the specified UDP
//! * Transport - The transport for this protocol is currently fixed to UDP and is realised by a
//! [`Transport`]. It encodes/decodes [Packet]'s to and from the specified UDP
//! socket.
//! * Session - The protocol's communication is encrypted with `AES_GCM`. All node communication
//! undergoes a handshake, which results in a [`Session`]. [`Session`]'s are established when
//! needed and get dropped after a timeout. This section manages the creation and maintenance of
//! sessions between nodes. It is realised by the [`SessionService`] struct.
//! * Behaviour - This section contains the protocol-level logic. In particular it manages the
//! * Application - This section contains the protocol-level logic. In particular it manages the
//! routing table of known ENR's, topic registration/advertisement and performs various queries
//! such as peer discovery. This section is realised by the [`Discv5`] struct.
//!
//! *Note* - Currently only `secp256k1` keys are supported.
//!
//! For a simple CLI discovery service see [discv5-cli](https://github.com/AgeManning/discv5-cli)
//!
//!
//! # Usage
//!
//! The [`Discv5`] service implements [`Stream`] which emits [`Discv5Event`] events. Running a
//! The [`Discv5`] service implements `Stream` which emits [`Discv5Event`] events. Running a
//! discv5 service is as simple as initialising a [`Discv5`] struct and driving the stream.
//!
//! A simple example of creating this service is as follows:
//! The service can be configured via [`Discv5Config`] which can be created using the
//! [`Discv5ConfigBuilder`].
//!
//! A simple example of creating this service is as follows:
//!
//! ```rust
//! use enr::{Enr,EnrBuilder, CombinedKey};
Expand Down Expand Up @@ -78,9 +84,9 @@
//! discv5.find_node(target_random_node_id);
//!
//! // poll the stream for the next FindNoeResult event
//! loop {
//! match discv5.next().await {
//! Some(Discv5Event::FindNodeResult { closer_peers, .. }) => {
//! while let Some(event) = discv5.next().await {
//! match event {
//! Discv5Event::FindNodeResult { closer_peers, .. } => {
//! println!("Query completed. Found {} peers", closer_peers.len());
//! break;
//! }
Expand All @@ -92,14 +98,14 @@
//!
//! To see a usage in a runtime environment, see the `find_nodes` example in `/examples`.
//!
//! [`Enr`]: enr::Enr
//! [`Discv5`]: crate::Discv5
//! [`Discv5Event`]: crate::Discv5Event.enum
//! [`Discv5Service`]: crate::service::Discv5Service
//! [`Packet`]: crate::service::Packet
//! [`SessionService`]: crate::session_service::SessionService
//! [`Session`]: crate::session::Session
//! [`Stream`]: future::stream::Stream
//! [`Discv5`]: struct.Discv5.html
//! [`Discv5Event`]: enum.Discv5Event.html
//! [`Discv5Config`]: config/struct.Discv5Config.html
//! [`Discv5ConfigBuilder`]: config/struct.Discv5ConfigBuilder.html
//! [`Transport`]: transport/struct.Transport.html
//! [Packet]: packet/enum.Packet.html
//! [`SessionService`]: session_service/struct.SessionService.html
//! [`Session`]: session/struct.Session.html
mod config;
mod discv5;
Expand All @@ -108,9 +114,9 @@ mod kbucket;
mod packet;
mod query_pool;
mod rpc;
mod service;
mod session;
mod session_service;
mod transport;

pub use crate::discv5::{Discv5, Discv5Event};
pub use config::{Discv5Config, Discv5ConfigBuilder};
Expand Down
4 changes: 2 additions & 2 deletions src/packet/auth_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Encodable for AuthResponse {
}

impl Decodable for AuthResponse {
fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
fn decode(rlp: &Rlp<'_>) -> Result<Self, DecoderError> {
if !rlp.is_list() {
return Err(DecoderError::RlpExpectedToBeList);
}
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Encodable for AuthHeader {
}

impl Decodable for AuthHeader {
fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
fn decode(rlp: &Rlp<'_>) -> Result<Self, DecoderError> {
match rlp.item_count() {
Ok(size) => {
if size != 5 {
Expand Down
4 changes: 3 additions & 1 deletion src/packet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
//!
//! The [discv5 wire specification](https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md) provides further information on UDP message packets as implemented in this module.
//!
//! The `Packet` struct defines all raw UDP message variants and implements the encoding/decoding
//! A [`Packet`] defines all raw UDP message variants and implements the encoding/decoding
//! logic.
//!
//! Note, that all message encryption/decryption is handled outside of this module.
//!
//! [`Packet`]: enum.Packet.html
mod auth_header;

Expand Down
Loading

0 comments on commit dafd11b

Please sign in to comment.