Skip to content

Commit

Permalink
server: make connection attempt behavior match config-template.toml c…
Browse files Browse the repository at this point in the history
…omments
  • Loading branch information
gustafla committed Dec 14, 2024
1 parent a55e648 commit 15398aa
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 76 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions protocol/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! This crate defines a custom client-server (relay agent to cluster) protocol.
//! In a final version of this software, this protocol should be replaced by the actual DHCP protocol.
#![deny(clippy::unwrap_used, clippy::allow_attributes_without_reason)]
#![warn(clippy::perf, clippy::complexity, clippy::pedantic, clippy::suspicious)]
#![allow(
Expand All @@ -10,9 +13,6 @@
reason = "There are no sufficient floating point types"
)]

//! This crate defines a custom client-server (relay agent to cluster) protocol.
//! In a final version of this software, this protocol should be replaced by the actual DHCP protocol.
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
fmt::Display,
Expand Down
2 changes: 1 addition & 1 deletion server-node/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server-node"
version = "0.7.0"
version = "0.8.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
21 changes: 16 additions & 5 deletions server-node/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
//! # Server Configuration and TOML File Loading
//! # DHCP Cluster - Server Implementation
//!
//! This crate contains a distributed DHCP server implementation, with a custom protocol between nodes.
//!
//! For the protocol definition, look into the [`server::peer::message`] module.
//!
//! The server architecture comprises of threads, which use blocking operations to communicate over [`std::net::TcpStream`]s.
//! There are two threads per active peer, one for receiving messages and one for sending messages.
//! There is also a server logic thread, handling bookkeeping for the peer- and client events.
//!
//! For the communication thread implementation, look into the [`server::peer`] module.
//!
//! This module contains the server configuration structure, along with facilities for
//! loading configuration files from the filesystem.
Expand Down Expand Up @@ -37,7 +49,7 @@ pub struct ClusterSection {
connect_timeout_seconds: Option<u64>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct Peer {
pub id: peer::Id,
pub host: String,
Expand Down Expand Up @@ -67,7 +79,7 @@ pub struct Config {

pub id: peer::Id,
pub heartbeat_timeout: Duration,
pub connect_timeout: Option<Duration>,
pub connect_timeout: Duration,
pub peers: Vec<Peer>,
}

Expand Down Expand Up @@ -108,9 +120,8 @@ impl From<File> for Config {
heartbeat_timeout: Duration::from_millis(cluster.heartbeat_timeout_millis),
connect_timeout: cluster
.connect_timeout_seconds
.map_or(Some(Duration::from_secs(10)), |sec| {
(sec != 0).then_some(Duration::from_secs(sec))
}),
.and_then(|sec| (sec != 0).then_some(Duration::from_secs(sec)))
.unwrap_or(Duration::from_secs(10)),
peers: file.peers,
}
}
Expand Down
23 changes: 11 additions & 12 deletions server-node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@
#![deny(clippy::unwrap_used, clippy::allow_attributes_without_reason)]
#![warn(clippy::perf, clippy::complexity, clippy::pedantic, clippy::suspicious)]
#![allow(
clippy::missing_errors_doc,
clippy::missing_panics_doc,
reason = "We're not going to write comprehensive docs"
)]
#![allow(
clippy::cast_precision_loss,
reason = "There are no sufficient floating point types"
)]

//! # DHCP Cluster - Server Implementation
//!
//! This crate contains a distributed DHCP server implementation, with a custom protocol between nodes.
Expand All @@ -22,6 +10,17 @@
//!
//! For the communication thread implementation, look into the [`server::peer`] module.
#![deny(clippy::unwrap_used, clippy::allow_attributes_without_reason)]
#![warn(clippy::perf, clippy::complexity, clippy::pedantic, clippy::suspicious)]
#![allow(
clippy::missing_errors_doc,
clippy::missing_panics_doc,
reason = "We're not going to write comprehensive docs"
)]
#![allow(
clippy::cast_precision_loss,
reason = "There are no sufficient floating point types"
)]
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

use std::thread::{self, JoinHandle};
Expand Down
75 changes: 31 additions & 44 deletions server-node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod client;
pub mod peer;

