Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent dual-stack spamming #275

Merged
merged 8 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/kbucket/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,7 @@ where
// Adjust `first_connected_pos` accordingly.
match old_status.state {
ConnectionState::Connected => {
if self.first_connected_pos.map_or(false, |p| p == pos.0)
&& pos.0 == self.nodes.len()
{
if self.first_connected_pos == Some(pos.0) && pos.0 == self.nodes.len() {
// It was the last connected node.
self.first_connected_pos = None
}
Expand Down
13 changes: 8 additions & 5 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1504,7 +1504,7 @@ impl Service {
_ => connection_direction,
};

debug!(node = %node_id, %direction, "Session established with Node");
debug!(node = %node_id, %direction, %socket, "Session established with Node");
self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction));
}

Expand Down Expand Up @@ -1616,13 +1616,16 @@ impl Service {
let Some(ip_votes) = self.ip_votes.as_mut() else {
return false;
};
match (ip_votes.majority(), is_ipv6) {
// Here we check the number of non-expired votes, rather than the majority. As if the
// local router is not SNAT'd we can have many votes but none for the same port and we
// therefore do excessive pinging.
match (ip_votes.less_than_minimum(), is_ipv6) {
// We don't have enough ipv4 votes, but this is an IPv4-only node.
((None, Some(_)), false) |
((false, true), false) |
// We don't have enough ipv6 votes, but this is an IPv6 node.
((Some(_), None), true) |
((true, false), true) |
// We don't have enough ipv6 or ipv4 nodes, ping this peer.
((None, None), _,) => true,
((false, false), _,) => true,
// We have enough votes do nothing.
((_, _), _,) => false,
}
Expand Down
65 changes: 47 additions & 18 deletions src/service/ip_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use std::{

/// A collection of IP:Ports for our node reported from external peers.
pub(crate) struct IpVote {
/// The current collection of IP:Port votes.
votes: HashMap<NodeId, (SocketAddr, Instant)>,
/// The current collection of IP:Port votes for ipv4.
ipv4_votes: HashMap<NodeId, (SocketAddrV4, Instant)>,
/// The current collection of IP:Port votes for ipv6.
ipv6_votes: HashMap<NodeId, (SocketAddrV6, Instant)>,
/// The minimum number of votes required before an IP/PORT is accepted.
minimum_threshold: usize,
/// The time votes remain valid.
Expand All @@ -23,34 +25,61 @@ impl IpVote {
panic!("Setting enr_peer_update_min to a value less than 2 will cause issues with discovery with peers behind NAT");
}
IpVote {
votes: HashMap::new(),
ipv4_votes: HashMap::new(),
ipv6_votes: HashMap::new(),
minimum_threshold,
vote_duration,
}
}

pub fn insert(&mut self, key: NodeId, socket: impl Into<SocketAddr>) {
self.votes
.insert(key, (socket.into(), Instant::now() + self.vote_duration));
match socket.into() {
SocketAddr::V4(socket) => {
self.ipv4_votes
.insert(key, (socket, Instant::now() + self.vote_duration));
}
SocketAddr::V6(socket) => {
self.ipv6_votes
.insert(key, (socket, Instant::now() + self.vote_duration));
}
}
}

/// Returns true if we have more than the minimum number of non-expired votes for a given ip
/// version.
pub fn less_than_minimum(&mut self) -> (bool, bool) {
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
let instant = Instant::now();
self.ipv4_votes.retain(|_, v| v.1 > instant);
self.ipv6_votes.retain(|_, v| v.1 > instant);

(
self.ipv4_votes.len() >= self.minimum_threshold,
self.ipv6_votes.len() >= self.minimum_threshold,
)
}

/// Returns the majority `SocketAddr` if it exists. If there are not enough votes to meet the threshold this returns None.
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
pub fn majority(&mut self) -> (Option<SocketAddrV4>, Option<SocketAddrV6>) {
// remove any expired votes
let instant = Instant::now();
self.votes.retain(|_, v| v.1 > instant);

// count votes, take majority
let mut ip4_count: FnvHashMap<SocketAddrV4, usize> = FnvHashMap::default();
let mut ip6_count: FnvHashMap<SocketAddrV6, usize> = FnvHashMap::default();
for (socket, _) in self.votes.values() {
// NOTE: here we depend on addresses being already cleaned up. No mapped or compat
// addresses should be present. This is done in the codec.
match socket {
SocketAddr::V4(socket) => *ip4_count.entry(*socket).or_insert_with(|| 0) += 1,
SocketAddr::V6(socket) => *ip6_count.entry(*socket).or_insert_with(|| 0) += 1,
}
}
self.ipv4_votes.retain(|_, v| v.1 > instant);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the order import for majority calculation?
I.e why do we have a test function for test_three_way_vote_draw.
We can save 2 iterations if we combine majority calculation and stale votes removal did at
ac644c8 but test_three_way_vote_draw fails

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think your changes are better. Can't remember about that test. I think we wanted to test that if there is a draw it at least returns Some and not None. It probably should be random from the tie.

Can you make a PR to adjust the logic to what you have here.

Why don't we just mutate the hashmap in the function rather than returning a new hashmap and re-setting it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah makes sense, updated and added the function to your PR!

self.ipv6_votes.retain(|_, v| v.1 > instant);

// Count all the votes into a hashmap containing (socket, count).
let ip4_count =
self.ipv4_votes
.values()
.fold(FnvHashMap::default(), |mut counts, (socket_vote, _)| {
*counts.entry(*socket_vote).or_default() += 1;
counts
});
let ip6_count =
self.ipv6_votes
.values()
.fold(FnvHashMap::default(), |mut counts, (socket_vote, _)| {
*counts.entry(*socket_vote).or_default() += 1;
counts
});

// find the maximum socket addr
let ip4_majority = majority(ip4_count.into_iter(), &self.minimum_threshold);
Expand Down
Loading