Skip to content

Commit

Permalink
Merge branch 'master' into structured-logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jmezo committed Jul 8, 2024
2 parents 1f86f54 + 7e63f13 commit d085642
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 35 deletions.
43 changes: 16 additions & 27 deletions src/socket/filter/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,7 @@ use std::{
/// cache's `time_window`.
pub const ENFORCED_SIZE_TIME: u64 = 1;

pub struct ReceivedPacket<T> {
/// The source that sent us the packet.
pub content: T,
/// The time the packet was received.
pub received: Instant,
}

pub struct ReceivedPacketCache<T> {
pub struct ReceivedPacketCache {
/// The target number of entries per ENFORCED_SIZE_TIME before inserting new elements reports
/// failure. The maximum size of the cache is target*time_window
target: usize,
Expand All @@ -39,11 +32,11 @@ pub struct ReceivedPacketCache<T> {
time_window: u64,
/// This stores the current number of messages that are within the `ENFORCED_SIZE_TIME`.
within_enforced_time: usize,
/// The underlying data structure.
inner: VecDeque<ReceivedPacket<T>>,
/// The underlying data structure. It stores the time when a packet was received.
inner: VecDeque<Instant>,
}

impl<T> ReceivedPacketCache<T> {
impl ReceivedPacketCache {
/// Creates a new `ReceivedPacketCache` with a specified size from which no more can enter.
pub fn new(target: usize, time_window: u64) -> Self {
Self {
Expand All @@ -56,21 +49,21 @@ impl<T> ReceivedPacketCache<T> {

/// Remove expired packets. We only keep, `CACHE_TIME` of data in the cache.
pub fn reset(&mut self) {
while let Some(packet) = self.inner.pop_front() {
if packet.received
while let Some(received_at) = self.inner.pop_front() {
if received_at
> Instant::now()
.checked_sub(Duration::from_secs(self.time_window))
.unwrap()
{
// add the packet back and end
self.inner.push_front(packet);
self.inner.push_front(received_at);
break;
}
}
// update the within_enforced_time
let mut count = 0;
for packet in self.inner.iter().rev() {
if packet.received
for received_at in self.inner.iter().rev() {
if *received_at
> Instant::now()
.checked_sub(Duration::from_secs(ENFORCED_SIZE_TIME))
.unwrap()
Expand All @@ -84,37 +77,33 @@ impl<T> ReceivedPacketCache<T> {
}

/// Inserts an element into the cache, removing any expired elements.
pub fn cache_insert(&mut self, content: T) -> bool {
pub fn cache_insert(&mut self) -> bool {
self.reset();
self.internal_insert(content)
self.internal_insert()
}

/// Inserts an element into the cache without removing expired elements.
fn internal_insert(&mut self, content: T) -> bool {
fn internal_insert(&mut self) -> bool {
if self.within_enforced_time >= self.target {
// Reached the target
false
} else {
let received_packet = ReceivedPacket {
content,
received: Instant::now(),
};
self.inner.push_back(received_packet);
self.inner.push_back(Instant::now());
self.within_enforced_time += 1;
true
}
}
}

impl<T> std::ops::Deref for ReceivedPacketCache<T> {
type Target = VecDeque<ReceivedPacket<T>>;
impl std::ops::Deref for ReceivedPacketCache {
type Target = VecDeque<Instant>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<T> std::ops::DerefMut for ReceivedPacketCache<T> {
impl std::ops::DerefMut for ReceivedPacketCache {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
Expand Down
14 changes: 6 additions & 8 deletions src/socket/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) struct Filter {
/// An ordered (by time) collection of recently seen packets by SocketAddr. The packet data is not
/// stored here. This stores 5 seconds of history to calculate a 5 second moving average for
/// the metrics.
raw_packets_received: ReceivedPacketCache<SocketAddr>,
raw_packets_received: ReceivedPacketCache,
/// The duration that bans by this filter last.
ban_duration: Option<Duration>,
/// Keep track of node ids per socket. If someone is using too many node-ids per IP, they can
Expand Down Expand Up @@ -86,19 +86,19 @@ impl Filter {
/// The first check. This determines if a new UDP packet should be decoded or dropped.
/// Only unsolicited packets arrive here.
pub fn initial_pass(&mut self, src: &SocketAddr) -> bool {
if PERMIT_BAN_LIST.read().permit_ips.get(&src.ip()).is_some() {
if PERMIT_BAN_LIST.read().permit_ips.contains(&src.ip()) {
return true;
}

if PERMIT_BAN_LIST.read().ban_ips.get(&src.ip()).is_some() {
if PERMIT_BAN_LIST.read().ban_ips.contains_key(&src.ip()) {
debug!(?src, "Dropped unsolicited packet from banned src");
return false;
}

// Add the un-solicited request to the cache
// If this is over the maximum requests per ENFORCED_SIZE_TIME, it will not be added, we
// leave the rate limiter to enforce the rate limits..
self.raw_packets_received.cache_insert(*src);
self.raw_packets_received.cache_insert();

// build the metrics
METRICS
Expand Down Expand Up @@ -135,17 +135,15 @@ impl Filter {
if PERMIT_BAN_LIST
.read()
.permit_nodes
.get(&node_address.node_id)
.is_some()
.contains(&node_address.node_id)
{
return true;
}

if PERMIT_BAN_LIST
.read()
.ban_nodes
.get(&node_address.node_id)
.is_some()
.contains_key(&node_address.node_id)
{
debug!(
node = %node_address,
Expand Down

0 comments on commit d085642

Please sign in to comment.