Skip to content

Commit

Permalink
Fix treatment of not-found nodes and update provisioning semantics
Browse files Browse the repository at this point in the history
This fixes mishandling of deleted and unknown nodes in the config in f-majority checks. Integration tests were misconfigured where the nodeset was [N2..N4] where N4 didn't actually exist in config. In this case we should not accept f-majority seal if only one node is sealed (replication=2)
Although this bug wouldn't impact us immediately, it's best to fix this condition and I took it as an opportunity to update the semantics of provisioning state to match the latest design direction. Documentation has also been updated to reflect the correct semantics.

Summary:
- Nodes observed in node-set but not in nodes-config is treated as "provisioning" rather than "disabled"
- Nodes that are "deleted" in config (tombstone exists) are treated as "disabled"
- Nodes in provisioning are fully authoritative, but are automatically excluded from new nodesets (already filtered by candidacy filter in nodeset selector)
- If provisioning nodes were added to the nodeset, they are treated as fully authoritative and are required to participate in f-majority.

```
// intentionally empty
```
  • Loading branch information
AhmedSoliman committed Feb 9, 2025
1 parent da6fbd8 commit ecf966a
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ impl ReadStreamTask {
loglet_id = %self.my_params.loglet_id,
from_offset = %self.read_pointer,
%to_offset,
?e,
%e,
"Could not request record batch from node {}", server
);
Ok(ServerReadResult::Skip)
Expand Down
165 changes: 118 additions & 47 deletions crates/bifrost/src/providers/replicated_loglet/replication/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ use std::hash::{Hash, Hasher};

use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use itertools::Itertools;
use restate_types::replication::DecoratedNodeSet;
use tracing::warn;
use tracing::trace;

