Skip to content

Commit

Permalink
Update UDP and QUIC test suites to match TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
quietlychris committed Jan 15, 2024
1 parent 78dc8fe commit a87fa69
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 89 deletions.
6 changes: 6 additions & 0 deletions src/host/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ pub async fn process_quic(
strings.push(name.to_string());
}
}
// Remove default sled tree name
let index = strings
.iter()
.position(|x| *x == "__sled__default")
.unwrap();
strings.remove(index);
if let Ok(data) = to_allocvec(&strings) {
let packet: GenericMsg = GenericMsg {
msg_type: MsgType::TOPICS,
Expand Down
4 changes: 1 addition & 3 deletions src/host/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub async fn process_udp(
count: Arc<Mutex<usize>>,
max_buffer_size: usize,
) {
// let mut buf = [0u8; 10_000];
let mut buf = vec![0u8; max_buffer_size];
// TO_DO_PART_B: Tried to with try_read_buf(), but seems to panic?
// let mut buf = Vec::with_capacity(max_buffer_size);
Expand All @@ -35,13 +34,12 @@ pub async fn process_udp(
Ok(msg) => msg,
Err(e) => {
error!("Had received Msg of {} bytes: {:?}, Error: {}", n, bytes, e);
panic!("{}", e);
continue;
}
};

match msg.msg_type {
MsgType::SET => {
// println!("received {} bytes, to be assigned to: {}", n, &msg.name);
let tree = db
.open_tree(msg.topic.as_bytes())
.expect("Error opening tree");
Expand Down
43 changes: 25 additions & 18 deletions src/node/quic/active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<T: Message + 'static> Node<Quic, Active, T> {
})
}

pub fn topics(&self) -> Result<Vec<String>, Error> {
pub fn topics(&self) -> Result<Msg<Vec<String>>, Error> {
let packet = GenericMsg {
msg_type: MsgType::TOPICS,
timestamp: Utc::now(),
Expand All @@ -110,23 +110,30 @@ impl<T: Message + 'static> Node<Quic, Active, T> {
self.rt_handle.block_on(async {
let mut buf = self.buffer.lock().await;

if let Some(connection) = self.connection.clone() {
let (mut send, mut recv) = connection.open_bi().await.map_err(ConnectionError)?;
debug!("Node succesfully opened stream from connection");
send.write_all(&packet_as_bytes).await.map_err(WriteError)?;
send.finish().await.map_err(WriteError)?;

if let Some(n) = recv.read(&mut buf).await.map_err(ReadError)? {
let bytes = &buf[..n];
let reply = from_bytes::<GenericMsg>(bytes)?;
let topics = from_bytes::<Vec<String>>(&reply.data)?;
Ok(topics)
} else {
Ok(Vec::new())
}
} else {
Ok(Vec::new())
}
let connection = self.connection.clone().ok_or(Connection)?;

let (mut send, mut recv) = connection.open_bi().await.map_err(ConnectionError)?;
debug!("Node succesfully opened stream from connection");
send.write_all(&packet_as_bytes).await.map_err(WriteError)?;
send.finish().await.map_err(WriteError)?;

let n = recv
.read(&mut buf)
.await
.map_err(ReadError)?
.ok_or(Connection)?;
let bytes = &buf[..n];
let reply = from_bytes::<GenericMsg>(bytes)?;
let topics: Msg<Vec<String>> = reply.try_into()?;
Ok(topics)

/* if let Some(n) = recv.read(&mut buf).await.map_err(ReadError)?? {
let bytes = &buf[..n];
let reply = from_bytes::<GenericMsg>(bytes)?;
let topics = from_bytes::<Vec<String>>(&reply.data)?;
// let topics: Msg<Vec<String>> = reply.try_into()?;
Ok(topics)
} */
})
}
}
8 changes: 3 additions & 5 deletions src/node/quic/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
// use crate::msg::Message;
// use crate::Error;
// use crate::Quic;
use crate::error::Error;
use crate::msg::Message;
use crate::node::network_config::Quic;
use crate::node::Subscription;
use crate::Msg;
use crate::Node;

impl<T: Message + 'static> Node<Quic, Subscription, T> {
// Should actually return a <T>
pub fn get_subscribed_data(&self) -> Result<T, Error> {
pub fn get_subscribed_data(&self) -> Result<Msg<T>, Error> {
let data = self.subscription_data.clone();
self.rt_handle.block_on(async {
let data = data.lock().await;
match data.clone() {
Some(value) => Ok(value.data),
Some(data) => Ok(data),
None => Err(Error::NoSubscriptionValue),
}
})
Expand Down
159 changes: 151 additions & 8 deletions tests/quic_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,18 @@ pub fn initialize() {
});
}

#[test]
#[cfg(feature = "quic")]
type N = Quic;

#[cfg(feature = "quic")]
#[test]
fn integrate_host_and_single_node_quic() {
initialize();
let mut host: Host = HostConfig::default()
.with_udp_config(None)
.build()
.expect("Error building Host with default QUIC configuration");
host.start().expect("Error starting QUIC-enabled Host");
let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap();
host.start().unwrap();
println!("Host should be running in the background");

// Get the host up and running
let node: Node<Quic, Idle, Pose> = NodeConfig::new("pose").build().unwrap();
let node: Node<N, Idle, Pose> = NodeConfig::new("pose").build().unwrap();
let node = node.activate().unwrap();

for i in 0..5 {
Expand All @@ -55,3 +54,147 @@ fn integrate_host_and_single_node_quic() {
assert_eq!(pose, result.data);
}
}

#[cfg(feature = "quic")]
#[test]
fn request_non_existent_topic_quic() {
let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap();
host.start().unwrap();
println!("Host should be running in the background");

// Get the host up and running
let node: Node<N, Idle, Pose> = NodeConfig::new("doesnt_exist").build().unwrap();
let node = node.activate().unwrap();

// Requesting a topic that doesn't exist should return a recoverable error
for i in 0..5 {
println!("on loop: {}", i);
let result = node.request();
dbg!(&result);
thread::sleep(Duration::from_millis(50));
}
}

#[cfg(feature = "quic")]
#[test]
fn node_send_options_quic() {
let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap();
host.start().unwrap();

// Get the host up and running
let node_a = NodeConfig::<N, Option<f32>>::new("pose")
.build()
.unwrap()
.activate()
.unwrap();
let node_b = NodeConfig::<N, Option<f32>>::new("pose")
.build()
.unwrap()
.activate()
.unwrap();

// Send Option with `Some(value)`
node_a.publish(Some(1.0)).unwrap();
let result = node_b.request().unwrap();
dbg!(&result);
assert_eq!(result.data.unwrap(), 1.0);

// Send option with `None`
node_a.publish(None).unwrap();
let result = node_b.request();
dbg!(&result);
assert_eq!(result.unwrap().data, None);
}

#[cfg(feature = "quic")]
#[test]
fn subscription_usize_quic() {
let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap();
host.start().unwrap();

// Get the host up and running
let writer = NodeConfig::<N, usize>::new("subscription")
.build()
.unwrap()
.activate()
.unwrap();

// Create a subscription node with a query rate of 100 Hz
let reader = writer
.cfg
.clone()
.build()
.unwrap()
.subscribe(Duration::from_millis(10))
.unwrap();

for i in 0..5 {
let test_value = i as usize;
writer.publish(test_value).unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
// let result = reader.get_subscribed_data();
match reader.get_subscribed_data() {
Ok(result) => assert_eq!(test_value, result.data),
Err(e) => println!("{:?}", e),
}
// dbg!(result);
}
}

#[cfg(feature = "quic")]
#[test]
#[should_panic]
fn no_subscribed_value_quic() {
let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap();
host.start().unwrap();

// Create a subscription node with a query rate of 10 Hz
let reader = NodeConfig::<N, usize>::new("subscription")
.build()
.unwrap()
.subscribe(Duration::from_millis(100))
.unwrap();

// Unwrapping on an error should lead to panic
let _result: usize = reader.get_subscribed_data().unwrap().data;
}

#[cfg(feature = "quic")]
#[test]
fn topics_list_quic() {
let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap();
host.start().unwrap();
println!("Host should be running in the background");

// Get the host up and running
let topics: Vec<String> = ["a", "b", "c", "d", "e", "f"]
.iter()
.map(|x| x.to_string())
.collect();
dbg!(&topics);
let mut nodes = Vec::with_capacity(topics.len());
for topic in topics.clone() {
let node: Node<N, Idle, usize> = NodeConfig::new(topic).build().unwrap();
let node = node.activate().unwrap();
nodes.push(node);
}

for i in 0..topics.len() {
nodes[i].publish(i).unwrap();
thread::sleep(Duration::from_millis(1));
assert_eq!(host.topics(), nodes[i].topics().unwrap().data);
let t = if i == 0 {
vec![topics[i].to_string()]
} else {
let mut t = topics[0..i + 1]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>();
t.sort();
t
};
let mut nt = nodes[i].topics().unwrap().data;
nt.sort();
assert_eq!(t, nt);
}
}
27 changes: 13 additions & 14 deletions tests/tcp_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ use common::Pose;
use std::thread;
use std::time::Duration;

type N = Tcp;

#[test]
fn integrate_host_and_single_node() {
fn integrate_host_and_single_node_tcp() {
let mut host: Host = HostConfig::default().build().unwrap();
host.start().unwrap();
println!("Host should be running in the background");

// Get the host up and running
let node: Node<Tcp, Idle, Pose> = NodeConfig::new("pose").build().unwrap();
let node: Node<N, Idle, Pose> = NodeConfig::new("pose").build().unwrap();
let node = node.activate().unwrap();

for i in 0..5 {
Expand All @@ -34,13 +36,13 @@ fn integrate_host_and_single_node() {
}

#[test]
fn request_non_existent_topic() {
fn request_non_existent_topic_tcp() {
let mut host: Host = HostConfig::default().build().unwrap();
host.start().unwrap();
println!("Host should be running in the background");

// Get the host up and running
let node: Node<Tcp, Idle, Pose> = NodeConfig::new("doesnt_exist").build().unwrap();
let node: Node<N, Idle, Pose> = NodeConfig::new("doesnt_exist").build().unwrap();
let node = node.activate().unwrap();

// Requesting a topic that doesn't exist should return a recoverable error
Expand All @@ -53,17 +55,17 @@ fn request_non_existent_topic() {
}

#[test]
fn node_send_options() {
fn node_send_options_tcp() {
let mut host: Host = HostConfig::default().build().unwrap();
host.start().unwrap();

// Get the host up and running
let node_a = NodeConfig::<Tcp, Option<f32>>::new("pose")
let node_a = NodeConfig::<N, Option<f32>>::new("pose")
.build()
.unwrap()
.activate()
.unwrap();
let node_b = NodeConfig::<Tcp, Option<f32>>::new("pose")
let node_b = NodeConfig::<N, Option<f32>>::new("pose")
.build()
.unwrap()
.activate()
Expand All @@ -83,13 +85,12 @@ fn node_send_options() {
}

#[test]
fn subscription_usize() {
fn subscription_usize_tcp() {
let mut host: Host = HostConfig::default().build().unwrap();
host.start().unwrap();
println!("Host should be running in the background");

// Get the host up and running
let writer = NodeConfig::<Tcp, usize>::new("subscription")
let writer = NodeConfig::<N, usize>::new("subscription")
.build()
.unwrap()
.activate()
Expand Down Expand Up @@ -119,12 +120,12 @@ fn subscription_usize() {

#[test]
#[should_panic]
fn no_subscribed_value() {
fn no_subscribed_value_tcp() {
let mut host: Host = HostConfig::default().build().unwrap();
host.start().unwrap();

// Create a subscription node with a query rate of 10 Hz
let reader = NodeConfig::<Tcp, usize>::new("subscription")
let reader = NodeConfig::<N, usize>::new("subscription")
.build()
.unwrap()
.subscribe(Duration::from_millis(100))
Expand All @@ -136,8 +137,6 @@ fn no_subscribed_value() {

#[test]
fn topics_list_tcp() {
type N = Tcp;

let mut host: Host = HostConfig::default().build().unwrap();
host.start().unwrap();
println!("Host should be running in the background");
Expand Down
Loading

0 comments on commit a87fa69

Please sign in to comment.