Skip to content

Commit

Permalink
Extract the network parameter to cut off connection based on max latency
Browse files Browse the repository at this point in the history
  • Loading branch information
akichidis committed Jan 3, 2024
1 parent c463b25 commit ec9d64d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 12 deletions.
6 changes: 6 additions & 0 deletions mysticeti-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub struct Parameters {
pub store_retain_rounds: u64,
pub enable_cleanup: bool,
pub synchronizer_parameters: SynchronizerParameters,
/// When detected connection latency is >= the `network_connection_max_latency`, then the connection breaks as best effort to fix any
/// transient connection issues.
pub network_connection_max_latency: Duration,
}

impl Default for Parameters {
Expand All @@ -61,6 +64,7 @@ impl Default for Parameters {
shutdown_grace_period: Self::DEFAULT_SHUTDOWN_GRACE_PERIOD,
number_of_leaders: Self::DEFAULT_NUMBER_OF_LEADERS,
store_retain_rounds: Self::DEFAULT_STORE_RETAIN_ROUNDS,
network_connection_max_latency: Self::DEFAULT_NETWORK_CONNECTION_MAX_LATENCY,
enable_pipelining: true,
enable_cleanup: true,
synchronizer_parameters: SynchronizerParameters::default(),
Expand Down Expand Up @@ -110,6 +114,8 @@ impl Parameters {

pub const DEFAULT_STORE_RETAIN_ROUNDS: u64 = 500;

pub const DEFAULT_NETWORK_CONNECTION_MAX_LATENCY: Duration = Duration::from_secs(5);

pub fn new_for_benchmarks(ips: Vec<IpAddr>) -> Self {
let benchmark_port_offset = ips.len() as u16;
let mut identifiers = Vec::new();
Expand Down
34 changes: 26 additions & 8 deletions mysticeti-core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,14 @@ impl Network {
) -> Self {
let addresses = parameters.all_network_addresses().collect::<Vec<_>>();
print_network_address_table(&addresses);
Self::from_socket_addresses(&addresses, our_id as usize, local_addr, metrics).await
Self::from_socket_addresses(
&addresses,
our_id as usize,
local_addr,
metrics,
parameters.network_connection_max_latency,
)
.await
}

pub fn connection_receiver(&mut self) -> &mut mpsc::Receiver<Connection> {
Expand All @@ -88,6 +95,7 @@ impl Network {
our_id: usize,
local_addr: SocketAddr,
metrics: Arc<Metrics>,
network_connection_max_latency: Duration,
) -> Self {
if our_id >= addresses.len() {
panic!(
Expand Down Expand Up @@ -122,7 +130,8 @@ impl Network {
connection_sender: connection_sender.clone(),
bind_addr: translation_mode.bind_addr(local_addr),
active_immediately: id < our_id,
latency_sender: metrics.connection_latency_sender.get(id).expect("Can not locate connection_latency_sender metric - did you initialize metrics with correct committee?").clone()
latency_sender: metrics.connection_latency_sender.get(id).expect("Can not locate connection_latency_sender metric - did you initialize metrics with correct committee?").clone(),
network_connection_max_latency
}
.run(receiver),
);
Expand Down Expand Up @@ -190,6 +199,7 @@ struct Worker {
bind_addr: Option<SocketAddr>,
active_immediately: bool,
latency_sender: HistogramSender<Duration>,
network_connection_max_latency: Duration,
}

struct WorkerConnection {
Expand Down Expand Up @@ -263,7 +273,7 @@ impl Worker {
// todo - pass signal to break the main loop
return Ok(());
};
Self::handle_stream(stream, connection).await
Self::handle_stream(stream, connection, self.network_connection_max_latency).await
}

async fn handle_passive_stream(&self, mut stream: TcpStream) -> io::Result<()> {
Expand All @@ -278,10 +288,14 @@ impl Worker {
// todo - pass signal to break the main loop
return Ok(());
};
Self::handle_stream(stream, connection).await
Self::handle_stream(stream, connection, self.network_connection_max_latency).await
}

async fn handle_stream(stream: TcpStream, connection: WorkerConnection) -> io::Result<()> {
async fn handle_stream(
stream: TcpStream,
connection: WorkerConnection,
network_connection_max_latency: Duration,
) -> io::Result<()> {
let WorkerConnection {
sender,
receiver,
Expand All @@ -298,6 +312,7 @@ impl Worker {
pong_receiver,
latency_sender,
latency_last_value_sender,
network_connection_max_latency,
)
.boxed();
let read_fut = Self::handle_read_stream(reader, sender, pong_sender).boxed();
Expand All @@ -312,6 +327,7 @@ impl Worker {
mut pong_receiver: mpsc::Receiver<i64>,
latency_sender: HistogramSender<Duration>,
latency_last_value_sender: tokio::sync::watch::Sender<Duration>,
network_connection_max_latency: Duration,
) -> io::Result<()> {
let start = Instant::now();
let mut ping_deadline = start + PING_INTERVAL;
Expand Down Expand Up @@ -359,8 +375,7 @@ impl Worker {
latency_sender.observe(d);
latency_last_value_sender.send(d).ok();

static CUT_OFF_LATENCY: u128 = 5_000;
if d.as_millis() >= CUT_OFF_LATENCY {
if d >= network_connection_max_latency {
tracing::warn!("High latency connection: {:?}. Breaking now connection.", d);
return Ok(());
}
Expand Down Expand Up @@ -561,6 +576,7 @@ fn decode_ping(message: &[u8]) -> i64 {
#[cfg(test)]
mod test {
use crate::committee::Committee;
use crate::config::Parameters;
use crate::metrics::Metrics;
use crate::test_util::networks_and_addresses;
use prometheus::Registry;
Expand All @@ -574,7 +590,9 @@ mod test {
.authorities()
.map(|_| Metrics::new(&Registry::default(), Some(&committee)).0)
.collect();
let (networks, addresses) = networks_and_addresses(&metrics).await;
let (networks, addresses) =
networks_and_addresses(&metrics, Parameters::DEFAULT_NETWORK_CONNECTION_MAX_LATENCY)
.await;
for (mut network, address) in networks.into_iter().zip(addresses.iter()) {
let mut waiting_peers: HashSet<_> = HashSet::from_iter(addresses.iter().copied());
waiting_peers.remove(address);
Expand Down
19 changes: 15 additions & 4 deletions mysticeti-core/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use rand::SeedableRng;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

pub fn test_metrics() -> Arc<Metrics> {
Metrics::new(&Registry::new(), None).0
Expand Down Expand Up @@ -172,7 +173,10 @@ pub fn committee_and_syncers(
)
}

pub async fn networks_and_addresses(metrics: &[Arc<Metrics>]) -> (Vec<Network>, Vec<SocketAddr>) {
pub async fn networks_and_addresses(
metrics: &[Arc<Metrics>],
network_connection_max_latency: Duration,
) -> (Vec<Network>, Vec<SocketAddr>) {
let host = Ipv4Addr::LOCALHOST;
let addresses: Vec<_> = (0..metrics.len())
.map(|i| SocketAddr::V4(SocketAddrV4::new(host, 5001 + i as u16)))
Expand All @@ -183,7 +187,13 @@ pub async fn networks_and_addresses(metrics: &[Arc<Metrics>]) -> (Vec<Network>,
.zip(metrics.iter())
.enumerate()
.map(|(i, (address, metrics))| {
Network::from_socket_addresses(&addresses, i, *address, metrics.clone())
Network::from_socket_addresses(
&addresses,
i,
*address,
metrics.clone(),
network_connection_max_latency,
)
});
let networks = join_all(networks).await;
(networks, addresses)
Expand Down Expand Up @@ -243,11 +253,12 @@ pub async fn network_syncers_with_epoch_duration(
n: usize,
rounds_in_epoch: RoundNumber,
) -> Vec<NetworkSyncer<TestBlockHandler, TestCommitObserver>> {
let parameters = Parameters::default();
let (_, cores, commit_observers, _) = committee_and_cores_epoch_duration(n, rounds_in_epoch);
let metrics: Vec<_> = cores.iter().map(|c| c.metrics.clone()).collect();
let (networks, _) = networks_and_addresses(&metrics).await;
let (networks, _) =
networks_and_addresses(&metrics, parameters.network_connection_max_latency).await;
let mut network_syncers = vec![];
let parameters = Parameters::default();
for ((network, core), commit_observer) in networks.into_iter().zip(cores).zip(commit_observers)
{
let network_syncer = NetworkSyncer::start(
Expand Down

0 comments on commit ec9d64d

Please sign in to comment.