Skip to content

Commit

Permalink
Add support to disable record cache
Browse files Browse the repository at this point in the history
Summary:
Since record cache will be used in multiple places. It's possible
to disable it by setting the memory budget to None. This way code
can still use the cache normally but no records will be cached.

All calls to Get will always return None if cache is disabled
  • Loading branch information
muhamadazmy committed Oct 7, 2024
1 parent 2128500 commit 0f17968
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 18 deletions.
4 changes: 2 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {

#[cfg(test)]
mod tests {
use std::num::NonZeroU8;
use std::num::{NonZeroU8, NonZeroUsize};

use super::*;

Expand Down Expand Up @@ -251,7 +251,7 @@ mod tests {

let logserver_rpc = LogServersRpc::new(&mut node_env.router_builder);
let sequencer_rpc = SequencersRpc::new(&mut node_env.router_builder);
let record_cache = RecordCache::new(1_000_000);
let record_cache = RecordCache::new(Some(NonZeroUsize::new(1_000_000).unwrap()));

let log_server = LogServerService::create(
config.clone(),
Expand Down
9 changes: 7 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,13 @@ impl<T: TransportConnect> ReplicatedLogletProvider<T> {
active_loglets: Default::default(),
_metadata_store_client: metadata_store_client,
networking,
// todo(asoli): read memory budget from ReplicatedLogletOptions
record_cache: RecordCache::new(20_000_000), // 20MB
record_cache: RecordCache::new(
Configuration::pinned()
.bifrost
.replicated_loglet
.record_cache_memory_size
.map(|v| v.as_non_zero_usize()),
),
logserver_rpc_routers,
sequencer_rpc_routers,
}
Expand Down
44 changes: 30 additions & 14 deletions crates/bifrost/src/providers/replicated_loglet/record_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroUsize;

use moka::{
policy::EvictionPolicy,
sync::{Cache, CacheBuilder},
Expand All @@ -27,27 +29,35 @@ type RecordKey = (ReplicatedLogletId, LogletOffset);
/// RemoteSequencers
#[derive(Clone)]
pub struct RecordCache {
inner: Cache<RecordKey, Record>,
inner: Option<Cache<RecordKey, Record>>,
}

impl RecordCache {
pub fn new(memory_budget_bytes: usize) -> Self {
let inner: Cache<RecordKey, Record> = CacheBuilder::default()
.weigher(|_, record: &Record| {
(size_of::<RecordKey>() + record.estimated_encode_size())
.try_into()
.unwrap_or(u32::MAX)
})
.max_capacity(memory_budget_bytes.try_into().unwrap_or(u64::MAX))
.eviction_policy(EvictionPolicy::lru())
.build();
/// Creates a new instance of RecordCache. If memory budget is None
/// cache will be disabled
pub fn new(memory_budget_bytes: Option<NonZeroUsize>) -> Self {
let inner = memory_budget_bytes.map(|memory_budget_bytes| {
CacheBuilder::default()
.weigher(|_, record: &Record| {
(size_of::<RecordKey>() + record.estimated_encode_size())
.try_into()
.unwrap_or(u32::MAX)
})
.max_capacity(memory_budget_bytes.get().try_into().unwrap_or(u64::MAX))
.eviction_policy(EvictionPolicy::lru())
.build()
});

Self { inner }
}

/// Writes a record to cache externally
pub fn add(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset, record: Record) {
self.inner.insert((loglet_id, offset), record);
let Some(ref inner) = self.inner else {
return;
};

inner.insert((loglet_id, offset), record);
}

/// Extend cache with records
Expand All @@ -57,14 +67,20 @@ impl RecordCache {
mut first_offset: LogletOffset,
records: I,
) {
let Some(ref inner) = self.inner else {
return;
};

for record in records.as_ref() {
self.inner.insert((loglet_id, first_offset), record.clone());
inner.insert((loglet_id, first_offset), record.clone());
first_offset = first_offset.next();
}
}

/// Get a for given loglet id and offset.
pub fn get(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset) -> Option<Record> {
self.inner.get(&(loglet_id, offset))
let inner = self.inner.as_ref()?;

inner.get(&(loglet_id, offset))
}
}
11 changes: 11 additions & 0 deletions crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ pub struct ReplicatedLogletOptions {
///
/// Retry policy for log server RPCs
pub log_server_retry_policy: RetryPolicy,

/// # In-memory RecordCache memory limit
///
/// Optional size of record cache in bytes.
/// If set to None, record cache will be disabled.
/// Defaults: 20M
#[cfg_attr(feature = "schemars", schemars(with = "NonZeroByteCount"))]
pub record_cache_memory_size: Option<NonZeroByteCount>,
}

impl Default for ReplicatedLogletOptions {
Expand All @@ -257,6 +265,9 @@ impl Default for ReplicatedLogletOptions {
Some(10),
Some(Duration::from_millis(2000)),
),
record_cache_memory_size: Some(NonZeroByteCount::new(
NonZeroUsize::new(20_000_000).unwrap(),
)), // 20MB
}
}
}

0 comments on commit 0f17968

Please sign in to comment.