Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for trim interval and persisted lsn watcher #2683

Merged
merged 7 commits into from
Feb 10, 2025
Merged
45 changes: 32 additions & 13 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,9 @@ mod tests {
use restate_core::test_env::NoOpMessageHandler;
use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder};
use restate_types::cluster::cluster_state::{NodeState, PartitionProcessorStatus};
use restate_types::config::{AdminOptions, BifrostOptions, Configuration, NetworkingOptions};
use restate_types::config::{
AdminOptionsBuilder, BifrostOptions, Configuration, NetworkingOptions,
};
use restate_types::health::HealthStatus;
use restate_types::identifiers::PartitionId;
use restate_types::live::Live;
Expand Down Expand Up @@ -896,9 +898,11 @@ mod tests {
async fn auto_log_trim() -> anyhow::Result<()> {
const LOG_ID: LogId = LogId::new(0);

let mut admin_options = AdminOptions::default();
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let admin_options = AdminOptionsBuilder::default()
.log_trim_interval(interval_duration.into())
.build()
.unwrap();
let networking = NetworkingOptions {
// we are using failing transport so we only want to use the mock connection we created.
num_concurrent_connections: NonZero::new(1).unwrap(),
Expand Down Expand Up @@ -979,9 +983,12 @@ mod tests {
..Default::default()
};

let mut admin_options = AdminOptions::default();
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let admin_options = AdminOptionsBuilder::default()
.log_trim_interval(interval_duration.into())
.build()
.unwrap();
let interval_duration = Duration::from_secs(10);
let config = Configuration {
networking,
admin: admin_options,
Expand Down Expand Up @@ -1051,9 +1058,11 @@ mod tests {
async fn do_not_trim_unless_all_nodes_report_persisted_lsn() -> anyhow::Result<()> {
const LOG_ID: LogId = LogId::new(0);

let mut admin_options = AdminOptions::default();
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let admin_options = AdminOptionsBuilder::default()
.log_trim_interval(interval_duration.into())
.build()
.unwrap();
let mut bifrost_options = BifrostOptions::default();
bifrost_options.default_provider = ProviderKind::InMemory;

Expand Down Expand Up @@ -1115,19 +1124,23 @@ mod tests {
async fn do_not_trim_by_persisted_lsn_if_snapshot_repository_configured() -> anyhow::Result<()>
{
const LOG_ID: LogId = LogId::new(0);
let interval_duration = Duration::from_secs(10);

let networking = NetworkingOptions {
// we are using failing transport so we only want to use the mock connection we created.
num_concurrent_connections: NonZero::new(1).unwrap(),
..Default::default()
};

let interval_duration = Duration::from_secs(10);
let admin_options = AdminOptionsBuilder::default()
.log_trim_interval(interval_duration.into())
.build()
.unwrap();
let mut config: Configuration = Configuration {
networking,
admin: admin_options,
..Default::default()
};
config.admin.log_trim_interval = Some(interval_duration.into());
config.bifrost.default_provider = ProviderKind::InMemory;
config.worker.snapshots.destination = Some("a-repository-somewhere".to_string());

Expand Down Expand Up @@ -1201,9 +1214,12 @@ mod tests {
async fn auto_trim_by_archived_lsn_when_dead_nodes_present() -> anyhow::Result<()> {
const LOG_ID: LogId = LogId::new(0);

let mut admin_options = AdminOptions::default();
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let admin_options = AdminOptionsBuilder::default()
.log_trim_interval(interval_duration.into())
.build()
.unwrap();

let mut bifrost_options = BifrostOptions::default();
bifrost_options.default_provider = ProviderKind::InMemory;
let config = Configuration {
Expand Down Expand Up @@ -1278,9 +1294,12 @@ mod tests {
async fn do_not_trim_by_archived_lsn_if_slow_nodes_present() -> anyhow::Result<()> {
const LOG_ID: LogId = LogId::new(0);

let mut admin_options = AdminOptions::default();
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let admin_options = AdminOptionsBuilder::default()
.log_trim_interval(interval_duration.into())
.build()
.unwrap();

let mut bifrost_options = BifrostOptions::default();
bifrost_options.default_provider = ProviderKind::InMemory;
let networking = NetworkingOptions {
Expand Down
24 changes: 12 additions & 12 deletions crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
use std::collections::{BTreeMap, BTreeSet};
use std::ops::{Add, Deref};
use std::sync::Arc;
use std::time::Duration;

use futures::future::OptionFuture;
use itertools::Itertools;
use rand::Rng;
use tokio::sync::watch;
use tokio::time;
use tokio::time::{Interval, MissedTickBehavior};
use tracing::{debug, info, instrument, trace, warn};

use restate_bifrost::Bifrost;
use restate_core::network::TransportConnect;
use restate_core::{my_node_id, Metadata};
Expand All @@ -26,11 +29,8 @@ use restate_types::config::{AdminOptions, Configuration};
use restate_types::identifiers::PartitionId;
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::net::metadata::MetadataKind;
use restate_types::retries::with_jitter;
use restate_types::{GenerationalNodeId, PlainNodeId, Version};
use tokio::sync::watch;
use tokio::time;
use tokio::time::{Interval, MissedTickBehavior};
use tracing::{debug, info, instrument, trace, warn};

use crate::cluster_controller::cluster_state_refresher::ClusterStateWatcher;
use crate::cluster_controller::logs_controller::{
Expand Down Expand Up @@ -230,7 +230,7 @@ where
_ = self.find_logs_tail_interval.tick() => {
self.logs_controller.find_logs_tail();
}
_ = OptionFuture::from(self.log_trim_check_interval.as_mut().map(|interval| interval.tick())) => {
Some(_) = OptionFuture::from(self.log_trim_check_interval.as_mut().map(|interval| interval.tick())) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this 🙏 I wasn't fully aware of the OptionFuture API back when I wrote it.

return Ok(LeaderEvent::TrimLogs);
}
result = self.logs_controller.run_async_operations() => {
Expand Down Expand Up @@ -354,13 +354,13 @@ fn create_log_trim_check_interval(options: &AdminOptions) -> Option<Interval> {
.log_trim_threshold
.inspect(|_| info!("The log trim threshold setting is deprecated and will be ignored"));

options.log_trim_interval.map(|interval| {
// delay the initial trim check, and add a small amount of jitter to avoid synchronization
options.log_trim_interval().map(|interval| {
// delay the initial trim check, and introduces small amount of jitter (+/-10%) to avoid synchronization
// among partition leaders in case of coordinated cluster restarts
let jitter = rand::rng().random_range(Duration::ZERO..interval.mul_f32(0.1));
let start_at = time::Instant::now().add(interval.into()).add(jitter);
let effective_interval = with_jitter(interval, 0.1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!!

let start_at = time::Instant::now().add(effective_interval);

let mut interval = time::interval_at(start_at, interval.into());
let mut interval = time::interval_at(start_at, effective_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval
})
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ impl Appender {
};
match loglet.append_batch(batch.clone()).await {
Ok(lsn) => return Ok(lsn),
Err(AppendError::Sealed) => {
Err(err @ AppendError::Sealed | err @ AppendError::ReconfigurationNeeded(_)) => {
debug!(
log_id = %self.log_id,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Batch append failed but will be retried (loglet has been sealed). Waiting for reconfiguration to complete"
"Batch append failed but will be retried ({err}). Waiting for reconfiguration to complete"
);
let new_loglet = Self::on_sealed_loglet(
self.log_id,
Expand Down
18 changes: 13 additions & 5 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ pub mod util;

// exports
pub use error::*;
use futures::stream::BoxStream;
pub use provider::{LogletProvider, LogletProviderFactory};
use restate_types::logs::metadata::ProviderKind;
use tokio::sync::oneshot;

use std::borrow::Cow;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Poll};

use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{FutureExt, Stream};
use tokio::sync::oneshot;

use restate_core::ShutdownError;
use restate_types::logs::metadata::ProviderKind;
use restate_types::logs::{KeyFilter, LogletId, LogletOffset, Record, TailState};

use crate::LogEntry;
Expand Down Expand Up @@ -190,6 +190,12 @@ impl LogletCommit {
Self { rx }
}

pub fn reconfiguration_needed(reason: impl Into<Cow<'static, str>>) -> Self {
let (tx, rx) = oneshot::channel();
let _ = tx.send(Err(AppendError::ReconfigurationNeeded(reason.into())));
Self { rx }
}

pub fn resolved(offset: LogletOffset) -> Self {
let (tx, rx) = oneshot::channel();
let _ = tx.send(Ok(offset));
Expand All @@ -211,7 +217,9 @@ impl std::future::Future for LogletCommit {
) -> Poll<Self::Output> {
match ready!(self.rx.poll_unpin(cx)) {
Ok(res) => Poll::Ready(res),
Err(_) => Poll::Ready(Err(AppendError::Shutdown(ShutdownError))),
Err(_) => Poll::Ready(Err(AppendError::ReconfigurationNeeded(
"loglet gave up on this batch".into(),
))),
}
}
}
5 changes: 4 additions & 1 deletion crates/bifrost/src/loglet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::Arc;

Expand All @@ -16,8 +17,10 @@ use restate_types::errors::{IntoMaybeRetryable, MaybeRetryableError};

#[derive(Debug, Clone, thiserror::Error)]
pub enum AppendError {
#[error("Loglet is sealed")]
#[error("Loglet has been sealed")]
Sealed,
#[error("Loglet needs reconfiguration; {0}")]
ReconfigurationNeeded(Cow<'static, str>),
#[error(transparent)]
Shutdown(#[from] ShutdownError),
#[error(transparent)]
Expand Down
4 changes: 4 additions & 0 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest
println!("append failed({i}) => SEALED");
break;
}
Err(AppendError::ReconfigurationNeeded(reason)) => {
println!("append failed({i}) => ReconfigurationNeeded({reason})");
break;
}
Err(AppendError::Shutdown(_)) => {
break;
}
Expand Down
10 changes: 9 additions & 1 deletion crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,15 @@ impl LogletWrapper {

pub async fn get_trim_point(&self) -> Result<Option<Lsn>, OperationError> {
let offset = self.loglet.get_trim_point().await?;
Ok(offset.map(|o| self.base_lsn.offset_by(o)))
Ok(offset
.map(|o| self.base_lsn.offset_by(o))
.map(|actual_trim_point| {
// If this loglet is sealed, the reported trim-point must fall within its boundaries
match self.tail_lsn {
Some(tail) => actual_trim_point.min(tail.prev()),
None => actual_trim_point,
}
}))
}

// trim_point is inclusive.
Expand Down
6 changes: 2 additions & 4 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl<T: TransportConnect> ReplicatedLoglet<T> {
}
CheckSealOutcome::FullySealed => {
// already fully sealed, just make sure the sequencer is drained.
handle.drain().await?;
handle.drain().await;
// note that we can only do that if we are the sequencer because
// our known_global_tail is authoritative. We have no doubt about
// whether the tail needs to be repaired or not.
Expand Down Expand Up @@ -370,8 +370,6 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
/// trim_point is inclusive (will be trimmed)
async fn trim(&self, trim_point: LogletOffset) -> Result<(), OperationError> {
trace!("trim() called");
let trim_point = trim_point.min(self.known_global_tail.latest_offset().prev_unchecked());

TrimTask::new(
&self.my_params,
self.logservers_rpc.clone(),
Expand Down Expand Up @@ -420,7 +418,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
.await?;
// If we are the sequencer, we need to wait until the sequencer is drained.
if let SequencerAccess::Local { handle } = &self.sequencer {
handle.drain().await?;
handle.drain().await;
self.known_global_tail.notify_seal();
};
// Primarily useful for remote sequencer to enforce seal check on the next find_tail() call
Expand Down
8 changes: 8 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,14 @@ impl WaitForCommitTask {
},
last_offset: LogletOffset::INVALID,
},
Err(AppendError::ReconfigurationNeeded(_)) => Appended {
header: CommonResponseHeader {
known_global_tail: Some(self.global_tail.latest_offset()),
sealed: Some(self.global_tail.is_sealed()), // this must be true
status: SequencerStatus::Gone,
},
last_offset: LogletOffset::INVALID,
},
Err(AppendError::Shutdown(_)) => Appended {
header: CommonResponseHeader {
known_global_tail: Some(self.global_tail.latest_offset()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ impl ReadStreamTask {
loglet_id = %self.my_params.loglet_id,
from_offset = %self.read_pointer,
%to_offset,
?e,
%e,
"Could not request record batch from node {}", server
);
Ok(ServerReadResult::Skip)
Expand Down
Loading
Loading