diff --git a/.cargo/config.toml b/.cargo/config.toml index db226da0c3..554887795e 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -8,7 +8,6 @@ xtask = "run --package xtask --" # when changing these please also change .github/workflows/steps/release-build-setup.yml rustflags = [ "-C", "force-unwind-tables", # Include full unwind tables when aborting on panic - "-C" , "debug-assertions", # Enable debug assertions in release builds to have more safeguards in place "--cfg", "uuid_unstable", # Enable unstable Uuid "--cfg", "tokio_unstable", # Enable unstable tokio ] @@ -16,7 +15,6 @@ rustflags = [ [target.aarch64-unknown-linux-gnu] rustflags = [ "-C", "force-unwind-tables", # Include full unwind tables when aborting on panic - "-C" , "debug-assertions", # Enable debug assertions in release builds to have more safeguards in place "--cfg", "uuid_unstable", # Enable unstable Uuid "--cfg", "tokio_unstable", # Enable unstable tokio "-C" , "force-frame-pointers=yes", # Enable frame pointers to support Parca (https://github.com/parca-dev/parca-agent/pull/1805) @@ -25,7 +23,6 @@ rustflags = [ [target.x86_64-unknown-linux-musl] rustflags = [ "-C", "force-unwind-tables", # Include full unwind tables when aborting on panic - "-C" , "debug-assertions", # Enable debug assertions in release builds to have more safeguards in place "--cfg", "uuid_unstable", # Enable unstable Uuid "--cfg", "tokio_unstable", # Enable unstable tokio "-C", "link-self-contained=yes", # Link statically @@ -34,7 +31,6 @@ rustflags = [ [target.aarch64-unknown-linux-musl] rustflags = [ "-C", "force-unwind-tables", # Include full unwind tables when aborting on panic - "-C" , "debug-assertions", # Enable debug assertions in release builds to have more safeguards in place "--cfg", "uuid_unstable", # Enable unstable Uuid "--cfg", "tokio_unstable", # Enable unstable tokio "-C", "force-frame-pointers=yes", # Enable frame pointers to support Parca (https://github.com/parca-dev/parca-agent/pull/1805) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b70960c323..5c511a620b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -153,7 +153,7 @@ jobs: if: "${{ runner.os == 'macOS' }}" run: "echo MACOSX_DEPLOYMENT_TARGET=\"10.14.0\" >> \"$GITHUB_ENV\"" - name: "Set up RUSTFLAGS" - run: "echo RUSTFLAGS=\"-C force-unwind-tables -C debug-assertions --cfg uuid_unstable --cfg tokio_unstable\" >> \"$GITHUB_ENV\"" + run: "echo RUSTFLAGS=\"-C force-unwind-tables --cfg uuid_unstable --cfg tokio_unstable\" >> \"$GITHUB_ENV\"" - name: Install dist run: ${{ matrix.install_dist.run }} # Get the dist-manifest diff --git a/.github/workflows/steps/release-build-setup.yml b/.github/workflows/steps/release-build-setup.yml index a036f89395..ebe3d6ba18 100644 --- a/.github/workflows/steps/release-build-setup.yml +++ b/.github/workflows/steps/release-build-setup.yml @@ -30,4 +30,4 @@ # cargo-dist isn't currently able to take these from .cargo/config.toml # https://github.com/axodotdev/cargo-dist/issues/1571 - name: Set up RUSTFLAGS - run: echo RUSTFLAGS="-C force-unwind-tables -C debug-assertions --cfg uuid_unstable --cfg tokio_unstable" >> "$GITHUB_ENV" + run: echo RUSTFLAGS="-C force-unwind-tables --cfg uuid_unstable --cfg tokio_unstable" >> "$GITHUB_ENV" diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index a1de5eaea3..e0561c16f2 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -778,6 +778,7 @@ mod tests { use super::Service; use std::collections::BTreeSet; + use std::num::NonZero; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -796,7 +797,7 @@ mod tests { use restate_core::test_env::NoOpMessageHandler; use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder}; use restate_types::cluster::cluster_state::{NodeState, PartitionProcessorStatus}; - use restate_types::config::{AdminOptions, BifrostOptions, Configuration}; + use restate_types::config::{AdminOptions, BifrostOptions, Configuration, NetworkingOptions}; use restate_types::health::HealthStatus; use restate_types::identifiers::PartitionId; use restate_types::live::Live; @@ -898,7 +899,13 @@ mod tests { let mut admin_options = AdminOptions::default(); let interval_duration = Duration::from_secs(10); admin_options.log_trim_interval = Some(interval_duration.into()); + let networking = NetworkingOptions { + // we are using failing transport so we only want to use the mock connection we created. + num_concurrent_connections: NonZero::new(1).unwrap(), + ..Default::default() + }; let config = Configuration { + networking, admin: admin_options, ..Default::default() }; @@ -966,10 +973,17 @@ mod tests { async fn auto_trim_log() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); + let networking = NetworkingOptions { + // we are using failing transport so we only want to use the mock connection we created. + num_concurrent_connections: NonZero::new(1).unwrap(), + ..Default::default() + }; + let mut admin_options = AdminOptions::default(); let interval_duration = Duration::from_secs(10); admin_options.log_trim_interval = Some(interval_duration.into()); let config = Configuration { + networking, admin: admin_options, ..Default::default() }; @@ -1042,7 +1056,14 @@ mod tests { admin_options.log_trim_interval = Some(interval_duration.into()); let mut bifrost_options = BifrostOptions::default(); bifrost_options.default_provider = ProviderKind::InMemory; + + let networking = NetworkingOptions { + // we are using failing transport so we only want to use the mock connection we created. + num_concurrent_connections: NonZero::new(1).unwrap(), + ..Default::default() + }; let config = Configuration { + networking, admin: admin_options, bifrost: bifrost_options, ..Default::default() @@ -1096,7 +1117,16 @@ mod tests { const LOG_ID: LogId = LogId::new(0); let interval_duration = Duration::from_secs(10); - let mut config: Configuration = Default::default(); + let networking = NetworkingOptions { + // we are using failing transport so we only want to use the mock connection we created. + num_concurrent_connections: NonZero::new(1).unwrap(), + ..Default::default() + }; + + let mut config: Configuration = Configuration { + networking, + ..Default::default() + }; config.admin.log_trim_interval = Some(interval_duration.into()); config.bifrost.default_provider = ProviderKind::InMemory; config.worker.snapshots.destination = Some("a-repository-somewhere".to_string()); @@ -1253,7 +1283,14 @@ mod tests { admin_options.log_trim_interval = Some(interval_duration.into()); let mut bifrost_options = BifrostOptions::default(); bifrost_options.default_provider = ProviderKind::InMemory; + let networking = NetworkingOptions { + // we are using failing transport so we only want to use the mock connection we created. + num_concurrent_connections: NonZero::new(1).unwrap(), + ..Default::default() + }; + let config = Configuration { + networking, admin: admin_options, bifrost: bifrost_options, ..Default::default() diff --git a/crates/bifrost/src/providers/replicated_loglet/network.rs b/crates/bifrost/src/providers/replicated_loglet/network.rs index 450b1e50c7..335b9a1d16 100644 --- a/crates/bifrost/src/providers/replicated_loglet/network.rs +++ b/crates/bifrost/src/providers/replicated_loglet/network.rs @@ -85,7 +85,7 @@ impl RequestPump { router_builder: &mut MessageRouterBuilder, ) -> Self { // todo(asoli) read from opts - let queue_length = 10; + let queue_length = 128; let append_stream = router_builder.subscribe_to_stream(queue_length); let get_sequencer_state_stream = router_builder.subscribe_to_stream(queue_length); Self { @@ -104,15 +104,16 @@ impl RequestPump { let mut cancel = std::pin::pin!(cancellation_watcher()); loop { tokio::select! { + biased; _ = &mut cancel => { break; } - Some(append) = self.append_stream.next() => { - self.handle_append(&provider, append).await; - } Some(get_sequencer_state) = self.get_sequencer_state_stream.next() => { self.handle_get_sequencer_state(&provider, get_sequencer_state).await; } + Some(append) = self.append_stream.next() => { + self.handle_append(&provider, append).await; + } } } @@ -158,41 +159,51 @@ impl RequestPump { return; } - let tail = if msg.force_seal_check { - match loglet - .find_tail_inner(FindTailOptions::ForceSealCheck) - .await - { - Ok(tail) => tail, - Err(err) => { - let failure = SequencerState { - header: CommonResponseHeader { - known_global_tail: None, - sealed: None, - status: SequencerStatus::Error { - retryable: true, - message: err.to_string(), + if msg.force_seal_check { + let _ = TaskCenter::spawn(TaskKind::Disposable, "remote-check-seal", async move { + match loglet + .find_tail_inner(FindTailOptions::ForceSealCheck) + .await + { + Ok(tail) => { + let sequencer_state = SequencerState { + header: CommonResponseHeader { + known_global_tail: Some(tail.offset()), + sealed: Some(tail.is_sealed()), + status: SequencerStatus::Ok, + }, + }; + let _ = reciprocal.prepare(sequencer_state).try_send(); + } + Err(err) => { + let failure = SequencerState { + header: CommonResponseHeader { + known_global_tail: None, + sealed: None, + status: SequencerStatus::Error { + retryable: true, + message: err.to_string(), + }, }, - }, - }; - let _ = reciprocal.prepare(failure).try_send(); - return; + }; + let _ = reciprocal.prepare(failure).try_send(); + } } - } + Ok(()) + }); } else { // if we are not forced to check the seal, we can just return the last known tail from the // sequencer's view - loglet.last_known_global_tail() - }; - - let sequencer_state = SequencerState { - header: CommonResponseHeader { - known_global_tail: Some(tail.offset()), - sealed: Some(tail.is_sealed()), - status: SequencerStatus::Ok, - }, - }; - let _ = reciprocal.prepare(sequencer_state).try_send(); + let tail = loglet.last_known_global_tail(); + let sequencer_state = SequencerState { + header: CommonResponseHeader { + known_global_tail: Some(tail.offset()), + sealed: Some(tail.is_sealed()), + status: SequencerStatus::Ok, + }, + }; + let _ = reciprocal.prepare(sequencer_state).try_send(); + } } /// Infallible handle_append method @@ -265,7 +276,7 @@ impl RequestPump { return Err(SequencerStatus::LogletIdMismatch); } - match self.create_loglet(provider, header).await { + match self.create_loglet(provider, header) { Ok(loglet) => return Ok(loglet), Err(SequencerStatus::UnknownLogId | SequencerStatus::UnknownSegmentIndex) => { // possible outdated metadata @@ -315,7 +326,7 @@ impl RequestPump { } } - async fn create_loglet( + fn create_loglet( &self, provider: &ReplicatedLogletProvider, header: &CommonRequestHeader, diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs index e2170d66dc..f492980bad 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs @@ -515,9 +515,18 @@ impl Debug for NodeSetChecker { impl Display for NodeSetChecker { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use itertools::Position; write!(f, "[")?; - for (node, attr) in self.node_to_attr.iter() { - write!(f, "{node} => {attr}, ")?; + for (pos, (node_id, attr)) in self + .node_to_attr + .iter() + .sorted_by_key(|v| v.0) + .with_position() + { + match pos { + Position::Only | Position::Last => write!(f, "{node_id}({attr})")?, + Position::First | Position::Middle => write!(f, "{node_id}({attr}), ")?, + } } write!(f, "]") } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs index 380365c1cf..79d81d4234 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs @@ -11,6 +11,7 @@ use std::sync::Weak; use std::time::Duration; +use restate_types::retries::with_jitter; use tokio::time::Instant; use tracing::instrument; use tracing::{debug, trace}; @@ -89,7 +90,7 @@ impl PeriodicTailChecker { ); } } - tokio::time::sleep(duration).await; + tokio::time::sleep(with_jitter(duration, 0.5)).await; } } } diff --git a/crates/core/src/metadata.rs b/crates/core/src/metadata.rs index 08fde62e1c..8fccd07c8f 100644 --- a/crates/core/src/metadata.rs +++ b/crates/core/src/metadata.rs @@ -216,8 +216,9 @@ impl Metadata { min_version: Version, ) -> Result { let mut recv = self.inner.write_watches[metadata_kind].receive.clone(); - let v = recv - .wait_for(|v| *v >= min_version) + // If we are already at the metadata version, avoid tokio's yielding to + // improve tail latencies when this is used in latency-sensitive operations. + let v = tokio::task::unconstrained(recv.wait_for(|v| *v >= min_version)) .await .map_err(|_| ShutdownError)?; Ok(*v) diff --git a/crates/core/src/network/connection.rs b/crates/core/src/network/connection.rs index 53baa2bf2c..79d03af0f3 100644 --- a/crates/core/src/network/connection.rs +++ b/crates/core/src/network/connection.rs @@ -11,11 +11,11 @@ use std::sync::Arc; use std::sync::Weak; use std::time::Duration; -use std::time::Instant; use enum_map::{enum_map, EnumMap}; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; +use tokio::time::Instant; use tracing::{debug, trace}; use restate_types::net::codec::Targeted; @@ -27,7 +27,6 @@ use restate_types::protobuf::node::Header; use restate_types::protobuf::node::Message; use restate_types::{GenerationalNodeId, Version}; -use super::metric_definitions::MESSAGE_SENT; use super::NetworkError; use super::Outgoing; use crate::Metadata; @@ -84,7 +83,6 @@ impl SendPermit<'_, M> { /// associated with the message. pub(crate) fn send_raw(self, raw_message: Message) { self.permit.send(raw_message); - MESSAGE_SENT.increment(1); } } @@ -313,7 +311,6 @@ pub mod test_util { use super::*; use std::sync::Arc; - use std::time::Instant; use async_trait::async_trait; use futures::stream::BoxStream; diff --git a/crates/core/src/network/connection_manager.rs b/crates/core/src/network/connection_manager.rs index 0711d63639..0537cdc74c 100644 --- a/crates/core/src/network/connection_manager.rs +++ b/crates/core/src/network/connection_manager.rs @@ -9,14 +9,15 @@ // by the Apache License, Version 2.0. use std::sync::{Arc, Weak}; -use std::time::Instant; use ahash::HashMap; use enum_map::EnumMap; use futures::{FutureExt, Stream, StreamExt}; +use metrics::{counter, histogram}; use opentelemetry::global; use parking_lot::Mutex; use tokio::sync::mpsc; +use tokio::time::Instant; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, info, instrument, trace, warn, Instrument, Span}; @@ -32,13 +33,15 @@ use super::connection::{OwnedConnection, WeakConnection}; use super::error::{NetworkError, ProtocolError}; use super::handshake::wait_for_welcome; use super::metric_definitions::{ - self, CONNECTION_DROPPED, INCOMING_CONNECTION, MESSAGE_PROCESSING_DURATION, MESSAGE_RECEIVED, - ONGOING_DRAIN, OUTGOING_CONNECTION, + self, CONNECTION_DROPPED, INCOMING_CONNECTION, OUTGOING_CONNECTION, }; use super::transport_connector::TransportConnect; use super::{Handler, MessageRouter}; use crate::metadata::Urgency; use crate::network::handshake::{negotiate_protocol_version, wait_for_hello}; +use crate::network::metric_definitions::{ + NETWORK_MESSAGE_PROCESSING_DURATION, NETWORK_MESSAGE_RECEIVED, NETWORK_MESSAGE_RECEIVED_BYTES, +}; use crate::network::{Incoming, PeerMetadataVersion}; use crate::{Metadata, TaskCenter, TaskContext, TaskId, TaskKind}; @@ -99,11 +102,20 @@ impl ConnectionManagerInner { fn get_random_connection( &self, peer_node_id: &GenerationalNodeId, + target_concurrency: usize, ) -> Option> { use rand::prelude::IndexedRandom; self.connection_by_gen_id .get(peer_node_id) - .and_then(|connections| connections.choose(&mut rand::rng())?.upgrade()) + .and_then(|connections| { + // Suggest we create new connection if the number + // of connections is below the target + if connections.len() >= target_concurrency { + connections.choose(&mut rand::rng())?.upgrade() + } else { + None + } + }) } } @@ -141,13 +153,14 @@ impl Clone for ConnectionManager { /// used for testing. Accepts connections but can't establish new connections impl ConnectionManager { pub fn new_incoming_only(metadata: Metadata) -> Self { + use restate_types::config::Configuration; let inner = Arc::new(Mutex::new(ConnectionManagerInner::default())); Self { metadata, inner, transport_connector: Arc::new(super::FailingConnector::default()), - networking_options: NetworkingOptions::default(), + networking_options: Configuration::pinned().networking.clone(), } } } @@ -294,10 +307,18 @@ impl ConnectionManager { &self, node_id: GenerationalNodeId, ) -> Result, NetworkError> { + // fail fast if we are connecting to our previous self + if self.metadata.my_node_id().is_same_but_different(&node_id) { + return Err(NetworkError::NodeIsGone(node_id)); + } + // find a connection by node_id let maybe_connection: Option> = { let guard = self.inner.lock(); - guard.get_random_connection(&node_id) + guard.get_random_connection( + &node_id, + self.networking_options.num_concurrent_connections(), + ) // lock is dropped. }; @@ -407,7 +428,7 @@ impl ConnectionManager { #[instrument(skip_all)] fn connect_loopback(&self) -> Result, NetworkError> { - let (tx, rx) = mpsc::channel(self.networking_options.outbound_queue_length.into()); + let (tx, rx) = mpsc::channel(self.networking_options.outbound_queue_length.get()); let connection = OwnedConnection::new( self.metadata.my_node_id(), restate_types::net::CURRENT_PROTOCOL_VERSION, @@ -507,8 +528,7 @@ impl ConnectionManager { let connection_weak = Arc::downgrade(&connection); let span = tracing::error_span!(parent: None, "network-reactor", task_id = tracing::field::Empty, - peer_node_id = %peer_node_id, - protocol_version = ?connection.protocol_version() as i32, + peer = %peer_node_id, ); let router = guard.router.clone(); @@ -526,7 +546,7 @@ impl ConnectionManager { )?; if peer_node_id != self.metadata.my_node_id() { debug!( - peer_node_id = %peer_node_id, + peer = %peer_node_id, task_id = %task_id, "Incoming connection accepted from node {}", peer_node_id ); @@ -597,7 +617,6 @@ where } }; - MESSAGE_RECEIVED.increment(1); let processing_started = Instant::now(); // body are not allowed to be empty. @@ -656,21 +675,21 @@ where } }); + let encoded_len = body.encoded_len(); match body.try_as_binary_body(connection.protocol_version) { Ok(msg) => { - trace!( - peer = %connection.peer, - ?header, - target = ?msg.target(), - "Message received" - ); + let target = msg.target(); let parent_context = header.span_context.as_ref().map(|span_ctx| { global::get_text_map_propagator(|propagator| propagator.extract(span_ctx)) }); - if let Err(e) = router - .call( + // unconstrained: We want to avoid yielding if the message router has capacity, + // this is to improve tail latency of message processing. We still give tokio + // a yielding point when reading the next message but it would be excessive to + // introduce more than one yielding point in this reactor loop. + if let Err(e) = tokio::task::unconstrained( + router.call( Incoming::from_parts( msg, connection.downgrade(), @@ -680,17 +699,29 @@ where ) .with_parent_context(parent_context), connection.protocol_version, - ) - .await + ), + ) + .await { - warn!("Error processing message: {:?}", e); + warn!( + target = target.as_str_name(), + "Error processing message: {e}" + ); } - MESSAGE_PROCESSING_DURATION.record(processing_started.elapsed()); + histogram!(NETWORK_MESSAGE_PROCESSING_DURATION, "target" => target.as_str_name()) + .record(processing_started.elapsed()); + counter!(NETWORK_MESSAGE_RECEIVED, "target" => target.as_str_name()).increment(1); + counter!(NETWORK_MESSAGE_RECEIVED_BYTES, "target" => target.as_str_name()) + .increment(encoded_len as u64); + trace!( + target = target.as_str_name(), + "Processed message in {:?}", + processing_started.elapsed() + ); } Err(status) => { // terminate the stream - info!("Error processing message, reporting error to peer: {status}"); - MESSAGE_PROCESSING_DURATION.record(processing_started.elapsed()); + warn!("Error processing message, reporting error to peer: {status}",); connection.send_control_frame(ConnectionControl::codec_error(status.to_string())); break; } @@ -698,7 +729,6 @@ where } // remove from active set - ONGOING_DRAIN.increment(1.0); on_connection_draining(&connection, &connection_manager, is_peer_shutting_down); let protocol_version = connection.protocol_version; let peer_node_id = connection.peer; @@ -752,7 +782,6 @@ where // We should also terminate response stream. This happens automatically when // the sender is dropped on_connection_terminated(&connection_manager); - ONGOING_DRAIN.decrement(1.0); CONNECTION_DROPPED.increment(1); debug!( "Connection terminated, drained {} messages in {:?}, total connection age is {:?}", diff --git a/crates/core/src/network/message_router.rs b/crates/core/src/network/message_router.rs index dce926b482..334f524a91 100644 --- a/crates/core/src/network/message_router.rs +++ b/crates/core/src/network/message_router.rs @@ -216,7 +216,20 @@ where protocol_version: ProtocolVersion, ) -> Result<(), Self::Error> { let message = message.try_map(|mut m| { - ::decode(&mut m.payload, protocol_version) + #[cfg(debug_assertions)] + let decode_start = tokio::time::Instant::now(); + + let res = ::decode(&mut m.payload, protocol_version); + #[cfg(debug_assertions)] + { + use super::metric_definitions::NETWORK_MESSAGE_DECODE_DURATION; + metrics::histogram!( + NETWORK_MESSAGE_DECODE_DURATION, + "target" => H::MessageType::TARGET.as_str_name() + ) + .record(decode_start.elapsed()); + } + res })?; self.inner.on_message(message).await; Ok(()) diff --git a/crates/core/src/network/metric_definitions.rs b/crates/core/src/network/metric_definitions.rs index e81896c86a..27d5362801 100644 --- a/crates/core/src/network/metric_definitions.rs +++ b/crates/core/src/network/metric_definitions.rs @@ -8,22 +8,22 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use metrics::{ - counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Counter, - Gauge, Histogram, Unit, -}; +use metrics::{counter, describe_counter, describe_histogram, Counter, Unit}; use std::sync::LazyLock; -const NETWORK_CONNECTION_CREATED: &str = "restate.network.connection_created.total"; -const NETWORK_CONNECTION_DROPPED: &str = "restate.network.connection_dropped.total"; -const NETWORK_ONGOING_DRAINS: &str = "restate.network.ongoing_drains"; -const NETWORK_MESSAGE_SENT: &str = "restate.network.message_sent.total"; -const NETWORK_MESSAGE_RECEIVED: &str = "restate.network.message_received.total"; +pub(crate) const NETWORK_CONNECTION_CREATED: &str = "restate.network.connection_created.total"; +pub(crate) const NETWORK_CONNECTION_DROPPED: &str = "restate.network.connection_dropped.total"; +pub(crate) const NETWORK_MESSAGE_RECEIVED: &str = "restate.network.message_received.total"; +pub(crate) const NETWORK_MESSAGE_RECEIVED_BYTES: &str = + "restate.network.message_received_bytes.total"; -const NETWORK_CONNECTION_SEND_DURATION: &str = "restate.network.connection_send_duration.seconds"; -const NETWORK_MESSAGE_PROCESSING_DURATION: &str = +pub(crate) const NETWORK_MESSAGE_PROCESSING_DURATION: &str = "restate.network.message_processing_duration.seconds"; +#[cfg(debug_assertions)] +pub(crate) const NETWORK_MESSAGE_DECODE_DURATION: &str = + "restate.network.message_decode_duration.seconds"; + pub static INCOMING_CONNECTION: LazyLock = LazyLock::new(|| counter!(NETWORK_CONNECTION_CREATED, "direction" => "incoming")); @@ -32,17 +32,6 @@ pub static OUTGOING_CONNECTION: LazyLock = pub static CONNECTION_DROPPED: LazyLock = LazyLock::new(|| counter!(NETWORK_CONNECTION_DROPPED)); -pub static ONGOING_DRAIN: LazyLock = LazyLock::new(|| gauge!(NETWORK_ONGOING_DRAINS)); - -pub static MESSAGE_SENT: LazyLock = LazyLock::new(|| counter!(NETWORK_MESSAGE_SENT)); -pub static MESSAGE_RECEIVED: LazyLock = - LazyLock::new(|| counter!(NETWORK_MESSAGE_RECEIVED)); - -pub static CONNECTION_SEND_DURATION: LazyLock = - LazyLock::new(|| histogram!(NETWORK_CONNECTION_SEND_DURATION)); - -pub static MESSAGE_PROCESSING_DURATION: LazyLock = - LazyLock::new(|| histogram!(NETWORK_MESSAGE_PROCESSING_DURATION)); pub fn describe_metrics() { describe_counter!( @@ -55,25 +44,18 @@ pub fn describe_metrics() { Unit::Count, "Number of connections dropped" ); - describe_gauge!( - NETWORK_ONGOING_DRAINS, - Unit::Count, - "Number of connections currently being drained" + describe_counter!( + NETWORK_MESSAGE_RECEIVED_BYTES, + Unit::Bytes, + "Number of bytes received by message name" ); - describe_counter!(NETWORK_MESSAGE_SENT, Unit::Count, "Number of messages sent"); - describe_counter!( NETWORK_MESSAGE_RECEIVED, Unit::Count, - "Number of messages received" + "Number of messages received by message type" ); - describe_histogram!( - NETWORK_CONNECTION_SEND_DURATION, - Unit::Seconds, - "Latency of sending a message over a single connection stream" - ); describe_histogram!( NETWORK_MESSAGE_PROCESSING_DURATION, Unit::Seconds, diff --git a/crates/core/src/network/transport_connector.rs b/crates/core/src/network/transport_connector.rs index 953c9386dc..118eb9a4dd 100644 --- a/crates/core/src/network/transport_connector.rs +++ b/crates/core/src/network/transport_connector.rs @@ -11,11 +11,9 @@ use std::future::Future; use futures::{Stream, StreamExt}; -use tonic::transport::Channel; use tracing::trace; use restate_types::config::NetworkingOptions; -use restate_types::net::AdvertisedAddress; use restate_types::nodes_config::NodesConfiguration; use restate_types::protobuf::node::Message; use restate_types::GenerationalNodeId; @@ -24,8 +22,6 @@ use super::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient; use super::{NetworkError, ProtocolError}; use crate::network::net_util::create_tonic_channel; -type DashMap = dashmap::DashMap; - pub trait TransportConnect: Send + Sync + 'static { fn connect( &self, @@ -42,15 +38,11 @@ pub trait TransportConnect: Send + Sync + 'static { pub struct GrpcConnector { networking_options: NetworkingOptions, - channel_cache: DashMap, } impl GrpcConnector { pub fn new(networking_options: NetworkingOptions) -> Self { - Self { - networking_options, - channel_cache: DashMap::default(), - } + Self { networking_options } } } @@ -67,15 +59,7 @@ impl TransportConnect for GrpcConnector { let address = nodes_config.find_node_by_id(node_id)?.address.clone(); trace!("Attempting to connect to node {} at {}", node_id, address); - // Do we have a channel in cache for this address? - let channel = match self.channel_cache.get(&address) { - Some(channel) => channel.clone(), - None => self - .channel_cache - .entry(address.clone()) - .or_insert_with(|| create_tonic_channel(address, &self.networking_options)) - .clone(), - }; + let channel = create_tonic_channel(address, &self.networking_options); // Establish the connection let mut client = CoreNodeSvcClient::new(channel) @@ -91,11 +75,11 @@ pub mod test_util { use super::*; use std::sync::Arc; - use std::time::Instant; use futures::{Stream, StreamExt}; use parking_lot::Mutex; use tokio::sync::mpsc; + use tokio::time::Instant; use tokio_stream::wrappers::ReceiverStream; use tracing::info; diff --git a/crates/core/src/network/types.rs b/crates/core/src/network/types.rs index a0a9f7870e..f3faed18cb 100644 --- a/crates/core/src/network/types.rs +++ b/crates/core/src/network/types.rs @@ -11,7 +11,7 @@ use std::marker::PhantomData; use std::sync::atomic::AtomicU64; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use opentelemetry::Context; use tracing::Span; @@ -25,7 +25,6 @@ use restate_types::{GenerationalNodeId, NodeId, Version}; use crate::Metadata; use super::connection::OwnedConnection; -use super::metric_definitions::CONNECTION_SEND_DURATION; use super::{NetworkError, NetworkSendError, WeakConnection}; static NEXT_MSG_ID: AtomicU64 = const { AtomicU64::new(1) }; @@ -417,18 +416,16 @@ impl Outgoing { /// for retrying externally. // #[instrument(level = "trace", skip_all, fields(peer_node_id = %self.peer, target_service = ?message.target(), msg = ?message.kind()))] pub async fn send(self) -> Result<(), NetworkSendError> { - let send_start = Instant::now(); let connection = bail_on_error!(self, self.try_upgrade()); let permit = bail_on_none!( self, - connection.reserve().await, + tokio::task::unconstrained(connection.reserve()).await, NetworkError::ConnectionClosed(connection.peer()) ); Metadata::with_current(|metadata| { permit.send(self, metadata); }); - CONNECTION_SEND_DURATION.record(send_start.elapsed()); Ok(()) } @@ -436,9 +433,8 @@ impl Outgoing { /// on the assigned connection or returns [`NetworkError::ConnectionClosed`] immediately if the /// assigned connection is no longer valid. pub async fn send_timeout(self, timeout: Duration) -> Result<(), NetworkSendError> { - let send_start = Instant::now(); let connection = bail_on_error!(self, self.try_upgrade()); - let permit = match connection.reserve_timeout(timeout).await { + let permit = match tokio::task::unconstrained(connection.reserve_timeout(timeout)).await { Ok(permit) => permit, Err(e) => return Err(NetworkSendError::new(self, e)), }; @@ -446,7 +442,6 @@ impl Outgoing { Metadata::with_current(|metadata| { permit.send(self, metadata); }); - CONNECTION_SEND_DURATION.record(send_start.elapsed()); Ok(()) } @@ -455,7 +450,6 @@ impl Outgoing { /// /// This fails immediately with [`NetworkError::Full`] if connection stream is out of capacity. pub fn try_send(self) -> Result<(), NetworkSendError> { - let send_start = Instant::now(); let connection = bail_on_error!(self, self.try_upgrade()); let permit = bail_on_error!(self, connection.try_reserve()); @@ -463,7 +457,6 @@ impl Outgoing { permit.send(self, metadata); }); - CONNECTION_SEND_DURATION.record(send_start.elapsed()); Ok(()) } diff --git a/crates/core/src/task_center/task_kind.rs b/crates/core/src/task_center/task_kind.rs index dacbc32175..dd3e84a802 100644 --- a/crates/core/src/task_center/task_kind.rs +++ b/crates/core/src/task_center/task_kind.rs @@ -80,8 +80,9 @@ pub enum TaskKind { #[strum(props(OnCancel = "abort"))] MetadataBackgroundSync, RpcServer, + #[strum(props(runtime = "default"))] SocketHandler, - #[strum(props(OnError = "log"))] + #[strum(props(OnError = "log", runtime = "default"))] H2Stream, /// A task that handles a single RPC request. The task is executed on the default runtime to /// decouple it from the lifetime of the originating runtime. Use this task kind if you want to @@ -104,7 +105,7 @@ pub enum TaskKind { /// Low-priority tasks responsible for partition snapshot-related I/O. #[strum(props(OnCancel = "abort", OnError = "log"))] PartitionSnapshotProducer, - #[strum(props(OnError = "log"))] + #[strum(props(OnError = "log", runtime = "default"))] ConnectionReactor, Shuffle, Cleaner, diff --git a/crates/log-server/src/loglet_worker.rs b/crates/log-server/src/loglet_worker.rs index c58669e8e8..4408d4dea8 100644 --- a/crates/log-server/src/loglet_worker.rs +++ b/crates/log-server/src/loglet_worker.rs @@ -178,7 +178,7 @@ impl LogletWorker { // this might include sending notifications of shutdown to allow graceful // handoff trace!(loglet_id = %self.loglet_id, "Loglet writer shutting down"); - return; + break; } // GET_DIGEST Some(msg) = get_digest_rx.recv() => { @@ -186,6 +186,17 @@ impl LogletWorker { // digest responses are spawned as tasks self.process_get_digest(msg); } + // GET_LOGLET_INFO + Some(msg) = get_loglet_info_rx.recv() => { + self.loglet_state.notify_known_global_tail(msg.body().header.known_global_tail); + // drop response if connection is lost/congested + let peer = msg.peer(); + if let Err(e) = msg.to_rpc_response(LogletInfo::new(self.loglet_state.local_tail(), self.loglet_state.trim_point(), self.loglet_state.known_global_tail())).try_send() { + debug!(?e.source, peer = %peer, "Failed to respond to GetLogletInfo message due to peer channel capacity being full"); + } else { + tracing::trace!(%peer, %self.loglet_id, local_tail = ?self.loglet_state.local_tail(), known_global_tail = %self.loglet_state.known_global_tail(), "GetLogletInfo response"); + } + } Some(_) = in_flight_stores.next() => {} // The in-flight seal (if any) Some(Ok(_)) = &mut in_flight_seal => { @@ -231,17 +242,6 @@ impl LogletWorker { } } - // GET_LOGLET_INFO - Some(msg) = get_loglet_info_rx.recv() => { - self.loglet_state.notify_known_global_tail(msg.body().header.known_global_tail); - // drop response if connection is lost/congested - let peer = msg.peer(); - if let Err(e) = msg.to_rpc_response(LogletInfo::new(self.loglet_state.local_tail(), self.loglet_state.trim_point(), self.loglet_state.known_global_tail())).try_send() { - debug!(?e.source, peer = %peer, "Failed to respond to GetLogletInfo message due to peer channel capacity being full"); - } else { - tracing::trace!(%peer, %self.loglet_id, local_tail = ?self.loglet_state.local_tail(), known_global_tail = %self.loglet_state.known_global_tail(), "GetLogletInfo response"); - } - } // GET_RECORDS Some(msg) = get_records_rx.recv() => { self.loglet_state.notify_known_global_tail(msg.body().header.known_global_tail); @@ -302,6 +302,31 @@ impl LogletWorker { } } } + + // draining in-flight operations + drop(store_rx); + drop(release_rx); + drop(seal_rx); + drop(get_loglet_info_rx); + drop(get_records_rx); + drop(trim_rx); + drop(wait_for_tail_rx); + drop(get_digest_rx); + tracing::debug!(loglet_id = %self.loglet_id, "Draining loglet worker"); + loop { + tokio::select! { + Some(Ok(_)) = &mut in_flight_seal => { + self.loglet_state.get_local_tail_watch().notify_seal(); + debug!(loglet_id = %self.loglet_id, "Loglet is now sealed on this log-server node"); + in_flight_seal.set(None.into()); + } + Some(_) = in_flight_stores.next() => {} + Some(_) = in_flight_network_sends.next() => {} + Some(_) = waiting_for_seal.next() => {} + else => break, + } + } + tracing::debug!(loglet_id = %self.loglet_id, "loglet worker drained"); } async fn process_store( diff --git a/crates/types/src/config/networking.rs b/crates/types/src/config/networking.rs index 13a76859e8..569820068e 100644 --- a/crates/types/src/config/networking.rs +++ b/crates/types/src/config/networking.rs @@ -63,6 +63,20 @@ pub struct NetworkingOptions { /// The number of messages that can be queued on the outbound stream of a single /// connection. pub outbound_queue_length: NonZeroUsize, + + /// # Number of connections to each peer + /// + /// This is used as a guiding value for how many connections every node can + /// maintain with each peer. With more connections, concurrency of network message + /// processing increases, but it also increases the memory and CPU overhead. + pub num_concurrent_connections: NonZeroUsize, +} + +impl NetworkingOptions { + #[inline(always)] + pub fn num_concurrent_connections(&self) -> usize { + self.num_concurrent_connections.get() + } } impl Default for NetworkingOptions { @@ -80,6 +94,7 @@ impl Default for NetworkingOptions { http2_keep_alive_interval: Duration::from_secs(5).into(), http2_keep_alive_timeout: Duration::from_secs(5).into(), http2_adaptive_window: true, + num_concurrent_connections: NonZeroUsize::new(13).unwrap(), } } } diff --git a/crates/types/src/node_id.rs b/crates/types/src/node_id.rs index f2fe02891b..1646ab5aff 100644 --- a/crates/types/src/node_id.rs +++ b/crates/types/src/node_id.rs @@ -106,6 +106,12 @@ impl GenerationalNodeId { self.encode(buf); buf.split() } + + /// Same plain node-id but not the same generation + #[inline(always)] + pub fn is_same_but_different(&self, other: &GenerationalNodeId) -> bool { + self.0 == other.0 && self.1 != other.1 + } } impl From for u64 { diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index 5340e9d89e..47560441cb 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -8,16 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::HashMap; - use enumset::{EnumSet, EnumSetType}; use serde_with::serde_as; -use xxhash_rust::xxh3::Xxh3DefaultBuilder; use crate::locality::NodeLocation; use crate::net::AdvertisedAddress; use crate::{flexbuffers_storage_encode_decode, GenerationalNodeId, NodeId, PlainNodeId}; use crate::{Version, Versioned}; +use ahash::HashMap; #[derive(Debug, thiserror::Error)] pub enum NodesConfigError { @@ -57,14 +55,15 @@ pub enum Role { } #[serde_as] -#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +#[derive(derive_more::Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub struct NodesConfiguration { version: Version, cluster_name: String, // flexbuffers only supports string-keyed maps :-( --> so we store it as vector of kv pairs #[serde_as(as = "serde_with::Seq<(_, _)>")] nodes: HashMap, - name_lookup: HashMap, + #[debug(skip)] + name_lookup: HashMap, } impl Default for NodesConfiguration { @@ -134,6 +133,14 @@ impl NodesConfiguration { } } + pub fn len(&self) -> usize { + self.name_lookup.len() + } + + pub fn is_empty(&self) -> bool { + self.name_lookup.is_empty() + } + pub fn cluster_name(&self) -> &str { &self.cluster_name } diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index ed9a55e27a..648c5def0a 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -329,7 +329,7 @@ where debug!( last_applied_lsn = %last_applied_lsn, current_log_tail = %current_tail, - "PartitionProcessor creating log reader", + "Partition creating log reader", ); if current_tail.offset() == last_applied_lsn.next() { if self.status.replay_status != ReplayStatus::Active {