Skip to content

Commit

Permalink
[Bifrost] ReplicatedLoglet read stream
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Oct 7, 2024
1 parent 2128500 commit e195f1c
Show file tree
Hide file tree
Showing 22 changed files with 959 additions and 96 deletions.
30 changes: 16 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ derive_more = { workspace = true }
enum-map = { workspace = true, features = ["serde"] }
futures = { workspace = true }
metrics = { workspace = true }
moka = { workspace = true, features = ["sync"] }
moka = { workspace = true, features = ["sync", "logging"] }
parking_lot = { workspace = true }
pin-project = { workspace = true }
rand = { workspace = true }
Expand All @@ -41,6 +41,7 @@ tokio = { workspace = true }
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
xxhash-rust = { workspace = true, features = ["xxh3"] }

[dev-dependencies]
restate-core = { workspace = true, features = ["test-util"] }
Expand Down
117 changes: 65 additions & 52 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tokio::task::{JoinHandle, JoinSet};
use tokio_stream::StreamExt;
use tracing::info;

use restate_core::{task_center, TaskHandle, TaskKind};
use restate_test_util::let_assert;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{KeyFilter, Lsn, SequenceNumber, TailState};
Expand Down Expand Up @@ -117,37 +118,39 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::R
// read from the future returns None
assert!(loglet.read_opt(Lsn::new(end)).await?.is_none());

let handle1: JoinHandle<googletest::Result<()>> = tokio::spawn({
let loglet = loglet.clone();
async move {
// read future record 4
let record = loglet.read(Lsn::new(4)).await?;
assert_that!(record.sequence_number(), eq(Lsn::new(4)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("record4".to_owned())
);
Ok(())
}
});
let handle1: TaskHandle<googletest::Result<()>> =
task_center().spawn_unmanaged(TaskKind::TestRunner, "read", None, {
let loglet = loglet.clone();
async move {
// read future record 4
let record = loglet.read(Lsn::new(4)).await?;
assert_that!(record.sequence_number(), eq(Lsn::new(4)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("record4".to_owned())
);
Ok(())
}
})?;

// Waiting for 10
let handle2: JoinHandle<googletest::Result<()>> = tokio::spawn({
let loglet = loglet.clone();
async move {
// read future record 10
let record = loglet.read(Lsn::new(10)).await?;
assert_that!(record.sequence_number(), eq(Lsn::new(10)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("record10".to_owned())
);

Ok(())
}
});
let handle2: TaskHandle<googletest::Result<()>> =
task_center().spawn_unmanaged(TaskKind::TestRunner, "read", None, {
let loglet = loglet.clone();
async move {
// read future record 10
let record = loglet.read(Lsn::new(10)).await?;
assert_that!(record.sequence_number(), eq(Lsn::new(10)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("record10".to_owned())
);

Ok(())
}
})?;

// Giving a chance to other tasks to work.
tokio::task::yield_now().await;
Expand Down Expand Up @@ -437,41 +440,51 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest
assert_eq!(Lsn::OLDEST, tail.offset());
assert!(!tail.is_sealed());
}

// +1 for the main task waiting on all concurrent appenders
let append_barrier = Arc::new(Barrier::new(CONCURRENT_APPENDERS + 1));

let mut appenders: JoinSet<googletest::Result<_>> = JoinSet::new();
let tc = task_center();
for appender_id in 0..CONCURRENT_APPENDERS {
appenders.spawn({
let loglet = loglet.clone();
let append_barrier = append_barrier.clone();
let tc = tc.clone();
async move {
let mut i = 1;
let mut committed = Vec::new();
let mut warmup = true;
loop {
let res = loglet
.append(format!("appender-{}-record{}", appender_id, i).into())
.await;
i += 1;
if i > WARMUP_APPENDS && warmup {
println!("appender({}) - warmup complete....", appender_id);
append_barrier.wait().await;
warmup = false;
}
match res {
Ok(offset) => {
committed.push(offset);
tc.run_in_scope("append", None, async move {
let mut i = 1;
let mut committed = Vec::new();
let mut warmup = true;
loop {
let res = loglet
.append(format!("appender-{}-record{}", appender_id, i).into())
.await;
i += 1;
if i > WARMUP_APPENDS && warmup {
println!("appender({}) - warmup complete....", appender_id);
append_barrier.wait().await;
warmup = false;
}
Err(AppendError::Sealed) => {
break;
match res {
Ok(offset) => {
committed.push(offset);
}
Err(AppendError::Sealed) => {
println!("append failed({}) => SEALED", i);
break;
}
Err(AppendError::Shutdown(_)) => {
break;
}
Err(e) => fail!("unexpected error: {}", e)?,
}
Err(e) => fail!("unexpected error: {}", e)?,
// give a chance to other tasks to work
tokio::task::yield_now().await;
}
// give a chance to other tasks to work
tokio::task::yield_now().await;
}
Ok(committed)
Ok(committed)
})
.await
}
});
}
Expand Down
6 changes: 6 additions & 0 deletions crates/bifrost/src/loglet/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ impl TailOffsetWatch {
.map_err(|_| ShutdownError)
}

pub fn subscribe(&self) -> watch::Receiver<TailState<LogletOffset>> {
let mut receiver = self.sender.subscribe();
receiver.mark_changed();
receiver
}

/// The first yielded value is the latest known tail
pub fn to_stream(&self) -> WatchStream<TailState<LogletOffset>> {
let mut receiver = self.sender.subscribe();
Expand Down
16 changes: 14 additions & 2 deletions crates/bifrost/src/providers/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,25 @@ impl Loglet for MemoryLoglet {
mod tests {
use super::*;

use restate_core::TestCoreEnvBuilder;

macro_rules! run_test {
($test:ident) => {
paste::paste! {
#[tokio::test(start_paused = true)]
async fn [<memory_loglet_ $test>]() -> googletest::Result<()> {
let loglet = MemoryLoglet::new(LogletParams::from("112".to_string()));
crate::loglet::loglet_tests::$test(loglet).await
let node_env = TestCoreEnvBuilder::with_incoming_only_connector()
.set_provider_kind(ProviderKind::InMemory)
.build()
.await;
node_env
.tc
.run_in_scope("test", None, async {
let loglet = MemoryLoglet::new(LogletParams::from("112".to_string()));
crate::loglet::loglet_tests::$test(loglet).await
})
.await?;
Ok(())
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl RemoteLogServerManager {
return Ok(current.clone());
}

let connection = networking.node_connection(id.into()).await?;
let connection = networking.node_connection(id).await?;
let server = RemoteLogServer {
loglet_id: self.loglet_id,
node_id: id,
Expand Down Expand Up @@ -142,7 +142,7 @@ impl RemoteLogServerManager {
return Ok(());
}

let connection = networking.node_connection(server.node_id.into()).await?;
let connection = networking.node_connection(server.node_id).await?;
inner.connection = connection.clone();
server.connection = connection.clone();

Expand Down
Loading

0 comments on commit e195f1c

Please sign in to comment.