Skip to content

Commit

Permalink
[Bifrost] ReplicatedLoglet read stream
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Oct 4, 2024
1 parent 59e75ad commit 640d766
Show file tree
Hide file tree
Showing 21 changed files with 944 additions and 80 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ derive_more = { workspace = true }
enum-map = { workspace = true, features = ["serde"] }
futures = { workspace = true }
metrics = { workspace = true }
moka = { workspace = true, features = ["sync"] }
moka = { workspace = true, features = ["sync", "logging"] }
parking_lot = { workspace = true }
pin-project = { workspace = true }
rand = { workspace = true }
Expand All @@ -41,6 +41,7 @@ tokio = { workspace = true }
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
xxhash-rust = { workspace = true, features = ["xxh3"] }

[dev-dependencies]
restate-core = { workspace = true, features = ["test-util"] }
Expand Down
117 changes: 65 additions & 52 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tokio::task::{JoinHandle, JoinSet};
use tokio_stream::StreamExt;
use tracing::info;

use restate_core::{task_center, TaskHandle, TaskKind};
use restate_test_util::let_assert;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{KeyFilter, Lsn, SequenceNumber, TailState};
Expand Down Expand Up @@ -117,37 +118,39 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::R
// read from the future returns None
assert!(loglet.read_opt(Lsn::new(end)).await?.is_none());

