Skip to content

Commit

Permalink
Add Bifrost trim gap handling support by fast-forwarding to the lates…
Browse files Browse the repository at this point in the history
…t partition snapshot (#2456)

This change introduces the ability for the PartitionProcessorManager to handle
PartitionProcessors encountering gaps in the log by fast-forwarding the 
partition state to the most recent snapshot, if available.
  • Loading branch information
pcholakov authored Jan 3, 2025
1 parent 1ac1f70 commit ef1f76a
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 132 deletions.
7 changes: 4 additions & 3 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,15 +520,16 @@ impl TaskCenterInner {

/// Starts the `root_future` on a new runtime. The runtime is stopped once the root future
/// completes.
pub fn start_runtime<F>(
pub fn start_runtime<F, R>(
self: &Arc<Self>,
root_task_kind: TaskKind,
runtime_name: &'static str,
partition_id: Option<PartitionId>,
root_future: impl FnOnce() -> F + Send + 'static,
) -> Result<RuntimeTaskHandle<anyhow::Result<()>>, RuntimeError>
) -> Result<RuntimeTaskHandle<R>, RuntimeError>
where
F: Future<Output = anyhow::Result<()>> + 'static,
F: Future<Output = R> + 'static,
R: Send + 'static,
{
if self.shutdown_requested.load(Ordering::Relaxed) {
return Err(ShutdownError.into());
Expand Down
8 changes: 5 additions & 3 deletions crates/core/src/task_center/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt::Debug;
use std::future::Future;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -77,15 +78,16 @@ impl Handle {
self.inner.block_on(future)
}

pub fn start_runtime<F>(
pub fn start_runtime<F, R>(
&self,
root_task_kind: TaskKind,
runtime_name: &'static str,
partition_id: Option<PartitionId>,
root_future: impl FnOnce() -> F + Send + 'static,
) -> Result<RuntimeTaskHandle<anyhow::Result<()>>, RuntimeError>
) -> Result<RuntimeTaskHandle<R>, RuntimeError>
where
F: Future<Output = anyhow::Result<()>> + 'static,
F: Future<Output = R> + 'static,
R: Send + Debug + 'static,
{
self.inner
.start_runtime(root_task_kind, runtime_name, partition_id, root_future)
Expand Down
91 changes: 60 additions & 31 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tracing::{debug, error, info, instrument, trace, warn, Span};

use restate_bifrost::Bifrost;
use restate_core::network::{HasConnection, Incoming, Outgoing};
use restate_core::{cancellation_watcher, TaskCenter, TaskKind};
use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind};
use restate_partition_store::{PartitionStore, PartitionStoreTransaction};
use restate_storage_api::deduplication_table::{
DedupInformation, DedupSequenceNumber, DeduplicationTable, ProducerId,
Expand Down Expand Up @@ -60,6 +60,7 @@ use restate_types::net::partition_processor::{
InvocationOutput, PartitionProcessorRpcError, PartitionProcessorRpcRequest,
PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse,
};
use restate_types::storage::StorageDecodeError;
use restate_types::time::MillisSinceEpoch;
use restate_wal_protocol::control::AnnounceLeader;
use restate_wal_protocol::{Command, Destination, Envelope, Header, Source};
Expand Down Expand Up @@ -252,20 +253,44 @@ pub struct PartitionProcessor<Codec, InvokerSender> {
partition_store: PartitionStore,
}

#[derive(Debug, derive_more::Display, thiserror::Error)]
pub enum ProcessorError {
TrimGapEncountered { gap_to_lsn: Lsn },
Storage(#[from] StorageError),
Decode(#[from] StorageDecodeError),
Bifrost(#[from] restate_bifrost::Error),
StateMachine(#[from] state_machine::Error),
ActionEffect(#[from] leadership::Error),
ShutdownError(#[from] ShutdownError),
LogReadStreamTerminated,
Other(#[from] anyhow::Error),
}

type LsnEnvelope = (Lsn, Arc<Envelope>);

impl<Codec, InvokerSender> PartitionProcessor<Codec, InvokerSender>
where
Codec: RawEntryCodec + Default + Debug,
InvokerSender: restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>> + Clone,
{
#[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))]
pub async fn run(mut self) -> anyhow::Result<()> {
#[instrument(
level = "error", skip_all,
fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty)
)]
pub async fn run(mut self) -> Result<(), ProcessorError> {
info!("Starting the partition processor.");

let res = tokio::select! {
res = self.run_inner() => {
match res.as_ref() {
// run_inner never returns normally
Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."),
Err(err) => warn!("Shutting partition processor down because it failed: {err}"),
Err(ProcessorError::TrimGapEncountered { gap_to_lsn }) =>
info!(
trim_gap_to_lsn = ?gap_to_lsn,
"Shutting partition processor down because it encountered a trim gap in the log."
),
Err(err) => warn!("Shutting partition processor down because of error: {err}"),
}
res
},
Expand Down Expand Up @@ -293,7 +318,7 @@ where
res
}

async fn run_inner(&mut self) -> anyhow::Result<()> {
async fn run_inner(&mut self) -> Result<(), ProcessorError> {
let mut partition_store = self.partition_store.clone();
let last_applied_lsn = partition_store.get_applied_lsn().await?;
let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID);
Expand Down Expand Up @@ -344,32 +369,40 @@ where

// Start reading after the last applied lsn
let key_query = KeyFilter::Within(self.partition_key_range.clone());
let mut log_reader = self
let mut record_stream = self
.bifrost
.create_reader(
LogId::from(self.partition_id),
key_query.clone(),
last_applied_lsn.next(),
Lsn::MAX,
)?
.map_ok(|entry| {
trace!(?entry, "Read entry");
let lsn = entry.sequence_number();
let Some(envelope) = entry.try_decode_arc::<Envelope>() else {
// trim-gap
unimplemented!("Handling trim gap is currently not supported")
};
anyhow::Ok((lsn, envelope?))
.map(|entry| match entry {
Ok(entry) => {
trace!(?entry, "Read entry");
let lsn = entry.sequence_number();
if entry.is_data_record() {
entry
.try_decode_arc::<Envelope>()
.map(|envelope| Ok((lsn, envelope?)))
.expect("data record is present")
} else {
Err(ProcessorError::TrimGapEncountered {
gap_to_lsn: entry
.trim_gap_to_sequence_number()
.expect("trim gap has to-LSN"),
})
}
}
Err(err) => Err(ProcessorError::from(err)),
})
.try_take_while(|entry| {
.try_take_while(|(_, envelope)| {
// a catch-all safety net if all lower layers didn't filter this record out. This
// could happen for old records that didn't store `Keys` in the log store.
//
// At some point, we should remove this and trust that stored records have Keys
// stored correctly.
std::future::ready(Ok(entry
.as_ref()
.is_ok_and(|(_, envelope)| envelope.matches_key_query(&key_query))))
std::future::ready(Ok(envelope.matches_key_query(&key_query)))
});

// avoid synchronized timers. We pick a randomised timer between 500 and 1023 millis.
Expand Down Expand Up @@ -406,7 +439,7 @@ where
old.updated_at = MillisSinceEpoch::now();
});
}
operation = Self::read_commands(&mut log_reader, self.max_command_batch_size, &mut command_buffer) => {
operation = Self::read_commands(&mut record_stream, self.max_command_batch_size, &mut command_buffer) => {
// check that reading has succeeded
operation?;

Expand Down Expand Up @@ -813,36 +846,32 @@ where
}

/// Tries to read as many records from the `log_reader` as are immediately available and stops
/// reading at `max_batching_size`.
/// reading at `max_batching_size`. Trim gaps will result in an immediate error.
async fn read_commands<S>(
log_reader: &mut S,
max_batching_size: usize,
record_buffer: &mut Vec<(Lsn, Arc<Envelope>)>,
) -> anyhow::Result<()>
record_buffer: &mut Vec<LsnEnvelope>,
) -> Result<(), ProcessorError>
where
S: Stream<Item = Result<anyhow::Result<(Lsn, Arc<Envelope>)>, restate_bifrost::Error>>
+ Unpin,
S: Stream<Item = Result<LsnEnvelope, ProcessorError>> + Unpin,
{
// beyond this point we must not await; otherwise we are no longer cancellation safe
let first_record = log_reader.next().await;

let Some(first_record) = first_record else {
// read stream terminated!
anyhow::bail!("Read stream terminated for partition processor");
return Err(ProcessorError::LogReadStreamTerminated);
};

record_buffer.clear();
record_buffer.push(first_record??);
record_buffer.push(first_record?);

while record_buffer.len() < max_batching_size {
// read more message from the stream but only if they are immediately available
if let Some(record) = log_reader.next().now_or_never() {
let Some(record) = record else {
// read stream terminated!
anyhow::bail!("Read stream terminated for partition processor");
return Err(ProcessorError::LogReadStreamTerminated);
};

record_buffer.push(record??);
record_buffer.push(record?);
} else {
// no more immediately available records found
break;
Expand Down
51 changes: 41 additions & 10 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN;
use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD;
use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE;
use crate::partition::snapshots::{SnapshotPartitionTask, SnapshotRepository};
use crate::partition::ProcessorError;
use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler;
use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog;
use crate::partition_processor_manager::processor_state::{
Expand Down Expand Up @@ -104,6 +105,7 @@ pub struct PartitionProcessorManager {
pending_snapshots: HashMap<PartitionId, PendingSnapshotTask>,
snapshot_export_tasks: FuturesUnordered<TaskHandle<SnapshotResultInternal>>,
snapshot_repository: Option<SnapshotRepository>,
fast_forward_on_startup: HashMap<PartitionId, Lsn>,
}

struct PendingSnapshotTask {
Expand Down Expand Up @@ -201,6 +203,7 @@ impl PartitionProcessorManager {
snapshot_export_tasks: FuturesUnordered::default(),
pending_snapshots: HashMap::default(),
snapshot_repository,
fast_forward_on_startup: HashMap::default(),
}
}

Expand Down Expand Up @@ -345,8 +348,11 @@ impl PartitionProcessorManager {
}
}

#[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)
))]
#[instrument(
level = "debug",
skip_all,
fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner))
)]
fn on_asynchronous_event(&mut self, event: AsynchronousEvent) {
let AsynchronousEvent {
partition_id,
Expand Down Expand Up @@ -425,7 +431,29 @@ impl PartitionProcessorManager {
ProcessorState::Started { processor, .. } => {
self.invokers_status_reader
.remove(processor.as_ref().expect("must be some").key_range());
warn!(%partition_id, "Partition processor exited unexpectedly: {result:?}");

match result {
Err(ProcessorError::TrimGapEncountered { gap_to_lsn: to_lsn }) => {
if self.snapshot_repository.is_some() {
info!(
trim_gap_to_lsn = ?to_lsn,
"Partition processor stopped due to a log trim gap, will attempt to fast-forward on restart",
);
self.fast_forward_on_startup.insert(partition_id, to_lsn);
} else {
warn!(
trim_gap_to_lsn = ?to_lsn,
"Partition processor stopped due to a log trim gap, and no snapshot repository is configured",
);
}
}
Err(err) => {
warn!("Partition processor exited unexpectedly: {err}")
}
Ok(_) => {
info!("Partition processor stopped.")
}
}
}
ProcessorState::Stopping {
processor,
Expand Down Expand Up @@ -487,7 +515,7 @@ impl PartitionProcessorManager {
fn await_runtime_task_result(
&mut self,
partition_id: PartitionId,
runtime_task_handle: RuntimeTaskHandle<anyhow::Result<()>>,
runtime_task_handle: RuntimeTaskHandle<Result<(), ProcessorError>>,
) {
self.asynchronous_operations.spawn(
async move {
Expand Down Expand Up @@ -716,10 +744,7 @@ impl PartitionProcessorManager {
self.spawn_create_snapshot_task(partition_id, snapshot_repository, Some(sender));
}

fn on_create_snapshot_task_completed(
&mut self,
result: Result<PartitionSnapshotMetadata, SnapshotError>,
) {
fn on_create_snapshot_task_completed(&mut self, result: SnapshotResultInternal) {
let (partition_id, response) = match result {
Ok(metadata) => {
self.archived_lsns
Expand Down Expand Up @@ -876,6 +901,7 @@ impl PartitionProcessorManager {
self.bifrost.clone(),
self.partition_store_manager.clone(),
self.snapshot_repository.clone(),
self.fast_forward_on_startup.remove(&partition_id),
)
}

Expand Down Expand Up @@ -921,8 +947,13 @@ struct AsynchronousEvent {

#[derive(strum::IntoStaticStr)]
enum EventKind {
Started(anyhow::Result<(StartedProcessor, RuntimeTaskHandle<anyhow::Result<()>>)>),
Stopped(anyhow::Result<()>),
Started(
anyhow::Result<(
StartedProcessor,
RuntimeTaskHandle<Result<(), ProcessorError>>,
)>,
),
Stopped(Result<(), ProcessorError>),
NewLeaderEpoch {
leader_epoch_token: LeaderEpochToken,
result: anyhow::Result<LeaderEpoch>,
Expand Down
Loading

0 comments on commit ef1f76a

Please sign in to comment.