Skip to content

Commit

Permalink
pref: optimize diff calc cache use
Browse files Browse the repository at this point in the history
  • Loading branch information
zxch3n committed Sep 24, 2024
1 parent bef39ce commit 344b596
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 50 deletions.
75 changes: 28 additions & 47 deletions crates/loro-internal/src/diff_calc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use itertools::Itertools;
use enum_dispatch::enum_dispatch;
use fxhash::{FxHashMap, FxHashSet};
use loro_common::{
CompactIdLp, ContainerID, Counter, HasCounterSpan, IdFull, IdLp, IdSpan, LoroValue, PeerID, ID,
CompactIdLp, ContainerID, Counter, HasCounter, HasCounterSpan, HasIdSpan, IdFull, IdLp, IdSpan,
LoroValue, PeerID, ID,
};
use loro_delta::DeltaRope;
use smallvec::SmallVec;
Expand All @@ -36,7 +37,7 @@ use crate::{
event::{DiffVariant, InternalDiff},
op::{InnerContent, RichOp, SliceRange, SliceWithId},
span::{HasId, HasLamport},
version::Frontiers,
version::{Frontiers, VersionRange},
InternalString, VersionVector,
};

Expand Down Expand Up @@ -64,10 +65,7 @@ enum DiffCalculatorRetainMode {
/// The diff calculator can only be used once.
Once { used: bool },
/// The diff calculator will be persisted and can be reused after the diff calc is done.
Persist {
has_all: bool,
last_vv: VersionVector,
},
Persist { recorded_ops_range: VersionRange },
}

