Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent dual-stack spamming #275

Merged
merged 8 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/kbucket/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ where
// Adjust `first_connected_pos` accordingly.
match old_status.state {
ConnectionState::Connected => {
if (self.first_connected_pos == Some(pos.0)) && pos.0 == self.nodes.len() {
if self.first_connected_pos == Some(pos.0) && pos.0 == self.nodes.len() {
// It was the last connected node.
self.first_connected_pos = None
}
Expand Down
13 changes: 8 additions & 5 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1504,7 +1504,7 @@ impl Service {
_ => connection_direction,
};

debug!(node = %node_id, %direction, "Session established with Node");
debug!(node = %node_id, %direction, %socket, "Session established with Node");
self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction));
}

Expand Down Expand Up @@ -1616,13 +1616,16 @@ impl Service {
let Some(ip_votes) = self.ip_votes.as_mut() else {
return false;
};
match (ip_votes.majority(), is_ipv6) {
// Here we check the number of non-expired votes, rather than the majority. As if the
// local router is not SNAT'd we can have many votes but none for the same port and we
// therefore do excessive pinging.
match (ip_votes.has_minimum_threshold(), is_ipv6) {
// We don't have enough ipv4 votes, but this is an IPv4-only node.
((None, Some(_)), false) |
((false, true), false) |
// We don't have enough ipv6 votes, but this is an IPv6 node.
((Some(_), None), true) |
((true, false), true) |
// We don't have enough ipv6 or ipv4 nodes, ping this peer.
((None, None), _,) => true,
((false, false), _,) => true,
// We have enough votes do nothing.
((_, _), _,) => false,
}
Expand Down
102 changes: 73 additions & 29 deletions src/service/ip_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use enr::NodeId;
use fnv::FnvHashMap;
use std::{
collections::HashMap,
hash::Hash,
net::{SocketAddr, SocketAddrV4, SocketAddrV6},
time::{Duration, Instant},
};

/// A collection of IP:Ports for our node reported from external peers.
pub(crate) struct IpVote {
/// The current collection of IP:Port votes.
votes: HashMap<NodeId, (SocketAddr, Instant)>,
/// The current collection of IP:Port votes for ipv4.
ipv4_votes: HashMap<NodeId, (SocketAddrV4, Instant)>,
/// The current collection of IP:Port votes for ipv6.
ipv6_votes: HashMap<NodeId, (SocketAddrV6, Instant)>,
/// The minimum number of votes required before an IP/PORT is accepted.
minimum_threshold: usize,
/// The time votes remain valid.
Expand All @@ -23,46 +26,86 @@ impl IpVote {
panic!("Setting enr_peer_update_min to a value less than 2 will cause issues with discovery with peers behind NAT");
}
IpVote {
votes: HashMap::new(),
ipv4_votes: HashMap::new(),
ipv6_votes: HashMap::new(),
minimum_threshold,
vote_duration,
}
}

pub fn insert(&mut self, key: NodeId, socket: impl Into<SocketAddr>) {
self.votes
.insert(key, (socket.into(), Instant::now() + self.vote_duration));
match socket.into() {
SocketAddr::V4(socket) => {
self.ipv4_votes
.insert(key, (socket, Instant::now() + self.vote_duration));
}
SocketAddr::V6(socket) => {
self.ipv6_votes
.insert(key, (socket, Instant::now() + self.vote_duration));
}
}
}

/// Returns the majority `SocketAddr` if it exists. If there are not enough votes to meet the threshold this returns None.
pub fn majority(&mut self) -> (Option<SocketAddrV4>, Option<SocketAddrV6>) {
// remove any expired votes
/// Returns true if we have more than the minimum number of non-expired votes for a given ip
/// version.
pub fn has_minimum_threshold(&mut self) -> (bool, bool) {
let instant = Instant::now();
self.votes.retain(|_, v| v.1 > instant);

// count votes, take majority
let mut ip4_count: FnvHashMap<SocketAddrV4, usize> = FnvHashMap::default();
let mut ip6_count: FnvHashMap<SocketAddrV6, usize> = FnvHashMap::default();
for (socket, _) in self.votes.values() {
// NOTE: here we depend on addresses being already cleaned up. No mapped or compat
// addresses should be present. This is done in the codec.
match socket {
SocketAddr::V4(socket) => *ip4_count.entry(*socket).or_insert_with(|| 0) += 1,
SocketAddr::V6(socket) => *ip6_count.entry(*socket).or_insert_with(|| 0) += 1,
self.ipv4_votes.retain(|_, v| v.1 > instant);
self.ipv6_votes.retain(|_, v| v.1 > instant);

(
self.ipv4_votes.len() >= self.minimum_threshold,
self.ipv6_votes.len() >= self.minimum_threshold,
)
}

/// Filter the stale votes and return the majority `SocketAddr` if it exists.
/// If there are not enough votes to meet the threshold this returns None.
fn filter_stale_find_most_frequent<K: Copy + Eq + Hash>(
votes: &HashMap<NodeId, (K, Instant)>,
minimum_threshold: usize,
) -> (HashMap<NodeId, (K, Instant)>, Option<K>) {
let mut updated = HashMap::default();
let mut counter: FnvHashMap<K, usize> = FnvHashMap::default();
let mut max: Option<(K, usize)> = None;
let now = Instant::now();

for (node_id, (vote, instant)) in votes {
// Discard stale votes.
if instant <= &now {
continue;
}
updated.insert(*node_id, (*vote, *instant));

let count = counter.entry(*vote).or_default();
*count += 1;
let current_max = max.map(|(_v, m)| m).unwrap_or_default();
if *count >= current_max && *count >= minimum_threshold {
max = Some((*vote, *count));
}
}

// find the maximum socket addr
let ip4_majority = majority(ip4_count.into_iter(), &self.minimum_threshold);
let ip6_majority = majority(ip6_count.into_iter(), &self.minimum_threshold);
(ip4_majority, ip6_majority)
(updated, max.map(|m| m.0))
}
}

fn majority<K>(iter: impl Iterator<Item = (K, usize)>, threshold: &usize) -> Option<K> {
iter.filter(|(_k, count)| count >= threshold)
.max_by_key(|(_k, count)| *count)
.map(|(k, _count)| k)
/// Returns the majority `SocketAddr`'s of both IPv4 and IPv6 if they exist. If there are not enough votes to meet the threshold this returns None for each stack.
pub fn majority(&mut self) -> (Option<SocketAddrV4>, Option<SocketAddrV6>) {
let (updated_ipv4_votes, ipv4_majority) = Self::filter_stale_find_most_frequent::<
SocketAddrV4,
>(
&self.ipv4_votes, self.minimum_threshold
);
self.ipv4_votes = updated_ipv4_votes;

let (updated_ipv6_votes, ipv6_majority) = Self::filter_stale_find_most_frequent::<
SocketAddrV6,
>(
&self.ipv6_votes, self.minimum_threshold
);
self.ipv6_votes = updated_ipv6_votes;

(ipv4_majority, ipv6_majority)
}
}

#[cfg(test)]
Expand All @@ -88,7 +131,8 @@ mod tests {
votes.insert(NodeId::random(), socket_3);
votes.insert(NodeId::random(), socket_3);

assert_eq!(votes.majority(), (Some(socket_2), None));
// Assert that in a draw situation a majority is still chosen.
assert!(votes.majority().0.is_some());
}

#[test]
Expand Down
Loading