Skip to content

Commit

Permalink
fix: fix all warnings and refine the impl of subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
zxch3n committed Oct 3, 2024
1 parent 6b1d787 commit 9e81547
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 53 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ jobs:
with:
version: "0.2.92"
- uses: Swatinem/rust-cache@v2
- name: Check
run: cargo clippy --all-features
- name: Build
run: cargo build --verbose
- name: Run rust tests
run: deno task test
- name: Run wasm tests
run: deno task test-wasm
run: deno task test-all
1 change: 1 addition & 0 deletions crates/loro-ffi/src/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ impl LoroDoc {
let s = self.doc.subscribe_local_update(Box::new(move |update| {
// TODO: should it be cloned?
callback.on_local_update(update.to_vec());
true
}));
Arc::new(Subscription(Arc::new(Mutex::new(s))))
}
Expand Down
3 changes: 3 additions & 0 deletions crates/loro-internal/src/allocation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#![allow(dead_code)]
#![allow(unused)]

mod bfs;
pub(crate) use bfs::calc_critical_version_bfs as calc_critical_version;

Expand Down
7 changes: 3 additions & 4 deletions crates/loro-internal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub use state::{TreeNode, TreeNodeWithChildren, TreeParentId};
use subscription::{LocalUpdateCallback, Observer, PeerIdUpdateCallback};
use txn::Transaction;
pub use undo::UndoManager;
use utils::subscription::SubscriberSet;
use utils::subscription::SubscriberSetWithQueue;
pub use utils::subscription::Subscription;
pub mod allocation;
pub mod awareness;
Expand Down Expand Up @@ -120,7 +120,6 @@ pub struct LoroDoc {
txn: Arc<Mutex<Option<Transaction>>>,
auto_commit: AtomicBool,
detached: AtomicBool,

local_update_subs: SubscriberSet<(), LocalUpdateCallback>,
peer_id_change_subs: SubscriberSet<(), PeerIdUpdateCallback>,
local_update_subs: SubscriberSetWithQueue<(), LocalUpdateCallback, Vec<u8>>,
peer_id_change_subs: SubscriberSetWithQueue<(), PeerIdUpdateCallback, ID>,
}
24 changes: 8 additions & 16 deletions crates/loro-internal/src/loro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
subscription::{LocalUpdateCallback, Observer, Subscriber},
txn::Transaction,
undo::DiffBatch,
utils::subscription::{SubscriberSet, Subscription},
utils::subscription::{SubscriberSetWithQueue, Subscription},
version::{shrink_frontiers, Frontiers, ImVersionVector},
ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler,
VersionVector,
Expand Down Expand Up @@ -90,8 +90,8 @@ impl LoroDoc {
diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))),
txn: global_txn,
arena,
local_update_subs: SubscriberSet::new(),
peer_id_change_subs: SubscriberSet::new(),
local_update_subs: SubscriberSetWithQueue::new(),
peer_id_change_subs: SubscriberSetWithQueue::new(),
}
}

Expand Down Expand Up @@ -120,9 +120,8 @@ impl LoroDoc {
txn,
auto_commit: AtomicBool::new(false),
detached: AtomicBool::new(self.is_detached()),

local_update_subs: SubscriberSet::new(),
peer_id_change_subs: SubscriberSet::new(),
local_update_subs: SubscriberSetWithQueue::new(),
peer_id_change_subs: SubscriberSetWithQueue::new(),
};

if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
Expand Down Expand Up @@ -276,11 +275,7 @@ impl LoroDoc {

let new_txn = self.txn().unwrap();
self.txn.try_lock().unwrap().replace(new_txn);

self.peer_id_change_subs.retain(&(), &mut |callback| {
callback(peer, next_id.counter);
true
});
self.peer_id_change_subs.emit(&(), next_id);
return Ok(());
}

Expand All @@ -297,10 +292,7 @@ impl LoroDoc {
.peer
.store(peer, std::sync::atomic::Ordering::Relaxed);
drop(doc_state);
self.peer_id_change_subs.retain(&(), &mut |callback| {
callback(peer, next_id.counter);
true
});
self.peer_id_change_subs.emit(&(), next_id);
Ok(())
}

