Skip to content

Commit

Permalink
Distributors can properly conenct to eachother, since connection take…
Browse files Browse the repository at this point in the history
… a long time it is handled in a seperate thread.
  • Loading branch information
NathanMcMillan54 committed Sep 25, 2024
1 parent 98bd64e commit f50b057
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// This is for handling external distributors

use std::{io::Write, net::TcpStream, thread::sleep, time::Duration};
use std::{io::Write, net::TcpStream, thread::{sleep, spawn}, time::Duration};

use lib_dldistributor::external::ExternalDistributorRW;
use lib_dldistributor::{external::ExternalDistributorRW, IDLE_SLEEP};
use tcp::TcpDistributor;

use crate::DISTRIBUTOR;

use super::DarkLightDistributor;

pub mod tcp;
Expand All @@ -28,37 +30,58 @@ impl DarkLightDistributor {
continue;
}

sleep(Duration::from_millis(600)); // Give external distributor time to move on
println!("Verifying {}...", self.info.config.tcp_connections[i].clone());
let verify_ret = tcp_distributor.verify_distributor();
if verify_ret == false {
continue;
} else {
println!("Failed to verify");
continue;
}

println!("Adding distirbutor");

self.tcp_distributors.push(tcp_distributor);
}
}

pub fn tcp_distributor_handler(&mut self) {
self.setup_tcp_distributors();
self.tcp_distributors.push(TcpDistributor::new(TcpStream::connect("127.0.0.1:6000").unwrap(), String::new()));

loop {
for i in 0..self.tcp_distributors.len() {
if self.tcp_distributors[i].msg == String::from("skp") {
continue;
} else if self.tcp_distributors[i].msg == String::from("rm") {
self.tcp_distributors.remove(i);
}

if self.tcp_distributors[i].msg == String::from("INIT-DIS-CONN") {
println!("Verifying external distirbutor connection...");
let verify_ret = self.tcp_distributors[i].verify_distributor();
if verify_ret == false {
self.tcp_distributors.remove(i);
break;
}
self.tcp_distributors[i].msg = String::from("skp"); // Skip this distributor while the thread below is running
// Uses global distributor definition so there isn't interference between threads
spawn(move || {
unsafe {
println!("Verifying external distirbutor connection...");
let verify_ret = DISTRIBUTOR.as_mut().unwrap().tcp_distributors[i].verify_distributor();
if verify_ret == false {
DISTRIBUTOR.as_mut().unwrap().tcp_distributors[i].msg = String::from("rm"); // Remove in the next iteration
return;
}

println!("Connecting to verified distirbutor...");
let conn_ret = self.tcp_distributors[i].attempt_connect();
if conn_ret == false {
self.tcp_distributors.remove(i);
}
println!("Connecting to verified distirbutor...");
let conn_ret = DISTRIBUTOR.as_mut().unwrap().tcp_distributors[i].attempt_connect();
if conn_ret == false {
DISTRIBUTOR.as_mut().unwrap().tcp_distributors[i].msg = String::from("rm");
} else {
DISTRIBUTOR.as_mut().unwrap().tcp_distributors[i].msg = String::new();
}
return;
}
});
continue;
}

sleep(IDLE_SLEEP);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,13 @@ impl ExternalDistributorRW for TcpDistributor {
}
}

println!("done: {:?}", self.info);
// check magic num

true
}

fn attempt_connect(&mut self) -> bool {
self.stream.set_read_timeout(Some(Duration::from_millis(500)));
self.stream.write(DIST_INIT.as_bytes());
self.stream.flush();

let s1 = VS1.parse::<i32>().unwrap();
let s2 = VS2.parse::<i32>().unwrap();
Expand Down

0 comments on commit f50b057

Please sign in to comment.