Skip to content

Commit

Permalink
Try to fix udp subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
quietlychris committed Jan 15, 2024
1 parent dd930c2 commit b342449
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 28 deletions.
46 changes: 24 additions & 22 deletions src/node/udp/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,39 +58,41 @@ impl<T: Message + 'static> Node<Udp, Idle, T> {
let mut buffer = vec![0u8; 10_000];

loop {
if let Ok(packet_as_bytes) = to_allocvec(&packet) {
// if let Ok(packet_as_bytes) = to_allocvec(&packet) {
let packet_as_bytes = match to_allocvec(&packet) {
Ok(packet_as_bytes) => packet_as_bytes,
Err(e) => {
error!("{:?}",e);
continue;
}
};
match udp::send_msg(&socket, packet_as_bytes.clone(), addr).await {
Ok(n) => {
info!(n);
match udp::await_response::<T>(&socket, &mut buffer).await {
Ok(msg) => {
let delta = Utc::now() - msg.timestamp;
if delta <= chrono::Duration::zero() {
info!("Data is not newer, skipping to next subscription iteration");
// continue;
}

let mut data = data.lock().await;
*data = Some(msg);
sleep(rate).await;
}
Err(e) => {
error!("Subscription Error: {}", e);
}
};
}
Err(e) => {
let e = e.to_string();
info!(e);
}
};

match udp::await_response::<T>(&socket, &mut buffer).await {
Ok(msg) => {
let delta = Utc::now() - msg.timestamp;
if delta <= chrono::Duration::zero() {
info!("Data is not newer, skipping to next subscription iteration");
// continue;
}

let mut data = data.lock().await;
*data = Some(msg);
}
Err(e) => {
error!("Subscription Error: {}", e);
}
};
}
else {
error!("Error creating UDP subscription packet");
}
sleep(rate).await;
}
}
});

self.task_subscribe = Some(task_subscribe);
Expand Down
7 changes: 1 addition & 6 deletions tests/udp_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,7 @@ fn udp_subscription() {
for i in 0..10 {
let x = i as f32;

match node.publish(x) {
Ok(_) => (),
Err(e) => {
dbg!(e);
}
};
node.publish(x).unwrap();
thread::sleep(Duration::from_millis(5));
let result = subscriber.get_subscribed_data().unwrap();
assert_eq!(x, result.data);
Expand Down

0 comments on commit b342449

Please sign in to comment.