From b342449cdbda75f28073af9a48c91221d38999ae Mon Sep 17 00:00:00 2001 From: chrism Date: Sun, 14 Jan 2024 20:25:48 -0500 Subject: [PATCH] Try to fix udp subscription --- src/node/udp/idle.rs | 46 +++++++++++++++++++++------------------- tests/udp_integration.rs | 7 +----- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/src/node/udp/idle.rs b/src/node/udp/idle.rs index e4f08db..802b44f 100644 --- a/src/node/udp/idle.rs +++ b/src/node/udp/idle.rs @@ -58,39 +58,41 @@ impl Node { let mut buffer = vec![0u8; 10_000]; loop { - if let Ok(packet_as_bytes) = to_allocvec(&packet) { + // if let Ok(packet_as_bytes) = to_allocvec(&packet) { + let packet_as_bytes = match to_allocvec(&packet) { + Ok(packet_as_bytes) => packet_as_bytes, + Err(e) => { + error!("{:?}",e); + continue; + } + }; match udp::send_msg(&socket, packet_as_bytes.clone(), addr).await { Ok(n) => { info!(n); + match udp::await_response::(&socket, &mut buffer).await { + Ok(msg) => { + let delta = Utc::now() - msg.timestamp; + if delta <= chrono::Duration::zero() { + info!("Data is not newer, skipping to next subscription iteration"); + // continue; + } + + let mut data = data.lock().await; + *data = Some(msg); + sleep(rate).await; + } + Err(e) => { + error!("Subscription Error: {}", e); + } + }; } Err(e) => { let e = e.to_string(); info!(e); } }; - - match udp::await_response::(&socket, &mut buffer).await { - Ok(msg) => { - let delta = Utc::now() - msg.timestamp; - if delta <= chrono::Duration::zero() { - info!("Data is not newer, skipping to next subscription iteration"); - // continue; - } - - let mut data = data.lock().await; - *data = Some(msg); - } - Err(e) => { - error!("Subscription Error: {}", e); - } - }; } - else { - error!("Error creating UDP subscription packet"); - } - sleep(rate).await; } - } }); self.task_subscribe = Some(task_subscribe); diff --git a/tests/udp_integration.rs b/tests/udp_integration.rs index ff53265..3563e21 100644 --- a/tests/udp_integration.rs +++ b/tests/udp_integration.rs @@ -84,12 +84,7 @@ fn udp_subscription() { for i in 0..10 { let x = i as f32; - match node.publish(x) { - Ok(_) => (), - Err(e) => { - dbg!(e); - } - }; + node.publish(x).unwrap(); thread::sleep(Duration::from_millis(5)); let result = subscriber.get_subscribed_data().unwrap(); assert_eq!(x, result.data);