/// This mode defines how the diff is calculated and how it should be applied on the state.
Expand Down Expand Up @@ -116,8 +114,7 @@ impl DiffCalculator {
calculators: Default::default(),
retain_mode: if persist {
DiffCalculatorRetainMode::Persist {
has_all: false,
last_vv: Default::default(),
recorded_ops_range: Default::default(),
}
} else {
DiffCalculatorRetainMode::Once { used: false }
Expand Down Expand Up @@ -158,61 +155,41 @@ impl DiffCalculator {
let _e = s.enter();

let mut use_persisted_shortcut = false;
let mut merged = before.clone();
merged.merge(after);
let (lca, mut diff_mode, iter) =
oplog.iter_from_lca_causally(before, before_frontiers, after, after_frontiers);
match &mut self.retain_mode {
DiffCalculatorRetainMode::Once { used } => {
if *used {
panic!("DiffCalculator with retain_mode Once can only be used once");
}
}
DiffCalculatorRetainMode::Persist { has_all, last_vv } => {
if *has_all {
let include_before = last_vv.includes_vv(before);
let include_after = last_vv.includes_vv(after);
if !include_after || !include_before {
*has_all = false;
*last_vv = Default::default();
}
}

if *has_all {
DiffCalculatorRetainMode::Persist { recorded_ops_range } => {
if recorded_ops_range.contains_ops_between(&lca, &merged)
&& recorded_ops_range.contains_ops_between(before, after)
{
use_persisted_shortcut = true;
} else {
diff_mode = DiffMode::Checkout;
recorded_ops_range.clear();
}
}
}

let affected_set = if !use_persisted_shortcut {
// if we don't have all the ops, we need to calculate the diff by tracing back
let mut merged = before.clone();
merged.merge(after);

let (lca, mut diff_mode, iter) =
oplog.iter_from_lca_causally(before, before_frontiers, after, after_frontiers);

if let DiffCalculatorRetainMode::Persist { has_all, last_vv } = &mut self.retain_mode {
if before.is_empty() {
*has_all = true;
*last_vv = Default::default();
}
diff_mode = DiffMode::Checkout;
}

tracing::debug!("LCA: {:?} mode={:?}", &lca, diff_mode);
let mut started_set = FxHashSet::default();
for (change, (start_counter, end_counter), vv) in iter {
if let DiffCalculatorRetainMode::Persist { has_all, last_vv } =
if let DiffCalculatorRetainMode::Persist { recorded_ops_range } =
&mut self.retain_mode
{
if *has_all {
if change.id.counter > 0 {
debug_assert!(
last_vv.includes_id(change.id.inc(-1)),
"{:?} {}",
&last_vv,
change.id
);
}

last_vv.extend_to_include_end_id(ID::new(change.id.peer, end_counter));
if container_filter.is_none() {
recorded_ops_range.extends_to_include_id_span(IdSpan::new(
change.peer(),
start_counter,
end_counter,
));
}
}

Expand Down Expand Up @@ -286,8 +263,12 @@ impl DiffCalculator {
// Find a set of affected containers idx, if it's relatively cheap
if before.distance_between(after) < self.calculators.len() || cfg!(debug_assertions) {
let mut set = FxHashSet::default();
oplog.for_each_change_within(before, after, |change| {
oplog.for_each_change_within(before, after, |change, (start, end)| {
for op in change.ops.iter() {
if op.ctr_end() <= start || op.ctr_start() >= end {
continue;
}

let idx = op.container;
if let Some(filter) = container_filter {
if !filter(idx) {
Expand Down
6 changes: 3 additions & 3 deletions crates/loro-internal/src/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::state::GcStore;
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::LoroError;
use change_store::BlockOpRef;
use loro_common::{IdLp, IdSpan};
use loro_common::{HasCounter, IdLp, IdSpan};
use rle::{HasLength, RleVec, Sliceable};
use smallvec::SmallVec;

Expand Down Expand Up @@ -433,12 +433,12 @@ impl OpLog {
&self,
a: &VersionVector,
b: &VersionVector,
mut f: impl FnMut(&Change),
mut f: impl FnMut(&Change, (Counter, Counter)),
) {
let spans = b.iter_between(a);
for span in spans {
for c in self.change_store.iter_changes(span) {
f(&c);
f(&c, (span.ctr_start(), span.ctr_end()));
}
}
}
Expand Down
65 changes: 65 additions & 0 deletions crates/loro-internal/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,71 @@ use crate::{
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionVector(FxHashMap<PeerID, Counter>);

#[repr(transparent)]
#[derive(Debug, Clone, Default)]
pub struct VersionRange(pub(crate) FxHashMap<PeerID, (Counter, Counter)>);

impl VersionRange {
pub fn new() -> Self {
Self(Default::default())
}

pub fn clear(&mut self) {
self.0.clear()
}

pub fn get(&self, peer: &PeerID) -> Option<&(Counter, Counter)> {
self.0.get(peer)
}

pub fn insert(&mut self, peer: PeerID, start: Counter, end: Counter) {
self.0.insert(peer, (start, end));
}

pub fn contains_ops_between(&self, vv_a: &VersionVector, vv_b: &VersionVector) -> bool {
for span in vv_a.sub_iter(vv_b) {
if !self.contains_id_span(IdSpan::new(
span.peer,
span.counter.start.saturating_sub(1),
span.counter.end,
)) {
return false;
}
}

for span in vv_b.sub_iter(vv_a) {
if !self.contains_id_span(IdSpan::new(
span.peer,
span.counter.start.saturating_sub(1),
span.counter.end,
)) {
return false;
}
}

true
}

pub fn contains_id_span(&self, mut span: IdSpan) -> bool {
span.normalize_();
if let Some((start, end)) = self.get(&span.peer) {
start <= &span.counter.start && end >= &span.counter.end
} else {
false
}
}

pub fn extends_to_include_id_span(&mut self, mut span: IdSpan) {
span.normalize_();
if let Some((start, end)) = self.0.get_mut(&span.peer) {
*start = (*start).min(span.counter.start);
*end = (*end).max(span.counter.end);
} else {
self.insert(span.peer, span.counter.start, span.counter.end);
}
}
}

/// Immutable version vector
///
/// It has O(1) clone time and O(logN) insert/delete/lookup time.
Expand Down

0 comments on commit 344b596

Please sign in to comment.