Skip to content

Commit

Permalink
Self-review
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 23, 2024
1 parent 11754c6 commit 140e7bf
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 10 deletions.
6 changes: 2 additions & 4 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,7 @@ where
Codec: RawEntryCodec + Default + Debug,
InvokerSender: restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>> + Clone,
{
#[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty
))]
#[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))]
pub async fn run(mut self) -> anyhow::Result<ProcessorStopReason> {
info!("Starting the partition processor.");

Expand All @@ -281,7 +280,7 @@ where
ProcessorStopReason::LogTrimGap { to_lsn } =>
info!(
trim_gap_to_lsn = ?to_lsn,
"Shutting partition processor down because we encountered a trim gap in the log."
"Shutting partition processor down because it encountered a trim gap in the log."
),
_ => warn!("Shutting partition processor down because it stopped unexpectedly.")
}
Expand Down Expand Up @@ -314,7 +313,6 @@ where
res
}

// Runs as long as log records are available, or returns
async fn run_inner(&mut self) -> anyhow::Result<ProcessorStopReason> {
let mut partition_store = self.partition_store.clone();
let last_applied_lsn = partition_store.get_applied_lsn().await?;
Expand Down
9 changes: 3 additions & 6 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,7 @@ impl PartitionProcessorManager {
}
}

#[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)
))]
#[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)))]
fn on_asynchronous_event(&mut self, event: AsynchronousEvent) {
let AsynchronousEvent {
partition_id,
Expand Down Expand Up @@ -433,20 +432,18 @@ impl PartitionProcessorManager {
Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => {
if self.snapshot_repository.is_some() {
info!(
%partition_id,
trim_gap_to_lsn = ?to_lsn,
"Partition processor stopped due to a log trim gap, will stop and attempt to fast-forward",
"Partition processor stopped due to a log trim gap, will attempt to fast-forward on restart",
);
self.fast_forward_on_startup.insert(partition_id, to_lsn);
} else {
warn!(
%partition_id,
"Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}",
);
}
}
_ => {
warn!(%partition_id, "Partition processor exited unexpectedly: {result:?}")
warn!("Partition processor exited unexpectedly: {result:?}")
}
}
}
Expand Down

0 comments on commit 140e7bf

Please sign in to comment.