Skip to content

Commit

Permalink
WIP: test, Flowspec PartialEq
Browse files Browse the repository at this point in the history
  • Loading branch information
hack3ric committed Dec 30, 2024
1 parent 5f492d0 commit 93e6201
Showing 12 changed files with 185 additions and 86 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -70,6 +70,7 @@ test-case = "3.3.1"
version-compare = "0.2.0"
tokio = { version = "1.38.0", features = ["time"] }
macro_rules_attribute = "0.2.0"
map-macro = "0.3.0"

[profile.release]
opt-level = "z"
16 changes: 13 additions & 3 deletions src/bgp/flow.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ use strum::{EnumDiscriminants, FromRepr};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt};

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[derive(Clone, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Flowspec {
afi: Afi,
inner: BTreeSet<ComponentStore>,
@@ -143,8 +143,18 @@ impl Display for Flowspec {
}
}

#[derive(Debug, Clone, Hash, Serialize, Deserialize)]
#[allow(clippy::derived_hash_with_manual_eq)]
impl PartialEq for Flowspec {
fn eq(&self, other: &Self) -> bool {
// ComponentStore's PartialEq only compares kind, so manually implement instead
self.afi == other.afi
&& self.inner.len() == other.inner.len()
&& self.inner.iter().zip(other.inner.iter()).all(|(a, b)| a.0 == b.0)
}
}

impl Eq for Flowspec {}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComponentStore(pub Component);

impl PartialEq for ComponentStore {
13 changes: 9 additions & 4 deletions src/bgp/mod.rs
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ use tokio::time::{interval, Duration, Instant, Interval};
use State::*;

#[cfg(test)]
use tokio::sync::mpsc;
use {crate::integration_tests::TestEvent, tokio::sync::mpsc};

/// A (currently passive only) BGP session.
///
@@ -60,11 +60,11 @@ pub struct Session<S: AsyncRead + AsyncWrite + Unpin> {
state: State<S>,
routes: Routes,
#[cfg(test)]
event_tx: mpsc::Sender<()>,
event_tx: mpsc::Sender<TestEvent>,
}

impl<S: AsyncRead + AsyncWrite + Unpin> Session<S> {
pub async fn new(config: RunArgs, #[cfg(test)] event_tx: mpsc::Sender<()>) -> Result<Self> {
pub async fn new(config: RunArgs, #[cfg(test)] event_tx: mpsc::Sender<TestEvent>) -> Result<Self> {
let kernel = if config.dry_run {
KernelAdapter::Noop
} else {
@@ -192,9 +192,14 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Session<S> {
Ok(Message::Update(msg)) => if let Some((afi, safi)) = msg.is_end_of_rib() {
debug!("received End-of-RIB of ({afi}, {safi:?})");
#[cfg(test)]
let _ = self.event_tx.send(()).await;
let _ = self.event_tx.send(TestEvent::EndOfRib(afi, safi)).await;

} else {
debug!("received update: {msg:?}");
#[cfg(test)]
let _ = self.event_tx.send(TestEvent::Update(msg.clone())).await;

// here `msg` is partially moved
if msg.nlri.is_some() || msg.old_nlri.is_some() {
let route_info = Rc::new(msg.route_info);
for n in msg.nlri.into_iter().chain(msg.old_nlri) {
4 changes: 2 additions & 2 deletions src/bgp/msg.rs
Original file line number Diff line number Diff line change
@@ -721,7 +721,7 @@ impl MessageSend for UpdateMessage<'_> {
/// test the correctness of the deserialization logic. With that said, it is
/// important to make this function correct as well.
fn write_data(&self, buf: &mut Vec<u8>) {
let old_nlri = match &self.old_nlri.as_ref().map(Nlri::kind) {
let old_nlri = match &self.old_nlri.as_ref().map(Nlri::content) {
Some(NlriContent::Unicast { prefixes, next_hop: NextHop::V4(next_hop), .. }) => Some((prefixes, next_hop)),
Some(_) => panic!("BGP-4 NLRI supports IPv4 only"),
None => None,
@@ -731,7 +731,7 @@ impl MessageSend for UpdateMessage<'_> {

// BGP-4 withdrawn routes
extend_with_u16_len(buf, |buf| {
let Some(NlriContent::Unicast { prefixes, .. }) = &self.old_withdrawn.as_ref().map(Nlri::kind) else {
let Some(NlriContent::Unicast { prefixes, .. }) = &self.old_withdrawn.as_ref().map(Nlri::content) else {
panic!("BGP-4 withdrawn routes support IPv4 only");
};
prefixes.iter().for_each(|p| {
8 changes: 6 additions & 2 deletions src/bgp/nlri.rs
Original file line number Diff line number Diff line change
@@ -54,10 +54,14 @@ impl Nlri {
Ok(Self { afi, content: NlriContent::Flow { specs } })
}

pub fn kind(&self) -> &NlriContent {
pub fn content(&self) -> &NlriContent {
&self.content
}

pub fn into_content(self) -> NlriContent {
self.content
}

pub fn write_mp_reach(&self, buf: &mut Vec<u8>) {
self.write_mp(buf, true);
}
@@ -74,7 +78,7 @@ impl Nlri {
} as u8,
]);
buf.extend(u16::to_be_bytes(self.afi as _));
match self.kind() {
match self.content() {
NlriContent::Unicast { prefixes, next_hop } => {
extend_with_u16_len(buf, |buf| {
buf.push(NlriKind::Unicast as u8);
126 changes: 114 additions & 12 deletions src/integration_tests/basic.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,145 @@
use super::helpers::bird::ensure_bird_2;
use super::helpers::cli::run_cli_with_bird;
use super::helpers::kernel::{ensure_loopback_up, pick_port};
use super::helpers::str_to_file;
use super::test_local;
use super::{test_local, TestEvent};
use crate::args::Cli;
use crate::bgp::flow::Component::*;
use crate::bgp::flow::{Flowspec, Op, Ops};
use crate::bgp::nlri::NlriContent;
use anyhow::bail;
use clap::Parser;
use macro_rules_attribute::apply;
use map_macro::btree_map;
use std::collections::BTreeSet;
use std::time::Duration;
use tokio::select;
use tokio::time::sleep;

const BIRD_FILE: &str = "\
router id 10.234.56.78;
flow4 table myflow4;
flow6 table myflow6;
protocol static f4 {
flow4 { table myflow4; };
@@FLOW4@@;
}
protocol static f6 {
flow6 { table myflow6; };
@@FLOW6@@;
}
protocol bgp flow_test {
debug all;
connect delay time 1;
local ::1 port @@BIRD_PORT@@ as 65000;
neighbor ::1 port @@FLOW_PORT@@ as 65000;
multihop;
flow4 { table myflow4; import none; export all; };
flow6 { table myflow6; import none; export all; };
}";

#[apply(test_local!)]
async fn test_basic() -> anyhow::Result<()> {
async fn test_routes() -> anyhow::Result<()> {
ensure_bird_2()?;
ensure_loopback_up().await?;

let flows = btree_map! {
Flowspec::new_v4()
.with(DstPrefix("10.0.0.0/8".parse()?, 0))?
.with(PacketLen(Ops::new(Op::gt(1024))))? => "flow4 { dst 10.0.0.0/8; length > 1024; }",
Flowspec::new_v4()
.with(SrcPrefix("123.45.67.192/26".parse()?, 0))? => "flow4 { src 123.45.67.192/26; }",
Flowspec::new_v6()
.with(DstPrefix("fec0:1122:3344:5566:7788:99aa:bbcc:ddee/128".parse()?, 0))?
.with(TcpFlags(Op::all(0x3).and(Op::not_any(0x1)).and(Op::any(0xff)).or(Op::all(0x33))))?
.with(DstPort(Ops::new(Op::eq(6000))))?
.with(Fragment(Op::not_any(0b10).or(Op::not_any(0b100))))? => "flow6 { \
dst fec0:1122:3344:5566:7788:99aa:bbcc:ddee/128; \
tcp flags 0x03/0x0f && !0/0xff || 0x33/0x33; \
dport = 6000; \
fragment !is_fragment || !first_fragment; }"
};

let (flow4, flow6) = flows.iter().fold((String::new(), String::new()), |(v4, v6), (k, v)| {
if k.is_ipv4() {
(v4 + "route " + v + ";", v6)
} else {
(v4, v6 + "route " + v + ";")
}
});

let flow_port = pick_port().await?.to_string();
let cli = Cli::try_parse_from(["flow", "run", "-v", "--dry-run", &format!("--bind=[::1]:{flow_port}")])?;
let bird = include_str!("config/basic.bird.conf.in")
let bird = BIRD_FILE
.replace("@@BIRD_PORT@@", &pick_port().await?.to_string())
.replace("@@FLOW_PORT@@", &flow_port);
let bird = str_to_file(bird.as_bytes()).await?;
let (mut cli, mut bird, mut events, _temp_dir) = run_cli_with_bird(cli, bird.file_path()).await?;
.replace("@@FLOW_PORT@@", &flow_port)
.replace("@@FLOW4@@", &flow4)
.replace("@@FLOW6@@", &flow6);

let (mut cli, mut bird, mut events, _g) = run_cli_with_bird(cli, &bird).await?;

let mut end_of_rib_count = 0;
let mut visited = BTreeSet::new();
loop {
select! {
Some(()) = events.recv(), if !events.is_closed() => {
end_of_rib_count += 1;
if end_of_rib_count >= 2 {
break;
Some(event) = events.recv(), if !events.is_closed() => match event {
TestEvent::EndOfRib(_afi, _safi) => {
end_of_rib_count += 1;
if end_of_rib_count >= 2 {
break;
}
}
}
TestEvent::Update(msg) => {
for nlri in msg.nlri.into_iter().chain(msg.old_nlri) {
let NlriContent::Flow { specs } = nlri.into_content() else {
bail!("received NLRI other than flowspec");
};
for spec in specs {
if flows.contains_key(dbg!(&spec)) {
println!("contains");
if !visited.insert(spec.clone()) {
bail!("received duplicate flowspec: {spec}");
}
} else {
bail!("received unknown flowspec: {spec}");
}
}
}
}
},
_ = sleep(Duration::from_secs(10)) => bail!("timed out"),
code = &mut cli => bail!("CLI exited early with code {}", code??),
status = bird.wait() => bail!("BIRD exited early with {}", status?),
}
}

if flows.len() != visited.len() {
let orig_set = flows.into_iter().map(|x| x.0).collect::<BTreeSet<_>>();
let diff = orig_set.difference(&visited).collect::<BTreeSet<_>>();
bail!("some flowspecs not received: {diff:?}");
}

Ok(())
}

#[test]
fn test_test() -> anyhow::Result<()> {
assert_eq!(
Flowspec::new_v6()
.with(DstPrefix("fec0:1122:3344:5566:7788:99aa:bbcc:ddee/128".parse()?, 0))?
.with(TcpFlags(Op::all(0x3).and(Op::not_any(0x1)).and(Op::any(0xff)).or(Op::all(0x33))))?
.with(DstPort(Ops::new(Op::eq(6000))))?
.with(Fragment(Op::not_any(0b10).or(Op::not_any(0b100))))?,
Flowspec::new_v6()
.with(DstPrefix("fec0:1122:3344:5566:7788:99aa:bbcc:ddee/128".parse()?, 0))?
.with(TcpFlags(Op::all(0x3).and(Op::not_any(0xc)).and(Op::any(0xff)).or(Op::all(0x33))))?
.with(DstPort(Ops::new(Op::eq(6000))))?
.with(Fragment(Op::not_any(0b10).or(Op::not_any(0b100))))?
);
Ok(())
}
45 changes: 0 additions & 45 deletions src/integration_tests/config/basic.bird.conf.in

This file was deleted.

17 changes: 10 additions & 7 deletions src/integration_tests/helpers/cli.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use super::bird::run_bird;
use super::str_to_file;
use crate::args::Cli;
use crate::cli_entry;
use async_tempfile::TempDir;
use std::path::Path;
use crate::integration_tests::TestEvent;
use async_tempfile::{TempDir, TempFile};
use tokio::process::Child;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

pub type CliChild = JoinHandle<anyhow::Result<u8>>;

fn run_cli(options: Cli, event_tx: mpsc::Sender<()>) -> CliChild {
fn run_cli(options: Cli, event_tx: mpsc::Sender<TestEvent>) -> CliChild {
tokio::task::spawn_local(async {
let exit_code = cli_entry(options, event_tx).await;
anyhow::Ok(exit_code)
@@ -18,13 +19,15 @@ fn run_cli(options: Cli, event_tx: mpsc::Sender<()>) -> CliChild {

pub async fn run_cli_with_bird(
mut cli_opt: Cli,
bird_conf_path: impl AsRef<Path>,
) -> anyhow::Result<(CliChild, Child, mpsc::Receiver<()>, TempDir)> {
bird_conf: &str,
) -> anyhow::Result<(CliChild, Child, mpsc::Receiver<TestEvent>, (TempFile, TempDir))> {
let bird_conf_file = str_to_file(bird_conf.as_bytes()).await?;

let sock_dir = TempDir::new().await?;
cli_opt.run_dir = sock_dir.as_ref().into();
let bird = run_bird(bird_conf_path.as_ref(), sock_dir.join("bird.sock")).await?;
let bird = run_bird(bird_conf_file.file_path(), sock_dir.join("bird.sock")).await?;

let (event_tx, event_rx) = mpsc::channel(127);
let cli = run_cli(cli_opt, event_tx);
Ok((cli, bird, event_rx, sock_dir))
Ok((cli, bird, event_rx, (bird_conf_file, sock_dir)))
}
Loading

0 comments on commit 93e6201

Please sign in to comment.