From 344b5968d4592d4a1e9307b79084aae8ab871973 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Tue, 24 Sep 2024 12:27:30 +0800 Subject: [PATCH] pref: optimize diff calc cache use --- crates/loro-internal/src/diff_calc.rs | 75 ++++++++++----------------- crates/loro-internal/src/oplog.rs | 6 +-- crates/loro-internal/src/version.rs | 65 +++++++++++++++++++++++ 3 files changed, 96 insertions(+), 50 deletions(-) diff --git a/crates/loro-internal/src/diff_calc.rs b/crates/loro-internal/src/diff_calc.rs index 77328a7a..892f85a1 100644 --- a/crates/loro-internal/src/diff_calc.rs +++ b/crates/loro-internal/src/diff_calc.rs @@ -13,7 +13,8 @@ use itertools::Itertools; use enum_dispatch::enum_dispatch; use fxhash::{FxHashMap, FxHashSet}; use loro_common::{ - CompactIdLp, ContainerID, Counter, HasCounterSpan, IdFull, IdLp, IdSpan, LoroValue, PeerID, ID, + CompactIdLp, ContainerID, Counter, HasCounter, HasCounterSpan, HasIdSpan, IdFull, IdLp, IdSpan, + LoroValue, PeerID, ID, }; use loro_delta::DeltaRope; use smallvec::SmallVec; @@ -36,7 +37,7 @@ use crate::{ event::{DiffVariant, InternalDiff}, op::{InnerContent, RichOp, SliceRange, SliceWithId}, span::{HasId, HasLamport}, - version::Frontiers, + version::{Frontiers, VersionRange}, InternalString, VersionVector, }; @@ -64,10 +65,7 @@ enum DiffCalculatorRetainMode { /// The diff calculator can only be used once. Once { used: bool }, /// The diff calculator will be persisted and can be reused after the diff calc is done. - Persist { - has_all: bool, - last_vv: VersionVector, - }, + Persist { recorded_ops_range: VersionRange }, } /// This mode defines how the diff is calculated and how it should be applied on the state. @@ -116,8 +114,7 @@ impl DiffCalculator { calculators: Default::default(), retain_mode: if persist { DiffCalculatorRetainMode::Persist { - has_all: false, - last_vv: Default::default(), + recorded_ops_range: Default::default(), } } else { DiffCalculatorRetainMode::Once { used: false } @@ -158,61 +155,41 @@ impl DiffCalculator { let _e = s.enter(); let mut use_persisted_shortcut = false; + let mut merged = before.clone(); + merged.merge(after); + let (lca, mut diff_mode, iter) = + oplog.iter_from_lca_causally(before, before_frontiers, after, after_frontiers); match &mut self.retain_mode { DiffCalculatorRetainMode::Once { used } => { if *used { panic!("DiffCalculator with retain_mode Once can only be used once"); } } - DiffCalculatorRetainMode::Persist { has_all, last_vv } => { - if *has_all { - let include_before = last_vv.includes_vv(before); - let include_after = last_vv.includes_vv(after); - if !include_after || !include_before { - *has_all = false; - *last_vv = Default::default(); - } - } - - if *has_all { + DiffCalculatorRetainMode::Persist { recorded_ops_range } => { + if recorded_ops_range.contains_ops_between(&lca, &merged) + && recorded_ops_range.contains_ops_between(before, after) + { use_persisted_shortcut = true; + } else { + diff_mode = DiffMode::Checkout; + recorded_ops_range.clear(); } } } let affected_set = if !use_persisted_shortcut { - // if we don't have all the ops, we need to calculate the diff by tracing back - let mut merged = before.clone(); - merged.merge(after); - - let (lca, mut diff_mode, iter) = - oplog.iter_from_lca_causally(before, before_frontiers, after, after_frontiers); - - if let DiffCalculatorRetainMode::Persist { has_all, last_vv } = &mut self.retain_mode { - if before.is_empty() { - *has_all = true; - *last_vv = Default::default(); - } - diff_mode = DiffMode::Checkout; - } - tracing::debug!("LCA: {:?} mode={:?}", &lca, diff_mode); let mut started_set = FxHashSet::default(); for (change, (start_counter, end_counter), vv) in iter { - if let DiffCalculatorRetainMode::Persist { has_all, last_vv } = + if let DiffCalculatorRetainMode::Persist { recorded_ops_range } = &mut self.retain_mode { - if *has_all { - if change.id.counter > 0 { - debug_assert!( - last_vv.includes_id(change.id.inc(-1)), - "{:?} {}", - &last_vv, - change.id - ); - } - - last_vv.extend_to_include_end_id(ID::new(change.id.peer, end_counter)); + if container_filter.is_none() { + recorded_ops_range.extends_to_include_id_span(IdSpan::new( + change.peer(), + start_counter, + end_counter, + )); } } @@ -286,8 +263,12 @@ impl DiffCalculator { // Find a set of affected containers idx, if it's relatively cheap if before.distance_between(after) < self.calculators.len() || cfg!(debug_assertions) { let mut set = FxHashSet::default(); - oplog.for_each_change_within(before, after, |change| { + oplog.for_each_change_within(before, after, |change, (start, end)| { for op in change.ops.iter() { + if op.ctr_end() <= start || op.ctr_start() >= end { + continue; + } + let idx = op.container; if let Some(filter) = container_filter { if !filter(idx) { diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index be7bccdc..6a8e170f 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -28,7 +28,7 @@ use crate::state::GcStore; use crate::version::{Frontiers, ImVersionVector, VersionVector}; use crate::LoroError; use change_store::BlockOpRef; -use loro_common::{IdLp, IdSpan}; +use loro_common::{HasCounter, IdLp, IdSpan}; use rle::{HasLength, RleVec, Sliceable}; use smallvec::SmallVec; @@ -433,12 +433,12 @@ impl OpLog { &self, a: &VersionVector, b: &VersionVector, - mut f: impl FnMut(&Change), + mut f: impl FnMut(&Change, (Counter, Counter)), ) { let spans = b.iter_between(a); for span in spans { for c in self.change_store.iter_changes(span) { - f(&c); + f(&c, (span.ctr_start(), span.ctr_end())); } } } diff --git a/crates/loro-internal/src/version.rs b/crates/loro-internal/src/version.rs index 89cc8256..14a83eef 100644 --- a/crates/loro-internal/src/version.rs +++ b/crates/loro-internal/src/version.rs @@ -27,6 +27,71 @@ use crate::{ #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VersionVector(FxHashMap); +#[repr(transparent)] +#[derive(Debug, Clone, Default)] +pub struct VersionRange(pub(crate) FxHashMap); + +impl VersionRange { + pub fn new() -> Self { + Self(Default::default()) + } + + pub fn clear(&mut self) { + self.0.clear() + } + + pub fn get(&self, peer: &PeerID) -> Option<&(Counter, Counter)> { + self.0.get(peer) + } + + pub fn insert(&mut self, peer: PeerID, start: Counter, end: Counter) { + self.0.insert(peer, (start, end)); + } + + pub fn contains_ops_between(&self, vv_a: &VersionVector, vv_b: &VersionVector) -> bool { + for span in vv_a.sub_iter(vv_b) { + if !self.contains_id_span(IdSpan::new( + span.peer, + span.counter.start.saturating_sub(1), + span.counter.end, + )) { + return false; + } + } + + for span in vv_b.sub_iter(vv_a) { + if !self.contains_id_span(IdSpan::new( + span.peer, + span.counter.start.saturating_sub(1), + span.counter.end, + )) { + return false; + } + } + + true + } + + pub fn contains_id_span(&self, mut span: IdSpan) -> bool { + span.normalize_(); + if let Some((start, end)) = self.get(&span.peer) { + start <= &span.counter.start && end >= &span.counter.end + } else { + false + } + } + + pub fn extends_to_include_id_span(&mut self, mut span: IdSpan) { + span.normalize_(); + if let Some((start, end)) = self.0.get_mut(&span.peer) { + *start = (*start).min(span.counter.start); + *end = (*end).max(span.counter.end); + } else { + self.insert(span.peer, span.counter.start, span.counter.end); + } + } +} + /// Immutable version vector /// /// It has O(1) clone time and O(logN) insert/delete/lookup time.