Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Bifrost trim gap handling support by fast-forwarding to the latest partition snapshot #2456

Merged
merged 15 commits into from
Jan 3, 2025

Conversation

pcholakov
Copy link
Contributor

@pcholakov pcholakov commented Dec 23, 2024

Closes: #2247

Copy link

github-actions bot commented Dec 23, 2024

Test Results

  7 files  ±0    7 suites  ±0   4m 26s ⏱️ -13s
 47 tests ±0   46 ✅ ±0  1 💤 ±0  0 ❌ ±0 
182 runs  ±0  179 ✅ ±0  3 💤 ±0  0 ❌ ±0 

Results for commit 0111eec. ± Comparison against base commit 1ac1f70.

♻️ This comment has been updated with latest results.

@pcholakov pcholakov force-pushed the feat/trim-gap-handling branch from 8a82f9e to 140e7bf Compare December 23, 2024 14:29
@pcholakov pcholakov marked this pull request as ready for review December 23, 2024 14:58
unimplemented!("Handling trim gap is currently not supported")
};
anyhow::Ok((lsn, envelope?))
if entry.is_data_record() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, LogEntry.record is not public, and neither are bifrost::{MaybeRecord, TrimGap} - would we prefer to make those public and use pattern-matching directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but it doesn't sound like you need that. See my comments below.

let snapshot = match snapshot_repository {
Some(repository) => {
debug!("Looking for partition snapshot from which to bootstrap partition store");
// todo(pavel): pass target LSN to repository
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can optimize this by not downloading a snapshot that's older than the target LSN; I'll tackle this as a separate follow-up PR.

crates/worker/src/partition/mod.rs Outdated Show resolved Hide resolved
);
}

// We expect the processor startup attempt will fail, avoid spinning too fast.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems reasonable to me. I chose to rather delay and try start again, just in case something has changed in the log - but at this point we're unlikely to get this processor going again by following the log. What's a good way to post a metric that we're spinning?

Comment on lines 284 to 285
Ok(stopped) => {
match stopped {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip: You can remove one level of nesting:

Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => ....
Ok(_) => warn...
Err(err) => warn...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better, thank you! <3

unimplemented!("Handling trim gap is currently not supported")
};
anyhow::Ok((lsn, envelope?))
if entry.is_data_record() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but it doesn't sound like you need that. See my comments below.

Comment on lines 390 to 394
anyhow::Ok(Record::TrimGap(
entry
.trim_gap_to_sequence_number()
.expect("trim gap has to-LSN"),
))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to stop the read stream at the first gap and return Err instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this to mean the map_ok function translates a trim-gap into an Err(TrimGap {..}) instead? Maybe! The way we use anyhow::Result pervasively makes this a deeper change than I wanted to tackle right away; but it also makes more sense to treat trim gaps as just another record in the stream, with errors reserved for actual failure conditions.

Zooming out a bit, modeling the Partition Processor overall outcome as Result<Canceled | StoppedAtTrimGap, ProcessingError> seems accurate: the Ok / left path is an expected if rare reason to halt; the Err / right path is an exceptional failure condition.

If you have a few minutes, I'd love to hear more about how you'd solve this? I'm certain I am also missing some subtlety around properly consuming the log stream!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed offline and agreed that it's best to represent this case as an error case.

Comment on lines +523 to +532
pub fn start_runtime<F, R>(
self: &Arc<Self>,
root_task_kind: TaskKind,
runtime_name: &'static str,
partition_id: Option<PartitionId>,
root_future: impl FnOnce() -> F + Send + 'static,
) -> Result<RuntimeTaskHandle<anyhow::Result<()>>, RuntimeError>
) -> Result<RuntimeTaskHandle<R>, RuntimeError>
where
F: Future<Output = anyhow::Result<()>> + 'static,
F: Future<Output = R> + 'static,
R: Send + 'static,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, it seems more than you want to have control over the error type rather than make the runtime behave like an async task with a return value.

In that case, your PartitionProcessorStopReason becomes the error type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe! We use anyhow::Error quite a bit in the PP now, so it would be difficult to disentangle the errors I care about, from other failure conditions. That aside, I still like modeling this as an outcome of either a known stop reason, or some other failure condition. I am treating PartitionProcessorStopReason as a normal return since both canceling the PP, or encountering a trim gap, are expected over a long enough timeline.

