Skip to content

Commit

Permalink
WIP: rtnl decouple mark and table
Browse files Browse the repository at this point in the history
  • Loading branch information
hack3ric committed Jan 9, 2025
1 parent 6ef171e commit 8086d57
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 30 deletions.
8 changes: 4 additions & 4 deletions src/bgp/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::flow::Flowspec;
use super::nlri::{NextHop, Nlri, NlriContent};
use crate::kernel::{self, Kernel, KernelAdapter, KernelHandle};
use crate::net::IpPrefix;
use crate::util::{grace, MaybeRc, BOLD, FG_BLUE_BOLD, FG_GREEN_BOLD, RESET};
use crate::util::{MaybeRc, BOLD, FG_BLUE_BOLD, FG_GREEN_BOLD, RESET};
use either::Either;
use itertools::Itertools;
use log::{warn, Level, LevelFilter};
Expand Down Expand Up @@ -52,7 +52,7 @@ impl Routes {
}
Entry::Occupied(mut e) => {
let (id, _) = e.insert((id, MaybeRc::Rc(info.clone())));
grace(self.kernel.remove(&id).await, "error removing old flowspec from kernel");
self.kernel.remove(&id).await;
}
}
}
Expand All @@ -78,15 +78,15 @@ impl Routes {
let mut flow = BTreeMap::new();
swap(&mut flow, &mut self.flow);
for (_, (handle, _)) in flow {
grace(self.kernel.remove(&handle).await, "error removing handle from kernel");
self.kernel.remove(&handle).await;
}
}

async fn withdraw_spec(&mut self, spec: Flowspec) {
let Some((handle, _)) = self.flow.remove(&spec) else {
return;
};
grace(self.kernel.remove(&handle).await, "error removing handle from kernel");
self.kernel.remove(&handle).await;
}