use restate_types::locality::{LocationScope, NodeLocation};
use restate_types::nodes_config::NodesConfigError;
use restate_types::nodes_config::{NodesConfiguration, StorageState};
use restate_types::replication::DecoratedNodeSet;
use restate_types::replication::{NodeSet, ReplicationProperty};
use restate_types::Merge;
use restate_types::PlainNodeId;
Expand Down Expand Up @@ -110,11 +111,12 @@ impl FMajorityResult {
/// legal write quorum. Nodes are assigned a [`StorageState`] that define their authoritative-ness.
/// An authoritative node is one that has not lost data and is not empty. Empty in this context
/// means that it's guaranteed that it has never received data and was never a participant in
/// previous writes. A node is considered non-authoritative if it's in [`StorageState::DataLoss`]
/// which means it has lost data and it cannot be considered a reliable participant in some quorum
/// checks. That said, dataloss doesn't include "corruption". It only means that if a node responds
/// negatively to a read request that we cannot confidently assume that the data was never written.
/// Conversely, if the node responds with a record/data, we can use this data safely.
/// previous writes and will never receive new writes. A node is considered non-authoritative if
/// it's in [`StorageState::DataLoss`] which means it has lost data and it cannot be considered a
/// reliable participant in some quorum checks. That said, dataloss doesn't include "corruption".
/// It only means that if a node responds negatively to a read request that we cannot confidently
/// assume that the data was never written. Conversely, if the node responds with a record/data,
/// we can use this data safely.
pub struct NodeSetChecker<Attr> {
// This is a btree-map to leverage its great cache-locality on small sets like this one.
// this could also be Box<[(PlainNodeId, LocationScopeState<Attr>)]> but btreemap is nicer to
Expand All @@ -123,8 +125,8 @@ pub struct NodeSetChecker<Attr> {
node_to_attr: HashMap<PlainNodeId, Attr>,
/// Mapping between node-id and its log-server storage state. Note that we keep all nodes even
/// unreadable ones here because they might become readable after a nodes configuration
/// refresh. That said. Node-ids that have been deleted in nodes-configuration will not appear
/// here.
/// refresh. That said. Node-ids that have been deleted in nodes-configuration or that has
/// storage state `Disabled` will not appear here.
node_to_storage_state: HashMap<PlainNodeId, StorageState>,
}

Expand Down Expand Up @@ -160,12 +162,24 @@ impl<Attr: Eq + Hash + Clone + std::fmt::Debug> NodeSetChecker<Attr> {
};

for node_id in nodeset.iter() {
if let Ok(config) = nodes_config.find_node_by_id(*node_id) {
checker.add_node(
match nodes_config.find_node_by_id(*node_id) {
Ok(config) => checker.add_node(
*node_id,
config.log_server_config.storage_state,
&config.location,
);
),
Err(NodesConfigError::Deleted(_)) => {
// deleted nodes are implicitly considered disabled (auth empty)
}
Err(NodesConfigError::UnknownNodeId(_)) => {
// Unknown nodes must be also considered as Provisioning because we might be
// operating on a nodeset that's out of sync with the nodes configuration
// (nodeset is newer).
checker.add_node(*node_id, StorageState::Provisioning, &NodeLocation::new());
}
Err(
NodesConfigError::InvalidUri(_) | NodesConfigError::GenerationMismatch { .. },
) => unreachable!("impossible nodes-configuration errors"),
}
}

Expand Down Expand Up @@ -259,7 +273,7 @@ impl<Attr: Eq + Hash + Clone + std::fmt::Debug> NodeSetChecker<Attr> {
.get(&node_id)
.copied()
.unwrap_or(StorageState::Disabled);
if !storage_state.should_read_from() {
if storage_state.empty() {
return;
}
let old_attribute = self.node_to_attr.insert(node_id, attribute.clone());
Expand All @@ -286,7 +300,7 @@ impl<Attr: Eq + Hash + Clone + std::fmt::Debug> NodeSetChecker<Attr> {
.get(&node_id)
.copied()
.unwrap_or(StorageState::Disabled);
if !storage_state.should_read_from() {
if storage_state.empty() {
return;
}
let existing = self.node_to_attr.get(&node_id);
Expand Down Expand Up @@ -321,7 +335,7 @@ impl<Attr: Eq + Hash + Clone + std::fmt::Debug> NodeSetChecker<Attr> {
self.node_to_attr.values().all(predicate)
}

// Does any node matches the predicate?
/// Does any node matches the predicate?
pub fn filter(
&self,
predicate: impl Fn(&Attr) -> bool,
Expand Down Expand Up @@ -367,7 +381,8 @@ impl<Attr: Eq + Hash + Clone + std::fmt::Debug> NodeSetChecker<Attr> {
// we have an f-majority if at least one scope has f-majority
let mut f_majority = false;
let mut sufficient_auth_domains = false;
// `for (scope, scope_state) in self.scopes.iter()` if you need the scope for debugging.
//` if you need the scope for debugging.
// for (scope, scope_state) in self.scopes.iter() {
for scope_state in self.scopes.values() {
let candidate_domains = u32::try_from(scope_state.failure_domains.len())
.expect("total domains must fit into u32");
Expand Down Expand Up @@ -411,11 +426,11 @@ impl<Attr: Eq + Hash + Clone + std::fmt::Debug> NodeSetChecker<Attr> {
storage_state: StorageState,
location: &NodeLocation,
) {
self.node_to_storage_state.insert(node_id, storage_state);
// we only consider nodes that might have data, everything else is ignored.
if !storage_state.should_read_from() {
if storage_state.empty() {
return;
}
self.node_to_storage_state.insert(node_id, storage_state);

for (scope, scope_state) in self.scopes.iter_mut() {
let domain_name = if *scope == LocationScope::Node {
Expand All @@ -426,7 +441,9 @@ impl<Attr: Eq + Hash + Clone + std::fmt::Debug> NodeSetChecker<Attr> {
location.domain_string(*scope, None)
} else {
// node doesn't have location information at this scope level
warn!(
// this is trace because it can very noisy if a single node in the cluster in this
// state. We don't have a good way of avoiding log-spamming yet.
trace!(
"Node {} doesn't have location information at location scope {:?} although replication is configured at this scope",
node_id, scope
);
Expand Down Expand Up @@ -556,7 +573,7 @@ impl Count {

/// Increments the corresponding counters according to this storage state
fn increment(&mut self, storage_state: StorageState) {
if storage_state.should_read_from() {
if storage_state.can_read_from() {
self.num_readable_nodes += 1;
}

Expand All @@ -571,7 +588,7 @@ impl Count {
return;
}

if storage_state.should_read_from() {
if storage_state.can_read_from() {
self.num_readable_nodes -= 1;
}
if storage_state.is_authoritative() {
Expand Down Expand Up @@ -836,6 +853,51 @@ mod tests {
Ok(())
}

#[test]
fn nodeset_checker_non_existent_nodes() -> Result<()> {
const LOCATION: &str = "";
// all_authoritative, all flat structure (no location)
let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned());
// add 2 nodes to config N1, N2
for i in 1..=2 {
nodes_config.upsert_node(generate_logserver_node(
i,
StorageState::ReadWrite,
LOCATION,
));
}

// NodeSet has 3 nodes N1, N2, N3. (N3 doesn't exist in config)
let nodeset: NodeSet = (1..=3).collect();
let replication = ReplicationProperty::new(2.try_into().unwrap());
let mut checker: NodeSetChecker<bool> =
NodeSetChecker::new(&nodeset, &nodes_config, &replication);

// all nodes in the nodeset are authoritative
assert_that!(checker.count_nodes(StorageState::is_authoritative), eq(3));
// all nodes are false by default. Can't establish write quorum.
assert_that!(checker.check_write_quorum(|attr| *attr), eq(false));

// Because N3 is not found, we should not consider it as auth empty, because it might show
// up in future nodes configuration update, we *must* consider it as Provisioning (fully
// auth). This means that we must acquire 2 nodes to achieve f-majority.
checker.set_attribute_on_each([1], true);
assert_that!(
checker.check_fmajority(|attr| !(*attr)),
eq(FMajorityResult::None)
);

checker.set_attribute_on_each([1, 2], true);
assert_that!(
checker.check_fmajority(|attr| *attr),
eq(FMajorityResult::Success)
);

// 2 nodes is also what we need to write quorum
assert_that!(checker.check_write_quorum(|attr| *attr), eq(true));
Ok(())
}

#[test]
fn nodeset_checker_mixed() -> Result<()> {
const LOCATION: &str = "";
Expand Down Expand Up @@ -1083,7 +1145,7 @@ mod tests {
// .az2 [N4, N5, N6]
// region2
// .az1 [N7, N8, N9]
// - [N10] N11 - in Provisioning
// - [N10(Provisioning/Not In Config) N11(D)] - D = Disabled
let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned());
// all authoritative
// region1.az1
Expand All @@ -1110,13 +1172,12 @@ mod tests {
"region2.az1",
));
}
// N10
nodes_config.upsert_node(generate_logserver_node(10, StorageState::ReadWrite, ""));
// N11
nodes_config.upsert_node(generate_logserver_node(11, StorageState::Provisioning, ""));
// N10 - Provisioning - Not in config
// N11 - Disabled
nodes_config.upsert_node(generate_logserver_node(11, StorageState::Disabled, ""));

// # Scenario 1
// - Nodeset with all nodes = 10 nodes
// - Nodeset with all nodes = 11 nodes (N12 is effectively discounted), effective=10
// - Replication is {node: 3}
let nodeset: NodeSet = (1..=11).collect();
let replication = ReplicationProperty::new(3.try_into().unwrap());
Expand All @@ -1126,21 +1187,22 @@ mod tests {
assert_that!(checker.count_nodes(StorageState::is_authoritative), eq(10));
// all nodes are false by default. Can't establish write quorum.
assert_that!(checker.check_write_quorum(|attr| *attr), eq(false));
// node 11 is provisioning, it won't fulfill our write quorum requirements.
checker.set_attribute_on_each([1, 2, 11], true);
// node 11 is provisioning, we can mark it but we shouldn't if we didn't actually write
// records to it.
checker.set_attribute_on_each([1, 2], true);
assert_that!(checker.check_write_quorum(|attr| *attr), eq(false));
// trying 3 instead
// having 3 will satisfy our quorum needs
checker.set_attribute(3, true);
assert_that!(checker.check_write_quorum(|attr| *attr), eq(true));

// We need 8 nodes (anywhere) for f-majority, current [1,2,3] are marked.
// We need 8 authoritative non-empty nodes (anywhere) for f-majority, current [1,2,3] are marked.
assert_that!(
checker.check_fmajority(|attr| *attr),
eq(FMajorityResult::None)
);

// total 7 nodes marked
checker.set_attribute_on_each([4, 5, 7, 8], true);
// total 8 nodes marked but 11 is empty/disabled, not enough.
checker.set_attribute_on_each([4, 5, 7, 8, 11], true);
assert_that!(
checker.check_fmajority(|attr| *attr),
eq(FMajorityResult::None)
Expand Down Expand Up @@ -1257,7 +1319,7 @@ mod tests {
// .az2 [N4, N5(DL), N6]
// region2
// .az1 [N7, N8, N9]
// - [N10] N11 - in Provisioning
// - [N10, N11(P)] - P = Provisioning
let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned());
// region1.az1
nodes_config.upsert_node(generate_logserver_node(
Expand Down Expand Up @@ -1310,7 +1372,7 @@ mod tests {
NodeSetChecker::new(&nodeset, &nodes_config, &replication);

// marked every possible node
checker.set_attribute_on_each([4, 6, 7, 8, 9, 10], true);
checker.set_attribute_on_each([4, 6, 7, 8, 9, 10, 11], true);
assert_that!(
checker.check_fmajority(|attr| *attr),
eq(FMajorityResult::BestEffort)
Expand Down Expand Up @@ -1417,10 +1479,10 @@ mod tests {

#[test]
fn nodeset_checker_fd_empty_domains() -> Result<()> {
// total = 11 nodes across 3 regions.
// total = 12 nodes across 4 regions (effectively 3 regions, region4 is disabled)
// replication = {region: 2, zone: 3}
// region1
// .az1 [N1, N2(D), N3(D), N4(D)]
// .az1 [N1, N2(P), N3(D), N4(D)]
// .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled
// region2
// .az1 [N8, N9, N10]
Expand Down Expand Up @@ -1487,7 +1549,7 @@ mod tests {
// # Write-quorum checks
// Can we write to N2, N5, N11? No.
// region1
// .az1 [N1, N2(D), N3(D), N4(D)]
// .az1 [N1, N2(P), N3(D), N4(D)]
// .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled
// region2
// .az1 [N8, N9, N10]
Expand All @@ -1503,7 +1565,7 @@ mod tests {
// but adding any node from region2 should fix it
checker.set_attribute_on_each([10], true);
// region1
// .az1 [N1(x), N2(D), N3(D), N4(D)]
// .az1 [N1(x), N2(x), N3(D), N4(D)]
// .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled
// region2
// .az1 [N8, N9, N10(x)]
Expand All @@ -1526,7 +1588,7 @@ mod tests {
// In other words, have f-majority if we have one full zone, or any 2 non-empty regions.
//
// region1
// .az1 [N1, N2(D), N3(D), N4(D)]
// .az1 [N1, N2(P), N3(D), N4(D)]
// .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled
// region2
// .az1 [N8, N9, N10]
Expand All @@ -1535,6 +1597,14 @@ mod tests {
// region4
// .az1 [N12(D)]

checker.set_attribute_on_each([1, 2], true);
assert_that!(
checker.check_fmajority(|attr| *attr),
eq(FMajorityResult::Success)
);
// revert that.
checker.set_attribute_on_each([1, 2], false);

// N10 on region2.az1 is not sufficient since other nodes on the same zone are not marked
checker.set_attribute_on_each([10], true);
assert_that!(
Expand All @@ -1549,26 +1619,27 @@ mod tests {
eq(FMajorityResult::Success)
);

// Another more stringent example is region1.az1. Only N1 should be sufficient for
// Another more stringent example is region3.az1. Only N11 should be sufficient for
// f-majority in this setup.
checker.fill_with_default();
checker.set_attribute_on_each([1], true);
checker.set_attribute_on_each([11], true);
// Flexible quorums FTW.
assert_that!(
checker.check_fmajority(|attr| *attr),
eq(FMajorityResult::Success)
);
// same for N11

// but not for N9
checker.fill_with_default();
checker.set_attribute_on_each([11], true);
checker.set_attribute_on_each([9], true);
assert_that!(
checker.check_fmajority(|attr| *attr),
eq(FMajorityResult::Success)
eq(FMajorityResult::None)
);

// but not for N9
// nor N12 because it's empty (not member of the nodeset)
checker.fill_with_default();
checker.set_attribute_on_each([9], true);
checker.set_attribute_on_each([12], true);
assert_that!(
checker.check_fmajority(|attr| *attr),
eq(FMajorityResult::None)
Expand Down
Loading

0 comments on commit ecf966a

Please sign in to comment.