From c9d75762856814e307b51ef49accdf91fefa7ca8 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Mon, 7 Oct 2024 13:04:27 +0200 Subject: [PATCH] Add support to disable record cache Summary: Since record cache will be used in multiple places. It's possible to disable it by setting the memory budget to None. This way code can still use the cache normally but no records will be cached. All calls to Get will always return None if cache is disabled --- .../providers/replicated_loglet/provider.rs | 9 +++- .../replicated_loglet/record_cache.rs | 44 +++++++++++++------ crates/types/src/config/bifrost.rs | 11 ++++- 3 files changed, 48 insertions(+), 16 deletions(-) diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index caaf42b3cb..51f45aed92 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -121,8 +121,13 @@ impl ReplicatedLogletProvider { active_loglets: Default::default(), _metadata_store_client: metadata_store_client, networking, - // todo(asoli): read memory budget from ReplicatedLogletOptions - record_cache: RecordCache::new(20_000_000), // 20MB + record_cache: RecordCache::new( + Configuration::pinned() + .bifrost + .replicated_loglet + .record_cache_memory_size + .as_usize(), + ), logserver_rpc_routers, sequencer_rpc_routers, } diff --git a/crates/bifrost/src/providers/replicated_loglet/record_cache.rs b/crates/bifrost/src/providers/replicated_loglet/record_cache.rs index 9f5bfaa0e2..b6fcc418e7 100644 --- a/crates/bifrost/src/providers/replicated_loglet/record_cache.rs +++ b/crates/bifrost/src/providers/replicated_loglet/record_cache.rs @@ -27,27 +27,39 @@ type RecordKey = (ReplicatedLogletId, LogletOffset); /// RemoteSequencers #[derive(Clone)] pub struct RecordCache { - inner: Cache, + inner: Option>, } impl RecordCache { + /// Creates a new instance of RecordCache. If memory budget is None + /// cache will be disabled pub fn new(memory_budget_bytes: usize) -> Self { - let inner: Cache = CacheBuilder::default() - .weigher(|_, record: &Record| { - (size_of::() + record.estimated_encode_size()) - .try_into() - .unwrap_or(u32::MAX) - }) - .max_capacity(memory_budget_bytes.try_into().unwrap_or(u64::MAX)) - .eviction_policy(EvictionPolicy::lru()) - .build(); + let inner = if memory_budget_bytes > 0 { + Some( + CacheBuilder::default() + .weigher(|_, record: &Record| { + (size_of::() + record.estimated_encode_size()) + .try_into() + .unwrap_or(u32::MAX) + }) + .max_capacity(memory_budget_bytes.try_into().unwrap_or(u64::MAX)) + .eviction_policy(EvictionPolicy::lru()) + .build(), + ) + } else { + None + }; Self { inner } } /// Writes a record to cache externally pub fn add(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset, record: Record) { - self.inner.insert((loglet_id, offset), record); + let Some(ref inner) = self.inner else { + return; + }; + + inner.insert((loglet_id, offset), record); } /// Extend cache with records @@ -57,14 +69,20 @@ impl RecordCache { mut first_offset: LogletOffset, records: I, ) { + let Some(ref inner) = self.inner else { + return; + }; + for record in records.as_ref() { - self.inner.insert((loglet_id, first_offset), record.clone()); + inner.insert((loglet_id, first_offset), record.clone()); first_offset = first_offset.next(); } } /// Get a for given loglet id and offset. pub fn get(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset) -> Option { - self.inner.get(&(loglet_id, offset)) + let inner = self.inner.as_ref()?; + + inner.get(&(loglet_id, offset)) } } diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index 2e54dd9229..2cf052dfef 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -15,7 +15,7 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use serde_with::serde_as; -use restate_serde_util::NonZeroByteCount; +use restate_serde_util::{ByteCount, NonZeroByteCount}; use tracing::warn; use crate::logs::metadata::ProviderKind; @@ -237,6 +237,14 @@ pub struct ReplicatedLogletOptions { /// /// Retry policy for log server RPCs pub log_server_retry_policy: RetryPolicy, + + /// # In-memory RecordCache memory limit + /// + /// Optional size of record cache in bytes. + /// If set to 0, record cache will be disabled. + /// Defaults: 20M + #[cfg_attr(feature = "schemars", schemars(with = "ByteCount"))] + pub record_cache_memory_size: ByteCount, } impl Default for ReplicatedLogletOptions { @@ -257,6 +265,7 @@ impl Default for ReplicatedLogletOptions { Some(10), Some(Duration::from_millis(2000)), ), + record_cache_memory_size: 20_000_000u64.into(), // 20MB } } }