Expand Down Expand Up @@ -978,7 +970,7 @@ impl LoroDoc {
}

pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
let (sub, activate) = self.local_update_subs.insert((), callback);
let (sub, activate) = self.local_update_subs.inner().insert((), callback);
activate();
sub
}
Expand Down
8 changes: 4 additions & 4 deletions crates/loro-internal/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ use crate::{
Subscription,
};
use fxhash::FxHashMap;
use loro_common::{ContainerID, Counter, PeerID};
use loro_common::{ContainerID, ID};
use smallvec::SmallVec;
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};

/// The callback of the local update.
pub type LocalUpdateCallback = Box<dyn Fn(&[u8]) + Send + Sync + 'static>;
pub type LocalUpdateCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
/// The callback of the peer id change. The second argument is the next counter for the peer.
pub type PeerIdUpdateCallback = Box<dyn Fn(PeerID, Counter) + Send + Sync + 'static>;
pub type PeerIdUpdateCallback = Box<dyn Fn(&ID) -> bool + Send + Sync + 'static>;
pub type Subscriber = Arc<dyn (for<'a> Fn(DiffEvent<'a>)) + Send + Sync>;

impl LoroDoc {
/// Subscribe to the changes of the peer id.
pub fn subscribe_peer_id_change(&self, callback: PeerIdUpdateCallback) -> Subscription {
let (s, enable) = self.peer_id_change_subs.insert((), callback);
let (s, enable) = self.peer_id_change_subs.inner().insert((), callback);
enable();
s
}
Expand Down
22 changes: 11 additions & 11 deletions crates/loro-internal/src/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
use rle::{HasLength, Mergable, RleVec};
use smallvec::{smallvec, SmallVec};
use tracing::trace;

use crate::{
change::{Change, Lamport, Timestamp},
Expand Down Expand Up @@ -70,24 +69,25 @@ impl crate::LoroDoc {
);

let obs = self.observer.clone();
let local_update_subs = self.local_update_subs.clone();
let local_update_subs_weak = self.local_update_subs.downgrade();
txn.set_on_commit(Box::new(move |state, oplog, id_span| {
trace!("on_commit!");
let mut state = state.try_lock().unwrap();
let events = state.take_events();
drop(state);
for event in events {
trace!("on_commit! {:#?}", &event);
obs.emit(event);
}

if !local_update_subs.is_empty() {
let bytes =
{ export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) };
local_update_subs.retain(&(), &mut |callback| {
callback(&bytes);
true
});
if id_span.atom_len() == 0 {
return;
}

if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
if !local_update_subs.inner().is_empty() {
let bytes =
{ export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) };
local_update_subs.emit(&(), bytes);
}
}
}));

Expand Down
7 changes: 4 additions & 3 deletions crates/loro-internal/src/undo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,13 @@ impl UndoManager {
}
}));