.await
&& fast_forward_lsn.is_none()
{
return Ok(partition_store_manager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tip: remove return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the return statement, I need to pull the rest of the method body into an else arm - and I specifically wanted to keep it this way. I find it easier to read without the extra nesting. Open to change it back if you feel about using the if expression as the returned value of course :-)

Comment on lines +274 to +279
tokio::time::sleep(Duration::from_millis(
10_000 + rand::random::<u64>() % 10_000,
))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would RetryPolicy and its internal jitter logic work for you here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! I didn't want to plumb a retry count through just yet but maybe even without it, we can leverage the retry policy already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remembered why I didn't want to tackle this just yet - right now the way to get consecutive retry decisions is to get an iterator. I want to introduce an alternative API to RetryPolicy which will make this more suitable for use cases like this one, but let me rather do that as a follow up PR!

warn!(
partition_id = %partition_id,
?snapshot_path,
"Failed to remove local snapshot directory, continuing with startup: {:?}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's try and avoid using Debug values in log messages higher than debug!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very insightful rule of thumb to keep in mind, thank you!

@pcholakov
Copy link
Contributor Author

One bit of offline feedback from @AhmedSoliman was to think edge cases around partition leadership, when an active leader encounters a trim gap.

Pushed an updated version which addresses the trim gap stop-reason as an explicit error, rather than an "ok" variant. This is because we really want to minimize the chances of misinterpreting a trim-gap record and accidentally consuming past it.

One deviation from the previous behavior with this latest revision is that read_commands (and by extension PartitionProcessor::run) immediately returns an error when a trim-gap is encountered, rather than trying to apply all the valid envelopes seen prior to it. This is acceptable: generally we would have stopped anyway; the only way to continue applying more records is to find a snapshot to fast-forward beyond the gap. Applying records from before the gap is technically not wrong but is wasteful.

Also addressed most of the remaining smaller comments, with the major exception of not using retry policy just yet - I plan to, but let's do it as a follow-up as I want to make some changes to make it easier to use here.

@pcholakov pcholakov requested review from AhmedSoliman and removed request for tillrohrmann December 24, 2024 12:35
@pcholakov pcholakov force-pushed the feat/trim-gap-handling branch 2 times, most recently from b76f3f9 to 87a876d Compare December 24, 2024 12:39
}
for record in command_buffer.drain(..) {
match record {
Record::Envelope(lsn, envelope) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Envelope handling logic is unchanged, this block is just indented due to the match expression needed to handle gaps.

@pcholakov pcholakov force-pushed the feat/trim-gap-handling branch from 0475afc to 2539aa3 Compare December 27, 2024 13:47
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for creating this PR @pcholakov. The changes look really good to me. Well done 👏. The one question I had was how bulletproof is the logic in open_partition_store when to import a snapshot w/o dropping the partition cf based on the passed in fast_forward_lsn parameter. It seems possible that we don't fail with a trim gap error (e.g. PP being stopped) and then restart later with a partition store that has an initialized cf and an existing snapshot.

crates/worker/src/partition/mod.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/mod.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/mod.rs Outdated Show resolved Hide resolved
@@ -426,14 +464,14 @@ where
lsn,
envelope,
&mut transaction,
&mut action_collector).await?;
&mut action_collector).await.map_err(ProcessorError::from)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? calls ProcessorError::from automatically. So map_err can be removed. I stop flagging this here but there are a few other occurrences further down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for flagging this! I distinctly recall compilation failures without these but I never investigated why - I think the return type might have been different at the time. All cleaned up now!

crates/worker/src/partition_processor_manager.rs Outdated Show resolved Hide resolved
Comment on lines 231 to 238
Ok(import_snapshot(
partition_id,
key_range,
snapshot,
partition_store_manager,
options,
)
.await?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it ok to import a snapshot w/o dropping the partition cf? Is it really safe that if fast_forward_lsn == None, then we guarantee that the corresponding partition cf does not exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safe, though it's very unclear in the local context why this is :-) At the very top of open_partition_store, we immediately go down a different path if we already have a store (i.e. a column family exists for the partition) and there's no fast-forward target LSN:

if partition_store_manager
    .has_partition_store(partition_id)
    .await
    && fast_forward_lsn.is_none()
{
    return Ok(partition_store_manager
        .open_partition_store(
            partition_id,
            key_range,
            OpenMode::OpenExisting,
            &options.storage.rocksdb,
        )
        .await?);
}

If we didn't go down that path, and fast_forward_lsn is unset, that means that the CF for partition_id does NOT exist.

I'll try refactor the code to make this more obvious, or failing that, at least add a comment to highlight this. Suggestions for how to make this better are welcome!

PS. import_snapshot is paranoid and will refuse to import if a CF already exists, so a potential bug here will cause a PP startup failure but not corrupt a data store.

@pcholakov
Copy link
Contributor Author

Thanks a lot for the feedback, @tillrohrmann! 🙏 Cleaned up per your suggestions.

The one question I had was how bulletproof is the logic in open_partition_store when to import a snapshot w/o dropping the partition cf based on the passed in fast_forward_lsn parameter. It seems possible that we don't fail with a trim gap error (e.g. PP being stopped) and then restart later with a partition store that has an initialized cf and an existing snapshot.

I believe this case covered handled by the early-return here:

if partition_store_manager
.has_partition_store(partition_id)
.await
&& fast_forward_lsn.is_none()
{
// We have an initialized partition store, and no fast-forward target - go on and open it.
return Ok(partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::OpenExisting,
&options.storage.rocksdb,
)
.await?);
}
- it's the common path taken when we are starting up with an existing partition store and the previous stop-reason is trim-gap encountered. We only continue on the special cases below if (re-)bootstrap is required.

If the PP previously stopped for some reason other than a trim-gap, then it's just a clean startup with an existing store. In the case of either a fresh start, or a restart after some other error, we don't have a fast-forward LSN target and will take the early return.

I tried to refactor the code to make it a bit more obvious and readable - plus added some comments. Let me know if this helps, and if you have a better idea! :-)

@pcholakov pcholakov requested review from tillrohrmann and removed request for AhmedSoliman January 3, 2025 13:04
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification of how the partition store is initialized from a snapshot. It makes sense to me now :-) Really nice work. LGTM. +1 for merging.

}
(Some(snapshot), None) => {
// We only reach this point if there is no initialized store for the partition (early
// return at start of method), we can import without first dropping the column family.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment in the parenthesis seems to be no longer correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Will update before I merge 👍

@pcholakov pcholakov merged commit ef1f76a into main Jan 3, 2025
12 checks passed
@pcholakov pcholakov deleted the feat/trim-gap-handling branch January 3, 2025 14:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Enable partition processor trim-gap handling via snapshot-based fast-forwarding
3 participants