Skip to content

Commit

Permalink
[Bifrost] Test for sealing empty replicated loglet on single node
Browse files Browse the repository at this point in the history
This also modifies the log-server startup to ensure provisioning happens before other components start.

Test Plan:
Uses the existing test in loglet_tests.
  • Loading branch information
AhmedSoliman committed Oct 7, 2024
1 parent e14e18c commit b1c8da3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
11 changes: 6 additions & 5 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use restate_test_util::let_assert;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{KeyFilter, Lsn, SequenceNumber, TailState};

use super::{Loglet, LogletOffset};
use super::Loglet;
use crate::loglet::AppendError;
use crate::loglet_wrapper::LogletWrapper;
use crate::setup_panic_handler;
Expand Down Expand Up @@ -574,11 +574,12 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest
/// Validates that an empty loglet can be sealed
pub async fn seal_empty(loglet: Arc<dyn Loglet>) -> googletest::Result<()> {
setup_panic_handler();
let loglet = LogletWrapper::new(SegmentIndex::from(1), Lsn::OLDEST, None, loglet);

assert_eq!(None, loglet.get_trim_point().await?);
{
let tail = loglet.find_tail().await?;
assert_eq!(LogletOffset::OLDEST, tail.offset());
assert_eq!(Lsn::OLDEST, tail.offset());
assert!(!tail.is_sealed());
}

Expand All @@ -589,19 +590,19 @@ pub async fn seal_empty(loglet: Arc<dyn Loglet>) -> googletest::Result<()> {
.next()
.await
.expect("get the last known tail immediately");
assert_eq!(LogletOffset::OLDEST, tail.offset());
assert_eq!(Lsn::OLDEST, tail.offset());
assert!(!tail.is_sealed());
}

loglet.seal().await?;
let tail = loglet.find_tail().await?;
assert_eq!(LogletOffset::OLDEST, tail.offset());
assert_eq!(Lsn::OLDEST, tail.offset());
assert!(tail.is_sealed());

{
// last known tail should be immediately available through the tail watch
let tail = watch.next().await.expect("see the sealed tail");
assert_eq!(LogletOffset::OLDEST, tail.offset());
assert_eq!(Lsn::OLDEST, tail.offset());
assert!(tail.is_sealed());
}

Expand Down
16 changes: 16 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,4 +447,20 @@ mod tests {
})
.await
}

#[test(tokio::test(start_paused = true))]
async fn replicated_loglet_single_seal_empty() -> Result<()> {
let loglet_id = ReplicatedLogletId::new(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::seal_empty(env.loglet)
})
.await
}
}
15 changes: 3 additions & 12 deletions crates/log-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,7 @@ impl LogServerService {
})
}

pub async fn start(self, metadata_writer: MetadataWriter) -> anyhow::Result<()> {
let tc = self.task_center.clone();
tc.spawn(TaskKind::SystemService, "log-server", None, async {
self.run(metadata_writer).await
})?;

Ok(())
}

async fn run(self, mut metadata_writer: MetadataWriter) -> anyhow::Result<()> {
pub async fn start(self, mut metadata_writer: MetadataWriter) -> anyhow::Result<()> {
let LogServerService {
updateable_config,
task_center,
Expand Down Expand Up @@ -103,8 +94,8 @@ impl LogServerService {
.await?;

task_center.spawn_child(
TaskKind::NetworkMessageHandler,
"log-server-req-pump",
TaskKind::SystemService,
"log-server",
None,
request_pump.run(log_store, storage_state),
)?;
Expand Down

0 comments on commit b1c8da3

Please sign in to comment.