diff --git a/src/host/quic.rs b/src/host/quic.rs index fe173e8..cff9ab2 100644 --- a/src/host/quic.rs +++ b/src/host/quic.rs @@ -147,6 +147,12 @@ pub async fn process_quic( strings.push(name.to_string()); } } + // Remove default sled tree name + let index = strings + .iter() + .position(|x| *x == "__sled__default") + .unwrap(); + strings.remove(index); if let Ok(data) = to_allocvec(&strings) { let packet: GenericMsg = GenericMsg { msg_type: MsgType::TOPICS, diff --git a/src/host/udp.rs b/src/host/udp.rs index 9b2a0fd..e19adf6 100644 --- a/src/host/udp.rs +++ b/src/host/udp.rs @@ -21,7 +21,6 @@ pub async fn process_udp( count: Arc>, max_buffer_size: usize, ) { - // let mut buf = [0u8; 10_000]; let mut buf = vec![0u8; max_buffer_size]; // TO_DO_PART_B: Tried to with try_read_buf(), but seems to panic? // let mut buf = Vec::with_capacity(max_buffer_size); @@ -35,13 +34,12 @@ pub async fn process_udp( Ok(msg) => msg, Err(e) => { error!("Had received Msg of {} bytes: {:?}, Error: {}", n, bytes, e); - panic!("{}", e); + continue; } }; match msg.msg_type { MsgType::SET => { - // println!("received {} bytes, to be assigned to: {}", n, &msg.name); let tree = db .open_tree(msg.topic.as_bytes()) .expect("Error opening tree"); diff --git a/src/node/quic/active.rs b/src/node/quic/active.rs index c52a03e..fb4178f 100644 --- a/src/node/quic/active.rs +++ b/src/node/quic/active.rs @@ -96,7 +96,7 @@ impl Node { }) } - pub fn topics(&self) -> Result, Error> { + pub fn topics(&self) -> Result>, Error> { let packet = GenericMsg { msg_type: MsgType::TOPICS, timestamp: Utc::now(), @@ -110,23 +110,30 @@ impl Node { self.rt_handle.block_on(async { let mut buf = self.buffer.lock().await; - if let Some(connection) = self.connection.clone() { - let (mut send, mut recv) = connection.open_bi().await.map_err(ConnectionError)?; - debug!("Node succesfully opened stream from connection"); - send.write_all(&packet_as_bytes).await.map_err(WriteError)?; - send.finish().await.map_err(WriteError)?; - - if let Some(n) = recv.read(&mut buf).await.map_err(ReadError)? { - let bytes = &buf[..n]; - let reply = from_bytes::(bytes)?; - let topics = from_bytes::>(&reply.data)?; - Ok(topics) - } else { - Ok(Vec::new()) - } - } else { - Ok(Vec::new()) - } + let connection = self.connection.clone().ok_or(Connection)?; + + let (mut send, mut recv) = connection.open_bi().await.map_err(ConnectionError)?; + debug!("Node succesfully opened stream from connection"); + send.write_all(&packet_as_bytes).await.map_err(WriteError)?; + send.finish().await.map_err(WriteError)?; + + let n = recv + .read(&mut buf) + .await + .map_err(ReadError)? + .ok_or(Connection)?; + let bytes = &buf[..n]; + let reply = from_bytes::(bytes)?; + let topics: Msg> = reply.try_into()?; + Ok(topics) + + /* if let Some(n) = recv.read(&mut buf).await.map_err(ReadError)?? { + let bytes = &buf[..n]; + let reply = from_bytes::(bytes)?; + let topics = from_bytes::>(&reply.data)?; + // let topics: Msg> = reply.try_into()?; + Ok(topics) + } */ }) } } diff --git a/src/node/quic/subscription.rs b/src/node/quic/subscription.rs index b2ceb78..e270538 100644 --- a/src/node/quic/subscription.rs +++ b/src/node/quic/subscription.rs @@ -1,20 +1,18 @@ -// use crate::msg::Message; -// use crate::Error; -// use crate::Quic; use crate::error::Error; use crate::msg::Message; use crate::node::network_config::Quic; use crate::node::Subscription; +use crate::Msg; use crate::Node; impl Node { // Should actually return a - pub fn get_subscribed_data(&self) -> Result { + pub fn get_subscribed_data(&self) -> Result, Error> { let data = self.subscription_data.clone(); self.rt_handle.block_on(async { let data = data.lock().await; match data.clone() { - Some(value) => Ok(value.data), + Some(data) => Ok(data), None => Err(Error::NoSubscriptionValue), } }) diff --git a/tests/quic_integration.rs b/tests/quic_integration.rs index 11e280f..3643f1c 100644 --- a/tests/quic_integration.rs +++ b/tests/quic_integration.rs @@ -25,19 +25,18 @@ pub fn initialize() { }); } -#[test] #[cfg(feature = "quic")] +type N = Quic; + +#[cfg(feature = "quic")] +#[test] fn integrate_host_and_single_node_quic() { - initialize(); - let mut host: Host = HostConfig::default() - .with_udp_config(None) - .build() - .expect("Error building Host with default QUIC configuration"); - host.start().expect("Error starting QUIC-enabled Host"); + let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap(); + host.start().unwrap(); println!("Host should be running in the background"); // Get the host up and running - let node: Node = NodeConfig::new("pose").build().unwrap(); + let node: Node = NodeConfig::new("pose").build().unwrap(); let node = node.activate().unwrap(); for i in 0..5 { @@ -55,3 +54,147 @@ fn integrate_host_and_single_node_quic() { assert_eq!(pose, result.data); } } + +#[cfg(feature = "quic")] +#[test] +fn request_non_existent_topic_quic() { + let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap(); + host.start().unwrap(); + println!("Host should be running in the background"); + + // Get the host up and running + let node: Node = NodeConfig::new("doesnt_exist").build().unwrap(); + let node = node.activate().unwrap(); + + // Requesting a topic that doesn't exist should return a recoverable error + for i in 0..5 { + println!("on loop: {}", i); + let result = node.request(); + dbg!(&result); + thread::sleep(Duration::from_millis(50)); + } +} + +#[cfg(feature = "quic")] +#[test] +fn node_send_options_quic() { + let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap(); + host.start().unwrap(); + + // Get the host up and running + let node_a = NodeConfig::>::new("pose") + .build() + .unwrap() + .activate() + .unwrap(); + let node_b = NodeConfig::>::new("pose") + .build() + .unwrap() + .activate() + .unwrap(); + + // Send Option with `Some(value)` + node_a.publish(Some(1.0)).unwrap(); + let result = node_b.request().unwrap(); + dbg!(&result); + assert_eq!(result.data.unwrap(), 1.0); + + // Send option with `None` + node_a.publish(None).unwrap(); + let result = node_b.request(); + dbg!(&result); + assert_eq!(result.unwrap().data, None); +} + +#[cfg(feature = "quic")] +#[test] +fn subscription_usize_quic() { + let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap(); + host.start().unwrap(); + + // Get the host up and running + let writer = NodeConfig::::new("subscription") + .build() + .unwrap() + .activate() + .unwrap(); + + // Create a subscription node with a query rate of 100 Hz + let reader = writer + .cfg + .clone() + .build() + .unwrap() + .subscribe(Duration::from_millis(10)) + .unwrap(); + + for i in 0..5 { + let test_value = i as usize; + writer.publish(test_value).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(100)); + // let result = reader.get_subscribed_data(); + match reader.get_subscribed_data() { + Ok(result) => assert_eq!(test_value, result.data), + Err(e) => println!("{:?}", e), + } + // dbg!(result); + } +} + +#[cfg(feature = "quic")] +#[test] +#[should_panic] +fn no_subscribed_value_quic() { + let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap(); + host.start().unwrap(); + + // Create a subscription node with a query rate of 10 Hz + let reader = NodeConfig::::new("subscription") + .build() + .unwrap() + .subscribe(Duration::from_millis(100)) + .unwrap(); + + // Unwrapping on an error should lead to panic + let _result: usize = reader.get_subscribed_data().unwrap().data; +} + +#[cfg(feature = "quic")] +#[test] +fn topics_list_quic() { + let mut host: Host = HostConfig::default().with_udp_config(None).build().unwrap(); + host.start().unwrap(); + println!("Host should be running in the background"); + + // Get the host up and running + let topics: Vec = ["a", "b", "c", "d", "e", "f"] + .iter() + .map(|x| x.to_string()) + .collect(); + dbg!(&topics); + let mut nodes = Vec::with_capacity(topics.len()); + for topic in topics.clone() { + let node: Node = NodeConfig::new(topic).build().unwrap(); + let node = node.activate().unwrap(); + nodes.push(node); + } + + for i in 0..topics.len() { + nodes[i].publish(i).unwrap(); + thread::sleep(Duration::from_millis(1)); + assert_eq!(host.topics(), nodes[i].topics().unwrap().data); + let t = if i == 0 { + vec![topics[i].to_string()] + } else { + let mut t = topics[0..i + 1] + .iter() + .map(|x| x.to_string()) + .collect::>(); + t.sort(); + t + }; + let mut nt = nodes[i].topics().unwrap().data; + nt.sort(); + assert_eq!(t, nt); + } +} diff --git a/tests/tcp_integration.rs b/tests/tcp_integration.rs index 85ce86f..8d5dc01 100644 --- a/tests/tcp_integration.rs +++ b/tests/tcp_integration.rs @@ -7,14 +7,16 @@ use common::Pose; use std::thread; use std::time::Duration; +type N = Tcp; + #[test] -fn integrate_host_and_single_node() { +fn integrate_host_and_single_node_tcp() { let mut host: Host = HostConfig::default().build().unwrap(); host.start().unwrap(); println!("Host should be running in the background"); // Get the host up and running - let node: Node = NodeConfig::new("pose").build().unwrap(); + let node: Node = NodeConfig::new("pose").build().unwrap(); let node = node.activate().unwrap(); for i in 0..5 { @@ -34,13 +36,13 @@ fn integrate_host_and_single_node() { } #[test] -fn request_non_existent_topic() { +fn request_non_existent_topic_tcp() { let mut host: Host = HostConfig::default().build().unwrap(); host.start().unwrap(); println!("Host should be running in the background"); // Get the host up and running - let node: Node = NodeConfig::new("doesnt_exist").build().unwrap(); + let node: Node = NodeConfig::new("doesnt_exist").build().unwrap(); let node = node.activate().unwrap(); // Requesting a topic that doesn't exist should return a recoverable error @@ -53,17 +55,17 @@ fn request_non_existent_topic() { } #[test] -fn node_send_options() { +fn node_send_options_tcp() { let mut host: Host = HostConfig::default().build().unwrap(); host.start().unwrap(); // Get the host up and running - let node_a = NodeConfig::>::new("pose") + let node_a = NodeConfig::>::new("pose") .build() .unwrap() .activate() .unwrap(); - let node_b = NodeConfig::>::new("pose") + let node_b = NodeConfig::>::new("pose") .build() .unwrap() .activate() @@ -83,13 +85,12 @@ fn node_send_options() { } #[test] -fn subscription_usize() { +fn subscription_usize_tcp() { let mut host: Host = HostConfig::default().build().unwrap(); host.start().unwrap(); - println!("Host should be running in the background"); // Get the host up and running - let writer = NodeConfig::::new("subscription") + let writer = NodeConfig::::new("subscription") .build() .unwrap() .activate() @@ -119,12 +120,12 @@ fn subscription_usize() { #[test] #[should_panic] -fn no_subscribed_value() { +fn no_subscribed_value_tcp() { let mut host: Host = HostConfig::default().build().unwrap(); host.start().unwrap(); // Create a subscription node with a query rate of 10 Hz - let reader = NodeConfig::::new("subscription") + let reader = NodeConfig::::new("subscription") .build() .unwrap() .subscribe(Duration::from_millis(100)) @@ -136,8 +137,6 @@ fn no_subscribed_value() { #[test] fn topics_list_tcp() { - type N = Tcp; - let mut host: Host = HostConfig::default().build().unwrap(); host.start().unwrap(); println!("Host should be running in the background"); diff --git a/tests/udp_integration.rs b/tests/udp_integration.rs index ab0fd8d..15b60bd 100644 --- a/tests/udp_integration.rs +++ b/tests/udp_integration.rs @@ -7,6 +7,8 @@ use common::Pose; use std::thread; use std::time::Duration; +type N = Udp; + #[test] fn integrate_host_and_single_node_udp() { let mut host: Host = HostConfig::default().build().unwrap(); @@ -14,7 +16,7 @@ fn integrate_host_and_single_node_udp() { println!("Host should be running in the background"); // Get the host up and running - let node: Node = NodeConfig::new("pose").build().unwrap(); + let node: Node = NodeConfig::new("pose").build().unwrap(); let node = node.activate().unwrap(); for i in 0..5 { @@ -34,68 +36,107 @@ fn integrate_host_and_single_node_udp() { } #[test] -fn simple_udp() { - let mut host = HostConfig::default().build().unwrap(); +fn request_non_existent_topic_udp() { + let mut host: Host = HostConfig::default().build().unwrap(); host.start().unwrap(); - println!("Started host"); + println!("Host should be running in the background"); + + // Get the host up and running + let node: Node = NodeConfig::new("doesnt_exist").build().unwrap(); + let node = node.activate().unwrap(); - let node = NodeConfig::::new("num") + // Requesting a topic that doesn't exist should return a recoverable error + for i in 0..5 { + println!("on loop: {}", i); + let result = node.request(); + dbg!(&result); + thread::sleep(Duration::from_millis(50)); + } +} + +#[test] +fn node_send_options_udp() { + let mut host: Host = HostConfig::default().build().unwrap(); + host.start().unwrap(); + + // Get the host up and running + let node_a = NodeConfig::>::new("pose") + .build() + .unwrap() + .activate() + .unwrap(); + let node_b = NodeConfig::>::new("pose") .build() .unwrap() .activate() .unwrap(); - for i in 0..10 { - let x = i as f32; - - match node.publish(x) { - Ok(_) => (), - Err(e) => { - dbg!(e); - } - }; - thread::sleep(Duration::from_millis(1)); - let result = node.request().unwrap(); - assert_eq!(x, result.data); - } + // Send Option with `Some(value)` + node_a.publish(Some(1.0)).unwrap(); + let result = node_b.request().unwrap(); + dbg!(&result); + assert_eq!(result.data.unwrap(), 1.0); + + // Send option with `None` + node_a.publish(None).unwrap(); + let result = node_b.request(); + dbg!(&result); + assert_eq!(result.unwrap().data, None); } #[test] -fn udp_subscription() { - let mut host = HostConfig::default().build().unwrap(); +fn subscription_usize_udp() { + let mut host: Host = HostConfig::default().build().unwrap(); host.start().unwrap(); - println!("Started host"); - let node = NodeConfig::::new("num") + // Get the host up and running + let writer = NodeConfig::::new("subscription") .build() .unwrap() .activate() .unwrap(); - let subscriber = NodeConfig::::new("num") + + // Create a subscription node with a query rate of 100 Hz + let reader = writer + .cfg + .clone() .build() .unwrap() - .subscribe(Duration::from_millis(1)) + .subscribe(Duration::from_millis(10)) .unwrap(); - for i in 0..10 { - let x = i as f32; - - match node.publish(x) { - Ok(_) => (), - Err(e) => { - dbg!(e); - } - }; - thread::sleep(Duration::from_millis(5)); - let result = subscriber.get_subscribed_data().unwrap(); - assert_eq!(x, result.data); + for i in 0..5 { + let test_value = i as usize; + writer.publish(test_value).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(100)); + // let result = reader.get_subscribed_data(); + match reader.get_subscribed_data() { + Ok(result) => assert_eq!(test_value, result.data), + Err(e) => println!("{:?}", e), + } + // dbg!(result); } } #[test] -fn topics_list_udp() { - type N = Udp; +#[should_panic] +fn no_subscribed_value_udp() { + let mut host: Host = HostConfig::default().build().unwrap(); + host.start().unwrap(); + + // Create a subscription node with a query rate of 10 Hz + let reader = NodeConfig::::new("subscription") + .build() + .unwrap() + .subscribe(Duration::from_millis(100)) + .unwrap(); + // Unwrapping on an error should lead to panic + let _result: usize = reader.get_subscribed_data().unwrap().data; +} + +#[test] +fn topics_list_udp() { let mut host: Host = HostConfig::default().build().unwrap(); host.start().unwrap(); println!("Host should be running in the background"); @@ -115,8 +156,6 @@ fn topics_list_udp() { for i in 0..topics.len() { nodes[i].publish(i).unwrap(); - dbg!(host.topics()); - // TO_DO: We really shouldn't need to sleep here thread::sleep(Duration::from_millis(1)); assert_eq!(host.topics(), nodes[i].topics().unwrap().data); let t = if i == 0 { @@ -129,7 +168,6 @@ fn topics_list_udp() { t.sort(); t }; - let mut nt = nodes[i].topics().unwrap().data; nt.sort(); assert_eq!(t, nt);