pub async fn process(&mut self) -> kernel::Result<()> {
Expand Down
17 changes: 9 additions & 8 deletions src/integration_tests/kernel_linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,25 @@ async fn test_redirect_to_ip() -> anyhow::Result<()> {
let (conn, handle, _) = rtnetlink::new_connection()?;
tokio::spawn(conn);

let table_index = 10000;
let dummy_index = create_dummy_link(&handle, "10.128.128.254/24".parse()?).await?;
let (name, (_g1, bird, chans, _g2)) =
run_kernel_test(["flow4 { dst 172.20.0.0/16; } { bgp_ext_community.add((unknown 0x800c, 10.128.128.1, 0)); }"])
.await?;
let (name, (_g1, mut bird, chans, _g2)) = run_kernel_test([
"flow4 { dst 172.20.0.0/16; } { bgp_ext_community.add((unknown 0x800c, 10.128.128.1, 0)); }",
"flow4 { dst 172.21.0.0/16; } { bgp_ext_community.add((unknown 0x800c, 10.128.128.1, 0)); }",
])
.await?;

print_nft_chain(&name, &name).await?;
print_ip_rule().await?;
print_ip_route(10000).await?;
print_ip_route(table_index).await?;

let nft_stmts = get_nft_stmts(&name, &name).await?;
let ip_rules = get_ip_rule(&handle, IpVersion::V4).await?;
let ip_routes = get_ip_route(&handle, IpVersion::V4, 10000).await?;
let nft_stmts = get_nft_stmts(&name, &name).await?;
close_cli(chans).await;
drop(bird);
bird.kill().await?;
remove_link(&handle, dummy_index).await?;

let table_index = 10000;

assert_eq!(nft_stmts, [vec![
prefix_stmt("daddr", "172.20.0.0/16".parse()?).unwrap(),
stmt::Statement::Mangle(stmt::Mangle { key: make_meta(expr::MetaKey::Mark), value: Number(table_index) }),
Expand Down
32 changes: 23 additions & 9 deletions src/kernel/linux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::rtnl::{RtNetlink, RtNetlinkArgs};
use super::{Kernel, Result};
use crate::bgp::flow::Flowspec;
use crate::bgp::route::RouteInfo;
use crate::util::grace;
use clap::Args;
use futures::future::OptionFuture;
use futures::join;
Expand Down Expand Up @@ -33,6 +34,17 @@ impl Linux {
rtnl_args: rtnl,
})
}

async fn remove_nft(&mut self, handle: &<Self as Kernel>::Handle) {
let mut batch = Batch::new();
for h in handle.iter().copied() {
batch.delete(self.nft.make_rule_handle(h));
}
grace(
self.nft.apply_ruleset(&batch.to_nftables()).await,
"failed to remove nft rules",
);
}
}

impl Kernel for Linux {
Expand Down Expand Up @@ -92,26 +104,28 @@ impl Kernel for Linux {

if let Some((next_hop, table_id)) = rt_info {
let rtnl = self.rtnl.as_mut().expect("RtNetlink should be initialized");
let real_table_id = rtnl.add(handle.clone(), spec, next_hop).await?;
let real_table_id = match rtnl.add(handle.clone(), spec, next_hop).await {
Ok(table_id) => table_id,
Err(error) => {
self.remove_nft(&handle).await;
return Err(error);
}
};
// TODO: remove this, table_id should be mark
assert_eq!(table_id, real_table_id, "table ID mismatch");
}

Ok(handle)
}

async fn remove(&mut self, handle: &Self::Handle) -> Result<()> {
let mut batch = Batch::new();
for h in handle.iter().copied() {
batch.delete(self.nft.make_rule_handle(h));
}
self.nft.apply_ruleset(&batch.to_nftables()).await?;
async fn remove(&mut self, handle: &Self::Handle) {
self.remove_nft(handle).await;
if let Some(rtnl) = &mut self.rtnl {
rtnl.del(handle).await?;
let _ = rtnl.del(handle).await;
if rtnl.is_empty() {
self.rtnl = None;
}
}
Ok(())
}

async fn process(&mut self) -> Result<()> {
Expand Down
3 changes: 2 additions & 1 deletion src/kernel/linux/nft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,8 @@ impl TrafficFilterAction {
let new = RtNetlink::new(rtnl_args.clone()).unwrap();
rtnl.get_or_insert(new)
};
let table_id = rtnl.next_table_id();
let table_id = rtnl.next_table_id(); // TODO: bug, should not be next_table_id and should be next_mark (?)
// TODO: mark could be random, create first and then pass to rtnl
let result = smallvec_inline![stmt::Statement::Mangle(stmt::Mangle {
key: make_meta(expr::MetaKey::Mark),
value: NUM(table_id),
Expand Down
11 changes: 4 additions & 7 deletions src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub trait Kernel: Sized {
) -> impl Future<Output = Result<Self::Handle>>;

/// Remove a flowspec from kernel using previously returned handle.
fn remove(&mut self, handle: &Self::Handle) -> impl Future<Output = Result<()>>;
fn remove(&mut self, handle: &Self::Handle) -> impl Future<Output = ()>;

/// Process notifications from kernel, timers, etc.
fn process(&mut self) -> impl Future<Output = Result<()>> {
Expand Down Expand Up @@ -81,13 +81,13 @@ impl Kernel for KernelAdapter {
}
}

async fn remove(&mut self, handle: &Self::Handle) -> Result<()> {
async fn remove(&mut self, handle: &Self::Handle) {
match (self, handle) {
(Self::Noop, KernelHandle::Noop) => Ok(()),
(Self::Noop, KernelHandle::Noop) => {}
#[cfg(linux)]
(Self::Linux(linux), KernelHandle::Linux(handle)) => linux.remove(handle).await,
#[cfg(linux)]
_ => Err(Error::HandleMismatch),
_ => panic!("kernel handle mismatch"),
}
}

Expand Down Expand Up @@ -140,9 +140,6 @@ pub enum Error {

#[error("flowspec matches nothing")]
MatchNothing,

#[error("kernel handle mismatch")]
HandleMismatch,
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
12 changes: 11 additions & 1 deletion src/kernel/rtnl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,15 @@ impl<K: Kernel> RtNetlink<K> {
.unwrap_or(self.args.init_table_id)
}

// TODO: add reserve table id for prefix

// TODO: make all delete action infallible (ignore errors)
pub async fn del(&mut self, id: &K::Handle) -> Result<()> {
let Some((prefix, _, table_id, _)) = self.routes.remove(id) else {
return Ok(());
};
self.del_route(table_id, prefix).await?;
// TODO: del_rule mark xxx lookup table xxx

let prefixes = self.rules.get_mut(&table_id).expect("route contains non-existent table??");
prefixes.remove(&prefix);
Expand All @@ -142,6 +146,9 @@ impl<K: Kernel> RtNetlink<K> {
Ok(())
}

// TODO: no need to explicitly delete rule
// when a route is removed, its `from all fwmark xxx lookup xxx` should be
// removed as well.
async fn del_rule(&self, table_id: u32) -> Result<()> {
// TODO: add RuleMessageBuilder to rtnetlink crate
let mut msg = RuleMessage::default();
Expand Down Expand Up @@ -236,7 +243,8 @@ impl<K: Kernel> RtNetlink<K> {
handle: &Handle,
iter: impl Iterator<Item = &mut (IpPrefix, IpAddr, u32, Vec<RouteAttribute>)>,
) -> Result<()> {
// TODO: remove route if next hop becomes unreachable
// TODO: ~~remove route~~ replace route with "unreachable" if next hop becomes
// unreachable
for (prefix, next_hop, table_id, attrs) in iter {
warn!("process {prefix}");
let new_attrs = Self::get_route2(handle, *next_hop).await?;
Expand All @@ -254,6 +262,8 @@ impl<K: Kernel> RtNetlink<K> {
Ok(())
}

// TODO: if network unreachable then insert unreachable route
// could be replaced with regular ones once the link is ready in `process_*`
async fn get_route(&self, ip: IpAddr) -> Result<Vec<RouteAttribute>> {
Self::get_route2(&self.handle, ip).await
}
Expand Down

0 comments on commit 8086d57

Please sign in to comment.