let sub = doc.subscribe_peer_id_change(Box::new(move |peer_id, counter| {
let sub = doc.subscribe_peer_id_change(Box::new(move |id| {
let mut inner = inner_clone2.try_lock().unwrap();
inner.undo_stack.clear();
inner.redo_stack.clear();
inner.next_counter = Some(counter);
peer_clone2.store(peer_id, std::sync::atomic::Ordering::Relaxed);
inner.next_counter = Some(id.counter);
peer_clone2.store(id.peer, std::sync::atomic::Ordering::Relaxed);
true
}));

UndoManager {
Expand Down
35 changes: 27 additions & 8 deletions crates/loro-internal/src/utils/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,12 @@ Apache License
END OF TERMS AND CONDITIONS
*/
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::error::Error;
use std::ptr::eq;
use smallvec::SmallVec;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::sync::{Mutex, Weak};
use std::{fmt::Debug, mem, sync::Arc};

use smallvec::SmallVec;

#[derive(Debug)]
pub enum SubscriptionError {
CannotEmitEventDueToRecursiveCall,
Expand Down Expand Up @@ -474,18 +471,40 @@ pub(crate) struct SubscriberSetWithQueue<EmitterKey, Callback, Payload> {
queue: Arc<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
}

pub(crate) struct WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
subscriber_set: Weak<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
queue: Weak<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
}

impl<EmitterKey, Callback, Payload> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
pub fn upgrade(self) -> Option<SubscriberSetWithQueue<EmitterKey, Callback, Payload>> {
Some(SubscriberSetWithQueue {
subscriber_set: SubscriberSet(self.subscriber_set.upgrade()?),
queue: self.queue.upgrade()?,
})
}
}

impl<EmitterKey, Callback, Payload> SubscriberSetWithQueue<EmitterKey, Callback, Payload>
where
EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
Callback: 'static + Send + Sync + FnMut(&Payload) -> bool,
Payload: Send + Sync,
Callback: 'static + Send + Sync + for<'a> FnMut(&'a Payload) -> bool,
Payload: Send + Sync + Debug,
{
pub fn new() -> Self {
Self {
subscriber_set: SubscriberSet::new(),
queue: Arc::new(Mutex::new(Default::default())),
}
}

pub fn downgrade(&self) -> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
WeakSubscriberSetWithQueue {
subscriber_set: Arc::downgrade(&self.subscriber_set.0),
queue: Arc::downgrade(&self.queue),
}
}

pub fn inner(&self) -> &SubscriberSet<EmitterKey, Callback> {
&self.subscriber_set
}
Expand Down
3 changes: 2 additions & 1 deletion crates/loro-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![allow(non_snake_case)]
#![allow(clippy::empty_docs)]
#![allow(clippy::doc_lazy_continuation)]
#![warn(missing_docs)]
// #![warn(missing_docs)]

use convert::{js_to_version_vector, resolved_diff_to_js};
use js_sys::{Array, Object, Promise, Reflect, Uint8Array};
Expand Down Expand Up @@ -1241,6 +1241,7 @@ impl LoroDoc {
if let Err(e) = observer.call1(&arr.into()) {
console_error!("Error: {:?}", e);
}
true
})));

let closure = Closure::wrap(Box::new(move || {
Expand Down
32 changes: 29 additions & 3 deletions crates/loro/tests/loro_rust_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1238,8 +1238,9 @@ fn test_loro_export_local_updates() {
let updates = Arc::new(Mutex::new(Vec::new()));

let updates_clone = updates.clone();
let subscription = doc.subscribe_local_update(Box::new(move |bytes: &[u8]| {
let subscription = doc.subscribe_local_update(Box::new(move |bytes: &Vec<u8>| {
updates_clone.try_lock().unwrap().push(bytes.to_vec());
true
}));

// Make some changes
Expand Down Expand Up @@ -1728,8 +1729,9 @@ fn change_peer_id() {
let doc = LoroDoc::new();
let received_peer_id = Arc::new(AtomicU64::new(0));
let received_peer_id_clone = received_peer_id.clone();
let sub = doc.subscribe_peer_id_change(Box::new(move |peer_id, _counter| {
received_peer_id_clone.store(peer_id, Ordering::SeqCst);
let sub = doc.subscribe_peer_id_change(Box::new(move |id| {
received_peer_id_clone.store(id.peer, Ordering::SeqCst);
true
}));

doc.set_peer_id(1).unwrap();
Expand Down Expand Up @@ -1896,3 +1898,27 @@ fn travel_change_ancestors() {
]"#
);
}

#[test]
fn no_dead_loop_when_subscribe_local_updates_to_each_other() {
let doc1 = Arc::new(LoroDoc::new());
let doc2 = Arc::new(LoroDoc::new());

let doc1_clone = doc1.clone();
let doc2_clone = doc2.clone();
let _sub1 = doc1.subscribe_local_update(Box::new(move |updates| {
doc2_clone.import(updates).unwrap();
true
}));
let _sub2 = doc2.subscribe_local_update(Box::new(move |updates| {
doc1_clone.import(updates).unwrap();
true
}));

doc1.get_text("text").insert(0, "Hello").unwrap();
doc1.commit();
doc2.get_text("text").insert(0, "World").unwrap();
doc2.commit();

assert_eq!(doc1.get_deep_value(), doc2.get_deep_value());
}

0 comments on commit 9e81547

Please sign in to comment.