Skip to content

Commit

Permalink
use atmoic compare_exchange_weak instead of store when marking peer a…
Browse files Browse the repository at this point in the history
…s used
  • Loading branch information
Arian8j2 committed Oct 8, 2024
1 parent 79c052d commit 7442339
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
12 changes: 4 additions & 8 deletions forwarder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ use anyhow::Context;
use log::info;
use mio::{Events, Poll, Registry};
use parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard};
use std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc},
time::Duration,
};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use {
peer::{Peer, PeerManager},
socket::{Socket, SocketTrait, SocketUri},
Expand Down Expand Up @@ -66,7 +62,7 @@ pub fn run_server(listen_uri: SocketUri, remote_uri: SocketUri, passphrase: Opti
let peers = peer_manager.upgradable_read();
match peers.find_peer_with_client_addr(&from_addr) {
Some(peer) => {
peer.used.store(true, Ordering::Relaxed);
peer.set_used();
// client ---> server socket ---peer socket----> remote
peer.socket.send(&buffer[..size]).ok();
}
Expand Down Expand Up @@ -134,7 +130,7 @@ fn peers_thread(
for event in &events {
let token = event.token();
let peer = peers.find_peer_with_token(&token).unwrap();
peer.used.store(true, Ordering::Relaxed);
peer.set_used();
// each epoll event may result in multiple readiness events
while let Ok(size) = peer.socket.recv(&mut buffer) {
if let Some(ref passphrase) = passphrase {
Expand All @@ -160,7 +156,7 @@ fn try_cleanup(peer_manager: &RwLock<PeerManager>) {
let mut peers = peer_manager.write();
let mut used_client_count = 0;
for peer in peers.get_all() {
let used = peer.used.swap(false, Ordering::Relaxed);
let used = peer.reset_used();
if !used {
let client_addr = peer.get_client_addr();
log::info!("cleaning peer that handled '{client_addr}'");
Expand Down
16 changes: 15 additions & 1 deletion forwarder/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
borrow::Borrow,
collections::BTreeMap,
net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
sync::atomic::Ordering,
sync::{atomic::AtomicBool, Arc},
};

Expand All @@ -12,7 +13,7 @@ pub struct Peer {
pub socket: Socket,
client_addr: SocketAddr,
token: Token,
pub used: AtomicBool,
used: AtomicBool,
}

impl Peer {
Expand All @@ -39,6 +40,19 @@ impl Peer {
Ok((peer, token))
}

/// mark `Peer` as being used to prevent cleanup thread from cleaning it
pub fn set_used(&self) {
self.used
.compare_exchange_weak(false, true, Ordering::Relaxed, Ordering::Relaxed)
.ok();
}

/// mark `Peer` as not being in use and returns `true` if it was used
/// before reseting otherwise returns `false`
pub fn reset_used(&self) -> bool {
self.used.swap(false, Ordering::Relaxed)
}

pub fn get_client_addr(&self) -> &SocketAddr {
&self.client_addr
}
Expand Down

0 comments on commit 7442339

Please sign in to comment.