Skip to content

Commit

Permalink
Seperated libdistributor and distributor bin, trying to prevent error…
Browse files Browse the repository at this point in the history
…s from StreamFile in dlwp.
  • Loading branch information
NathanMcMillan54 committed Nov 21, 2024
1 parent 187ac25 commit a697e78
Show file tree
Hide file tree
Showing 27 changed files with 66 additions and 70 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ signal-hook = "0.3.17"
members = [
"darklight_driver/",
"dl_instance/",
"dl_instance/distributor/distributor/",
"dl_instance/distributors/posix_distributor/",
"dl_instance/libdistributor/",
"dlwp/",
"tools/client",
"tools/dlcmd",
Expand Down
8 changes: 0 additions & 8 deletions darklight_driver/src/cns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub fn setup_cns() -> ([i32; 6], String, String) {
s2 = rng.gen_range(0..ALPHABET_LEN);
encoder.change_setting(s1, s2);
encoder.set_alphabet();
println!("Running");
}

// In the future this value can (and should) be increased to i32::MAX
Expand Down Expand Up @@ -134,13 +133,6 @@ pub fn cns_add(input: Vec<&str>) {
if recv_first == false {
let read = stream.read();
if read.is_empty() {
attempts += 1;

if attempts > 500 {
attempts = 0;
send_first = false;
}

continue;
}

Expand Down
25 changes: 0 additions & 25 deletions dl_instance/distributor/distributor/Cargo.toml

This file was deleted.

12 changes: 12 additions & 0 deletions dl_instance/distributors/posix_distributor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "posix_distributor"
description = "A TCP and serial distributor intended to run on POSIX-like systems"
version = "0.1.0"
edition = "2021"

[dependencies]
dlwp = { path = "../../../dlwp/", features = ["use_io", "include_chrono", "include_serde"] }
lazy_static = { version = "1.5.0" }
libdistributor = { path = "../../libdistributor/" }
reqwest = { version = "0.11.24" }
tokio = { version = "1.36.0", features = ["full"] }
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@ fn bin() {
}

fn main() {
#[cfg(feature = "bin")]
bin()
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,16 @@ use std::{
time::Duration,
};

use distributors::tcp::TcpDistributor;
use crate::distributors::tcp::TcpDistributor;
use dlwp::{config::DistributorConfig, encryption::EncryptionInfo};
use dlwp::serde_json;
use input::check_user_input;
use lib_dldistributor::{
use crate::input::check_user_input;
use libdistributor::{
connections::{LocalConnections, PendingMessage, PendingMessages}, encryption::DistributorEncryption, get_a_magic_num, info::DistributorInfo
};

use crate::{DISTRIBUTOR, DISTRIBUTOR_ID, DISTRIBUTOR_UID};

pub(crate) mod distributors;
pub(crate) mod input;
pub(crate) mod users;
pub(crate) mod verify_server;

// "Magic number" information
#[cfg(debug_assertions)]
#[path = "magicn_debug.rs"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{io::Write, net::TcpStream, thread::{sleep, spawn}, time::Duration};

use dlwp::{codes::{Code, READ_TIMEDOUT, STATUS_OK}, distributor::READ_AVAILABLE, message::{valid_message_string, ReceiveInfo, MSG_END, MSG_INIT}};
use lib_dldistributor::{connections::PendingMessage, external::ExternalDistributorRW, IDLE_SLEEP};
use libdistributor::{connections::PendingMessage, external::ExternalDistributorRW, IDLE_SLEEP};
use tcp::TcpDistributor;

use crate::DISTRIBUTOR;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use dlwp::{cerpton::{libcerpton_decode, libcerpton_encode}, codes::{INVALID_RR, READ_FAILED, STATUS_OK, WRITE_FAILED}, distributor::DIST_INIT};
use lib_dldistributor::external::{ExternalDistributorInfo, ExternalDistributorRW};
use libdistributor::external::{ExternalDistributorInfo, ExternalDistributorRW};
use std::{borrow::Borrow, io::{Read, Write}, net::TcpStream, time::Duration};

use crate::{distributor::{encrpytion::current_encryption, magicn::get_my_magic_num}, DISTRIBUTOR_ID, VS1, VS2, VS3};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ use std::{
time::Duration,
};

use distributor::{users, DarkLightDistributor};
use dlwp::encryption::EncryptionInfo;
use lib_dldistributor::encryption::DistributorEncryption;

extern crate dlwp;
#[macro_use]
extern crate lib_dldistributor;
extern crate libdistributor;
#[macro_use]
extern crate tokio;

Expand All @@ -32,7 +28,13 @@ const DISTRIBUTOR_UID: &str = env!("DIST_UID");
// Path to config file
const CONFIG_PATH: &str = "distributor_config.json";

mod distributor;
pub(crate) mod distributor;
pub(crate) mod distributors;
pub(crate) mod input;
pub(crate) mod users;
pub(crate) mod verify_server;

pub(crate) use distributor::DarkLightDistributor;

static mut DISTRIBUTOR: Option<DarkLightDistributor> = None;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use dlwp::{
id::LId,
message::{ReceiveInfo, MSG_END, MSG_INIT},
};
use lib_dldistributor::connections::PendingMessage;
use libdistributor::connections::PendingMessage;

use super::DarkLightDistributor;
use crate::DISTRIBUTOR;
Expand Down Expand Up @@ -45,7 +45,7 @@ impl DarkLightDistributor {
return None;
}

sleep(Duration::from_micros(100));
sleep(Duration::from_micros(50));
}

let read_err = io_err_check!(stream.read(&mut buf));
Expand Down Expand Up @@ -81,16 +81,9 @@ impl DarkLightDistributor {

pub fn tcp_user_handler(&mut self) {
loop {
//sleep_condition!(self.user_connections.tcp_connections.len() <= 1); // Loop delays then continues if there <= 1 users
sleep_condition!(self.user_connections.tcp_connections.len() <= 1); // Loop delays then continues if there <= 1 users

for (id, mut stream) in self.user_connections.tcp_connections.iter() {
/*if self.local_pending_messages.contains_key(id) {
if self.local_pending_messages[id].can_send == true {
println!("i have a message: {}", self.local_pending_messages[id].message_str);
self.tcp_user_write(stream, self.local_pending_messages[id].message_str.clone());
}
}*/

if self.local_pending_messages.contains_key(id) {
if self.local_pending_messages[id].message_str == String::new() {
self.local_pending_messages.remove(id);
Expand Down Expand Up @@ -152,11 +145,18 @@ impl DarkLightDistributor {
}
}

let mut removed = vec![];
for (id, msg) in self.local_pending_messages.iter() {
if msg.can_send == false && msg.message_str == String::from("rm") { // Remove users
println!("removing user: {}", id);
self.user_connections.tcp_connections.remove(id);
removed.push(id.clone());
}
}

for i in 0..removed.len() {
self.local_pending_messages.remove(&removed[i]);
}
}
}
}
8 changes: 8 additions & 0 deletions dl_instance/libdistributor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "libdistributor"
version = "0.1.0"
edition = "2021"
description = "A TCP and serial distributor intended to run on POSIX-like systems"

[dependencies]
dlwp = { path = "../../dlwp/", features = ["use_io", "include_chrono", "include_serde"] }
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
30 changes: 21 additions & 9 deletions dlwp/src/stream/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl StreamFile {
};

file.create();
file.write();
return file;
}

Expand All @@ -51,7 +52,8 @@ impl StreamFile {
while self.exists(pr) == false {
sleep(Duration::from_micros(100));
}
sleep(Duration::from_micros(200));

sleep(Duration::from_millis(150));
}

pub fn create(&self) {
Expand All @@ -60,11 +62,11 @@ impl StreamFile {
}

pub fn create_pending(&self) {
File::options().read(true).write(true).create(true).open(&format!("{}P", self.path())).expect("Failed to create file").write_fmt(format_args!("{}", serde_json::to_string(&self.pending).expect("Failed to parse"))).expect("Failed to write to stream file");
File::options().read(true).write(true).create(true).open(&format!("{}P", self.path())).expect("Failed to create file");
}

pub fn create_recieved(&self) {
File::options().read(true).write(true).create(true).open(&format!("{}R", self.path())).expect("Failed to create file").write_fmt(format_args!("{}", serde_json::to_string(&self.received).expect("Failed to parse"))).expect("Failed to write to stream file");
File::options().read(true).write(true).create(true).open(&format!("{}R", self.path())).expect("Failed to create file");
}

pub fn remove_pending(&self) {
Expand All @@ -83,17 +85,27 @@ impl StreamFile {
pub fn read_pending(&mut self) {
self.wait_for_file("P");

let pending_contents = read_to_string(&format!("{}P", self.path())).unwrap();
let parsed_pending: Vec<String> = serde_json::from_str(&pending_contents).unwrap();
let mut try_pending_contents = read_to_string(&format!("{}P", self.path()));

while try_pending_contents.is_err() {
try_pending_contents = read_to_string(&format!("{}P", self.path()));
}

let parsed_pending: Vec<String> = serde_json::from_str(&try_pending_contents.unwrap()).unwrap();

self.pending = parsed_pending.clone();
}

pub fn read_recieved(&mut self) {
self.wait_for_file("R");

let recieved_contents = read_to_string(&format!("{}R", self.path())).unwrap();
let parsed_recieved: Vec<ReceivedMessage> = serde_json::from_str(&recieved_contents).unwrap();
let mut try_recieved_contents = read_to_string(&format!("{}R", self.path()));

while try_recieved_contents.is_err() {
try_recieved_contents = read_to_string(&format!("{}R", self.path()));
}

let parsed_recieved: Vec<ReceivedMessage> = serde_json::from_str(&try_recieved_contents.unwrap()).unwrap();

self.received = parsed_recieved.clone();
}
Expand All @@ -105,7 +117,7 @@ impl StreamFile {

let mut file = File::options().write(true).open(&format!("{}P", self.path())).unwrap();

file.write_fmt(format_args!("{}", serde_json::to_string_pretty(&self.pending).unwrap())).unwrap();
file.write_fmt(format_args!("{}", serde_json::to_string(&self.pending).unwrap())).unwrap();
file.flush().unwrap();
}

Expand All @@ -115,7 +127,7 @@ impl StreamFile {
self.wait_for_file("R");
let mut file = File::options().write(true).open(&format!("{}R", self.path())).unwrap();

file.write_fmt(format_args!("{}", serde_json::to_string_pretty(&self.received).unwrap())).unwrap();
file.write_fmt(format_args!("{}", serde_json::to_string(&self.received).unwrap())).unwrap();
file.flush().unwrap();
}

Expand Down

0 comments on commit a697e78

Please sign in to comment.