Skip to content

Commit

Permalink
Trim improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Feb 9, 2025
1 parent ecf966a commit 928f7e7
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
10 changes: 9 additions & 1 deletion crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,15 @@ impl LogletWrapper {

pub async fn get_trim_point(&self) -> Result<Option<Lsn>, OperationError> {
let offset = self.loglet.get_trim_point().await?;
Ok(offset.map(|o| self.base_lsn.offset_by(o)))
Ok(offset
.map(|o| self.base_lsn.offset_by(o))
.map(|actual_trim_point| {
// If this loglet is sealed, the reported trim-point must fall within its boundaries
match self.tail_lsn {
Some(tail) => actual_trim_point.min(tail.prev()),
None => actual_trim_point,
}
}))
}

// trim_point is inclusive.
Expand Down
2 changes: 0 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,6 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
/// trim_point is inclusive (will be trimmed)
async fn trim(&self, trim_point: LogletOffset) -> Result<(), OperationError> {
trace!("trim() called");
let trim_point = trim_point.min(self.known_global_tail.latest_offset().prev_unchecked());

TrimTask::new(
&self.my_params,
self.logservers_rpc.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use restate_types::replicated_loglet::{LogNodeSetExt, ReplicatedLogletParams};

use crate::loglet::util::TailOffsetWatch;
use crate::loglet::OperationError;
use crate::providers::replicated_loglet::replication::NodeSetChecker;
use crate::providers::replicated_loglet::replication::{FMajorityResult, NodeSetChecker};
use crate::providers::replicated_loglet::rpc_routers::LogServersRpc;
use crate::providers::replicated_loglet::tasks::util::{Disposition, RunOnSingleNode};

Expand All @@ -31,8 +31,8 @@ struct GetTrimPointError;
/// Find the trim point for a loglet
///
/// The trim point doesn't require a quorum-check, the maximum observed trim-point on
/// log-servers can be used, but we wait for write-quorum (or) f-majority whichever happens first
/// before we respond to increase the chances of getting a reliable trim point.
/// log-servers can be used, but we wait for f-majority before we respond to increase
/// the chances of getting a reliable trim point.
///
/// We don't provide a guarantee that `get_trim_point` return an always increasing offset,
/// and it should not be used in gap-detection in read streams. In read-streams, the
Expand Down Expand Up @@ -78,6 +78,14 @@ impl<'a> GetTrimPointTask<'a> {
.nodeset
.to_effective(&networking.metadata().nodes_config_ref());

if effective_nodeset.is_empty() {
// all nodes are disabled, we can't determine the trim point but we know that this
// loglet is impossible to read. Control plane transitioning our nodeset into disabled
// must have trimmed all records. In all cases, it's safe to return the maximum
// possible trim point.
return Ok(Some(LogletOffset::MAX.prev()));
}

trace!(
loglet_id = %self.my_params.loglet_id,
known_global_tail = %self.known_global_tail,
Expand Down Expand Up @@ -137,11 +145,10 @@ impl<'a> GetTrimPointTask<'a> {
continue;
};

// either f-majority or write-quorum is enough
// wait for f-majority, best effort is acceptable since it includes all authoritative
// nodes in the nodeset.
nodeset_checker.set_attribute(node_id, Some(res));
if nodeset_checker.check_write_quorum(predicate)
|| nodeset_checker.check_fmajority(predicate).passed()
{
if nodeset_checker.check_fmajority(predicate) >= FMajorityResult::BestEffort {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl<'a> TrimTask<'a> {
// Not enough nodes have successful responses
warn!(
loglet_id = %self.my_params.loglet_id,
trim_point = %trim_point,
trim_point = %trim_point,
known_global_tail = %self.known_global_tail,
effective_nodeset = %effective_nodeset,
"Could not trim the loglet, since we could not confirm the new trim point with write-quorum nodes. Nodes that have confirmed are {}",
Expand Down

0 comments on commit 928f7e7

Please sign in to comment.