Skip to content

Commit

Permalink
Remove unnecessary map_err calls in PartitionProcessor::run_inner
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Jan 3, 2025
1 parent 1baff85 commit 0d94b45
Showing 1 changed file with 11 additions and 16 deletions.
27 changes: 11 additions & 16 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,7 @@ where

async fn run_inner(&mut self) -> Result<(), ProcessorError> {
let mut partition_store = self.partition_store.clone();
let last_applied_lsn = partition_store
.get_applied_lsn()
.await
.map_err(ProcessorError::from)?;
let last_applied_lsn = partition_store.get_applied_lsn().await?;
let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID);

self.status.last_applied_log_lsn = Some(last_applied_lsn);
Expand All @@ -332,8 +329,7 @@ where
let current_tail = self
.bifrost
.find_tail(LogId::from(self.partition_id))
.await
.map_err(ProcessorError::from)?;
.await?;

debug!(
last_applied_lsn = %last_applied_lsn,
Expand Down Expand Up @@ -380,16 +376,15 @@ where
key_query.clone(),
last_applied_lsn.next(),
Lsn::MAX,
)
.map_err(ProcessorError::from)?
)?
.map(|entry| match entry {
Ok(entry) => {
trace!(?entry, "Read entry");
let lsn = entry.sequence_number();
if entry.is_data_record() {
entry
.try_decode_arc::<Envelope>()
.map(|envelope| Ok((lsn, envelope.map_err(ProcessorError::from)?)))
.map(|envelope| Ok((lsn, envelope?)))
.expect("data record is present")
} else {
Err(ProcessorError::TrimGapEncountered {
Expand Down Expand Up @@ -464,14 +459,14 @@ where
lsn,
envelope,
&mut transaction,
&mut action_collector).await.map_err(ProcessorError::from)?;
&mut action_collector).await?;

apply_command_latency.record(command_start.elapsed());

if let Some((header, announce_leader)) = leadership_change {
// commit all changes so far, this is important so that the actuators see all changes
// when becoming leader.
transaction.commit().await.map_err(ProcessorError::from)?;
transaction.commit().await?;

// We can ignore all actions collected so far because as a new leader we have to instruct the
// actuators afresh.
Expand All @@ -489,7 +484,7 @@ where
self.status.last_observed_leader_node = announce_leader.node_id;
}

let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?;
let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?;

Span::current().record("is_leader", is_leader);

Expand All @@ -502,17 +497,17 @@ where
}

// Commit our changes and notify actuators about actions if we are the leader
transaction.commit().await.map_err(ProcessorError::from)?;
transaction.commit().await?;
let actions_start = Instant::now();
self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorError::from)?;
self.leadership_state.handle_actions(action_collector.drain(..)).await?;
record_actions_latency.record(actions_start.elapsed());
},
result = self.leadership_state.run() => {
let action_effects = result.map_err(ProcessorError::from)?;
let action_effects = result?;
// We process the action_effects not directly in the run future because it
// requires the run future to be cancellation safe. In the future this could be
// implemented.
self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorError::from)?;
self.leadership_state.handle_action_effects(action_effects).await?;
}
}
// Allow other tasks on this thread to run, but only if we have exhausted the coop
Expand Down

0 comments on commit 0d94b45

Please sign in to comment.