From 0d94b4584f82c810f31bfa426a15192d61c7ccc9 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Fri, 3 Jan 2025 13:38:26 +0200 Subject: [PATCH] Remove unnecessary map_err calls in PartitionProcessor::run_inner --- crates/worker/src/partition/mod.rs | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 5046b1dd9..5659a7f25 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -320,10 +320,7 @@ where async fn run_inner(&mut self) -> Result<(), ProcessorError> { let mut partition_store = self.partition_store.clone(); - let last_applied_lsn = partition_store - .get_applied_lsn() - .await - .map_err(ProcessorError::from)?; + let last_applied_lsn = partition_store.get_applied_lsn().await?; let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID); self.status.last_applied_log_lsn = Some(last_applied_lsn); @@ -332,8 +329,7 @@ where let current_tail = self .bifrost .find_tail(LogId::from(self.partition_id)) - .await - .map_err(ProcessorError::from)?; + .await?; debug!( last_applied_lsn = %last_applied_lsn, @@ -380,8 +376,7 @@ where key_query.clone(), last_applied_lsn.next(), Lsn::MAX, - ) - .map_err(ProcessorError::from)? + )? .map(|entry| match entry { Ok(entry) => { trace!(?entry, "Read entry"); @@ -389,7 +384,7 @@ where if entry.is_data_record() { entry .try_decode_arc::() - .map(|envelope| Ok((lsn, envelope.map_err(ProcessorError::from)?))) + .map(|envelope| Ok((lsn, envelope?))) .expect("data record is present") } else { Err(ProcessorError::TrimGapEncountered { @@ -464,14 +459,14 @@ where lsn, envelope, &mut transaction, - &mut action_collector).await.map_err(ProcessorError::from)?; + &mut action_collector).await?; apply_command_latency.record(command_start.elapsed()); if let Some((header, announce_leader)) = leadership_change { // commit all changes so far, this is important so that the actuators see all changes // when becoming leader. - transaction.commit().await.map_err(ProcessorError::from)?; + transaction.commit().await?; // We can ignore all actions collected so far because as a new leader we have to instruct the // actuators afresh. @@ -489,7 +484,7 @@ where self.status.last_observed_leader_node = announce_leader.node_id; } - let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?; + let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?; Span::current().record("is_leader", is_leader); @@ -502,17 +497,17 @@ where } // Commit our changes and notify actuators about actions if we are the leader - transaction.commit().await.map_err(ProcessorError::from)?; + transaction.commit().await?; let actions_start = Instant::now(); - self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorError::from)?; + self.leadership_state.handle_actions(action_collector.drain(..)).await?; record_actions_latency.record(actions_start.elapsed()); }, result = self.leadership_state.run() => { - let action_effects = result.map_err(ProcessorError::from)?; + let action_effects = result?; // We process the action_effects not directly in the run future because it // requires the run future to be cancellation safe. In the future this could be // implemented. - self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorError::from)?; + self.leadership_state.handle_action_effects(action_effects).await?; } } // Allow other tasks on this thread to run, but only if we have exhausted the coop