Skip to content

Commit

Permalink
Fix UDP test with timing; this shouldn't be happening and has been gi…
Browse files Browse the repository at this point in the history
…ven a TO_DO
  • Loading branch information
quietlychris committed Jan 15, 2024
1 parent cd228fd commit 78dc8fe
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/error/host_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum HostError {
NonExistentTopic,
}

#[derive(Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize)]
pub enum HostOperation {
SUCCESS,
FAILURE,
Expand Down
47 changes: 29 additions & 18 deletions src/host/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ pub async fn process_udp(
panic!("{}", e);
}
};
// dbg!(&msg);

match msg.msg_type {
MsgType::SET => {
// println!("received {} bytes, to be assigned to: {}", n, &msg.name);
let tree = db
.open_tree(msg.topic.as_bytes())
.expect("Error opening tree");
let _db_result =

let _db_result = {
match tree.insert(msg.timestamp.to_string().as_bytes(), bytes) {
Ok(_prev_msg) => {
let mut count = count.lock().await;
*count += 1;
"SUCCESS".to_string()
info!("{:?}", msg.data);
crate::error::HostOperation::SUCCESS
}
Err(e) => e.to_string(),
};
Err(_e) => crate::error::HostOperation::FAILURE,
}
};
}
MsgType::GET => {
let tree = db
Expand All @@ -81,6 +81,7 @@ pub async fn process_udp(
}
MsgType::TOPICS => {
let names = db.tree_names();

let mut strings = Vec::new();
for name in names {
match std::str::from_utf8(&name[..]) {
Expand All @@ -99,20 +100,30 @@ pub async fn process_udp(
.unwrap();
strings.remove(index);

if let Ok(data) = to_allocvec(&strings) {
let packet: GenericMsg = GenericMsg {
msg_type: MsgType::TOPICS,
timestamp: Utc::now(),
topic: "".to_string(),
data_type: std::any::type_name::<Vec<String>>().to_string(),
data,
};
match to_allocvec(&strings) {
Ok(data) => {
let packet: GenericMsg = GenericMsg {
msg_type: MsgType::TOPICS,
timestamp: Utc::now(),
topic: "".to_string(),
data_type: std::any::type_name::<Vec<String>>().to_string(),
data,
};

if let Ok(bytes) = to_allocvec(&packet) {
if let Err(e) = socket.try_send_to(&bytes, return_addr) {
error!("Error sending data back on UDP/TOPICS: {:?}", e);
if let Ok(bytes) = to_allocvec(&packet) {
if let Ok(()) = socket.writable().await {
if let Err(e) = socket.try_send_to(&bytes, return_addr) {
error!(
"Error sending data back on UDP/TOPICS: {:?}",
e
);
}
}
}
}
Err(e) => {
error!("{:?}", e);
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions tests/udp_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ fn topics_list_udp() {

for i in 0..topics.len() {
nodes[i].publish(i).unwrap();
dbg!(host.topics());
// TO_DO: We really shouldn't need to sleep here
thread::sleep(Duration::from_millis(1));
assert_eq!(host.topics(), nodes[i].topics().unwrap().data);
let t = if i == 0 {
vec![topics[i].to_string()]
Expand All @@ -126,6 +129,7 @@ fn topics_list_udp() {
t.sort();
t
};

let mut nt = nodes[i].topics().unwrap().data;
nt.sort();
assert_eq!(t, nt);
Expand Down

0 comments on commit 78dc8fe

Please sign in to comment.