From ecf966a6cbe51920905742fc044967550aafff2c Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Sun, 9 Feb 2025 09:43:35 +0000 Subject: [PATCH] Fix treatment of not-found nodes and update provisioning semantics This fixes mishandling of deleted and unknown nodes in the config in f-majority checks. Integration tests were misconfigured where the nodeset was [N2..N4] where N4 didn't actually exist in config. In this case we should not accept f-majority seal if only one node is sealed (replication=2) Although this bug wouldn't impact us immediately, it's best to fix this condition and I took it as an opportunity to update the semantics of provisioning state to match the latest design direction. Documentation has also been updated to reflect the correct semantics. Summary: - Nodes observed in node-set but not in nodes-config is treated as "provisioning" rather than "disabled" - Nodes that are "deleted" in config (tombstone exists) are treated as "disabled" - Nodes in provisioning are fully authoritative, but are automatically excluded from new nodesets (already filtered by candidacy filter in nodeset selector) - If provisioning nodes were added to the nodeset, they are treated as fully authoritative and are required to participate in f-majority. ``` // intentionally empty ``` --- .../read_path/read_stream_task.rs | 2 +- .../replicated_loglet/replication/checker.rs | 165 +++++++++++++----- crates/types/src/nodes_config.rs | 34 +++- server/tests/common/replicated_loglet.rs | 4 +- 4 files changed, 146 insertions(+), 59 deletions(-) diff --git a/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs b/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs index 2d65ea872..3bd870f2b 100644 --- a/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs +++ b/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs @@ -545,7 +545,7 @@ impl ReadStreamTask { loglet_id = %self.my_params.loglet_id, from_offset = %self.read_pointer, %to_offset, - ?e, + %e, "Could not request record batch from node {}", server ); Ok(ServerReadResult::Skip) diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs index f492980ba..8e3d235dc 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs @@ -15,11 +15,12 @@ use std::hash::{Hash, Hasher}; use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use itertools::Itertools; -use restate_types::replication::DecoratedNodeSet; -use tracing::warn; +use tracing::trace; use restate_types::locality::{LocationScope, NodeLocation}; +use restate_types::nodes_config::NodesConfigError; use restate_types::nodes_config::{NodesConfiguration, StorageState}; +use restate_types::replication::DecoratedNodeSet; use restate_types::replication::{NodeSet, ReplicationProperty}; use restate_types::Merge; use restate_types::PlainNodeId; @@ -110,11 +111,12 @@ impl FMajorityResult { /// legal write quorum. Nodes are assigned a [`StorageState`] that define their authoritative-ness. /// An authoritative node is one that has not lost data and is not empty. Empty in this context /// means that it's guaranteed that it has never received data and was never a participant in -/// previous writes. A node is considered non-authoritative if it's in [`StorageState::DataLoss`] -/// which means it has lost data and it cannot be considered a reliable participant in some quorum -/// checks. That said, dataloss doesn't include "corruption". It only means that if a node responds -/// negatively to a read request that we cannot confidently assume that the data was never written. -/// Conversely, if the node responds with a record/data, we can use this data safely. +/// previous writes and will never receive new writes. A node is considered non-authoritative if +/// it's in [`StorageState::DataLoss`] which means it has lost data and it cannot be considered a +/// reliable participant in some quorum checks. That said, dataloss doesn't include "corruption". +/// It only means that if a node responds negatively to a read request that we cannot confidently +/// assume that the data was never written. Conversely, if the node responds with a record/data, +/// we can use this data safely. pub struct NodeSetChecker { // This is a btree-map to leverage its great cache-locality on small sets like this one. // this could also be Box<[(PlainNodeId, LocationScopeState)]> but btreemap is nicer to @@ -123,8 +125,8 @@ pub struct NodeSetChecker { node_to_attr: HashMap, /// Mapping between node-id and its log-server storage state. Note that we keep all nodes even /// unreadable ones here because they might become readable after a nodes configuration - /// refresh. That said. Node-ids that have been deleted in nodes-configuration will not appear - /// here. + /// refresh. That said. Node-ids that have been deleted in nodes-configuration or that has + /// storage state `Disabled` will not appear here. node_to_storage_state: HashMap, } @@ -160,12 +162,24 @@ impl NodeSetChecker { }; for node_id in nodeset.iter() { - if let Ok(config) = nodes_config.find_node_by_id(*node_id) { - checker.add_node( + match nodes_config.find_node_by_id(*node_id) { + Ok(config) => checker.add_node( *node_id, config.log_server_config.storage_state, &config.location, - ); + ), + Err(NodesConfigError::Deleted(_)) => { + // deleted nodes are implicitly considered disabled (auth empty) + } + Err(NodesConfigError::UnknownNodeId(_)) => { + // Unknown nodes must be also considered as Provisioning because we might be + // operating on a nodeset that's out of sync with the nodes configuration + // (nodeset is newer). + checker.add_node(*node_id, StorageState::Provisioning, &NodeLocation::new()); + } + Err( + NodesConfigError::InvalidUri(_) | NodesConfigError::GenerationMismatch { .. }, + ) => unreachable!("impossible nodes-configuration errors"), } } @@ -259,7 +273,7 @@ impl NodeSetChecker { .get(&node_id) .copied() .unwrap_or(StorageState::Disabled); - if !storage_state.should_read_from() { + if storage_state.empty() { return; } let old_attribute = self.node_to_attr.insert(node_id, attribute.clone()); @@ -286,7 +300,7 @@ impl NodeSetChecker { .get(&node_id) .copied() .unwrap_or(StorageState::Disabled); - if !storage_state.should_read_from() { + if storage_state.empty() { return; } let existing = self.node_to_attr.get(&node_id); @@ -321,7 +335,7 @@ impl NodeSetChecker { self.node_to_attr.values().all(predicate) } - // Does any node matches the predicate? + /// Does any node matches the predicate? pub fn filter( &self, predicate: impl Fn(&Attr) -> bool, @@ -367,7 +381,8 @@ impl NodeSetChecker { // we have an f-majority if at least one scope has f-majority let mut f_majority = false; let mut sufficient_auth_domains = false; - // `for (scope, scope_state) in self.scopes.iter()` if you need the scope for debugging. + //` if you need the scope for debugging. + // for (scope, scope_state) in self.scopes.iter() { for scope_state in self.scopes.values() { let candidate_domains = u32::try_from(scope_state.failure_domains.len()) .expect("total domains must fit into u32"); @@ -411,11 +426,11 @@ impl NodeSetChecker { storage_state: StorageState, location: &NodeLocation, ) { - self.node_to_storage_state.insert(node_id, storage_state); // we only consider nodes that might have data, everything else is ignored. - if !storage_state.should_read_from() { + if storage_state.empty() { return; } + self.node_to_storage_state.insert(node_id, storage_state); for (scope, scope_state) in self.scopes.iter_mut() { let domain_name = if *scope == LocationScope::Node { @@ -426,7 +441,9 @@ impl NodeSetChecker { location.domain_string(*scope, None) } else { // node doesn't have location information at this scope level - warn!( + // this is trace because it can very noisy if a single node in the cluster in this + // state. We don't have a good way of avoiding log-spamming yet. + trace!( "Node {} doesn't have location information at location scope {:?} although replication is configured at this scope", node_id, scope ); @@ -556,7 +573,7 @@ impl Count { /// Increments the corresponding counters according to this storage state fn increment(&mut self, storage_state: StorageState) { - if storage_state.should_read_from() { + if storage_state.can_read_from() { self.num_readable_nodes += 1; } @@ -571,7 +588,7 @@ impl Count { return; } - if storage_state.should_read_from() { + if storage_state.can_read_from() { self.num_readable_nodes -= 1; } if storage_state.is_authoritative() { @@ -836,6 +853,51 @@ mod tests { Ok(()) } + #[test] + fn nodeset_checker_non_existent_nodes() -> Result<()> { + const LOCATION: &str = ""; + // all_authoritative, all flat structure (no location) + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + // add 2 nodes to config N1, N2 + for i in 1..=2 { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + LOCATION, + )); + } + + // NodeSet has 3 nodes N1, N2, N3. (N3 doesn't exist in config) + let nodeset: NodeSet = (1..=3).collect(); + let replication = ReplicationProperty::new(2.try_into().unwrap()); + let mut checker: NodeSetChecker = + NodeSetChecker::new(&nodeset, &nodes_config, &replication); + + // all nodes in the nodeset are authoritative + assert_that!(checker.count_nodes(StorageState::is_authoritative), eq(3)); + // all nodes are false by default. Can't establish write quorum. + assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + + // Because N3 is not found, we should not consider it as auth empty, because it might show + // up in future nodes configuration update, we *must* consider it as Provisioning (fully + // auth). This means that we must acquire 2 nodes to achieve f-majority. + checker.set_attribute_on_each([1], true); + assert_that!( + checker.check_fmajority(|attr| !(*attr)), + eq(FMajorityResult::None) + ); + + checker.set_attribute_on_each([1, 2], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + + // 2 nodes is also what we need to write quorum + assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); + Ok(()) + } + #[test] fn nodeset_checker_mixed() -> Result<()> { const LOCATION: &str = ""; @@ -1083,7 +1145,7 @@ mod tests { // .az2 [N4, N5, N6] // region2 // .az1 [N7, N8, N9] - // - [N10] N11 - in Provisioning + // - [N10(Provisioning/Not In Config) N11(D)] - D = Disabled let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); // all authoritative // region1.az1 @@ -1110,13 +1172,12 @@ mod tests { "region2.az1", )); } - // N10 - nodes_config.upsert_node(generate_logserver_node(10, StorageState::ReadWrite, "")); - // N11 - nodes_config.upsert_node(generate_logserver_node(11, StorageState::Provisioning, "")); + // N10 - Provisioning - Not in config + // N11 - Disabled + nodes_config.upsert_node(generate_logserver_node(11, StorageState::Disabled, "")); // # Scenario 1 - // - Nodeset with all nodes = 10 nodes + // - Nodeset with all nodes = 11 nodes (N12 is effectively discounted), effective=10 // - Replication is {node: 3} let nodeset: NodeSet = (1..=11).collect(); let replication = ReplicationProperty::new(3.try_into().unwrap()); @@ -1126,21 +1187,22 @@ mod tests { assert_that!(checker.count_nodes(StorageState::is_authoritative), eq(10)); // all nodes are false by default. Can't establish write quorum. assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); - // node 11 is provisioning, it won't fulfill our write quorum requirements. - checker.set_attribute_on_each([1, 2, 11], true); + // node 11 is provisioning, we can mark it but we shouldn't if we didn't actually write + // records to it. + checker.set_attribute_on_each([1, 2], true); assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); - // trying 3 instead + // having 3 will satisfy our quorum needs checker.set_attribute(3, true); assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); - // We need 8 nodes (anywhere) for f-majority, current [1,2,3] are marked. + // We need 8 authoritative non-empty nodes (anywhere) for f-majority, current [1,2,3] are marked. assert_that!( checker.check_fmajority(|attr| *attr), eq(FMajorityResult::None) ); - // total 7 nodes marked - checker.set_attribute_on_each([4, 5, 7, 8], true); + // total 8 nodes marked but 11 is empty/disabled, not enough. + checker.set_attribute_on_each([4, 5, 7, 8, 11], true); assert_that!( checker.check_fmajority(|attr| *attr), eq(FMajorityResult::None) @@ -1257,7 +1319,7 @@ mod tests { // .az2 [N4, N5(DL), N6] // region2 // .az1 [N7, N8, N9] - // - [N10] N11 - in Provisioning + // - [N10, N11(P)] - P = Provisioning let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); // region1.az1 nodes_config.upsert_node(generate_logserver_node( @@ -1310,7 +1372,7 @@ mod tests { NodeSetChecker::new(&nodeset, &nodes_config, &replication); // marked every possible node - checker.set_attribute_on_each([4, 6, 7, 8, 9, 10], true); + checker.set_attribute_on_each([4, 6, 7, 8, 9, 10, 11], true); assert_that!( checker.check_fmajority(|attr| *attr), eq(FMajorityResult::BestEffort) @@ -1417,10 +1479,10 @@ mod tests { #[test] fn nodeset_checker_fd_empty_domains() -> Result<()> { - // total = 11 nodes across 3 regions. + // total = 12 nodes across 4 regions (effectively 3 regions, region4 is disabled) // replication = {region: 2, zone: 3} // region1 - // .az1 [N1, N2(D), N3(D), N4(D)] + // .az1 [N1, N2(P), N3(D), N4(D)] // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled // region2 // .az1 [N8, N9, N10] @@ -1487,7 +1549,7 @@ mod tests { // # Write-quorum checks // Can we write to N2, N5, N11? No. // region1 - // .az1 [N1, N2(D), N3(D), N4(D)] + // .az1 [N1, N2(P), N3(D), N4(D)] // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled // region2 // .az1 [N8, N9, N10] @@ -1503,7 +1565,7 @@ mod tests { // but adding any node from region2 should fix it checker.set_attribute_on_each([10], true); // region1 - // .az1 [N1(x), N2(D), N3(D), N4(D)] + // .az1 [N1(x), N2(x), N3(D), N4(D)] // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled // region2 // .az1 [N8, N9, N10(x)] @@ -1526,7 +1588,7 @@ mod tests { // In other words, have f-majority if we have one full zone, or any 2 non-empty regions. // // region1 - // .az1 [N1, N2(D), N3(D), N4(D)] + // .az1 [N1, N2(P), N3(D), N4(D)] // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled // region2 // .az1 [N8, N9, N10] @@ -1535,6 +1597,14 @@ mod tests { // region4 // .az1 [N12(D)] + checker.set_attribute_on_each([1, 2], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + // revert that. + checker.set_attribute_on_each([1, 2], false); + // N10 on region2.az1 is not sufficient since other nodes on the same zone are not marked checker.set_attribute_on_each([10], true); assert_that!( @@ -1549,26 +1619,27 @@ mod tests { eq(FMajorityResult::Success) ); - // Another more stringent example is region1.az1. Only N1 should be sufficient for + // Another more stringent example is region3.az1. Only N11 should be sufficient for // f-majority in this setup. checker.fill_with_default(); - checker.set_attribute_on_each([1], true); + checker.set_attribute_on_each([11], true); // Flexible quorums FTW. assert_that!( checker.check_fmajority(|attr| *attr), eq(FMajorityResult::Success) ); - // same for N11 + + // but not for N9 checker.fill_with_default(); - checker.set_attribute_on_each([11], true); + checker.set_attribute_on_each([9], true); assert_that!( checker.check_fmajority(|attr| *attr), - eq(FMajorityResult::Success) + eq(FMajorityResult::None) ); - // but not for N9 + // nor N12 because it's empty (not member of the nodeset) checker.fill_with_default(); - checker.set_attribute_on_each([9], true); + checker.set_attribute_on_each([12], true); assert_that!( checker.check_fmajority(|attr| *attr), eq(FMajorityResult::None) diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index 47560441c..b727c3000 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -215,7 +215,7 @@ impl NodesConfiguration { pub fn get_log_server_storage_state(&self, node_id: &PlainNodeId) -> StorageState { let maybe = self.nodes.get(node_id); let Some(maybe) = maybe else { - return StorageState::Disabled; + return StorageState::Provisioning; }; match maybe { MaybeNode::Tombstone => StorageState::Disabled, @@ -325,10 +325,21 @@ pub enum StorageState { /// /// The node can never transition back to `Provisioning` once it has transitioned out of it. /// - /// should read from: no - /// can write to: no + /// [authoritative] + /// or all intents and purposes, this is equivalent to a `ReadOnly` state, except that it's + /// excluded from nodeset generation. The difference + /// between this and `ReadOnly` is that if a node is in Provisioning state, we are confident + /// that it has not been added to nodesets and we can safely remove it from the cluster without + /// checking the log-chain or trim points of loglets. Note that if this node happens to be in a + /// nodeset (although control plan shouldn't add it) spread selectors will not write any data + /// to it until they observe a state transition to ReadWrite. This behaviour matches `ReadOnly` + /// state. + /// + /// can read from: yes (likely to not have data, but it might if it transitioned into RW) + /// can write to: yes - but excluded from new nodesets. If you see it in a nodeset, try writing to it. #[default] Provisioning, + /// [authoritative empty] /// Node's storage is not expected to be accessed in reads nor write. The node is not /// considered as part of the replicated log cluster. Node can be safely decommissioned. /// @@ -337,6 +348,7 @@ pub enum StorageState { /// should read from: no /// can write to: no Disabled, + /// [authoritative] /// Node is not picked in new write sets, but it may still accept writes on existing nodeset /// and it's included in critical metadata updates (seal, release, etc.) /// should read from: yes @@ -344,14 +356,17 @@ pub enum StorageState { /// **should write to: no** /// **excluded from new nodesets** ReadOnly, + /// [authoritative] /// Can be picked up in new write sets and accepts writes in existing write sets. /// /// should read from: yes /// can write to: yes ReadWrite, + /// **[non-authoritative]** /// Node detected that some/all of its local storage has been deleted and it cannot be used /// as authoritative source for quorum-dependent queries. Some data might have permanently been - /// lost. + /// lost. It behaves like ReadOnly in spread selectors, but participates unauthoritatively in + /// f-majority checks. This node can transition back to ReadWrite if it has been repaired. /// /// should read from: yes (non-quorum reads) /// can write to: no @@ -367,19 +382,20 @@ impl StorageState { } } - pub fn should_read_from(&self) -> bool { + // Nodes that may have data or might become readable in the future + pub fn can_read_from(&self) -> bool { use StorageState::*; match self { - ReadOnly | ReadWrite | DataLoss => true, - Provisioning | Disabled => false, + Provisioning | ReadOnly | ReadWrite | DataLoss => true, + Disabled => false, } } pub fn is_authoritative(&self) -> bool { use StorageState::*; match self { - Provisioning | Disabled | DataLoss => false, - ReadOnly | ReadWrite => true, + DataLoss => false, + Disabled | Provisioning | ReadOnly | ReadWrite => true, } } diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index daceda694..017c115ea 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -128,8 +128,8 @@ where loglet_id: LogletId::new(LogId::from(1u32), SegmentIndex::OLDEST), sequencer, replication, - // node 1 is the metadata, 2..=count+1 are logservers - nodeset: (2..=log_server_count + 1).collect(), + // all nodes are log-servers + nodeset: (1..=log_server_count).collect(), }; let loglet_params = loglet_params.serialize()?;