diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index e0561c16f2..eff28a83cf 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -797,7 +797,9 @@ mod tests { use restate_core::test_env::NoOpMessageHandler; use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder}; use restate_types::cluster::cluster_state::{NodeState, PartitionProcessorStatus}; - use restate_types::config::{AdminOptions, BifrostOptions, Configuration, NetworkingOptions}; + use restate_types::config::{ + AdminOptionsBuilder, BifrostOptions, Configuration, NetworkingOptions, + }; use restate_types::health::HealthStatus; use restate_types::identifiers::PartitionId; use restate_types::live::Live; @@ -896,9 +898,11 @@ mod tests { async fn auto_log_trim() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); - let mut admin_options = AdminOptions::default(); let interval_duration = Duration::from_secs(10); - admin_options.log_trim_interval = Some(interval_duration.into()); + let admin_options = AdminOptionsBuilder::default() + .log_trim_interval(interval_duration.into()) + .build() + .unwrap(); let networking = NetworkingOptions { // we are using failing transport so we only want to use the mock connection we created. num_concurrent_connections: NonZero::new(1).unwrap(), @@ -979,9 +983,12 @@ mod tests { ..Default::default() }; - let mut admin_options = AdminOptions::default(); let interval_duration = Duration::from_secs(10); - admin_options.log_trim_interval = Some(interval_duration.into()); + let admin_options = AdminOptionsBuilder::default() + .log_trim_interval(interval_duration.into()) + .build() + .unwrap(); + let interval_duration = Duration::from_secs(10); let config = Configuration { networking, admin: admin_options, @@ -1051,9 +1058,11 @@ mod tests { async fn do_not_trim_unless_all_nodes_report_persisted_lsn() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); - let mut admin_options = AdminOptions::default(); let interval_duration = Duration::from_secs(10); - admin_options.log_trim_interval = Some(interval_duration.into()); + let admin_options = AdminOptionsBuilder::default() + .log_trim_interval(interval_duration.into()) + .build() + .unwrap(); let mut bifrost_options = BifrostOptions::default(); bifrost_options.default_provider = ProviderKind::InMemory; @@ -1115,7 +1124,6 @@ mod tests { async fn do_not_trim_by_persisted_lsn_if_snapshot_repository_configured() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); - let interval_duration = Duration::from_secs(10); let networking = NetworkingOptions { // we are using failing transport so we only want to use the mock connection we created. @@ -1123,11 +1131,16 @@ mod tests { ..Default::default() }; + let interval_duration = Duration::from_secs(10); + let admin_options = AdminOptionsBuilder::default() + .log_trim_interval(interval_duration.into()) + .build() + .unwrap(); let mut config: Configuration = Configuration { networking, + admin: admin_options, ..Default::default() }; - config.admin.log_trim_interval = Some(interval_duration.into()); config.bifrost.default_provider = ProviderKind::InMemory; config.worker.snapshots.destination = Some("a-repository-somewhere".to_string()); @@ -1201,9 +1214,12 @@ mod tests { async fn auto_trim_by_archived_lsn_when_dead_nodes_present() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); - let mut admin_options = AdminOptions::default(); let interval_duration = Duration::from_secs(10); - admin_options.log_trim_interval = Some(interval_duration.into()); + let admin_options = AdminOptionsBuilder::default() + .log_trim_interval(interval_duration.into()) + .build() + .unwrap(); + let mut bifrost_options = BifrostOptions::default(); bifrost_options.default_provider = ProviderKind::InMemory; let config = Configuration { @@ -1278,9 +1294,12 @@ mod tests { async fn do_not_trim_by_archived_lsn_if_slow_nodes_present() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); - let mut admin_options = AdminOptions::default(); let interval_duration = Duration::from_secs(10); - admin_options.log_trim_interval = Some(interval_duration.into()); + let admin_options = AdminOptionsBuilder::default() + .log_trim_interval(interval_duration.into()) + .build() + .unwrap(); + let mut bifrost_options = BifrostOptions::default(); bifrost_options.default_provider = ProviderKind::InMemory; let networking = NetworkingOptions { diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index d70902dd08..c5ede867a5 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -11,11 +11,14 @@ use std::collections::{BTreeMap, BTreeSet}; use std::ops::{Add, Deref}; use std::sync::Arc; -use std::time::Duration; use futures::future::OptionFuture; use itertools::Itertools; -use rand::Rng; +use tokio::sync::watch; +use tokio::time; +use tokio::time::{Interval, MissedTickBehavior}; +use tracing::{debug, info, instrument, trace, warn}; + use restate_bifrost::Bifrost; use restate_core::network::TransportConnect; use restate_core::{my_node_id, Metadata}; @@ -26,11 +29,8 @@ use restate_types::config::{AdminOptions, Configuration}; use restate_types::identifiers::PartitionId; use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_types::net::metadata::MetadataKind; +use restate_types::retries::with_jitter; use restate_types::{GenerationalNodeId, PlainNodeId, Version}; -use tokio::sync::watch; -use tokio::time; -use tokio::time::{Interval, MissedTickBehavior}; -use tracing::{debug, info, instrument, trace, warn}; use crate::cluster_controller::cluster_state_refresher::ClusterStateWatcher; use crate::cluster_controller::logs_controller::{ @@ -230,7 +230,7 @@ where _ = self.find_logs_tail_interval.tick() => { self.logs_controller.find_logs_tail(); } - _ = OptionFuture::from(self.log_trim_check_interval.as_mut().map(|interval| interval.tick())) => { + Some(_) = OptionFuture::from(self.log_trim_check_interval.as_mut().map(|interval| interval.tick())) => { return Ok(LeaderEvent::TrimLogs); } result = self.logs_controller.run_async_operations() => { @@ -354,13 +354,13 @@ fn create_log_trim_check_interval(options: &AdminOptions) -> Option { .log_trim_threshold .inspect(|_| info!("The log trim threshold setting is deprecated and will be ignored")); - options.log_trim_interval.map(|interval| { - // delay the initial trim check, and add a small amount of jitter to avoid synchronization + options.log_trim_interval().map(|interval| { + // delay the initial trim check, and introduces small amount of jitter (+/-10%) to avoid synchronization // among partition leaders in case of coordinated cluster restarts - let jitter = rand::rng().random_range(Duration::ZERO..interval.mul_f32(0.1)); - let start_at = time::Instant::now().add(interval.into()).add(jitter); + let effective_interval = with_jitter(interval, 0.1); + let start_at = time::Instant::now().add(effective_interval); - let mut interval = time::interval_at(start_at, interval.into()); + let mut interval = time::interval_at(start_at, effective_interval); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); interval }) diff --git a/crates/bifrost/src/appender.rs b/crates/bifrost/src/appender.rs index 9b2801b940..a1c02e7247 100644 --- a/crates/bifrost/src/appender.rs +++ b/crates/bifrost/src/appender.rs @@ -110,12 +110,12 @@ impl Appender { }; match loglet.append_batch(batch.clone()).await { Ok(lsn) => return Ok(lsn), - Err(AppendError::Sealed) => { + Err(err @ AppendError::Sealed | err @ AppendError::ReconfigurationNeeded(_)) => { debug!( log_id = %self.log_id, attempt = attempt, segment_index = %loglet.segment_index(), - "Batch append failed but will be retried (loglet has been sealed). Waiting for reconfiguration to complete" + "Batch append failed but will be retried ({err}). Waiting for reconfiguration to complete" ); let new_loglet = Self::on_sealed_loglet( self.log_id, diff --git a/crates/bifrost/src/loglet.rs b/crates/bifrost/src/loglet.rs index 0f3f160320..bf92ea03c1 100644 --- a/crates/bifrost/src/loglet.rs +++ b/crates/bifrost/src/loglet.rs @@ -16,19 +16,19 @@ pub mod util; // exports pub use error::*; -use futures::stream::BoxStream; pub use provider::{LogletProvider, LogletProviderFactory}; -use restate_types::logs::metadata::ProviderKind; -use tokio::sync::oneshot; +use std::borrow::Cow; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Poll}; use async_trait::async_trait; +use futures::stream::BoxStream; use futures::{FutureExt, Stream}; +use tokio::sync::oneshot; -use restate_core::ShutdownError; +use restate_types::logs::metadata::ProviderKind; use restate_types::logs::{KeyFilter, LogletId, LogletOffset, Record, TailState}; use crate::LogEntry; @@ -190,6 +190,12 @@ impl LogletCommit { Self { rx } } + pub fn reconfiguration_needed(reason: impl Into>) -> Self { + let (tx, rx) = oneshot::channel(); + let _ = tx.send(Err(AppendError::ReconfigurationNeeded(reason.into()))); + Self { rx } + } + pub fn resolved(offset: LogletOffset) -> Self { let (tx, rx) = oneshot::channel(); let _ = tx.send(Ok(offset)); @@ -211,7 +217,9 @@ impl std::future::Future for LogletCommit { ) -> Poll { match ready!(self.rx.poll_unpin(cx)) { Ok(res) => Poll::Ready(res), - Err(_) => Poll::Ready(Err(AppendError::Shutdown(ShutdownError))), + Err(_) => Poll::Ready(Err(AppendError::ReconfigurationNeeded( + "loglet gave up on this batch".into(), + ))), } } } diff --git a/crates/bifrost/src/loglet/error.rs b/crates/bifrost/src/loglet/error.rs index d70b20fe75..da0c3e0a0a 100644 --- a/crates/bifrost/src/loglet/error.rs +++ b/crates/bifrost/src/loglet/error.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::borrow::Cow; use std::fmt::Debug; use std::sync::Arc; @@ -16,8 +17,10 @@ use restate_types::errors::{IntoMaybeRetryable, MaybeRetryableError}; #[derive(Debug, Clone, thiserror::Error)] pub enum AppendError { - #[error("Loglet is sealed")] + #[error("Loglet has been sealed")] Sealed, + #[error("Loglet needs reconfiguration; {0}")] + ReconfigurationNeeded(Cow<'static, str>), #[error(transparent)] Shutdown(#[from] ShutdownError), #[error(transparent)] diff --git a/crates/bifrost/src/loglet/loglet_tests.rs b/crates/bifrost/src/loglet/loglet_tests.rs index 1f9486c928..b43b77b971 100644 --- a/crates/bifrost/src/loglet/loglet_tests.rs +++ b/crates/bifrost/src/loglet/loglet_tests.rs @@ -510,6 +510,10 @@ pub async fn append_after_seal_concurrent(loglet: Arc) -> googletest println!("append failed({i}) => SEALED"); break; } + Err(AppendError::ReconfigurationNeeded(reason)) => { + println!("append failed({i}) => ReconfigurationNeeded({reason})"); + break; + } Err(AppendError::Shutdown(_)) => { break; } diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs index 6fe8dce7c7..eed202756e 100644 --- a/crates/bifrost/src/loglet_wrapper.rs +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -188,7 +188,15 @@ impl LogletWrapper { pub async fn get_trim_point(&self) -> Result, OperationError> { let offset = self.loglet.get_trim_point().await?; - Ok(offset.map(|o| self.base_lsn.offset_by(o))) + Ok(offset + .map(|o| self.base_lsn.offset_by(o)) + .map(|actual_trim_point| { + // If this loglet is sealed, the reported trim-point must fall within its boundaries + match self.tail_lsn { + Some(tail) => actual_trim_point.min(tail.prev()), + None => actual_trim_point, + } + })) } // trim_point is inclusive. diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index f12edb5ae6..a5916ec49e 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -219,7 +219,7 @@ impl ReplicatedLoglet { } CheckSealOutcome::FullySealed => { // already fully sealed, just make sure the sequencer is drained. - handle.drain().await?; + handle.drain().await; // note that we can only do that if we are the sequencer because // our known_global_tail is authoritative. We have no doubt about // whether the tail needs to be repaired or not. @@ -370,8 +370,6 @@ impl Loglet for ReplicatedLoglet { /// trim_point is inclusive (will be trimmed) async fn trim(&self, trim_point: LogletOffset) -> Result<(), OperationError> { trace!("trim() called"); - let trim_point = trim_point.min(self.known_global_tail.latest_offset().prev_unchecked()); - TrimTask::new( &self.my_params, self.logservers_rpc.clone(), @@ -420,7 +418,7 @@ impl Loglet for ReplicatedLoglet { .await?; // If we are the sequencer, we need to wait until the sequencer is drained. if let SequencerAccess::Local { handle } = &self.sequencer { - handle.drain().await?; + handle.drain().await; self.known_global_tail.notify_seal(); }; // Primarily useful for remote sequencer to enforce seal check on the next find_tail() call diff --git a/crates/bifrost/src/providers/replicated_loglet/network.rs b/crates/bifrost/src/providers/replicated_loglet/network.rs index 335b9a1d16..f21a273abd 100644 --- a/crates/bifrost/src/providers/replicated_loglet/network.rs +++ b/crates/bifrost/src/providers/replicated_loglet/network.rs @@ -395,6 +395,14 @@ impl WaitForCommitTask { }, last_offset: LogletOffset::INVALID, }, + Err(AppendError::ReconfigurationNeeded(_)) => Appended { + header: CommonResponseHeader { + known_global_tail: Some(self.global_tail.latest_offset()), + sealed: Some(self.global_tail.is_sealed()), // this must be true + status: SequencerStatus::Gone, + }, + last_offset: LogletOffset::INVALID, + }, Err(AppendError::Shutdown(_)) => Appended { header: CommonResponseHeader { known_global_tail: Some(self.global_tail.latest_offset()), diff --git a/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs b/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs index 2d65ea8725..3bd870f2b2 100644 --- a/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs +++ b/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs @@ -545,7 +545,7 @@ impl ReadStreamTask { loglet_id = %self.my_params.loglet_id, from_offset = %self.read_pointer, %to_offset, - ?e, + %e, "Could not request record batch from node {}", server ); Ok(ServerReadResult::Skip) diff --git a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs index 1101fbdc0b..13d04e9b3c 100644 --- a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs @@ -30,6 +30,7 @@ use restate_types::{ errors::MaybeRetryableError, logs::{metadata::SegmentIndex, LogId, Record}, net::replicated_loglet::{Append, Appended, CommonRequestHeader, SequencerStatus}, + nodes_config::NodesConfigError, replicated_loglet::ReplicatedLogletParams, GenerationalNodeId, }; @@ -140,7 +141,10 @@ where .await .unwrap(); - let mut connection = self.get_connection().await?; + let mut connection = match self.get_connection().await { + Ok(connection) => connection, + Err(err) => return self.on_handle_network_error(err), + }; let mut msg = Append { header: CommonRequestHeader { @@ -163,8 +167,10 @@ where | NetworkError::ConnectionClosed(_) | NetworkError::Timeout(_) => { // we retry to re-connect one time - connection = self.renew_connection(connection).await?; - + connection = match self.renew_connection(connection).await { + Ok(connection) => connection, + Err(err) => return self.on_handle_network_error(err), + }; msg = err.original; continue; } @@ -180,6 +186,27 @@ where Ok(commit_token) } + fn on_handle_network_error(&self, err: NetworkError) -> Result { + match err { + err @ NetworkError::OldPeerGeneration(_) | err @ NetworkError::NodeIsGone(_) => { + // means that the sequencer is gone, we need reconfiguration. + Ok(LogletCommit::reconfiguration_needed(format!( + "sequencer is gone; {err}" + ))) + } + NetworkError::UnknownNode(NodesConfigError::GenerationMismatch { found, .. }) + if found.is_newer_than(self.params.sequencer) => + { + // means that the sequencer is gone, we need reconfiguration. + Ok(LogletCommit::reconfiguration_needed(format!( + "sequencer is gone; {err}" + ))) + } + // probably retryable + err => Err(err.into()), + } + } + /// Gets or starts a new remote sequencer connection async fn get_connection(&self) -> Result { let mut guard = self.connection.lock().await; @@ -375,11 +402,16 @@ impl RemoteSequencerConnection { commit_resolver.sealed(); break AppendError::Sealed; } + SequencerStatus::Gone | SequencerStatus::Shutdown => { + // this sequencer is not coming back + commit_resolver.error(AppendError::ReconfigurationNeeded( + format!("sequencer at {} is terminating", connection.peer()).into(), + )); + } SequencerStatus::UnknownLogId | SequencerStatus::UnknownSegmentIndex | SequencerStatus::LogletIdMismatch | SequencerStatus::NotSequencer - | SequencerStatus::Shutdown | SequencerStatus::Error { .. } => { let err = RemoteSequencerError::try_from(appended.header.status).unwrap(); // While the UnknownLoglet status is non-terminal for the connection @@ -428,8 +460,6 @@ pub enum RemoteSequencerError { LogletIdMismatch, #[error("Remote node is not a sequencer")] NotSequencer, - #[error("Sequencer shutdown")] - Shutdown, #[error("Unknown remote error: {message}")] Error { retryable: bool, message: String }, } @@ -441,7 +471,6 @@ impl MaybeRetryableError for RemoteSequencerError { Self::UnknownSegmentIndex => false, Self::LogletIdMismatch => false, Self::NotSequencer => false, - Self::Shutdown => false, Self::Error { retryable, .. } => *retryable, } } @@ -455,12 +484,14 @@ impl TryFrom for RemoteSequencerError { SequencerStatus::UnknownSegmentIndex => RemoteSequencerError::UnknownSegmentIndex, SequencerStatus::LogletIdMismatch => RemoteSequencerError::LogletIdMismatch, SequencerStatus::NotSequencer => RemoteSequencerError::NotSequencer, - SequencerStatus::Shutdown => RemoteSequencerError::Shutdown, SequencerStatus::Error { retryable, message } => { RemoteSequencerError::Error { retryable, message } } - SequencerStatus::Ok | SequencerStatus::Sealed => { - return Err("not a failure status"); + SequencerStatus::Ok + | SequencerStatus::Sealed + | SequencerStatus::Shutdown + | SequencerStatus::Gone => { + unreachable!("not a failure status") } }; diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs index f492980bad..5364046e1a 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs @@ -15,11 +15,12 @@ use std::hash::{Hash, Hasher}; use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use itertools::Itertools; -use restate_types::replication::DecoratedNodeSet; -use tracing::warn; +use tracing::trace; use restate_types::locality::{LocationScope, NodeLocation}; +use restate_types::nodes_config::NodesConfigError; use restate_types::nodes_config::{NodesConfiguration, StorageState}; +use restate_types::replication::DecoratedNodeSet; use restate_types::replication::{NodeSet, ReplicationProperty}; use restate_types::Merge; use restate_types::PlainNodeId; @@ -108,13 +109,13 @@ impl FMajorityResult { /// /// The concept of "f-majority" can be defined as a set of nodes that intersects every possible /// legal write quorum. Nodes are assigned a [`StorageState`] that define their authoritative-ness. -/// An authoritative node is one that has not lost data and is not empty. Empty in this context -/// means that it's guaranteed that it has never received data and was never a participant in -/// previous writes. A node is considered non-authoritative if it's in [`StorageState::DataLoss`] -/// which means it has lost data and it cannot be considered a reliable participant in some quorum -/// checks. That said, dataloss doesn't include "corruption". It only means that if a node responds -/// negatively to a read request that we cannot confidently assume that the data was never written. -/// Conversely, if the node responds with a record/data, we can use this data safely. +/// An authoritative node is one that has not lost data. Empty nodes are those that we guarantee +/// that they have never received any writes and will never participate in future writes. +/// A node is considered non-authoritative if it's in [`StorageState::DataLoss`] which means it +/// has lost data and it cannot be considered a reliable participant in some quorum checks. That +/// said, dataloss doesn't include "corruption". It only means that if a node responds negatively +/// to a read request that we cannot confidently assume that the data was never written. Conversely, +/// if the node responds with a record/data, we can use this data safely. pub struct NodeSetChecker { // This is a btree-map to leverage its great cache-locality on small sets like this one. // this could also be Box<[(PlainNodeId, LocationScopeState)]> but btreemap is nicer to @@ -123,8 +124,8 @@ pub struct NodeSetChecker { node_to_attr: HashMap, /// Mapping between node-id and its log-server storage state. Note that we keep all nodes even /// unreadable ones here because they might become readable after a nodes configuration - /// refresh. That said. Node-ids that have been deleted in nodes-configuration will not appear - /// here. + /// refresh. That said. Node-ids that have been deleted in nodes-configuration or that has + /// storage state `Disabled` will not appear here. node_to_storage_state: HashMap, } @@ -160,12 +161,24 @@ impl NodeSetChecker { }; for node_id in nodeset.iter() { - if let Ok(config) = nodes_config.find_node_by_id(*node_id) { - checker.add_node( + match nodes_config.find_node_by_id(*node_id) { + Ok(config) => checker.add_node( *node_id, config.log_server_config.storage_state, &config.location, - ); + ), + Err(NodesConfigError::Deleted(_)) => { + // deleted nodes are implicitly considered disabled (auth empty) + } + Err(NodesConfigError::UnknownNodeId(_)) => { + // Unknown nodes must be also considered as Provisioning because we might be + // operating on a nodeset that's out of sync with the nodes configuration + // (nodeset is newer). + checker.add_node(*node_id, StorageState::Provisioning, &NodeLocation::new()); + } + Err( + NodesConfigError::InvalidUri(_) | NodesConfigError::GenerationMismatch { .. }, + ) => unreachable!("impossible nodes-configuration errors"), } } @@ -259,7 +272,7 @@ impl NodeSetChecker { .get(&node_id) .copied() .unwrap_or(StorageState::Disabled); - if !storage_state.should_read_from() { + if storage_state.empty() { return; } let old_attribute = self.node_to_attr.insert(node_id, attribute.clone()); @@ -286,7 +299,7 @@ impl NodeSetChecker { .get(&node_id) .copied() .unwrap_or(StorageState::Disabled); - if !storage_state.should_read_from() { + if storage_state.empty() { return; } let existing = self.node_to_attr.get(&node_id); @@ -321,7 +334,7 @@ impl NodeSetChecker { self.node_to_attr.values().all(predicate) } - // Does any node matches the predicate? + /// Does any node matches the predicate? pub fn filter( &self, predicate: impl Fn(&Attr) -> bool, @@ -367,7 +380,8 @@ impl NodeSetChecker { // we have an f-majority if at least one scope has f-majority let mut f_majority = false; let mut sufficient_auth_domains = false; - // `for (scope, scope_state) in self.scopes.iter()` if you need the scope for debugging. + //` if you need the scope for debugging. + // for (scope, scope_state) in self.scopes.iter() { for scope_state in self.scopes.values() { let candidate_domains = u32::try_from(scope_state.failure_domains.len()) .expect("total domains must fit into u32"); @@ -411,11 +425,11 @@ impl NodeSetChecker { storage_state: StorageState, location: &NodeLocation, ) { - self.node_to_storage_state.insert(node_id, storage_state); // we only consider nodes that might have data, everything else is ignored. - if !storage_state.should_read_from() { + if storage_state.empty() { return; } + self.node_to_storage_state.insert(node_id, storage_state); for (scope, scope_state) in self.scopes.iter_mut() { let domain_name = if *scope == LocationScope::Node { @@ -426,7 +440,9 @@ impl NodeSetChecker { location.domain_string(*scope, None) } else { // node doesn't have location information at this scope level - warn!( + // this is trace because it can very noisy if a single node in the cluster in this + // state. We don't have a good way of avoiding log-spamming yet. + trace!( "Node {} doesn't have location information at location scope {:?} although replication is configured at this scope", node_id, scope ); @@ -556,7 +572,7 @@ impl Count { /// Increments the corresponding counters according to this storage state fn increment(&mut self, storage_state: StorageState) { - if storage_state.should_read_from() { + if storage_state.can_read_from() { self.num_readable_nodes += 1; } @@ -571,7 +587,7 @@ impl Count { return; } - if storage_state.should_read_from() { + if storage_state.can_read_from() { self.num_readable_nodes -= 1; } if storage_state.is_authoritative() { @@ -836,6 +852,51 @@ mod tests { Ok(()) } + #[test] + fn nodeset_checker_non_existent_nodes() -> Result<()> { + const LOCATION: &str = ""; + // all_authoritative, all flat structure (no location) + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + // add 2 nodes to config N1, N2 + for i in 1..=2 { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + LOCATION, + )); + } + + // NodeSet has 3 nodes N1, N2, N3. (N3 doesn't exist in config) + let nodeset: NodeSet = (1..=3).collect(); + let replication = ReplicationProperty::new(2.try_into().unwrap()); + let mut checker: NodeSetChecker = + NodeSetChecker::new(&nodeset, &nodes_config, &replication); + + // all nodes in the nodeset are authoritative + assert_that!(checker.count_nodes(StorageState::is_authoritative), eq(3)); + // all nodes are false by default. Can't establish write quorum. + assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + + // Because N3 is not found, we should not consider it as auth empty, because it might show + // up in future nodes configuration update, we *must* consider it as Provisioning (fully + // auth). This means that we must acquire 2 nodes to achieve f-majority. + checker.set_attribute_on_each([1], true); + assert_that!( + checker.check_fmajority(|attr| !(*attr)), + eq(FMajorityResult::None) + ); + + checker.set_attribute_on_each([1, 2], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + + // 2 nodes is also what we need to write quorum + assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); + Ok(()) + } + #[test] fn nodeset_checker_mixed() -> Result<()> { const LOCATION: &str = ""; @@ -1083,7 +1144,7 @@ mod tests { // .az2 [N4, N5, N6] // region2 // .az1 [N7, N8, N9] - // - [N10] N11 - in Provisioning + // - [N10(Provisioning/Not In Config) N11(D)] - D = Disabled let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); // all authoritative // region1.az1 @@ -1110,13 +1171,12 @@ mod tests { "region2.az1", )); } - // N10 - nodes_config.upsert_node(generate_logserver_node(10, StorageState::ReadWrite, "")); - // N11 - nodes_config.upsert_node(generate_logserver_node(11, StorageState::Provisioning, "")); + // N10 - Provisioning - Not in config + // N11 - Disabled + nodes_config.upsert_node(generate_logserver_node(11, StorageState::Disabled, "")); // # Scenario 1 - // - Nodeset with all nodes = 10 nodes + // - Nodeset with all nodes = 11 nodes (N11 is effectively discounted), effective=10 // - Replication is {node: 3} let nodeset: NodeSet = (1..=11).collect(); let replication = ReplicationProperty::new(3.try_into().unwrap()); @@ -1126,21 +1186,22 @@ mod tests { assert_that!(checker.count_nodes(StorageState::is_authoritative), eq(10)); // all nodes are false by default. Can't establish write quorum. assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); - // node 11 is provisioning, it won't fulfill our write quorum requirements. - checker.set_attribute_on_each([1, 2, 11], true); + // node 10 is provisioning, we can mark it but we shouldn't if we didn't actually write + // records to it. + checker.set_attribute_on_each([1, 2], true); assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); - // trying 3 instead + // having 3 will satisfy our quorum needs checker.set_attribute(3, true); assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); - // We need 8 nodes (anywhere) for f-majority, current [1,2,3] are marked. + // We need 8 authoritative non-empty nodes (anywhere) for f-majority, current [1,2,3] are marked. assert_that!( checker.check_fmajority(|attr| *attr), eq(FMajorityResult::None) ); - // total 7 nodes marked - checker.set_attribute_on_each([4, 5, 7, 8], true); + // total 8 nodes marked but 11 is empty/disabled, not enough. + checker.set_attribute_on_each([4, 5, 7, 8, 11], true); assert_that!( checker.check_fmajority(|attr| *attr), eq(FMajorityResult::None) @@ -1257,7 +1318,7 @@ mod tests { // .az2 [N4, N5(DL), N6] // region2 // .az1 [N7, N8, N9] - // - [N10] N11 - in Provisioning + // - [N10, N11(P)] - P = Provisioning let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); // region1.az1 nodes_config.upsert_node(generate_logserver_node( @@ -1310,7 +1371,7 @@ mod tests { NodeSetChecker::new(&nodeset, &nodes_config, &replication); // marked every possible node - checker.set_attribute_on_each([4, 6, 7, 8, 9, 10], true); + checker.set_attribute_on_each([4, 6, 7, 8, 9, 10, 11], true); assert_that!( checker.check_fmajority(|attr| *attr), eq(FMajorityResult::BestEffort) @@ -1417,10 +1478,10 @@ mod tests { #[test] fn nodeset_checker_fd_empty_domains() -> Result<()> { - // total = 11 nodes across 3 regions. + // total = 12 nodes across 4 regions (effectively 3 regions, region4 is disabled) // replication = {region: 2, zone: 3} // region1 - // .az1 [N1, N2(D), N3(D), N4(D)] + // .az1 [N1, N2(P), N3(D), N4(D)] // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled // region2 // .az1 [N8, N9, N10] @@ -1487,7 +1548,7 @@ mod tests { // # Write-quorum checks // Can we write to N2, N5, N11? No. // region1 - // .az1 [N1, N2(D), N3(D), N4(D)] + // .az1 [N1, N2(P), N3(D), N4(D)] // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled // region2 // .az1 [N8, N9, N10] @@ -1503,7 +1564,7 @@ mod tests { // but adding any node from region2 should fix it checker.set_attribute_on_each([10], true); // region1 - // .az1 [N1(x), N2(D), N3(D), N4(D)] + // .az1 [N1(x), N2(x), N3(D), N4(D)] // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled // region2 // .az1 [N8, N9, N10(x)] @@ -1526,7 +1587,7 @@ mod tests { // In other words, have f-majority if we have one full zone, or any 2 non-empty regions. // // region1 - // .az1 [N1, N2(D), N3(D), N4(D)] + // .az1 [N1, N2(P), N3(D), N4(D)] // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled // region2 // .az1 [N8, N9, N10] @@ -1535,6 +1596,14 @@ mod tests { // region4 // .az1 [N12(D)] + checker.set_attribute_on_each([1, 2], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + // revert that. + checker.set_attribute_on_each([1, 2], false); + // N10 on region2.az1 is not sufficient since other nodes on the same zone are not marked checker.set_attribute_on_each([10], true); assert_that!( @@ -1549,26 +1618,27 @@ mod tests { eq(FMajorityResult::Success) ); - // Another more stringent example is region1.az1. Only N1 should be sufficient for + // Another more stringent example is region3.az1. Only N11 should be sufficient for // f-majority in this setup. checker.fill_with_default(); - checker.set_attribute_on_each([1], true); + checker.set_attribute_on_each([11], true); // Flexible quorums FTW. assert_that!( checker.check_fmajority(|attr| *attr), eq(FMajorityResult::Success) ); - // same for N11 + + // but not for N9 checker.fill_with_default(); - checker.set_attribute_on_each([11], true); + checker.set_attribute_on_each([9], true); assert_that!( checker.check_fmajority(|attr| *attr), - eq(FMajorityResult::Success) + eq(FMajorityResult::None) ); - // but not for N9 + // nor N12 because it's empty (not member of the nodeset) checker.fill_with_default(); - checker.set_attribute_on_each([9], true); + checker.set_attribute_on_each([12], true); assert_that!( checker.check_fmajority(|attr| *attr), eq(FMajorityResult::None) diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer.rs index a2118637bb..3a163bc00d 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer.rs @@ -17,7 +17,7 @@ use std::sync::{ use crossbeam_utils::CachePadded; use tokio::sync::Semaphore; -use tokio_util::task::TaskTracker; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tracing::{debug, instrument, trace}; use restate_core::{ @@ -111,6 +111,8 @@ pub struct Sequencer { record_permits: Arc, in_flight_appends: TaskTracker, record_cache: RecordCache, + /// this is the parent token for all appenders. + cancellation_token: CancellationToken, } impl Sequencer { @@ -175,6 +177,7 @@ impl Sequencer { record_cache, max_inflight_records_in_config: AtomicUsize::new(max_in_flight_records_in_config), in_flight_appends: TaskTracker::default(), + cancellation_token: CancellationToken::default(), } } @@ -186,16 +189,17 @@ impl Sequencer { /// observed global_tail with is_sealed=true) /// /// This method is cancellation safe. - pub async fn drain(&self) -> Result<(), ShutdownError> { + pub async fn drain(&self) { // stop issuing new permits self.record_permits.close(); // required to allow in_flight.wait() to finish. self.in_flight_appends.close(); + self.cancellation_token.cancel(); // we are assuming here that seal has been already executed on majority of nodes. This is // important since in_flight.close() doesn't prevent new tasks from being spawned. if self.sequencer_shared_state.known_global_tail.is_sealed() { - return Ok(()); + return; } // wait for in-flight tasks to complete before returning @@ -210,8 +214,6 @@ impl Sequencer { loglet_id = %self.sequencer_shared_state.my_params.loglet_id, "Sequencer drained", ); - - Ok(()) } fn ensure_enough_permits(&self, required: usize) { @@ -288,7 +290,9 @@ impl Sequencer { commit_resolver, ); - let fut = self.in_flight_appends.track_future(appender.run()); + let fut = self + .in_flight_appends + .track_future(appender.run(self.cancellation_token.child_token())); // Why not managed tasks, because managed tasks are not designed to manage a potentially // very large number of tasks, they also require a lock acquistion on start and that might // be a contention point. diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs index 5cb4b76007..385d5668db 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs @@ -12,13 +12,15 @@ use std::{cmp::Ordering, fmt::Display, sync::Arc, time::Duration}; use tokio::time::Instant; use tokio::{sync::OwnedSemaphorePermit, task::JoinSet}; +use tokio_util::sync::CancellationToken; use tracing::{debug, instrument, trace, warn}; use restate_core::{ - cancellation_token, network::{rpc_router::RpcRouter, Incoming, NetworkError, Networking, TransportConnect}, - ShutdownError, TaskCenterFutureExt, + TaskCenterFutureExt, }; +use restate_types::replicated_loglet::Spread; +use restate_types::retries::with_jitter; use restate_types::{ config::Configuration, live::Live, @@ -31,6 +33,7 @@ use restate_types::{ use super::{RecordsExt, SequencerSharedState}; use crate::providers::replicated_loglet::metric_definitions::BIFROST_SEQ_APPEND_DURATION; +use crate::providers::replicated_loglet::replication::spread_selector::SpreadSelectorError; use crate::{ loglet::{AppendError, LogletCommitResolver}, providers::replicated_loglet::{ @@ -46,10 +49,7 @@ const DEFAULT_BACKOFF_TIME: Duration = Duration::from_millis(1000); const TONE_ESCALATION_THRESHOLD: usize = 5; enum State { - Wave { - // nodes that should be avoided by the spread selector - graylist: NodeSet, - }, + Wave, Backoff, Done, Sealed, @@ -72,6 +72,8 @@ pub(crate) struct SequencerAppender { permit: Option, commit_resolver: Option, configuration: Live, + // nodes that should be avoided by the spread selector + graylist: NodeSet, } impl SequencerAppender { @@ -109,6 +111,7 @@ impl SequencerAppender { permit: Some(permit), commit_resolver: Some(commit_resolver), configuration: Configuration::updateable(), + graylist: NodeSet::default(), } } @@ -123,14 +126,10 @@ impl SequencerAppender { otel.name="replicated_loglet::sequencer::appender: run" ) )] - pub async fn run(mut self) { + pub async fn run(mut self, cancellation_token: CancellationToken) { let start = Instant::now(); // initial wave has 0 replicated and 0 gray listed node - let mut state = State::Wave { - graylist: NodeSet::default(), - }; - - let cancellation = cancellation_token(); + let mut state = State::Wave; let retry_policy = self .configuration @@ -147,10 +146,20 @@ impl SequencerAppender { state = match state { // termination conditions State::Done | State::Cancelled | State::Sealed => break state, - State::Wave { graylist } => { + State::Wave => { self.current_wave += 1; - let Some(next_state) = cancellation - .run_until_cancelled(self.send_wave(graylist)) + // # Why is this cancellation safe? + // Because we don't await any futures inside the join_next() loop, so we are + // confident that have cancelled before resolving the commit token. + // We want to make sure we don't cancel _after_ updating the global offset, *then* reporting Cancelled. + // This is because we don't want appenders after our offset to make progress, + // therefore (potentially) dropping records in the writer prefix. Even if a store was + // fully replicated and we cancelled before updating the tail, that's an acceptable + // and safe result because we didn't acknowledge the append to the writer and from + // their perspective it has failed, and because the global tail was not moved, all + // appends after this one cannot move the global tail as well. + let Some(next_state) = cancellation_token + .run_until_cancelled(self.send_wave()) .await else { break State::Cancelled; @@ -160,7 +169,9 @@ impl SequencerAppender { State::Backoff => { // since backoff can be None, or run out of iterations, // but appender should never give up we fall back to fixed backoff - let delay = retry.next().unwrap_or(DEFAULT_BACKOFF_TIME); + let delay = retry + .next() + .unwrap_or(with_jitter(DEFAULT_BACKOFF_TIME, 0.5)); if self.current_wave >= TONE_ESCALATION_THRESHOLD { warn!( wave = %self.current_wave, @@ -173,7 +184,7 @@ impl SequencerAppender { ); } - if cancellation + if cancellation_token .run_until_cancelled(tokio::time::sleep(delay)) .await .is_none() @@ -181,9 +192,7 @@ impl SequencerAppender { break State::Cancelled; }; - State::Wave { - graylist: NodeSet::default(), - } + State::Wave } } }; @@ -208,7 +217,9 @@ impl SequencerAppender { State::Cancelled => { trace!("Append cancelled"); if let Some(commit_resolver) = self.commit_resolver.take() { - commit_resolver.error(AppendError::Shutdown(ShutdownError)); + commit_resolver.error(AppendError::ReconfigurationNeeded( + "sequencer is draining".into(), + )); } } State::Sealed => { @@ -223,26 +234,55 @@ impl SequencerAppender { } } + fn reset_graylist(&mut self) { + self.graylist.clear(); + // add back the sealed nodes to the gray list, those will never be writeable again. + self.graylist.extend( + self.checker + .filter(|attr| attr.sealed) + .map(|(node_id, _)| *node_id), + ); + } + + fn generate_spread(&mut self) -> Result { + let rng = &mut rand::rng(); + let nodes_config = &self.networking.metadata().nodes_config_ref(); + match self + .sequencer_shared_state + .selector + .select(rng, nodes_config, &self.graylist) + { + Ok(spread) => Ok(spread), + Err(err) => { + trace!( + nodeset_status = %self.nodeset_status, + graylist = %self.graylist, + %err, + "Cannot select a spread, perhaps too many nodes are graylisted, will clear the list and try again" + ); + self.reset_graylist(); + self.sequencer_shared_state + .selector + .select(rng, nodes_config, &self.graylist) + } + } + } + #[instrument(skip_all, fields(wave = %self.current_wave))] - async fn send_wave(&mut self, mut graylist: NodeSet) -> State { + async fn send_wave(&mut self) -> State { // select the spread - let spread = match self.sequencer_shared_state.selector.select( - &mut rand::rng(), - &self.networking.metadata().nodes_config_ref(), - &graylist, - ) { + let spread = match self.generate_spread() { Ok(spread) => spread, - Err(_) => { - graylist.clear(); + Err(err) => { trace!( - %graylist, - "Cannot select a spread, perhaps too many nodes are graylisted, will clear the list and try again" + nodeset_status = %self.nodeset_status, + "Cannot select a spread: {err}" ); return State::Backoff; } }; - trace!(%graylist, %spread, "Sending append wave"); + trace!(graylist = %self.graylist, %spread, wave = %self.current_wave, nodeset_status = %self.nodeset_status, "Sending append wave"); let last_offset = self.records.last_offset(self.first_offset).unwrap(); // todo: should be exponential backoff @@ -265,20 +305,26 @@ impl SequencerAppender { continue; } } - store_tasks.spawn({ - let store_task = LogServerStoreTask { - node_id, - sequencer_shared_state: self.sequencer_shared_state.clone(), - networking: self.networking.clone(), - first_offset: self.first_offset, - records: self.records.clone(), - rpc_router: self.store_router.clone(), - store_timeout, - }; - async move { (node_id, store_task.run().await) }.in_current_tc() - }); + store_tasks + .build_task() + .name(&format!("store-to-{}", node_id)) + .spawn({ + let store_task = LogServerStoreTask { + node_id, + sequencer_shared_state: self.sequencer_shared_state.clone(), + networking: self.networking.clone(), + first_offset: self.first_offset, + records: self.records.clone(), + rpc_router: self.store_router.clone(), + store_timeout, + }; + async move { (node_id, store_task.run().await) }.in_current_tc() + }) + .unwrap(); } + // NOTE: It's very important to keep this loop cancellation safe. If the appender future + // was cancelled, we don't want to move the global commit offset. while let Some(store_result) = store_tasks.join_next().await { // unlikely to happen, but it's there for completeness if self.sequencer_shared_state.known_global_tail.is_sealed() { @@ -303,7 +349,7 @@ impl SequencerAppender { trace!(peer = %node_id, "Timeout waiting for node {} to commit a batch", node_id); } self.nodeset_status.merge(node_id, PerNodeStatus::timeout()); - graylist.insert(node_id); + self.graylist.insert(node_id); continue; } StoreTaskStatus::Error(err) => { @@ -314,7 +360,7 @@ impl SequencerAppender { trace!(peer = %node_id, %err, "Failed to send batch to node"); } self.nodeset_status.merge(node_id, PerNodeStatus::failed()); - graylist.insert(node_id); + self.graylist.insert(node_id); continue; } StoreTaskStatus::Sealed => { @@ -343,20 +389,20 @@ impl SequencerAppender { Status::Sealed | Status::Sealing => { self.checker .set_attribute(node_id, NodeAttributes::sealed()); - graylist.insert(node_id); + self.graylist.insert(node_id); } Status::Dropped => { // Overloaded, or request expired debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer is load shedding"); - graylist.insert(node_id); + self.graylist.insert(node_id); } Status::Disabled => { debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer's log-store is disabled"); - graylist.insert(node_id); + self.graylist.insert(node_id); } Status::SequencerMismatch | Status::Malformed | Status::OutOfBounds => { warn!(peer = %node_id, status=?stored.status, "Store failed on peer due to unexpected error, please check logs of the peer to investigate"); - graylist.insert(node_id); + self.graylist.insert(node_id); } } @@ -377,10 +423,7 @@ impl SequencerAppender { if self.checker.check_fmajority(|attr| attr.sealed).passed() { State::Sealed } else { - // We couldn't achieve write quorum with this wave. We will try again, as fast as - // possible until the graylist eats up enough nodes such that we won't be able to - // generate node nodesets. Only then we backoff. - State::Wave { graylist } + State::Backoff } } } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/get_trim_point.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/get_trim_point.rs index 23c70bc287..028c277421 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/get_trim_point.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/get_trim_point.rs @@ -20,7 +20,7 @@ use restate_types::replicated_loglet::{LogNodeSetExt, ReplicatedLogletParams}; use crate::loglet::util::TailOffsetWatch; use crate::loglet::OperationError; -use crate::providers::replicated_loglet::replication::NodeSetChecker; +use crate::providers::replicated_loglet::replication::{FMajorityResult, NodeSetChecker}; use crate::providers::replicated_loglet::rpc_routers::LogServersRpc; use crate::providers::replicated_loglet::tasks::util::{Disposition, RunOnSingleNode}; @@ -31,8 +31,8 @@ struct GetTrimPointError; /// Find the trim point for a loglet /// /// The trim point doesn't require a quorum-check, the maximum observed trim-point on -/// log-servers can be used, but we wait for write-quorum (or) f-majority whichever happens first -/// before we respond to increase the chances of getting a reliable trim point. +/// log-servers can be used, but we wait for f-majority before we respond to increase +/// the chances of getting a reliable trim point. /// /// We don't provide a guarantee that `get_trim_point` return an always increasing offset, /// and it should not be used in gap-detection in read streams. In read-streams, the @@ -78,6 +78,14 @@ impl<'a> GetTrimPointTask<'a> { .nodeset .to_effective(&networking.metadata().nodes_config_ref()); + if effective_nodeset.is_empty() { + // all nodes are disabled, we can't determine the trim point but we know that this + // loglet is impossible to read. Control plane transitioning our nodeset into disabled + // must have trimmed all records. In all cases, it's safe to return the maximum + // possible trim point. + return Ok(Some(LogletOffset::MAX)); + } + trace!( loglet_id = %self.my_params.loglet_id, known_global_tail = %self.known_global_tail, @@ -137,11 +145,10 @@ impl<'a> GetTrimPointTask<'a> { continue; }; - // either f-majority or write-quorum is enough + // wait for f-majority, best effort is acceptable since it includes all authoritative + // nodes in the nodeset. nodeset_checker.set_attribute(node_id, Some(res)); - if nodeset_checker.check_write_quorum(predicate) - || nodeset_checker.check_fmajority(predicate).passed() - { + if nodeset_checker.check_fmajority(predicate) >= FMajorityResult::BestEffort { break; } } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs index bf470ac065..5b878d99aa 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs @@ -319,9 +319,8 @@ impl RepairTail { info!( loglet_id = %self.my_params.loglet_id, known_global_tail = %self.known_global_tail.latest_offset(), - target_tail = %self.digests.target_tail(), elapsed = ?start.elapsed(), - "Repair task completed, {} records have been repaired", + "Repair task completed, {} record(s) have been repaired", self.digests.num_fixups(), ); return RepairTailResult::Completed; diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs index 0eba4c8ac5..028f5cfe73 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs @@ -176,7 +176,7 @@ impl<'a> TrimTask<'a> { // Not enough nodes have successful responses warn!( loglet_id = %self.my_params.loglet_id, - trim_point = %trim_point, + trim_point = %trim_point, known_global_tail = %self.known_global_tail, effective_nodeset = %effective_nodeset, "Could not trim the loglet, since we could not confirm the new trim point with write-quorum nodes. Nodes that have confirmed are {}", diff --git a/crates/core/src/network/connection_manager.rs b/crates/core/src/network/connection_manager.rs index 0537cdc74c..e775cb0dc3 100644 --- a/crates/core/src/network/connection_manager.rs +++ b/crates/core/src/network/connection_manager.rs @@ -638,8 +638,8 @@ where if let message::Body::ConnectionControl(ctrl_msg) = &body { // do something info!( - "Terminating connection based on signal from peer: {:?} {}", - ctrl_msg.signal(), + "Terminating connection based on signal from {}: {}", + connection.peer(), ctrl_msg.message ); if ctrl_msg.signal() == message::Signal::Shutdown { diff --git a/crates/log-server/src/loglet_worker.rs b/crates/log-server/src/loglet_worker.rs index 4408d4dea8..84244797e2 100644 --- a/crates/log-server/src/loglet_worker.rs +++ b/crates/log-server/src/loglet_worker.rs @@ -385,22 +385,11 @@ impl LogletWorker { } if body.flags.contains(StoreFlags::IgnoreSeal) { - // We must be sealed (sanity check) - // Accept repair store only on sealed loglet. - if !self.loglet_state.is_sealed() { - warn!( - loglet_id = %self.loglet_id, - %peer, - first_offset = %body.first_offset, - "Ignoring repair store on unsealed loglet, repair should only happen on sealed loglets" - ); - return (Status::Malformed, None); - } trace!( loglet_id = %self.loglet_id, %peer, first_offset = %body.first_offset, - "Admitting a repair store for sealed loglet to restore replication" + "Admitting a repair store loglet to restore replication" ); } diff --git a/crates/types/src/config/admin.rs b/crates/types/src/config/admin.rs index c95fe8d538..29018e02ef 100644 --- a/crates/types/src/config/admin.rs +++ b/crates/types/src/config/admin.rs @@ -49,10 +49,10 @@ pub struct AdminOptions { /// # Log trim interval /// /// Controls the interval at which cluster controller tries to trim the logs. Log trimming - /// can be disabled by setting it to "". - #[serde(with = "serde_with::As::>")] - #[cfg_attr(feature = "schemars", schemars(with = "Option"))] - pub log_trim_interval: Option, + /// can be disabled by setting it to "0s". + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + log_trim_interval: humantime::Duration, /// # Log trim threshold (deprecated) /// @@ -111,6 +111,14 @@ impl AdminOptions { #[cfg(any(test, feature = "test-util"))] return !self.disable_cluster_controller; } + + pub fn log_trim_interval(&self) -> Option { + if self.log_trim_interval.is_zero() { + None + } else { + Some(*self.log_trim_interval) + } + } } impl Default for AdminOptions { @@ -122,7 +130,7 @@ impl Default for AdminOptions { query_engine: Default::default(), heartbeat_interval: Duration::from_millis(1500).into(), // try to trim the log every hour - log_trim_interval: Some(Duration::from_secs(60 * 60).into()), + log_trim_interval: Duration::from_secs(60 * 60).into(), log_trim_threshold: None, default_partition_replication: PartitionReplication::default(), #[cfg(any(test, feature = "test-util"))] diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 2b29e7e0b5..fefaf6295f 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -275,10 +275,10 @@ pub struct StorageOptions { /// # Persist lsn interval /// /// Controls the interval at which worker tries to persist the last applied lsn. Lsn persisting - /// can be disabled by setting it to "". - #[serde(with = "serde_with::As::>")] - #[cfg_attr(feature = "schemars", schemars(with = "Option"))] - pub persist_lsn_interval: Option, + /// can be disabled by setting it to "0s". + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + persist_lsn_interval: humantime::Duration, /// # Persist lsn threshold /// @@ -341,6 +341,14 @@ impl StorageOptions { pub fn snapshots_staging_dir(&self) -> PathBuf { super::data_dir("pp-snapshots") } + + pub fn persist_lsn_interval(&self) -> Option { + if self.persist_lsn_interval.is_zero() { + None + } else { + Some(*self.persist_lsn_interval) + } + } } impl Default for StorageOptions { @@ -357,7 +365,7 @@ impl Default for StorageOptions { rocksdb_memory_budget: None, rocksdb_memory_ratio: 0.49, // persist the lsn every hour - persist_lsn_interval: Some(Duration::from_secs(60 * 60).into()), + persist_lsn_interval: Duration::from_secs(60 * 60).into(), persist_lsn_threshold: 1000, always_commit_in_background: false, } diff --git a/crates/types/src/net/replicated_loglet.rs b/crates/types/src/net/replicated_loglet.rs index 35e637270f..68a15d3ddb 100644 --- a/crates/types/src/net/replicated_loglet.rs +++ b/crates/types/src/net/replicated_loglet.rs @@ -44,6 +44,8 @@ pub enum SequencerStatus { /// Sealed is returned when the sequencer cannot accept more /// [`Append`] requests because it's sealed Sealed, + /// Local sequencer is not available anymore, reconfiguration is needed + Gone, /// LogletID does not match Segment LogletIdMismatch, /// Invalid LogId diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index 47560441cb..b727c30001 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -215,7 +215,7 @@ impl NodesConfiguration { pub fn get_log_server_storage_state(&self, node_id: &PlainNodeId) -> StorageState { let maybe = self.nodes.get(node_id); let Some(maybe) = maybe else { - return StorageState::Disabled; + return StorageState::Provisioning; }; match maybe { MaybeNode::Tombstone => StorageState::Disabled, @@ -325,10 +325,21 @@ pub enum StorageState { /// /// The node can never transition back to `Provisioning` once it has transitioned out of it. /// - /// should read from: no - /// can write to: no + /// [authoritative] + /// or all intents and purposes, this is equivalent to a `ReadOnly` state, except that it's + /// excluded from nodeset generation. The difference + /// between this and `ReadOnly` is that if a node is in Provisioning state, we are confident + /// that it has not been added to nodesets and we can safely remove it from the cluster without + /// checking the log-chain or trim points of loglets. Note that if this node happens to be in a + /// nodeset (although control plan shouldn't add it) spread selectors will not write any data + /// to it until they observe a state transition to ReadWrite. This behaviour matches `ReadOnly` + /// state. + /// + /// can read from: yes (likely to not have data, but it might if it transitioned into RW) + /// can write to: yes - but excluded from new nodesets. If you see it in a nodeset, try writing to it. #[default] Provisioning, + /// [authoritative empty] /// Node's storage is not expected to be accessed in reads nor write. The node is not /// considered as part of the replicated log cluster. Node can be safely decommissioned. /// @@ -337,6 +348,7 @@ pub enum StorageState { /// should read from: no /// can write to: no Disabled, + /// [authoritative] /// Node is not picked in new write sets, but it may still accept writes on existing nodeset /// and it's included in critical metadata updates (seal, release, etc.) /// should read from: yes @@ -344,14 +356,17 @@ pub enum StorageState { /// **should write to: no** /// **excluded from new nodesets** ReadOnly, + /// [authoritative] /// Can be picked up in new write sets and accepts writes in existing write sets. /// /// should read from: yes /// can write to: yes ReadWrite, + /// **[non-authoritative]** /// Node detected that some/all of its local storage has been deleted and it cannot be used /// as authoritative source for quorum-dependent queries. Some data might have permanently been - /// lost. + /// lost. It behaves like ReadOnly in spread selectors, but participates unauthoritatively in + /// f-majority checks. This node can transition back to ReadWrite if it has been repaired. /// /// should read from: yes (non-quorum reads) /// can write to: no @@ -367,19 +382,20 @@ impl StorageState { } } - pub fn should_read_from(&self) -> bool { + // Nodes that may have data or might become readable in the future + pub fn can_read_from(&self) -> bool { use StorageState::*; match self { - ReadOnly | ReadWrite | DataLoss => true, - Provisioning | Disabled => false, + Provisioning | ReadOnly | ReadWrite | DataLoss => true, + Disabled => false, } } pub fn is_authoritative(&self) -> bool { use StorageState::*; match self { - Provisioning | Disabled | DataLoss => false, - ReadOnly | ReadWrite => true, + DataLoss => false, + Disabled | Provisioning | ReadOnly | ReadWrite => true, } } diff --git a/crates/worker/src/partition_processor_manager/persisted_lsn_watchdog.rs b/crates/worker/src/partition_processor_manager/persisted_lsn_watchdog.rs index f976272321..b684085c4f 100644 --- a/crates/worker/src/partition_processor_manager/persisted_lsn_watchdog.rs +++ b/crates/worker/src/partition_processor_manager/persisted_lsn_watchdog.rs @@ -8,7 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeMap; + use futures::future::OptionFuture; +use tokio::sync::watch; +use tokio::time; +use tokio::time::MissedTickBehavior; +use tracing::{debug, info, warn}; + use restate_core::cancellation_watcher; use restate_partition_store::PartitionStoreManager; use restate_storage_api::fsm_table::ReadOnlyFsmTable; @@ -17,11 +24,6 @@ use restate_types::config::{Configuration, StorageOptions}; use restate_types::identifiers::PartitionId; use restate_types::live::LiveLoad; use restate_types::logs::{Lsn, SequenceNumber}; -use std::collections::BTreeMap; -use tokio::sync::watch; -use tokio::time; -use tokio::time::MissedTickBehavior; -use tracing::{debug, trace, warn}; /// Monitors the persisted log lsns and notifies the partition processor manager about it. The /// current approach requires flushing the memtables to make sure that data has been persisted. @@ -58,15 +60,16 @@ impl PersistedLogLsnWatchdog { } fn create_persist_lsn(options: &StorageOptions) -> (Option, Lsn) { - let persist_lsn_interval = options.persist_lsn_interval.map(|duration| { - let mut interval = time::interval(duration.into()); + let persist_lsn_interval = options.persist_lsn_interval().map(|duration| { + let mut interval = time::interval(duration); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); interval }); - let persist_lsn_threshold = Lsn::from(options.persist_lsn_threshold); - - (persist_lsn_interval, persist_lsn_threshold) + ( + persist_lsn_interval, + Lsn::from(options.persist_lsn_threshold), + ) } pub async fn run(mut self) -> anyhow::Result<()> { @@ -80,10 +83,8 @@ impl PersistedLogLsnWatchdog { _ = &mut shutdown => { break; }, - _ = OptionFuture::from(self.persist_lsn_interval.as_mut().map(|interval| interval.tick())) => { - let result = self.update_persisted_lsns().await; - - if let Err(err) = result { + Some(_) = OptionFuture::from(self.persist_lsn_interval.as_mut().map(|interval| interval.tick())) => { + if let Err(err) = self.update_persisted_lsns().await { warn!("Failed updating the persisted applied lsns. This might prevent the log from being trimmed: {err}"); } } @@ -129,7 +130,7 @@ impl PersistedLogLsnWatchdog { if applied_lsn >= previously_applied_lsn + self.persist_lsn_threshold { // since we cannot be sure that we have read the applied lsn from disk, we need // to flush the memtables to be sure that it is persisted - trace!( + info!( partition_id = %partition_id, applied_lsn = %applied_lsn, "Flush partition store to persist applied lsn" @@ -223,9 +224,8 @@ mod tests { watch_rx.changed().await?; assert_eq!(watch_rx.borrow().get(&PartitionId::MIN), Some(&lsn)); let persist_lsn_interval: Duration = storage_options - .persist_lsn_interval - .expect("should be enabled") - .into(); + .persist_lsn_interval() + .expect("should be enabled"); assert!(now.elapsed() >= persist_lsn_interval); // we are short by one to hit the persist lsn threshold diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index daceda694e..017c115ea3 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -128,8 +128,8 @@ where loglet_id: LogletId::new(LogId::from(1u32), SegmentIndex::OLDEST), sequencer, replication, - // node 1 is the metadata, 2..=count+1 are logservers - nodeset: (2..=log_server_count + 1).collect(), + // all nodes are log-servers + nodeset: (1..=log_server_count).collect(), }; let loglet_params = loglet_params.serialize()?; diff --git a/server/tests/replicated_loglet.rs b/server/tests/replicated_loglet.rs index 38cc5e21ba..30e238ffca 100644 --- a/server/tests/replicated_loglet.rs +++ b/server/tests/replicated_loglet.rs @@ -22,9 +22,11 @@ mod tests { use futures_util::StreamExt; use googletest::prelude::*; use restate_bifrost::{loglet::AppendError, ErrorRecoveryStrategy}; - use restate_core::{cancellation_token, Metadata, TaskCenterFutureExt}; use test_log::test; + use tokio::task::{JoinHandle, JoinSet}; + use tokio_util::sync::CancellationToken; + use restate_core::{Metadata, TaskCenterFutureExt}; use restate_types::{ config::Configuration, logs::{ @@ -37,8 +39,6 @@ mod tests { time::NanosSinceEpoch, GenerationalNodeId, Version, }; - use tokio::task::{JoinHandle, JoinSet}; - use tokio_util::sync::CancellationToken; use super::common::replicated_loglet::run_in_test_env; @@ -209,7 +209,7 @@ mod tests { async fn bifrost_append_and_seal_concurrent() -> googletest::Result<()> { const TEST_DURATION: Duration = Duration::from_secs(10); const SEAL_PERIOD: Duration = Duration::from_secs(1); - const CONCURRENT_APPENDERS: usize = 20; + const CONCURRENT_APPENDERS: usize = 400; run_in_test_env( Configuration::default(), @@ -250,15 +250,13 @@ mod tests { let mut sealer_handle: JoinHandle> = tokio::task::spawn({ let bifrost = test_env.bifrost.clone(); - async move { - let cancellation_token = cancellation_token(); let mut chain = metadata.updateable_logs_metadata().map(|logs| logs.chain(&log_id).expect("a chain to exist")); let mut last_loglet_id = None; - while !cancellation_token.is_cancelled() { + loop { tokio::time::sleep(SEAL_PERIOD).await; let mut params = ReplicatedLogletParams::deserialize_from( @@ -282,7 +280,6 @@ mod tests { ) .await?; } - Ok(()) }.in_current_tc() }); diff --git a/tools/restatectl/src/commands/replicated_loglet/digest.rs b/tools/restatectl/src/commands/replicated_loglet/digest.rs index 08d11a90fc..c091c45d8c 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest.rs @@ -8,26 +8,27 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::Context; +use std::collections::HashMap; + use cling::prelude::*; -use itertools::Itertools; use tonic::codec::CompressionEncoding; use tracing::{info, warn}; use restate_bifrost::loglet::util::TailOffsetWatch; use restate_bifrost::providers::replicated_loglet::replication::NodeSetChecker; use restate_cli_util::_comfy_table::{Cell, Color, Table}; +use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; -use restate_cli_util::{c_eprintln, c_println}; use restate_log_server::protobuf::log_server_svc_client::LogServerSvcClient; -use restate_log_server::protobuf::GetDigestRequest; +use restate_log_server::protobuf::{GetDigestRequest, GetLogletInfoRequest}; use restate_types::logs::{LogletId, LogletOffset, SequenceNumber, TailState}; use restate_types::net::log_server::RecordStatus; use restate_types::nodes_config::Role; use restate_types::replicated_loglet::LogNodeSetExt; -use restate_types::Versioned; +use restate_types::PlainNodeId; use crate::commands::replicated_loglet::digest_util::DigestsHelper; +use crate::commands::replicated_loglet::info::gen_loglet_info_table; use crate::connection::ConnectionInfo; use crate::util::grpc_channel; @@ -37,77 +38,116 @@ pub struct DigestOpts { /// The replicated loglet id loglet_id: LogletId, - /// From offset. Requests from oldest if unset. - #[arg(long, default_value = "1")] - from: u32, - /// to offset + /// From offset (inclusive) + #[arg(long)] + from: Option, + /// to offset (inclusive) #[arg(long)] - to: u32, + to: Option, + /// Only print under-replicated offsets + #[arg(long, short)] + under_replicated_only: bool, } async fn get_digest(connection: &ConnectionInfo, opts: &DigestOpts) -> anyhow::Result<()> { let logs = connection.get_logs().await?; - c_println!("Log Configuration ({})", logs.version()); let Some(loglet_ref) = logs.get_replicated_loglet(&opts.loglet_id) else { return Err(anyhow::anyhow!("loglet {} not found", opts.loglet_id)); }; - let nodes_config = connection.get_nodes_configuration().await?; - c_println!("Nodes Configuration ({})", nodes_config.version()); - c_println!(); - let nodeset = loglet_ref.params.nodeset.to_effective(&nodes_config); - c_println!("Loglet Id: {}", opts.loglet_id); - c_println!("Nodeset: {nodeset}"); - c_println!("Replication: {}", loglet_ref.params.replication); + let mut nodeset = loglet_ref.params.nodeset.to_effective(&nodes_config); + nodeset.sort(); let known_global_tail = TailOffsetWatch::new(TailState::new(false, LogletOffset::INVALID)); - let mut digests = DigestsHelper::new(&loglet_ref.params, opts.from.into(), opts.to.into()); - for node_id in nodeset.iter() { - let node = nodes_config.find_node_by_id(*node_id).with_context(|| { - format!("Node {node_id} doesn't seem to exist in nodes configuration") - })?; - info!( - "Requesting digest from node {} at {}", - node_id, node.address - ); - if !node.has_role(Role::LogServer) { - warn!( - "Node {} is not running the log-server role, will not connect to it", - node_id + let nodeset_channels: HashMap = nodeset + .iter() + .copied() + .filter_map(|node_id| { + let node = nodes_config.find_node_by_id(node_id).unwrap_or_else(|_| { + panic!("Node {node_id} doesn't seem to exist in nodes configuration"); + }); + info!( + "Requesting digest from node {} at {}", + node_id, node.address ); + if !node.has_role(Role::LogServer) { + warn!( + "Node {} is not running the log-server role, will not connect to it", + node_id + ); + return None; + } + Some((node_id, grpc_channel(node.address.clone()))) + }) + .collect(); + + // get loglet info + let mut loglet_infos: HashMap = HashMap::default(); + for (node_id, channel) in nodeset_channels.iter() { + let mut client = LogServerSvcClient::new(channel.clone()); + let Ok(Some(loglet_info)) = client + .get_loglet_info(GetLogletInfoRequest { + loglet_id: opts.loglet_id.into(), + }) + .await + .map(|resp| resp.into_inner().info) + else { continue; - } + }; + loglet_infos.insert(*node_id, loglet_info); + } + // we want to request data that's within trim points -> max-tail to avoid blowing up Digest's + // internal btree + let min_trim_point = loglet_infos + .values() + .map(|info| info.trim_point) + .min() + .unwrap_or(0); + // clamp from_offset at next point after the smallest trim point + let from_offset = (min_trim_point + 1).max(opts.from.unwrap_or(1)); + let Some(max_local_tail) = loglet_infos + .values() + .map(|info| info.header.unwrap().local_tail) + .max() + else { + return Err(anyhow::anyhow!( + "Couldn't determine local-tail of any node in the nodeset" + )); + }; + // clamp to-offset to max-local-tail - 1; + let to_offset = max_local_tail + .saturating_sub(1) + .min(opts.to.unwrap_or(u32::MAX - 1)); - let channel = grpc_channel(node.address.clone()); + // digests + let mut digests = DigestsHelper::new( + &loglet_ref.params, + from_offset.into(), + // target tail is one offset after the inclusive to_offset arg. + (to_offset + 1).into(), + ); + for (node_id, channel) in nodeset_channels.iter() { let mut client = - LogServerSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + LogServerSvcClient::new(channel.clone()).accept_compressed(CompressionEncoding::Gzip); let req = GetDigestRequest { loglet_id: opts.loglet_id.into(), - from_offset: opts.from, - to_offset: opts.to, + from_offset, + to_offset, }; let digest = match client.get_digest(req).await { Ok(response) => response.into_inner().digest.expect("always set by servers"), Err(err) => { - c_eprintln!("Couldn't get digest from {}: {}", node_id, err); + warn!("Couldn't get digest from {}: {}", node_id, err); continue; } }; digests.add_digest_message(*node_id, digest.into(), &known_global_tail); } - c_println!( - "Max Returned Global Tail: {}", - *known_global_tail.get().offset() - ); - c_println!("Seal Observed?: {}", known_global_tail.get().is_sealed()); - c_println!("Max Observed Trim Point: {}", digests.max_trim_point()); - c_println!("Max Local Tail: {}", digests.max_local_tail()); - c_println!(); let mut records_table = Table::new_styled(); - let node_ids = nodeset.iter().sorted().map(|n| { + let node_ids = nodeset.iter().map(|n| { match digests.is_sealed(n) { Some(true) => Cell::new(format!("{n}(S)")).fg(Color::Magenta), Some(false) => Cell::new(n.to_string()), @@ -120,7 +160,9 @@ async fn get_digest(connection: &ConnectionInfo, opts: &DigestOpts) -> anyhow::R heading.extend(vec![Cell::new("ISSUES")]); records_table.set_header(heading); + let mut checker = NodeSetChecker::new(&nodeset, &nodes_config, &loglet_ref.params.replication); for (offset, responses) in digests.iter() { + checker.fill_with_default(); if *offset >= digests.max_local_tail() { break; } @@ -128,11 +170,9 @@ async fn get_digest(connection: &ConnectionInfo, opts: &DigestOpts) -> anyhow::R // divider to indicate that everything after global tail records_table.add_row(std::iter::repeat("────").take(nodeset.len() + 2)); } - let mut checker = - NodeSetChecker::new(&nodeset, &nodes_config, &loglet_ref.params.replication); let mut status_row = Vec::with_capacity(nodeset.len() + 2); status_row.push(Cell::new(offset.to_string())); - for node in nodeset.iter().sorted() { + for node in nodeset.iter() { if let Some(status) = responses.get(node) { status_row.push(Cell::new(status.to_string())); if let RecordStatus::Exists = status { @@ -149,9 +189,65 @@ async fn get_digest(connection: &ConnectionInfo, opts: &DigestOpts) -> anyhow::R if !checker.check_write_quorum(|t| *t) { // record is under-replicated status_row.push(Cell::new("U").fg(Color::Red)); + records_table.add_row(status_row); + } else if !opts.under_replicated_only { + records_table.add_row(status_row); } - records_table.add_row(status_row); } + // empty separator + records_table.add_row(vec![""]); + records_table.add_row(std::iter::repeat("═════════").take(nodeset.len() + 1)); + // append the node-level info at the end + { + let mut row = Vec::with_capacity(nodeset.len() + 2); + row.push(Cell::new("LOCAL TAIL")); + for node in nodeset.iter() { + if let Some(header) = digests.get_response_header(node) { + let color = if header.sealed { + Color::Magenta + } else { + Color::Reset + }; + row.push(Cell::new(header.local_tail.to_string()).fg(color)); + } else { + row.push(Cell::new("?").fg(Color::Red)); + } + } + records_table.add_row(row); + } + { + let mut row = Vec::with_capacity(nodeset.len() + 2); + row.push(Cell::new("GLOBAL TAIL")); + for node in nodeset.iter() { + if let Some(header) = digests.get_response_header(node) { + row.push(Cell::new(header.known_global_tail.to_string())); + } else { + row.push(Cell::new("?").fg(Color::Red)); + } + } + records_table.add_row(row); + } + { + let mut row = Vec::with_capacity(nodeset.len() + 2); + row.push(Cell::new("TRIM POINT")); + for node in nodeset.iter() { + if let Some(header) = loglet_infos.get(node) { + row.push(Cell::new(header.trim_point.to_string())); + } else { + row.push(Cell::new("?").fg(Color::Red)); + } + } + records_table.add_row(row); + } + c_println!("{}", records_table); + c_println!(); + + let mut info_table = gen_loglet_info_table(&logs, loglet_ref, &nodes_config); + info_table.add_kv_row( + "Observed Global Tail:", + known_global_tail.latest_offset().to_string(), + ); + c_println!("{}", info_table); Ok(()) } diff --git a/tools/restatectl/src/commands/replicated_loglet/digest_util.rs b/tools/restatectl/src/commands/replicated_loglet/digest_util.rs index 5e1da75e35..c32833bb06 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest_util.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest_util.rs @@ -95,14 +95,14 @@ impl DigestsHelper { self.known_nodes.contains_key(node_id) } - pub fn max_trim_point(&self) -> LogletOffset { - self.max_trim_point - } - pub fn max_local_tail(&self) -> LogletOffset { self.max_local_tail } + pub fn get_response_header(&self, node_id: &PlainNodeId) -> Option { + self.known_nodes.get(node_id).cloned() + } + pub fn iter( &self, ) -> impl Iterator)> { diff --git a/tools/restatectl/src/commands/replicated_loglet/info.rs b/tools/restatectl/src/commands/replicated_loglet/info.rs index 12c0fb011b..ff448312e2 100644 --- a/tools/restatectl/src/commands/replicated_loglet/info.rs +++ b/tools/restatectl/src/commands/replicated_loglet/info.rs @@ -8,13 +8,25 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashMap; + use cling::prelude::*; +use tracing::warn; -use restate_cli_util::{c_indentln, c_println}; +use restate_cli_util::_comfy_table::{Attribute, Cell, Color, Table}; +use restate_cli_util::c_println; +use restate_cli_util::ui::console::{Styled, StyledTable}; +use restate_cli_util::ui::stylesheet::Style; +use restate_log_server::protobuf::log_server_svc_client::LogServerSvcClient; +use restate_log_server::protobuf::GetLogletInfoRequest; +use restate_types::logs::metadata::{LogletRef, Logs}; use restate_types::logs::LogletId; -use restate_types::Versioned; +use restate_types::nodes_config::{NodesConfigError, NodesConfiguration, Role}; +use restate_types::replicated_loglet::ReplicatedLogletParams; +use restate_types::PlainNodeId; use crate::connection::ConnectionInfo; +use crate::util::grpc_channel; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "get_info")] @@ -23,29 +35,138 @@ pub struct InfoOpts { loglet_id: LogletId, } +pub(super) fn gen_loglet_info_table( + logs: &Logs, + loglet: &LogletRef, + nodes_config: &NodesConfiguration, +) -> Table { + let references = loglet.references.iter().map(|(log_id, segment)| { + let is_sealed_segment = logs + .chain(log_id) + .map(|chain| chain.tail_index() > *segment) + .unwrap_or(false); + + let note = if is_sealed_segment { + Styled(Style::Success, "(SEGMENT IS SEALED)") + } else { + Styled(Style::Normal, "") + }; + format!("LogId={} at Segment={} {}", log_id, segment, note) + }); + let sequencer = loglet.params.sequencer; + let sequencer_note_style = Style::Warn; + let sequencer_note = match nodes_config.find_node_by_id(sequencer.as_plain()) { + Ok(config) if config.current_generation.is_newer_than(sequencer) => Styled( + sequencer_note_style, + format!( + "(gone; current generation is {})", + config.current_generation + ), + ), + Ok(config) if config.current_generation.is_same_but_different(&sequencer) => Styled( + sequencer_note_style, + format!( + "(newer than nodes config, config has {})", + config.current_generation + ), + ), + // node is the same generation as config + Ok(_) => Styled(Style::Normal, String::default()), + Err(err @ NodesConfigError::Deleted(_)) => { + Styled(sequencer_note_style, format!("(gone; {err})")) + } + Err(err) => Styled(sequencer_note_style, format!("({err})")), + }; + + let mut table = Table::new_styled(); + table.add_kv_row( + "Loglet Id:", + format!( + "{} (raw={})", + loglet.params.loglet_id, *loglet.params.loglet_id + ), + ); + table.add_kv_row("Used in:", itertools::join(references, "\n")); + table.add_kv_row( + "Sequencer:", + Cell::new(format!("{sequencer} {sequencer_note}")), + ); + table.add_kv_row("Replication:", loglet.params.replication.clone()); + table.add_kv_row("Nodeset:", format!("{:#}", loglet.params.nodeset)); + table +} + async fn get_info(connection: &ConnectionInfo, opts: &InfoOpts) -> anyhow::Result<()> { let logs = connection.get_logs().await?; - c_println!("Log Configuration ({})", logs.version()); - - let Some(loglet_ref) = logs.get_replicated_loglet(&opts.loglet_id) else { + let Some(loglet) = logs.get_replicated_loglet(&opts.loglet_id) else { return Err(anyhow::anyhow!("loglet {} not found", opts.loglet_id)); }; - c_println!("Loglet Referenced in: "); - for (log_id, segment) in &loglet_ref.references { - c_indentln!(2, "- LogId={}, Segment={}", log_id, segment); - } + let nodes_config = connection.get_nodes_configuration().await?; + let mut nodeset = loglet.params.nodeset.clone(); + // display nodes sorted + nodeset.sort(); + + c_println!("{}", gen_loglet_info_table(&logs, loglet, &nodes_config)); c_println!(); - c_println!("Loglet Parameters:"); - c_indentln!( - 2, - "Loglet Id: {} ({})", - loglet_ref.params.loglet_id, - *loglet_ref.params.loglet_id - ); - c_indentln!(2, "Sequencer: {}", loglet_ref.params.sequencer); - c_indentln!(2, "Replication: {}", loglet_ref.params.replication); - c_indentln!(2, "Node Set: {}", loglet_ref.params.nodeset); + + let mut loglet_infos: HashMap = HashMap::default(); + for node_id in nodeset.iter().copied() { + let node = nodes_config.find_node_by_id(node_id).unwrap_or_else(|_| { + panic!("Node {node_id} doesn't seem to exist in nodes configuration"); + }); + if !node.has_role(Role::LogServer) { + warn!( + "Node {} is not running the log-server role, will not connect to it", + node_id + ); + continue; + } + let mut client = LogServerSvcClient::new(grpc_channel(node.address.clone())); + let Ok(Some(loglet_info)) = client + .get_loglet_info(GetLogletInfoRequest { + loglet_id: opts.loglet_id.into(), + }) + .await + .map(|resp| resp.into_inner().info) + else { + continue; + }; + loglet_infos.insert(node_id, loglet_info); + } + + let mut info_table = Table::new_styled(); + info_table.set_styled_header(vec![ + "", + "LOCAL TAIL", + "GLOBAL TAIL", + "TRIM POINT", + "SEALED", + ]); + + for node_id in nodeset.iter() { + let mut row = Vec::with_capacity(5); + row.push(Cell::new(node_id.to_string()).add_attribute(Attribute::Bold)); + if let Some(info) = loglet_infos.get(node_id) { + let header = info.header.unwrap(); + row.push(Cell::new(header.local_tail.to_string())); + row.push(Cell::new(header.known_global_tail.to_string())); + row.push(Cell::new(info.trim_point.to_string())); + row.push(if header.sealed { + Cell::new("YES").fg(Color::Magenta) + } else { + Cell::new("NO") + }); + } else { + row.push(Cell::new("?").fg(Color::Red)); + row.push(Cell::new("?").fg(Color::Red)); + row.push(Cell::new("?").fg(Color::Red)); + row.push(Cell::new("?").fg(Color::Red)); + } + info_table.add_row(row); + } + + c_println!("{}", info_table); Ok(()) }