From 140e7bf30ed7ffc10474983794308ae5efe3140f Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 23 Dec 2024 16:28:50 +0200 Subject: [PATCH] Self-review --- crates/worker/src/partition/mod.rs | 6 ++---- crates/worker/src/partition_processor_manager.rs | 9 +++------ 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index bb5f51d95..2006d653c 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -268,8 +268,7 @@ where Codec: RawEntryCodec + Default + Debug, InvokerSender: restate_invoker_api::InvokerHandle> + Clone, { - #[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty - ))] + #[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))] pub async fn run(mut self) -> anyhow::Result { info!("Starting the partition processor."); @@ -281,7 +280,7 @@ where ProcessorStopReason::LogTrimGap { to_lsn } => info!( trim_gap_to_lsn = ?to_lsn, - "Shutting partition processor down because we encountered a trim gap in the log." + "Shutting partition processor down because it encountered a trim gap in the log." ), _ => warn!("Shutting partition processor down because it stopped unexpectedly.") } @@ -314,7 +313,6 @@ where res } - // Runs as long as log records are available, or returns async fn run_inner(&mut self) -> anyhow::Result { let mut partition_store = self.partition_store.clone(); let last_applied_lsn = partition_store.get_applied_lsn().await?; diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index db29a9b59..d76f66375 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -348,8 +348,7 @@ impl PartitionProcessorManager { } } - #[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner) - ))] + #[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)))] fn on_asynchronous_event(&mut self, event: AsynchronousEvent) { let AsynchronousEvent { partition_id, @@ -433,20 +432,18 @@ impl PartitionProcessorManager { Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => { if self.snapshot_repository.is_some() { info!( - %partition_id, trim_gap_to_lsn = ?to_lsn, - "Partition processor stopped due to a log trim gap, will stop and attempt to fast-forward", + "Partition processor stopped due to a log trim gap, will attempt to fast-forward on restart", ); self.fast_forward_on_startup.insert(partition_id, to_lsn); } else { warn!( - %partition_id, "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}", ); } } _ => { - warn!(%partition_id, "Partition processor exited unexpectedly: {result:?}") + warn!("Partition processor exited unexpectedly: {result:?}") } } }