Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat static routing #58

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/rust-build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ jobs:
- name: Run local dtntrigger test
run: ./tests/local_trigger_test.sh

- name: Run spray and wait routing test
run: ./tests/routing_saw.sh

- name: Run static routing test
run: ./tests/routing_static.sh

- name: Run external peer management (dtn) test
run: ./tests/ext_peer_management.sh

Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Plus:
* [Minimal TCP Convergence Layer](https://tools.ietf.org/html/draft-ietf-dtn-mtcpcl-01)
* A simple [HTTP Convergence Layer](doc/http-cl.md)
* A [HTTP pull-based Convergence Layer](doc/http-pull-cl.md)
* An IP neighorhood discovery service
* An IP neighborhood discovery service
* Convenient command line tools to interact with the daemon
* A simple web interface for status information about `dtnd`
* A [web-socket interface](doc/http-client-api.md) for application agents
Expand All @@ -25,7 +25,7 @@ The actual BP7 implementation (encoding/decoding) is available as a separate [pr

Additional dtn extensions and a client library are also [available](https://crates.io/crates/dtn7-plus).

Currently, a service discovery based on IPND but adapted to CBOR and BPv7, TCP, MTCP & HTTP CLs, sprayandwait/flooding/epidemic/sink-routing and restful/websocket command interfaces are implemented.
Currently, a service discovery based on IPND but adapted to CBOR and BPv7, TCP, MTCP & HTTP CLs, sprayandwait/flooding/epidemic/static/sink-routing and restful/websocket command interfaces are implemented.
Both addressing schemes, *dtn* as well as *ipn* are supported.
Furthermore, some CLI tools are provided to easily integrate *dtn7* into shell scripts.

Expand Down
1 change: 1 addition & 0 deletions core/dtn7/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ dtn7-codegen = { path = "../codegen", version = "0.1.0" }
byteorder = "1.4.3"
reqwest = { version = "0.11.13", default-features = false, features = ["json"] }
sha1 = "0.10.5"
glob-match = "0.2.1"

[lib]
name = "dtn7"
Expand Down
39 changes: 39 additions & 0 deletions core/dtn7/src/dtnd/httpd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use crate::core::peer::PeerType;
use crate::core::store::BundleStore;
use crate::peers_add;
use crate::peers_remove;
use crate::routing_cmd;
use crate::routing_get_data;
use crate::store_remove;
use crate::CONFIG;
use crate::DTNCORE;
Expand Down Expand Up @@ -329,6 +331,41 @@ async fn debug_rnd_peer() -> String {
res
}

async fn http_routing_cmd(
Query(params): Query<HashMap<String, String>>,
) -> Result<String, (StatusCode, &'static str)> {
if let Some(cmd) = params.get("c") {
if routing_cmd(cmd.to_string()).await.is_ok() {
Ok("Sent command to routing agent.".into())
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error sending cmd to routing agent",
))
}
} else {
//anyhow::bail!("missing filter criteria");
Err((
StatusCode::BAD_REQUEST,
"missing routing command parameter cmd",
))
}
}

async fn http_routing_getdata(
Query(params): Query<HashMap<String, String>>,
) -> Result<String, (StatusCode, &'static str)> {
let param = params.get("p").map_or("".to_string(), |f| f.to_string());
if let Ok(res) = routing_get_data(param).await {
Ok(res)
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error getting data from routing agent",
))
}
}

async fn http_peers_add(
Query(params): Query<HashMap<String, String>>,
) -> Result<String, (StatusCode, &'static str)> {
Expand Down Expand Up @@ -699,6 +736,8 @@ pub async fn spawn_httpd() -> Result<()> {
let mut app_local_only = Router::new()
.route("/peers/add", get(http_peers_add))
.route("/peers/del", get(http_peers_delete))
.route("/routing/cmd", get(http_routing_cmd).post(http_routing_cmd))
.route("/routing/getdata", get(http_routing_getdata))
.route("/send", post(send_post))
.route("/delete", get(delete).delete(delete))
.route("/register", get(register))
Expand Down
20 changes: 20 additions & 0 deletions core/dtn7/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,26 @@ pub fn store_delete_expired() {
}
}
}
pub async fn routing_cmd(cmd: String) -> Result<()> {
let chan = DTNCORE.lock().routing_agent.channel();
if let Err(err) = chan.send(RoutingCmd::Command(cmd)).await {
bail!("Error while sending notification: {}", err);
}
Ok(())
}