let handle1: JoinHandle<googletest::Result<()>> = tokio::spawn({
let loglet = loglet.clone();
async move {
// read future record 4
let record = loglet.read(Lsn::new(4)).await?;
assert_that!(record.sequence_number(), eq(Lsn::new(4)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("record4".to_owned())
);
Ok(())
}
});
let handle1: TaskHandle<googletest::Result<()>> =
task_center().spawn_unmanaged(TaskKind::TestRunner, "read", None, {
let loglet = loglet.clone();
async move {
// read future record 4
let record = loglet.read(Lsn::new(4)).await?;
assert_that!(record.sequence_number(), eq(Lsn::new(4)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("record4".to_owned())
);
Ok(())
}
})?;

// Waiting for 10
let handle2: JoinHandle<googletest::Result<()>> = tokio::spawn({
let loglet = loglet.clone();
async move {
// read future record 10
let record = loglet.read(Lsn::new(10)).await?;
assert_that!(record.sequence_number(), eq(Lsn::new(10)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("record10".to_owned())
);

Ok(())
}
});
let handle2: TaskHandle<googletest::Result<()>> =
task_center().spawn_unmanaged(TaskKind::TestRunner, "read", None, {
let loglet = loglet.clone();
async move {
// read future record 10
let record = loglet.read(Lsn::new(10)).await?;
assert_that!(record.sequence_number(), eq(Lsn::new(10)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("record10".to_owned())
);

Ok(())
}
})?;

// Giving a chance to other tasks to work.
tokio::task::yield_now().await;
Expand Down Expand Up @@ -437,41 +440,51 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest
assert_eq!(Lsn::OLDEST, tail.offset());
assert!(!tail.is_sealed());
}

// +1 for the main task waiting on all concurrent appenders
let append_barrier = Arc::new(Barrier::new(CONCURRENT_APPENDERS + 1));

let mut appenders: JoinSet<googletest::Result<_>> = JoinSet::new();
let tc = task_center();
for appender_id in 0..CONCURRENT_APPENDERS {
appenders.spawn({
let loglet = loglet.clone();
let append_barrier = append_barrier.clone();
let tc = tc.clone();
async move {
let mut i = 1;
let mut committed = Vec::new();
let mut warmup = true;
loop {
let res = loglet
.append(format!("appender-{}-record{}", appender_id, i).into())
.await;
i += 1;
if i > WARMUP_APPENDS && warmup {
println!("appender({}) - warmup complete....", appender_id);
append_barrier.wait().await;
warmup = false;
}
match res {
Ok(offset) => {
committed.push(offset);
tc.run_in_scope("append", None, async move {
let mut i = 1;
let mut committed = Vec::new();
let mut warmup = true;
loop {
let res = loglet
.append(format!("appender-{}-record{}", appender_id, i).into())
.await;
i += 1;
if i > WARMUP_APPENDS && warmup {
println!("appender({}) - warmup complete....", appender_id);
append_barrier.wait().await;
warmup = false;
}
Err(AppendError::Sealed) => {
break;
match res {
Ok(offset) => {
committed.push(offset);
}
Err(AppendError::Sealed) => {
println!("append failed({}) => SEALED", i);
break;
}
Err(AppendError::Shutdown(_)) => {
break;
}
Err(e) => fail!("unexpected error: {}", e)?,
}
Err(e) => fail!("unexpected error: {}", e)?,
// give a chance to other tasks to work
tokio::task::yield_now().await;
}
// give a chance to other tasks to work
tokio::task::yield_now().await;
}
Ok(committed)
Ok(committed)
})
.await
}
});
}
Expand Down
6 changes: 6 additions & 0 deletions crates/bifrost/src/loglet/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ impl TailOffsetWatch {
.map_err(|_| ShutdownError)
}

pub fn subscribe(&self) -> watch::Receiver<TailState<LogletOffset>> {
let mut receiver = self.sender.subscribe();
receiver.mark_changed();
receiver
}

/// The first yielded value is the latest known tail
pub fn to_stream(&self) -> WatchStream<TailState<LogletOffset>> {
let mut receiver = self.sender.subscribe();
Expand Down
16 changes: 14 additions & 2 deletions crates/bifrost/src/providers/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,25 @@ impl Loglet for MemoryLoglet {
mod tests {
use super::*;

use restate_core::TestCoreEnvBuilder;

macro_rules! run_test {
($test:ident) => {
paste::paste! {
#[tokio::test(start_paused = true)]
async fn [<memory_loglet_ $test>]() -> googletest::Result<()> {
let loglet = MemoryLoglet::new(LogletParams::from("112".to_string()));
crate::loglet::loglet_tests::$test(loglet).await
let node_env = TestCoreEnvBuilder::with_incoming_only_connector()
.set_provider_kind(ProviderKind::InMemory)
.build()
.await;
node_env
.tc
.run_in_scope("test", None, async {
let loglet = MemoryLoglet::new(LogletParams::from("112".to_string()));
crate::loglet::loglet_tests::$test(loglet).await
})
.await?;
Ok(())
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl RemoteLogServerManager {
return Ok(current.clone());
}

let connection = networking.node_connection(id.into()).await?;
let connection = networking.node_connection(id).await?;
let server = RemoteLogServer {
loglet_id: self.loglet_id,
node_id: id,
Expand Down Expand Up @@ -142,7 +142,7 @@ impl RemoteLogServerManager {
return Ok(());
}

let connection = networking.node_connection(server.node_id.into()).await?;
let connection = networking.node_connection(server.node_id).await?;
inner.connection = connection.clone();
server.connection = connection.clone();

Expand Down
81 changes: 76 additions & 5 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::providers::replicated_loglet::tasks::{FindTailTask, SealTask};

use super::error::ReplicatedLogletError;
use super::log_server_manager::RemoteLogServerManager;
use super::read_path::{ReadStreamTask, ReplicatedLogletReadStream};
use super::record_cache::RecordCache;
use super::rpc_routers::{LogServersRpc, SequencersRpc};
use super::tasks::FindTailResult;
Expand Down Expand Up @@ -128,11 +129,30 @@ pub enum SequencerAccess<T> {
impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
async fn create_read_stream(
self: Arc<Self>,
_filter: KeyFilter,
_from: LogletOffset,
_to: Option<LogletOffset>,
filter: KeyFilter,
from: LogletOffset,
to: Option<LogletOffset>,
) -> Result<SendableLogletReadStream, OperationError> {
todo!()
let cache = self.record_cache.clone();
let known_global_tail = self.known_global_tail.clone();
let my_params = self.my_params.clone();
let networking = self.networking.clone();
let logservers_rpc = self.logservers_rpc.clone();

let (rx_stream, reader_task) = ReadStreamTask::start(
my_params,
networking,
logservers_rpc,
filter,
from,
to,
known_global_tail,
cache,
)
.await?;
let read_stream = ReplicatedLogletReadStream::new(from, rx_stream, reader_task);

Ok(Box::pin(read_stream))
}

fn watch_tail(&self) -> BoxStream<'static, TailState<LogletOffset>> {
Expand Down Expand Up @@ -181,7 +201,8 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
}

async fn get_trim_point(&self) -> Result<Option<LogletOffset>, OperationError> {
todo!()
// todo(asoli): Implement trim
Ok(None)
}

/// Trim the log to the minimum of new_trim_point and last_committed_offset
Expand Down Expand Up @@ -324,6 +345,7 @@ mod tests {
cached_record.unwrap().keys().clone(),
matches_pattern!(Keys::Single(eq(1)))
);

Ok(())
})
.await
Expand Down Expand Up @@ -370,4 +392,53 @@ mod tests {
})
.await
}

// ** Single-node replicated-loglet read-stream **
#[test(tokio::test(start_paused = true))]
async fn replicated_loglet_single_loglet_readstream() -> Result<()> {
let loglet_id = ReplicatedLogletId::new(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::single_loglet_readstream(env.loglet)
})
.await
}

#[test(tokio::test(start_paused = true))]
async fn replicated_loglet_single_append_after_seal() -> Result<()> {
let loglet_id = ReplicatedLogletId::new(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::append_after_seal(env.loglet)
})
.await
}

#[test(tokio::test(start_paused = true))]
async fn replicated_loglet_single_append_after_seal_concurrent() -> Result<()> {
let loglet_id = ReplicatedLogletId::new(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::append_after_seal_concurrent(env.loglet)
})
.await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,35 @@
use metrics::{describe_counter, Unit};

pub(crate) const BIFROST_REPLICATED_APPEND: &str = "restate.bifrost.replicatedloglet.appends.total";
pub(crate) const BIFROST_REPLICATED_READ_CACHE_HIT: &str =
"restate.bifrost.replicatedloglet.read_record_cache_hit.total";
pub(crate) const BIFROST_REPLICATED_READ_CACHE_FILTERED: &str =
"restate.bifrost.replicatedloglet.read_record_cache_filtered.total";
pub(crate) const BIFROST_REPLICATED_READ_TOTAL: &str =
"restate.bifrost.replicatedloglet.read_record.total";

pub(crate) fn describe_metrics() {
describe_counter!(
BIFROST_REPLICATED_APPEND,
Unit::Count,
"Number of append requests to bifrost's replicated loglet"
);

describe_counter!(
BIFROST_REPLICATED_READ_CACHE_HIT,
Unit::Count,
"Number of records read from RecordCache"
);

describe_counter!(
BIFROST_REPLICATED_READ_CACHE_FILTERED,
Unit::Count,
"Number of records filtered out while reading from RecordCache"
);

describe_counter!(
BIFROST_REPLICATED_READ_TOTAL,
Unit::Count,
"Number of records read"
);
}
Loading

0 comments on commit 640d766

Please sign in to comment.