Skip to content

Commit

Permalink
Fix sequencer drain in challenging situations
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Feb 9, 2025
1 parent 7dc8864 commit 2a770cb
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
10 changes: 8 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::{

use crossbeam_utils::CachePadded;
use tokio::sync::Semaphore;
use tokio_util::task::TaskTracker;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::{debug, instrument, trace};

use restate_core::{
Expand Down Expand Up @@ -111,6 +111,8 @@ pub struct Sequencer<T> {
record_permits: Arc<Semaphore>,
in_flight_appends: TaskTracker,
record_cache: RecordCache,
/// this is the parent token for all appenders.
cancellation_token: CancellationToken,
}

impl<T> Sequencer<T> {
Expand Down Expand Up @@ -175,6 +177,7 @@ impl<T: TransportConnect> Sequencer<T> {
record_cache,
max_inflight_records_in_config: AtomicUsize::new(max_in_flight_records_in_config),
in_flight_appends: TaskTracker::default(),
cancellation_token: CancellationToken::default(),
}
}

Expand All @@ -191,6 +194,7 @@ impl<T: TransportConnect> Sequencer<T> {
self.record_permits.close();
// required to allow in_flight.wait() to finish.
self.in_flight_appends.close();
self.cancellation_token.cancel();

// we are assuming here that seal has been already executed on majority of nodes. This is
// important since in_flight.close() doesn't prevent new tasks from being spawned.
Expand Down Expand Up @@ -288,7 +292,9 @@ impl<T: TransportConnect> Sequencer<T> {
commit_resolver,
);

let fut = self.in_flight_appends.track_future(appender.run());
let fut = self
.in_flight_appends
.track_future(appender.run(self.cancellation_token.child_token()));
// Why not managed tasks, because managed tasks are not designed to manage a potentially
// very large number of tasks, they also require a lock acquistion on start and that might
// be a contention point.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use std::{cmp::Ordering, fmt::Display, sync::Arc, time::Duration};

use tokio::time::Instant;
use tokio::{sync::OwnedSemaphorePermit, task::JoinSet};
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument, trace, warn};

use restate_core::{
cancellation_token,
network::{rpc_router::RpcRouter, Incoming, NetworkError, Networking, TransportConnect},
ShutdownError, TaskCenterFutureExt,
};
Expand Down Expand Up @@ -123,15 +123,13 @@ impl<T: TransportConnect> SequencerAppender<T> {
otel.name="replicated_loglet::sequencer::appender: run"
)
)]
pub async fn run(mut self) {
pub async fn run(mut self, cancellation_token: CancellationToken) {
let start = Instant::now();
// initial wave has 0 replicated and 0 gray listed node
let mut state = State::Wave {
graylist: NodeSet::default(),
};

let cancellation = cancellation_token();

let retry_policy = self
.configuration
.live_load()
Expand All @@ -149,7 +147,17 @@ impl<T: TransportConnect> SequencerAppender<T> {
State::Done | State::Cancelled | State::Sealed => break state,
State::Wave { graylist } => {
self.current_wave += 1;
let Some(next_state) = cancellation
// # Why is this cancellation safe?
// Because we don't await any futures inside the join_next() loop, so we are
// confident that have cancelled before resolving the commit token.
// We want to make sure we don't cancel _after_ updating the global offset, *then* reporting Cancelled.
// This is because we don't want appenders after our offset to make progress,
// therefore (potentially) dropping records in the writer prefix. Even if a store was
// fully replicated and we cancelled before updating the tail, that's an acceptable
// and safe result because we didn't acknowledge the append to the writer and from
// their perspective it has failed, and because the global tail was not moved, all
// appends after this one cannot move the global tail as well.
let Some(next_state) = cancellation_token
.run_until_cancelled(self.send_wave(graylist))
.await
else {
Expand All @@ -173,7 +181,7 @@ impl<T: TransportConnect> SequencerAppender<T> {
);
}

if cancellation
if cancellation_token
.run_until_cancelled(tokio::time::sleep(delay))
.await
.is_none()
Expand Down Expand Up @@ -279,6 +287,8 @@ impl<T: TransportConnect> SequencerAppender<T> {
});
}

// NOTE: It's very important to keep this loop cancellation safe. If the appender future
// was cancelled, we don't want to move the global commit offset.
while let Some(store_result) = store_tasks.join_next().await {
// unlikely to happen, but it's there for completeness
if self.sequencer_shared_state.known_global_tail.is_sealed() {
Expand Down

0 comments on commit 2a770cb

Please sign in to comment.