pub async fn routing_get_data(param: String) -> Result<String> {
let (reply_tx, reply_rx) = oneshot::channel();

let cmd_channel = DTNCORE.lock().routing_agent.channel();
if let Err(err) = cmd_channel.send(RoutingCmd::GetData(param, reply_tx)).await {
bail!("Error while sending command to routing agent: {}", err);
}
// wait for reply or timeout
let res = tokio::time::timeout(std::time::Duration::from_secs(1), reply_rx).await??;

Ok(res)
}

pub async fn routing_notify(notification: RoutingNotifcation) -> Result<()> {
let chan = DTNCORE.lock().routing_agent.channel();
Expand Down
4 changes: 4 additions & 0 deletions core/dtn7/src/routing/epidemic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send(format!("{:?}", core.history)).unwrap();
}
super::RoutingCmd::Notify(notification) => match notification {
RoutingNotifcation::SendingFailed(bid, cla_sender) => {
core.sending_failed(bid.as_str(), cla_sender.as_str());
Expand Down
4 changes: 4 additions & 0 deletions core/dtn7/src/routing/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ impl ExternalRoutingAgent {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send("unimplemented!".to_string()).unwrap();
}
super::RoutingCmd::Notify(notification) => {
notify(notification);
}
Expand Down
4 changes: 4 additions & 0 deletions core/dtn7/src/routing/flooding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ impl FloodingRoutingAgent {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send("unimplemented!".to_string()).unwrap();
}
super::RoutingCmd::Notify(_) => {}
}
}
Expand Down
17 changes: 15 additions & 2 deletions core/dtn7/src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod external;
pub mod flooding;
pub mod sink;
pub mod sprayandwait;
pub mod static_routing;

use crate::cla::ClaSenderTask;
use crate::core::bundlepack::BundlePack;
Expand All @@ -17,6 +18,7 @@ use external::ExternalRoutingAgent;
use flooding::FloodingRoutingAgent;
use sink::SinkRoutingAgent;
use sprayandwait::SprayAndWaitRoutingAgent;
use static_routing::StaticRoutingAgent;
use std::fmt::Debug;
use std::fmt::Display;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -37,11 +39,14 @@ pub enum RoutingAgentsEnum {
SinkRoutingAgent,
ExternalRoutingAgent,
SprayAndWaitRoutingAgent,
StaticRoutingAgent,
}

pub enum RoutingCmd {
SenderForBundle(BundlePack, oneshot::Sender<(Vec<ClaSenderTask>, bool)>),
Notify(RoutingNotifcation),
Command(String),
GetData(String, oneshot::Sender<String>),
Shutdown,
}

Expand All @@ -60,18 +65,26 @@ pub trait RoutingAgent: Debug + Display {
}

pub fn routing_algorithms() -> Vec<&'static str> {
vec!["epidemic", "flooding", "sink", "external", "sprayandwait"]
vec![
"epidemic",
"flooding",
"sink",
"external",
"sprayandwait",
"static",
]
}

pub fn routing_options() -> Vec<&'static str> {
vec!["sprayandwait.num_copies=<int>"]
vec!["sprayandwait.num_copies=<int>", "static.routes=<file>"]
}

pub fn new(routingagent: &str) -> RoutingAgentsEnum {
match routingagent {
"flooding" => FloodingRoutingAgent::new().into(),
"epidemic" => EpidemicRoutingAgent::new().into(),
"sink" => SinkRoutingAgent::new().into(),
"static" => StaticRoutingAgent::new().into(),
"external" => ExternalRoutingAgent::new().into(),
"sprayandwait" => SprayAndWaitRoutingAgent::new().into(),
_ => panic!("Unknown routing agent {}", routingagent),
Expand Down
4 changes: 4 additions & 0 deletions core/dtn7/src/routing/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl SinkRoutingAgent {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send("unimplemented!".to_string()).unwrap();
}
super::RoutingCmd::Notify(_) => {}
}
}
Expand Down
5 changes: 5 additions & 0 deletions core/dtn7/src/routing/sprayandwait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct SprayAndWaitRoutingAgent {
tx: mpsc::Sender<super::RoutingCmd>,
}

#[derive(Debug)]
pub struct SaWBundleData {
/// the number of copies we have left to spread
remaining_copies: usize,
Expand Down Expand Up @@ -203,6 +204,10 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send(format!("{:?}", core.history)).unwrap();
}
super::RoutingCmd::Notify(notification) => {
handle_notification(&mut core, notification);
}
Expand Down
Loading
Loading