From 8086d57d3a98213f911a3ecdb14c0cd9c6759046 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Tue, 7 Jan 2025 23:01:49 +0800 Subject: [PATCH] WIP: rtnl decouple mark and table --- src/bgp/route.rs | 8 +++---- src/integration_tests/kernel_linux.rs | 17 +++++++------- src/kernel/linux/mod.rs | 32 +++++++++++++++++++-------- src/kernel/linux/nft.rs | 3 ++- src/kernel/mod.rs | 11 ++++----- src/kernel/rtnl.rs | 12 +++++++++- 6 files changed, 53 insertions(+), 30 deletions(-) diff --git a/src/bgp/route.rs b/src/bgp/route.rs index 2c90d6c..0467411 100644 --- a/src/bgp/route.rs +++ b/src/bgp/route.rs @@ -2,7 +2,7 @@ use super::flow::Flowspec; use super::nlri::{NextHop, Nlri, NlriContent}; use crate::kernel::{self, Kernel, KernelAdapter, KernelHandle}; use crate::net::IpPrefix; -use crate::util::{grace, MaybeRc, BOLD, FG_BLUE_BOLD, FG_GREEN_BOLD, RESET}; +use crate::util::{MaybeRc, BOLD, FG_BLUE_BOLD, FG_GREEN_BOLD, RESET}; use either::Either; use itertools::Itertools; use log::{warn, Level, LevelFilter}; @@ -52,7 +52,7 @@ impl Routes { } Entry::Occupied(mut e) => { let (id, _) = e.insert((id, MaybeRc::Rc(info.clone()))); - grace(self.kernel.remove(&id).await, "error removing old flowspec from kernel"); + self.kernel.remove(&id).await; } } } @@ -78,7 +78,7 @@ impl Routes { let mut flow = BTreeMap::new(); swap(&mut flow, &mut self.flow); for (_, (handle, _)) in flow { - grace(self.kernel.remove(&handle).await, "error removing handle from kernel"); + self.kernel.remove(&handle).await; } } @@ -86,7 +86,7 @@ impl Routes { let Some((handle, _)) = self.flow.remove(&spec) else { return; }; - grace(self.kernel.remove(&handle).await, "error removing handle from kernel"); + self.kernel.remove(&handle).await; } pub async fn process(&mut self) -> kernel::Result<()> { diff --git a/src/integration_tests/kernel_linux.rs b/src/integration_tests/kernel_linux.rs index 68605cd..9e68037 100644 --- a/src/integration_tests/kernel_linux.rs +++ b/src/integration_tests/kernel_linux.rs @@ -68,24 +68,25 @@ async fn test_redirect_to_ip() -> anyhow::Result<()> { let (conn, handle, _) = rtnetlink::new_connection()?; tokio::spawn(conn); + let table_index = 10000; let dummy_index = create_dummy_link(&handle, "10.128.128.254/24".parse()?).await?; - let (name, (_g1, bird, chans, _g2)) = - run_kernel_test(["flow4 { dst 172.20.0.0/16; } { bgp_ext_community.add((unknown 0x800c, 10.128.128.1, 0)); }"]) - .await?; + let (name, (_g1, mut bird, chans, _g2)) = run_kernel_test([ + "flow4 { dst 172.20.0.0/16; } { bgp_ext_community.add((unknown 0x800c, 10.128.128.1, 0)); }", + "flow4 { dst 172.21.0.0/16; } { bgp_ext_community.add((unknown 0x800c, 10.128.128.1, 0)); }", + ]) + .await?; print_nft_chain(&name, &name).await?; print_ip_rule().await?; - print_ip_route(10000).await?; + print_ip_route(table_index).await?; + let nft_stmts = get_nft_stmts(&name, &name).await?; let ip_rules = get_ip_rule(&handle, IpVersion::V4).await?; let ip_routes = get_ip_route(&handle, IpVersion::V4, 10000).await?; - let nft_stmts = get_nft_stmts(&name, &name).await?; close_cli(chans).await; - drop(bird); + bird.kill().await?; remove_link(&handle, dummy_index).await?; - let table_index = 10000; - assert_eq!(nft_stmts, [vec![ prefix_stmt("daddr", "172.20.0.0/16".parse()?).unwrap(), stmt::Statement::Mangle(stmt::Mangle { key: make_meta(expr::MetaKey::Mark), value: Number(table_index) }), diff --git a/src/kernel/linux/mod.rs b/src/kernel/linux/mod.rs index b8ebaee..68c2a7e 100644 --- a/src/kernel/linux/mod.rs +++ b/src/kernel/linux/mod.rs @@ -4,6 +4,7 @@ use super::rtnl::{RtNetlink, RtNetlinkArgs}; use super::{Kernel, Result}; use crate::bgp::flow::Flowspec; use crate::bgp::route::RouteInfo; +use crate::util::grace; use clap::Args; use futures::future::OptionFuture; use futures::join; @@ -33,6 +34,17 @@ impl Linux { rtnl_args: rtnl, }) } + + async fn remove_nft(&mut self, handle: &::Handle) { + let mut batch = Batch::new(); + for h in handle.iter().copied() { + batch.delete(self.nft.make_rule_handle(h)); + } + grace( + self.nft.apply_ruleset(&batch.to_nftables()).await, + "failed to remove nft rules", + ); + } } impl Kernel for Linux { @@ -92,26 +104,28 @@ impl Kernel for Linux { if let Some((next_hop, table_id)) = rt_info { let rtnl = self.rtnl.as_mut().expect("RtNetlink should be initialized"); - let real_table_id = rtnl.add(handle.clone(), spec, next_hop).await?; + let real_table_id = match rtnl.add(handle.clone(), spec, next_hop).await { + Ok(table_id) => table_id, + Err(error) => { + self.remove_nft(&handle).await; + return Err(error); + } + }; + // TODO: remove this, table_id should be mark assert_eq!(table_id, real_table_id, "table ID mismatch"); } Ok(handle) } - async fn remove(&mut self, handle: &Self::Handle) -> Result<()> { - let mut batch = Batch::new(); - for h in handle.iter().copied() { - batch.delete(self.nft.make_rule_handle(h)); - } - self.nft.apply_ruleset(&batch.to_nftables()).await?; + async fn remove(&mut self, handle: &Self::Handle) { + self.remove_nft(handle).await; if let Some(rtnl) = &mut self.rtnl { - rtnl.del(handle).await?; + let _ = rtnl.del(handle).await; if rtnl.is_empty() { self.rtnl = None; } } - Ok(()) } async fn process(&mut self) -> Result<()> { diff --git a/src/kernel/linux/nft.rs b/src/kernel/linux/nft.rs index 7cb8fff..dace435 100644 --- a/src/kernel/linux/nft.rs +++ b/src/kernel/linux/nft.rs @@ -366,7 +366,8 @@ impl TrafficFilterAction { let new = RtNetlink::new(rtnl_args.clone()).unwrap(); rtnl.get_or_insert(new) }; - let table_id = rtnl.next_table_id(); + let table_id = rtnl.next_table_id(); // TODO: bug, should not be next_table_id and should be next_mark (?) + // TODO: mark could be random, create first and then pass to rtnl let result = smallvec_inline![stmt::Statement::Mangle(stmt::Mangle { key: make_meta(expr::MetaKey::Mark), value: NUM(table_id), diff --git a/src/kernel/mod.rs b/src/kernel/mod.rs index d515587..63f7fe9 100644 --- a/src/kernel/mod.rs +++ b/src/kernel/mod.rs @@ -32,7 +32,7 @@ pub trait Kernel: Sized { ) -> impl Future>; /// Remove a flowspec from kernel using previously returned handle. - fn remove(&mut self, handle: &Self::Handle) -> impl Future>; + fn remove(&mut self, handle: &Self::Handle) -> impl Future; /// Process notifications from kernel, timers, etc. fn process(&mut self) -> impl Future> { @@ -81,13 +81,13 @@ impl Kernel for KernelAdapter { } } - async fn remove(&mut self, handle: &Self::Handle) -> Result<()> { + async fn remove(&mut self, handle: &Self::Handle) { match (self, handle) { - (Self::Noop, KernelHandle::Noop) => Ok(()), + (Self::Noop, KernelHandle::Noop) => {} #[cfg(linux)] (Self::Linux(linux), KernelHandle::Linux(handle)) => linux.remove(handle).await, #[cfg(linux)] - _ => Err(Error::HandleMismatch), + _ => panic!("kernel handle mismatch"), } } @@ -140,9 +140,6 @@ pub enum Error { #[error("flowspec matches nothing")] MatchNothing, - - #[error("kernel handle mismatch")] - HandleMismatch, } pub type Result = std::result::Result; diff --git a/src/kernel/rtnl.rs b/src/kernel/rtnl.rs index f2eba6e..eee59a3 100644 --- a/src/kernel/rtnl.rs +++ b/src/kernel/rtnl.rs @@ -117,11 +117,15 @@ impl RtNetlink { .unwrap_or(self.args.init_table_id) } + // TODO: add reserve table id for prefix + + // TODO: make all delete action infallible (ignore errors) pub async fn del(&mut self, id: &K::Handle) -> Result<()> { let Some((prefix, _, table_id, _)) = self.routes.remove(id) else { return Ok(()); }; self.del_route(table_id, prefix).await?; + // TODO: del_rule mark xxx lookup table xxx let prefixes = self.rules.get_mut(&table_id).expect("route contains non-existent table??"); prefixes.remove(&prefix); @@ -142,6 +146,9 @@ impl RtNetlink { Ok(()) } + // TODO: no need to explicitly delete rule + // when a route is removed, its `from all fwmark xxx lookup xxx` should be + // removed as well. async fn del_rule(&self, table_id: u32) -> Result<()> { // TODO: add RuleMessageBuilder to rtnetlink crate let mut msg = RuleMessage::default(); @@ -236,7 +243,8 @@ impl RtNetlink { handle: &Handle, iter: impl Iterator)>, ) -> Result<()> { - // TODO: remove route if next hop becomes unreachable + // TODO: ~~remove route~~ replace route with "unreachable" if next hop becomes + // unreachable for (prefix, next_hop, table_id, attrs) in iter { warn!("process {prefix}"); let new_attrs = Self::get_route2(handle, *next_hop).await?; @@ -254,6 +262,8 @@ impl RtNetlink { Ok(()) } + // TODO: if network unreachable then insert unreachable route + // could be replaced with regular ones once the link is ready in `process_*` async fn get_route(&self, ip: IpAddr) -> Result> { Self::get_route2(&self.handle, ip).await }