diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b197d468..04154c63 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -40,9 +40,9 @@ jobs: with: version: "0.2.92" - uses: Swatinem/rust-cache@v2 + - name: Check + run: cargo clippy --all-features - name: Build run: cargo build --verbose - name: Run rust tests - run: deno task test - - name: Run wasm tests - run: deno task test-wasm + run: deno task test-all diff --git a/crates/loro-ffi/src/doc.rs b/crates/loro-ffi/src/doc.rs index 4ce6face..1a34d8be 100644 --- a/crates/loro-ffi/src/doc.rs +++ b/crates/loro-ffi/src/doc.rs @@ -409,6 +409,7 @@ impl LoroDoc { let s = self.doc.subscribe_local_update(Box::new(move |update| { // TODO: should it be cloned? callback.on_local_update(update.to_vec()); + true })); Arc::new(Subscription(Arc::new(Mutex::new(s)))) } diff --git a/crates/loro-internal/src/allocation/mod.rs b/crates/loro-internal/src/allocation/mod.rs index 8ebb3ac5..be34ae5e 100644 --- a/crates/loro-internal/src/allocation/mod.rs +++ b/crates/loro-internal/src/allocation/mod.rs @@ -1,3 +1,6 @@ +#![allow(dead_code)] +#![allow(unused)] + mod bfs; pub(crate) use bfs::calc_critical_version_bfs as calc_critical_version; diff --git a/crates/loro-internal/src/lib.rs b/crates/loro-internal/src/lib.rs index 77b8264b..6cdcecbc 100644 --- a/crates/loro-internal/src/lib.rs +++ b/crates/loro-internal/src/lib.rs @@ -33,7 +33,7 @@ pub use state::{TreeNode, TreeNodeWithChildren, TreeParentId}; use subscription::{LocalUpdateCallback, Observer, PeerIdUpdateCallback}; use txn::Transaction; pub use undo::UndoManager; -use utils::subscription::SubscriberSet; +use utils::subscription::SubscriberSetWithQueue; pub use utils::subscription::Subscription; pub mod allocation; pub mod awareness; @@ -120,7 +120,6 @@ pub struct LoroDoc { txn: Arc>>, auto_commit: AtomicBool, detached: AtomicBool, - - local_update_subs: SubscriberSet<(), LocalUpdateCallback>, - peer_id_change_subs: SubscriberSet<(), PeerIdUpdateCallback>, + local_update_subs: SubscriberSetWithQueue<(), LocalUpdateCallback, Vec>, + peer_id_change_subs: SubscriberSetWithQueue<(), PeerIdUpdateCallback, ID>, } diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 7f4edafe..b76f2e08 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -46,7 +46,7 @@ use crate::{ subscription::{LocalUpdateCallback, Observer, Subscriber}, txn::Transaction, undo::DiffBatch, - utils::subscription::{SubscriberSet, Subscription}, + utils::subscription::{SubscriberSetWithQueue, Subscription}, version::{shrink_frontiers, Frontiers, ImVersionVector}, ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler, VersionVector, @@ -90,8 +90,8 @@ impl LoroDoc { diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))), txn: global_txn, arena, - local_update_subs: SubscriberSet::new(), - peer_id_change_subs: SubscriberSet::new(), + local_update_subs: SubscriberSetWithQueue::new(), + peer_id_change_subs: SubscriberSetWithQueue::new(), } } @@ -120,9 +120,8 @@ impl LoroDoc { txn, auto_commit: AtomicBool::new(false), detached: AtomicBool::new(self.is_detached()), - - local_update_subs: SubscriberSet::new(), - peer_id_change_subs: SubscriberSet::new(), + local_update_subs: SubscriberSetWithQueue::new(), + peer_id_change_subs: SubscriberSetWithQueue::new(), }; if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) { @@ -279,11 +278,7 @@ impl LoroDoc { let new_txn = self.txn().unwrap(); self.txn.try_lock().unwrap().replace(new_txn); - - self.peer_id_change_subs.retain(&(), &mut |callback| { - callback(peer, next_id.counter); - true - }); + self.peer_id_change_subs.emit(&(), next_id); return Ok(()); } @@ -300,10 +295,7 @@ impl LoroDoc { .peer .store(peer, std::sync::atomic::Ordering::Relaxed); drop(doc_state); - self.peer_id_change_subs.retain(&(), &mut |callback| { - callback(peer, next_id.counter); - true - }); + self.peer_id_change_subs.emit(&(), next_id); Ok(()) } @@ -981,7 +973,7 @@ impl LoroDoc { } pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription { - let (sub, activate) = self.local_update_subs.insert((), callback); + let (sub, activate) = self.local_update_subs.inner().insert((), callback); activate(); sub } @@ -1507,24 +1499,36 @@ impl LoroDoc { 0 } } +} +#[derive(Debug, thiserror::Error)] +pub enum ChangeTravelError { + #[error("Target id not found {0:?}")] + TargetIdNotFound(ID), + #[error("History on the target version is trimmed")] + TargetVersionTrimmed, +} + +impl LoroDoc { pub fn travel_change_ancestors( &self, - id: ID, + ids: &[ID], f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>, - ) { + ) -> Result<(), ChangeTravelError> { struct PendingNode(ChangeMeta); impl PartialEq for PendingNode { fn eq(&self, other: &Self) -> bool { self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer } } + impl Eq for PendingNode {} impl PartialOrd for PendingNode { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } + impl Ord for PendingNode { fn cmp(&self, other: &Self) -> Ordering { self.0 @@ -1534,15 +1538,23 @@ impl LoroDoc { } } - if !self.oplog().try_lock().unwrap().vv().includes_id(id) { - return; + for id in ids { + let op_log = &self.oplog().try_lock().unwrap(); + if !op_log.vv().includes_id(*id) { + return Err(ChangeTravelError::TargetIdNotFound(*id)); + } + if op_log.dag.trimmed_vv().includes_id(*id) { + return Err(ChangeTravelError::TargetVersionTrimmed); + } } let mut visited = FxHashSet::default(); let mut pending: BinaryHeap = BinaryHeap::new(); - pending.push(PendingNode(ChangeMeta::from_change( - &self.oplog().try_lock().unwrap().get_change_at(id).unwrap(), - ))); + for id in ids { + pending.push(PendingNode(ChangeMeta::from_change( + &self.oplog().try_lock().unwrap().get_change_at(*id).unwrap(), + ))); + } while let Some(PendingNode(node)) = pending.pop() { let deps = node.deps.clone(); if f(node).is_break() { @@ -1561,6 +1573,8 @@ impl LoroDoc { pending.push(PendingNode(ChangeMeta::from_change(&dep_node))); } } + + Ok(()) } } diff --git a/crates/loro-internal/src/subscription.rs b/crates/loro-internal/src/subscription.rs index 7bdf00fc..bf238a18 100644 --- a/crates/loro-internal/src/subscription.rs +++ b/crates/loro-internal/src/subscription.rs @@ -7,7 +7,7 @@ use crate::{ Subscription, }; use fxhash::FxHashMap; -use loro_common::{ContainerID, Counter, PeerID}; +use loro_common::{ContainerID, ID}; use smallvec::SmallVec; use std::{ collections::VecDeque, @@ -15,15 +15,15 @@ use std::{ }; /// The callback of the local update. -pub type LocalUpdateCallback = Box; +pub type LocalUpdateCallback = Box) -> bool + Send + Sync + 'static>; /// The callback of the peer id change. The second argument is the next counter for the peer. -pub type PeerIdUpdateCallback = Box; +pub type PeerIdUpdateCallback = Box bool + Send + Sync + 'static>; pub type Subscriber = Arc Fn(DiffEvent<'a>)) + Send + Sync>; impl LoroDoc { /// Subscribe to the changes of the peer id. pub fn subscribe_peer_id_change(&self, callback: PeerIdUpdateCallback) -> Subscription { - let (s, enable) = self.peer_id_change_subs.insert((), callback); + let (s, enable) = self.peer_id_change_subs.inner().insert((), callback); enable(); s } diff --git a/crates/loro-internal/src/txn.rs b/crates/loro-internal/src/txn.rs index aaecc67a..7de8909d 100644 --- a/crates/loro-internal/src/txn.rs +++ b/crates/loro-internal/src/txn.rs @@ -11,7 +11,6 @@ use loro_common::{ContainerType, IdLp, IdSpan, LoroResult}; use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder}; use rle::{HasLength, Mergable, RleVec}; use smallvec::{smallvec, SmallVec}; -use tracing::trace; use crate::{ change::{Change, Lamport, Timestamp}, @@ -70,24 +69,25 @@ impl crate::LoroDoc { ); let obs = self.observer.clone(); - let local_update_subs = self.local_update_subs.clone(); + let local_update_subs_weak = self.local_update_subs.downgrade(); txn.set_on_commit(Box::new(move |state, oplog, id_span| { - trace!("on_commit!"); let mut state = state.try_lock().unwrap(); let events = state.take_events(); drop(state); for event in events { - trace!("on_commit! {:#?}", &event); obs.emit(event); } - if !local_update_subs.is_empty() { - let bytes = - { export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) }; - local_update_subs.retain(&(), &mut |callback| { - callback(&bytes); - true - }); + if id_span.atom_len() == 0 { + return; + } + + if let Some(local_update_subs) = local_update_subs_weak.upgrade() { + if !local_update_subs.inner().is_empty() { + let bytes = + { export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) }; + local_update_subs.emit(&(), bytes); + } } })); diff --git a/crates/loro-internal/src/undo.rs b/crates/loro-internal/src/undo.rs index 69f9cf18..03e639f5 100644 --- a/crates/loro-internal/src/undo.rs +++ b/crates/loro-internal/src/undo.rs @@ -505,12 +505,13 @@ impl UndoManager { } })); - let sub = doc.subscribe_peer_id_change(Box::new(move |peer_id, counter| { + let sub = doc.subscribe_peer_id_change(Box::new(move |id| { let mut inner = inner_clone2.try_lock().unwrap(); inner.undo_stack.clear(); inner.redo_stack.clear(); - inner.next_counter = Some(counter); - peer_clone2.store(peer_id, std::sync::atomic::Ordering::Relaxed); + inner.next_counter = Some(id.counter); + peer_clone2.store(id.peer, std::sync::atomic::Ordering::Relaxed); + true })); UndoManager { diff --git a/crates/loro-internal/src/utils/subscription.rs b/crates/loro-internal/src/utils/subscription.rs index cbabdab3..fff4dbf7 100644 --- a/crates/loro-internal/src/utils/subscription.rs +++ b/crates/loro-internal/src/utils/subscription.rs @@ -226,15 +226,12 @@ Apache License END OF TERMS AND CONDITIONS */ -use std::collections::{BTreeMap, BTreeSet, VecDeque}; -use std::error::Error; -use std::ptr::eq; +use smallvec::SmallVec; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Mutex; +use std::sync::{Mutex, Weak}; use std::{fmt::Debug, mem, sync::Arc}; -use smallvec::SmallVec; - #[derive(Debug)] pub enum SubscriptionError { CannotEmitEventDueToRecursiveCall, @@ -474,11 +471,25 @@ pub(crate) struct SubscriberSetWithQueue { queue: Arc>>>, } +pub(crate) struct WeakSubscriberSetWithQueue { + subscriber_set: Weak>>, + queue: Weak>>>, +} + +impl WeakSubscriberSetWithQueue { + pub fn upgrade(self) -> Option> { + Some(SubscriberSetWithQueue { + subscriber_set: SubscriberSet(self.subscriber_set.upgrade()?), + queue: self.queue.upgrade()?, + }) + } +} + impl SubscriberSetWithQueue where EmitterKey: 'static + Ord + Clone + Debug + Send + Sync, - Callback: 'static + Send + Sync + FnMut(&Payload) -> bool, - Payload: Send + Sync, + Callback: 'static + Send + Sync + for<'a> FnMut(&'a Payload) -> bool, + Payload: Send + Sync + Debug, { pub fn new() -> Self { Self { @@ -486,6 +497,14 @@ where queue: Arc::new(Mutex::new(Default::default())), } } + + pub fn downgrade(&self) -> WeakSubscriberSetWithQueue { + WeakSubscriberSetWithQueue { + subscriber_set: Arc::downgrade(&self.subscriber_set.0), + queue: Arc::downgrade(&self.queue), + } + } + pub fn inner(&self) -> &SubscriberSet { &self.subscriber_set } diff --git a/crates/loro-wasm/src/lib.rs b/crates/loro-wasm/src/lib.rs index 65b1790a..18d76faa 100644 --- a/crates/loro-wasm/src/lib.rs +++ b/crates/loro-wasm/src/lib.rs @@ -2,7 +2,7 @@ #![allow(non_snake_case)] #![allow(clippy::empty_docs)] #![allow(clippy::doc_lazy_continuation)] -#![warn(missing_docs)] +// #![warn(missing_docs)] use convert::{js_to_version_vector, resolved_diff_to_js}; use js_sys::{Array, Object, Promise, Reflect, Uint8Array}; @@ -1241,6 +1241,7 @@ impl LoroDoc { if let Err(e) = observer.call1(&arr.into()) { console_error!("Error: {:?}", e); } + true }))); let closure = Closure::wrap(Box::new(move || { diff --git a/crates/loro/src/lib.rs b/crates/loro/src/lib.rs index 5ab3da3b..82b7d672 100644 --- a/crates/loro/src/lib.rs +++ b/crates/loro/src/lib.rs @@ -8,6 +8,7 @@ use loro_internal::cursor::PosQueryResult; use loro_internal::cursor::Side; use loro_internal::handler::HandlerTrait; use loro_internal::handler::ValueOrHandler; +use loro_internal::loro::ChangeTravelError; use loro_internal::undo::{OnPop, OnPush}; pub use loro_internal::version::ImVersionVector; use loro_internal::DocState; @@ -798,14 +799,14 @@ impl LoroDoc { /// /// # Arguments /// - /// * `id` - The ID of the Change to start the traversal from. + /// * `ids` - The IDs of the Change to start the traversal from. /// * `f` - A mutable function that is called for each ancestor. It can return `ControlFlow::Break(())` to stop the traversal. pub fn travel_change_ancestors( &self, - id: ID, + ids: &[ID], f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>, - ) { - self.doc.travel_change_ancestors(id, f) + ) -> Result<(), ChangeTravelError> { + self.doc.travel_change_ancestors(ids, f) } } diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index 351fcec3..0144d448 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -1238,8 +1238,9 @@ fn test_loro_export_local_updates() { let updates = Arc::new(Mutex::new(Vec::new())); let updates_clone = updates.clone(); - let subscription = doc.subscribe_local_update(Box::new(move |bytes: &[u8]| { + let subscription = doc.subscribe_local_update(Box::new(move |bytes: &Vec| { updates_clone.try_lock().unwrap().push(bytes.to_vec()); + true })); // Make some changes @@ -1728,8 +1729,9 @@ fn change_peer_id() { let doc = LoroDoc::new(); let received_peer_id = Arc::new(AtomicU64::new(0)); let received_peer_id_clone = received_peer_id.clone(); - let sub = doc.subscribe_peer_id_change(Box::new(move |peer_id, _counter| { - received_peer_id_clone.store(peer_id, Ordering::SeqCst); + let sub = doc.subscribe_peer_id_change(Box::new(move |id| { + received_peer_id_clone.store(id.peer, Ordering::SeqCst); + true })); doc.set_peer_id(1).unwrap(); @@ -1787,10 +1789,11 @@ fn travel_change_ancestors() { let f = doc.state_frontiers(); assert_eq!(f.len(), 1); let mut changes = vec![]; - doc.travel_change_ancestors(f[0], &mut |meta| { + doc.travel_change_ancestors(&[f[0]], &mut |meta| { changes.push(meta.clone()); ControlFlow::Continue(()) - }); + }) + .unwrap(); let dbg_str = format!("{:#?}", changes); assert_eq!( @@ -1861,10 +1864,11 @@ fn travel_change_ancestors() { ); let mut changes = vec![]; - doc.travel_change_ancestors(ID::new(2, 4), &mut |meta| { + doc.travel_change_ancestors(&[ID::new(2, 4)], &mut |meta| { changes.push(meta.clone()); ControlFlow::Continue(()) - }); + }) + .unwrap(); let dbg_str = format!("{:#?}", changes); assert_eq!( dbg_str, @@ -1895,6 +1899,30 @@ fn travel_change_ancestors() { ); } +#[test] +fn no_dead_loop_when_subscribe_local_updates_to_each_other() { + let doc1 = Arc::new(LoroDoc::new()); + let doc2 = Arc::new(LoroDoc::new()); + + let doc1_clone = doc1.clone(); + let doc2_clone = doc2.clone(); + let _sub1 = doc1.subscribe_local_update(Box::new(move |updates| { + doc2_clone.import(updates).unwrap(); + true + })); + let _sub2 = doc2.subscribe_local_update(Box::new(move |updates| { + doc1_clone.import(updates).unwrap(); + true + })); + + doc1.get_text("text").insert(0, "Hello").unwrap(); + doc1.commit(); + doc2.get_text("text").insert(0, "World").unwrap(); + doc2.commit(); + + assert_eq!(doc1.get_deep_value(), doc2.get_deep_value()); +} + /// https://github.com/loro-dev/loro/issues/490 #[test] fn issue_490() -> anyhow::Result<()> { diff --git a/package.json b/package.json index 734c0e15..4ed1d5f1 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "check-all": "cargo hack check --each-feature", "build": "cargo build", "test": "cargo nextest run --features=test_utils,jsonpath --no-fail-fast && cargo test --doc", - "test-all": "nr test && nr test-wasm", + "test-all": "pnpm test && pnpm test-wasm", "test-wasm": "cd crates/loro-wasm && deno task dev && cd ../../loro-js && pnpm i && pnpm run test", "coverage": "mkdir -p coverage && cargo llvm-cov nextest --features test_utils,jsonpath --lcov > coverage/lcov-nextest.info && cargo llvm-cov report", "release-wasm": "cd crates/loro-wasm && deno task release && cd ../../loro-js && pnpm i && pnpm build && pnpm run test",