diff --git a/crates/bifrost/src/loglet/loglet_tests.rs b/crates/bifrost/src/loglet/loglet_tests.rs index 24b35619f2..7c74ad148f 100644 --- a/crates/bifrost/src/loglet/loglet_tests.rs +++ b/crates/bifrost/src/loglet/loglet_tests.rs @@ -24,7 +24,7 @@ use restate_test_util::let_assert; use restate_types::logs::metadata::SegmentIndex; use restate_types::logs::{KeyFilter, Lsn, SequenceNumber, TailState}; -use super::{Loglet, LogletOffset}; +use super::Loglet; use crate::loglet::AppendError; use crate::loglet_wrapper::LogletWrapper; use crate::setup_panic_handler; @@ -574,11 +574,12 @@ pub async fn append_after_seal_concurrent(loglet: Arc) -> googletest /// Validates that an empty loglet can be sealed pub async fn seal_empty(loglet: Arc) -> googletest::Result<()> { setup_panic_handler(); + let loglet = LogletWrapper::new(SegmentIndex::from(1), Lsn::OLDEST, None, loglet); assert_eq!(None, loglet.get_trim_point().await?); { let tail = loglet.find_tail().await?; - assert_eq!(LogletOffset::OLDEST, tail.offset()); + assert_eq!(Lsn::OLDEST, tail.offset()); assert!(!tail.is_sealed()); } @@ -589,19 +590,19 @@ pub async fn seal_empty(loglet: Arc) -> googletest::Result<()> { .next() .await .expect("get the last known tail immediately"); - assert_eq!(LogletOffset::OLDEST, tail.offset()); + assert_eq!(Lsn::OLDEST, tail.offset()); assert!(!tail.is_sealed()); } loglet.seal().await?; let tail = loglet.find_tail().await?; - assert_eq!(LogletOffset::OLDEST, tail.offset()); + assert_eq!(Lsn::OLDEST, tail.offset()); assert!(tail.is_sealed()); { // last known tail should be immediately available through the tail watch let tail = watch.next().await.expect("see the sealed tail"); - assert_eq!(LogletOffset::OLDEST, tail.offset()); + assert_eq!(Lsn::OLDEST, tail.offset()); assert!(tail.is_sealed()); } diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index b432a466aa..0b9451c2f8 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -447,4 +447,20 @@ mod tests { }) .await } + + #[test(tokio::test(start_paused = true))] + async fn replicated_loglet_single_seal_empty() -> Result<()> { + let loglet_id = ReplicatedLogletId::new(122); + let params = ReplicatedLogletParams { + loglet_id, + sequencer: GenerationalNodeId::new(1, 1), + replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()), + nodeset: NodeSet::from_single(PlainNodeId::new(1)), + write_set: None, + }; + run_in_test_env(params, |env| { + crate::loglet::loglet_tests::seal_empty(env.loglet) + }) + .await + } } diff --git a/crates/log-server/src/service.rs b/crates/log-server/src/service.rs index fa8d26b7f1..9bf8e6a648 100644 --- a/crates/log-server/src/service.rs +++ b/crates/log-server/src/service.rs @@ -61,16 +61,7 @@ impl LogServerService { }) } - pub async fn start(self, metadata_writer: MetadataWriter) -> anyhow::Result<()> { - let tc = self.task_center.clone(); - tc.spawn(TaskKind::SystemService, "log-server", None, async { - self.run(metadata_writer).await - })?; - - Ok(()) - } - - async fn run(self, mut metadata_writer: MetadataWriter) -> anyhow::Result<()> { + pub async fn start(self, mut metadata_writer: MetadataWriter) -> anyhow::Result<()> { let LogServerService { updateable_config, task_center, @@ -103,8 +94,8 @@ impl LogServerService { .await?; task_center.spawn_child( - TaskKind::NetworkMessageHandler, - "log-server-req-pump", + TaskKind::SystemService, + "log-server", None, request_pump.run(log_store, storage_state), )?;