Skip to content

Commit

Permalink
fix not being able to run both icmpv4 and icmpv6 test at same time
Browse files Browse the repository at this point in the history
  • Loading branch information
Arian8j2 committed Sep 22, 2024
1 parent 043494c commit ab88ed7
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 24 deletions.
31 changes: 17 additions & 14 deletions forwarder/src/socket/icmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ use std::{
os::fd::AsRawFd,
};

/// tracks if the icmp receiver thread is started or not, the first index
/// is for icmpv4 and the second is for icmpv6
static IS_RECEIVER_STARTED: [Mutex<bool>; 2] = [Mutex::new(false), Mutex::new(false)];

/// icmp receiver only handles ports that are in OPEN_PORTS so each
/// `IcmpSocket` must register it's port via adding it to `OPEN_PORTS`
/// and removing it when the `IcmpSocket` is dropped, the first index
/// is for icmpv4 open ports and the second index is for icmpv6 open ports
static OPEN_PORTS: [RwLock<BTreeSet<u16>>; 2] =
[RwLock::new(BTreeSet::new()), RwLock::new(BTreeSet::new())];

/// `IcmpSocket` that is very similiar to `UdpSocket`
#[derive(Debug)]
pub struct IcmpSocket {
Expand All @@ -29,25 +40,15 @@ pub struct IcmpSocket {
connected_addr: Option<SocketAddr>,
}

static IS_RECEIVER_STARTED: Mutex<bool> = Mutex::new(false);

/// each nonblocking `IcmpSocket` does not actually listen for new packets because
/// icmp protocol is on layer 2 and doesn't have any concept of ports
/// so each packet will wake up all `IcmpSocket`s, to fix that and remove
/// overheads of parsing each packet multiple times we listen to packets
/// only on one socket on another thread and after parsing port and packet
/// we put it in the corresponding controller `packets`, each nonblocking
/// `IcmpSocket` can register it's port via adding it to `OPEN_PORTS`
static OPEN_PORTS: RwLock<BTreeSet<u16>> = RwLock::new(BTreeSet::new());

impl IcmpSocket {
pub fn bind(addr: &SocketAddr) -> io::Result<Self> {
let udp_socket = std::net::UdpSocket::bind(addr)?;
let udp_socket_addr = udp_socket.local_addr()?;
let socket = IcmpSocket::inner_bind(*addr)?;

// run the icmp receiver if it isn't running
let mut is_receiver_alive = IS_RECEIVER_STARTED.lock();
let receiver_index = addr.is_ipv6() as usize;
let mut is_receiver_alive = IS_RECEIVER_STARTED[receiver_index].lock();
if !*is_receiver_alive {
let addr_clone = addr.to_owned();
std::thread::spawn(move || {
Expand Down Expand Up @@ -83,7 +84,8 @@ impl IcmpSocket {
impl Drop for IcmpSocket {
fn drop(&mut self) {
// clear port
let mut open_ports = OPEN_PORTS.write();
let open_ports_index = self.udp_socket_addr.is_ipv6() as usize;
let mut open_ports = OPEN_PORTS[open_ports_index].write();
open_ports.remove(&self.udp_socket_addr.port());
}
}
Expand Down Expand Up @@ -178,7 +180,8 @@ impl SocketTrait for IcmpSocket {
}

fn register(&mut self, registry: &mio::Registry, token: mio::Token) -> io::Result<()> {
let mut open_ports = OPEN_PORTS.write();
let open_ports_index = self.udp_socket_addr.is_ipv6() as usize;
let mut open_ports = OPEN_PORTS[open_ports_index].write();
open_ports.insert(self.udp_socket_addr.port());

registry.register(
Expand Down
9 changes: 8 additions & 1 deletion forwarder/src/socket/icmp/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ use crate::MAX_PACKET_SIZE;
use etherparse::Ipv4HeaderSlice;
use std::{mem::MaybeUninit, net::SocketAddr};

// each nonblocking `IcmpSocket` does not actually listen for new packets because
// icmp protocol is on layer 2 and doesn't have any concept of ports
// so each packet will wake up all `IcmpSocket`s, to fix that and remove
// overheads of parsing each packet multiple times we listen to packets
// only on one socket on another thread and after parsing port and packet
// we send it back to `IcmpSocket` via udp protocol
pub fn run_icmp_receiver(addr: SocketAddr) -> anyhow::Result<()> {
let is_ipv6 = addr.is_ipv6();
let socket: socket2::Socket = IcmpSocket::inner_bind(addr)?;
Expand All @@ -11,6 +17,7 @@ pub fn run_icmp_receiver(addr: SocketAddr) -> anyhow::Result<()> {

let mut buffer = [0u8; MAX_PACKET_SIZE];
let mut addr_buffer = addr;
let open_ports = &OPEN_PORTS[is_ipv6 as usize];

loop {
let Ok(size) =
Expand All @@ -21,7 +28,7 @@ pub fn run_icmp_receiver(addr: SocketAddr) -> anyhow::Result<()> {
let Some(icmp_packet) = parse_icmp_packet(&buffer[..size], is_ipv6) else {
continue;
};
let open_ports = OPEN_PORTS.read();
let open_ports = open_ports.read();
let port = icmp_packet.dst_port;
if open_ports.contains(&port) {
addr_buffer.set_port(port);
Expand Down
18 changes: 9 additions & 9 deletions forwarder/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ fn test_udp_double_forwarder_back_and_forth() {

#[test]
#[ignore = "icmp sockets requires special access, please run this test with ./test_icmp.sh"]
fn test_icmp_double_forwarder_back_and_forth() {
fn test_icmpv4_double_forwarder_back_and_forth() {
let forwarder_uri = SocketUri::from_str("127.0.0.1:38809/udp").unwrap();
let second_forwarder_uri = SocketUri::from_str("127.0.0.1:38810/icmp").unwrap();
let remote_uri = SocketUri::from_str("127.0.0.1:38811/udp").unwrap();
Expand All @@ -115,14 +115,14 @@ fn test_udp_ipv6_double_forwarder_back_and_forth() {
spawn_double_forwarder_and_test_connection(forwarder_uri, second_forwarder_uri, remote_uri);
}

// #[test]
// #[ignore = "icmp sockets requires special access, please run this test with ./test_icmp.sh"]
// fn test_icmp_ipv6_double_forwarder_back_and_forth() {
// let forwarder_uri = SocketUri::from_str("127.0.0.1:38815/udp").unwrap();
// let second_forwarder_uri = SocketUri::from_str("[::1]:38816/icmp").unwrap();
// let remote_uri = SocketUri::from_str("127.0.0.1:38817/udp").unwrap();
// spawn_double_forwarder_and_test_connection(forwarder_uri, second_forwarder_uri, remote_uri);
// }
#[test]
#[ignore = "icmp sockets requires special access, please run this test with ./test_icmp.sh"]
fn test_icmpv6_double_forwarder_back_and_forth() {
let forwarder_uri = SocketUri::from_str("127.0.0.1:38815/udp").unwrap();
let second_forwarder_uri = SocketUri::from_str("[::1]:38816/icmp").unwrap();
let remote_uri = SocketUri::from_str("127.0.0.1:38817/udp").unwrap();
spawn_double_forwarder_and_test_connection(forwarder_uri, second_forwarder_uri, remote_uri);
}

fn spawn_double_forwarder_and_test_connection(
forwarder_uri: SocketUri,
Expand Down

0 comments on commit ab88ed7

Please sign in to comment.