use crate::{
config::{self, Config},
config::Config,
console,
dhcp::{self, Ipv4Range, Lease, LeaseOffer},
ThreadJoin,
Expand All @@ -16,7 +16,7 @@ use std::{
fmt::Display,
net::{Ipv4Addr, TcpListener, TcpStream},
sync::{
mpsc::{self, Receiver, Sender},
mpsc::{self, Receiver, RecvTimeoutError, Sender},
Arc,
},
thread,
Expand Down Expand Up @@ -54,14 +54,6 @@ enum ElectionState {
Follower,
}

// Don't spam (re)connection, keep track of last attempt time
#[derive(Debug)]
enum ConnectAttempt {
Never,
Running,
Finished(SystemTime),
}

/// The distributed DHCP server
pub struct Server {
config: Arc<Config>,
Expand All @@ -72,7 +64,6 @@ pub struct Server {
peers: HashMap<peer::Id, Peer>,
election_state: ElectionState,
majority: bool,
last_connect_attempt: ConnectAttempt,
}

impl Server {
Expand Down Expand Up @@ -112,7 +103,6 @@ impl Server {
peers: HashMap::new(),
election_state: ElectionState::Follower,
majority: false,
last_connect_attempt: ConnectAttempt::Never,
};

let peer_listener_thread = {
Expand Down Expand Up @@ -146,26 +136,25 @@ impl Server {
}

fn run_event_loop(&mut self, server_rx: &Receiver<Event>) {
let mut last_connect_attempt = SystemTime::UNIX_EPOCH;

loop {
// Render pretty text representation if running in a terminal
console::update_state(format!("{self}"));

// Periodically check if server needs to connect to peers
match self.last_connect_attempt {
ConnectAttempt::Never => self.attempt_connect(),
ConnectAttempt::Finished(at) => {
if at.elapsed().unwrap_or(Duration::ZERO) > self.config.heartbeat_timeout {
self.attempt_connect();
}
}
ConnectAttempt::Running => {}
};
if last_connect_attempt.elapsed().unwrap_or(Duration::ZERO)
> self.config.connect_timeout
{
last_connect_attempt = SystemTime::now();
self.attempt_connect();
}

// Receive the next message from other threads (peer I/O, listeners, timers etc.)
let message = match server_rx.recv_timeout(self.config.heartbeat_timeout) {
let message = match server_rx.recv_timeout(self.config.connect_timeout) {
Ok(message) => message,
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => break,
};

match message {
Expand Down Expand Up @@ -204,36 +193,34 @@ impl Server {
}

fn attempt_connect(&mut self) {
self.last_connect_attempt = ConnectAttempt::Running;
console::debug!("Connection attempt started");

for config::Peer { id, host } in &self.config.peers {
for peer in &self.config.peers {
// Only start connections with peers we don't have a connection with
// Always have the higher ID start connections to avoid race conditions with concurrent
// handshakes
if !self.peers.contains_key(id) && self.config.id > *id {
if !self.peers.contains_key(&peer.id) && self.config.id > peer.id {
let config = Arc::clone(&self.config);
let peer_id = *id;
let name = host.to_owned();
let peer = peer.clone();
let server_tx = self.tx.clone();
self.thread_pool
.execute(
move || match Peer::connect(&config, peer_id, &name, &server_tx) {
Ok(success) => {
server_tx
.send(Event::EstablishedPeerConnection(success))
.expect("Invariant violated: server_rx has been dropped");
}
Err(e) => {
console::error!(&e, "Can't connect to peer {peer_id} at {name}");
}
},
)
.execute(move || match Peer::connect(&config, &peer, &server_tx) {
Ok(success) => {
server_tx
.send(Event::EstablishedPeerConnection(success))
.expect("Invariant violated: server_rx has been dropped");
}
Err(e) => {
console::error!(
&e,
"Can't connect to peer {} at {}",
peer.id,
peer.host
);
}
})
.expect("Thread pool cannot spawn treads");
}
}

self.last_connect_attempt = ConnectAttempt::Finished(SystemTime::now());
console::debug!("Connection threads spawned");
}

Expand Down
25 changes: 15 additions & 10 deletions server-node/src/server/peer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
pub mod message;

use crate::{config::Config, console, dhcp::Lease, server::Event, ThreadJoin};
use crate::{
config::{self, Config},
console,
dhcp::Lease,
server::Event,
ThreadJoin,
};
use message::Message;
use protocol::{RecvCbor, SendCbor};
use std::{
Expand Down Expand Up @@ -79,30 +85,29 @@ impl Peer {
/// Initiate peer connection
pub fn connect(
config: &Arc<Config>,
peer_id: Id,
name: &str,
peer: &config::Peer,
server_tx: &Sender<Event>,
) -> Result<JoinSuccess, HandshakeError> {
let timeout = config.connect_timeout;
let peer_id = peer.id;
let name = &peer.host;

console::debug!("Connecting to {peer_id} at {name}");

// Resolve address
let mut addrs = name
let mut addrs = peer
.host
.to_socket_addrs()
.map_err(|e| HandshakeError::NameResolution {
peer_id,
name: name.to_owned(),
name: name.clone(),
source: e,
})?
.peekable();

// Try every result
while let Some(addr) = addrs.next() {
let stream = match if let Some(timeout) = timeout {
TcpStream::connect_timeout(&addr, timeout)
} else {
TcpStream::connect(addr)
} {
let stream = match TcpStream::connect_timeout(&addr, timeout) {
Ok(stream) => stream,
Err(e) => match addrs.peek() {
Some(_) => continue, // Retry next result if available
Expand Down

0 comments on commit 15398aa

Please sign in to comment.