Skip to content

Commit

Permalink
feat: add import status (#494)
Browse files Browse the repository at this point in the history
  • Loading branch information
Leeeon233 authored Oct 5, 2024
1 parent e1bf4a8 commit 405cbe0
Show file tree
Hide file tree
Showing 16 changed files with 261 additions and 109 deletions.
4 changes: 2 additions & 2 deletions crates/fuzz/src/crdt_fuzzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use arbitrary::Arbitrary;
use fxhash::FxHashSet;
use loro::{ContainerType, Frontiers, LoroError, LoroResult};
use loro::{ContainerType, Frontiers, ImportStatus, LoroError, LoroResult};
use tabled::TableIteratorExt;
use tracing::{info, info_span, trace};

Expand Down Expand Up @@ -271,7 +271,7 @@ impl CRDTFuzzer {
}
}

fn handle_import_result(e: LoroResult<()>) {
fn handle_import_result(e: LoroResult<ImportStatus>) {
match e {
Ok(_) => {}
Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion) => {
Expand Down
3 changes: 2 additions & 1 deletion crates/loro-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ impl TryFrom<&str> for TreeID {

#[cfg(feature = "wasm")]
pub mod wasm {
use crate::{LoroError, TreeID};
use crate::{IdSpanVector, LoroError, TreeID};
use js_sys::Map;
use wasm_bindgen::JsValue;
impl From<TreeID> for JsValue {
fn from(value: TreeID) -> Self {
Expand Down
28 changes: 28 additions & 0 deletions crates/loro-common/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ impl CounterSpan {
}
}

pub fn extend_include(&mut self, new_start: Counter, new_end: Counter) {
self.set_start(new_start);
self.set_end(new_end);
}

/// if we can merge element on the left, this method return the last atom of it
fn prev_pos(&self) -> i32 {
if self.start < self.end {
Expand Down Expand Up @@ -509,6 +514,29 @@ impl From<ID> for IdSpan {
}
}

#[cfg(feature = "wasm")]
mod wasm {
use js_sys::Object;
use wasm_bindgen::JsValue;

use super::CounterSpan;

impl From<CounterSpan> for JsValue {
fn from(value: CounterSpan) -> Self {
let obj = Object::new();
js_sys::Reflect::set(
&obj,
&JsValue::from_str("start"),
&JsValue::from(value.start),
)
.unwrap();
js_sys::Reflect::set(&obj, &JsValue::from_str("end"), &JsValue::from(value.end))
.unwrap();
obj.into()
}
}
}

#[cfg(test)]
mod test_id_span {
use super::*;
Expand Down
12 changes: 6 additions & 6 deletions crates/loro-ffi/src/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::{
};

use loro::{
cursor::CannotFindRelativePosition, DocAnalysis, FrontiersNotIncluded, IdSpan, JsonPathError,
JsonSchema, Lamport, LoroDoc as InnerLoroDoc, LoroEncodeError, LoroError, LoroResult, PeerID,
Timestamp, ID,
cursor::CannotFindRelativePosition, DocAnalysis, FrontiersNotIncluded, IdSpan, ImportStatus,
JsonPathError, JsonSchema, Lamport, LoroDoc as InnerLoroDoc, LoroEncodeError, LoroError,
LoroResult, PeerID, Timestamp, ID,
};

use crate::{
Expand Down Expand Up @@ -253,7 +253,7 @@ impl LoroDoc {

/// Import updates/snapshot exported by [`LoroDoc::export_snapshot`] or [`LoroDoc::export_from`].
#[inline]
pub fn import(&self, bytes: &[u8]) -> Result<(), LoroError> {
pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
self.doc.import_with(bytes, "")
}

Expand All @@ -262,11 +262,11 @@ impl LoroDoc {
/// It marks the import with a custom `origin` string. It can be used to track the import source
/// in the generated events.
#[inline]
pub fn import_with(&self, bytes: &[u8], origin: &str) -> Result<(), LoroError> {
pub fn import_with(&self, bytes: &[u8], origin: &str) -> Result<ImportStatus, LoroError> {
self.doc.import_with(bytes, origin)
}

pub fn import_json_updates(&self, json: &str) -> Result<(), LoroError> {
pub fn import_json_updates(&self, json: &str) -> Result<ImportStatus, LoroError> {
self.doc.import_json_updates(json)
}

Expand Down
54 changes: 48 additions & 6 deletions crates/loro-internal/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ pub(crate) mod value_register;
pub(crate) use outdated_encode_reordered::{
decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
};
use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
pub(crate) use value::OwnedValue;

use crate::op::OpWithId;
use crate::version::Frontiers;
use crate::version::{Frontiers, VersionRange, VersionVectorDiff};
use crate::LoroDoc;
use crate::{oplog::OpLog, LoroError, VersionVector};
use loro_common::{IdLpSpan, IdSpan, LoroEncodeError, LoroResult, PeerID, ID};
use loro_common::{
CounterSpan, HasCounter, HasCounterSpan, IdLpSpan, IdSpan, IdSpanVector, LoroEncodeError,
LoroResult, PeerID, ID,
};
use num_traits::{FromPrimitive, ToPrimitive};
use rle::{HasLength, Sliceable};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -166,6 +170,12 @@ impl TryFrom<[u8; 2]> for EncodeMode {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ImportStatus {
pub success: IdSpanVector,
pub pending: Option<IdSpanVector>,
}

/// The encoder used to encode the container states.
///
/// Each container state can be represented by a sequence of operations.
Expand Down Expand Up @@ -243,16 +253,44 @@ pub(crate) fn encode_oplog(oplog: &OpLog, vv: &VersionVector, mode: EncodeMode)
pub(crate) fn decode_oplog(
oplog: &mut OpLog,
parsed: ParsedHeaderAndBody,
) -> Result<(), LoroError> {
) -> Result<ImportStatus, LoroError> {
let before_vv = oplog.vv().clone();
let ParsedHeaderAndBody { mode, body, .. } = parsed;
match mode {
let changes = match mode {
EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
outdated_encode_reordered::decode_updates(oplog, body)
}
EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
EncodeMode::Auto => unreachable!(),
}?;
let ImportChangesResult {
latest_ids,
pending_changes,
changes_that_deps_on_trimmed_history,
} = import_changes_to_oplog(changes, oplog);

let mut pending = IdSpanVector::default();
pending_changes.iter().for_each(|c| {
let peer = c.id.peer;
let start = c.ctr_start();
let end = c.ctr_end();
pending
.entry(peer)
.or_insert_with(|| CounterSpan::new(start, end))
.extend_include(start, end);
});
// TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
oplog.try_apply_pending(latest_ids);
oplog.import_unknown_lamport_pending_changes(pending_changes)?;
let after_vv = oplog.vv();
if !changes_that_deps_on_trimmed_history.is_empty() {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
Ok(ImportStatus {
success: before_vv.diff(after_vv).right,
pending: (!pending.is_empty()).then_some(pending),
})
}

pub(crate) struct ParsedHeaderAndBody<'a> {
Expand Down Expand Up @@ -427,12 +465,16 @@ pub(crate) fn decode_snapshot(
doc: &LoroDoc,
mode: EncodeMode,
body: &[u8],
) -> Result<(), LoroError> {
) -> Result<ImportStatus, LoroError> {
match mode {
EncodeMode::OutdatedSnapshot => outdated_encode_reordered::decode_snapshot(doc, body),
EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot(doc, body.to_vec().into()),
_ => unreachable!(),
}
};
Ok(ImportStatus {
success: doc.oplog_vv().diff(&Default::default()).left,
pending: None,
})
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
34 changes: 7 additions & 27 deletions crates/loro-internal/src/encoding/fast_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
//!
use std::io::{Read, Write};

use crate::{encoding::trimmed_snapshot, oplog::ChangeStore, LoroDoc, OpLog, VersionVector};
use crate::{
change::Change, encoding::trimmed_snapshot, oplog::ChangeStore, LoroDoc, OpLog, VersionVector,
};
use bytes::{Buf, Bytes};
use loro_common::{IdSpan, LoroError, LoroResult};
use tracing::trace;
Expand Down Expand Up @@ -208,7 +210,7 @@ pub(crate) fn encode_snapshot<W: std::io::Write>(doc: &LoroDoc, w: &mut W) {
}
}

pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroError> {
pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<Vec<Change>, LoroError> {
let oplog_len = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
let oplog_bytes = &bytes[4..4 + oplog_len as usize];
let mut changes = ChangeStore::decode_snapshot_for_updates(
Expand All @@ -217,18 +219,7 @@ pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroEr
oplog.vv(),
)?;
changes.sort_unstable_by_key(|x| x.lamport);
let ImportChangesResult {
latest_ids,
pending_changes,
changes_that_deps_on_trimmed_history,
} = import_changes_to_oplog(changes, oplog);
// TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
oplog.try_apply_pending(latest_ids);
oplog.import_unknown_lamport_pending_changes(pending_changes)?;
if !changes_that_deps_on_trimmed_history.is_empty() {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
Ok(())
Ok(changes)
}

pub(crate) fn encode_updates<W: std::io::Write>(doc: &LoroDoc, vv: &VersionVector, w: &mut W) {
Expand All @@ -244,7 +235,7 @@ pub(crate) fn encode_updates_in_range<W: std::io::Write>(
oplog.export_blocks_in_range(spans, w);
}

pub(crate) fn decode_updates(oplog: &mut OpLog, body: Bytes) -> Result<(), LoroError> {
pub(crate) fn decode_updates(oplog: &mut OpLog, body: Bytes) -> Result<Vec<Change>, LoroError> {
let mut reader: &[u8] = body.as_ref();
let mut index = 0;
let self_vv = oplog.vv();
Expand All @@ -263,16 +254,5 @@ pub(crate) fn decode_updates(oplog: &mut OpLog, body: Bytes) -> Result<(), LoroE
}

changes.sort_unstable_by_key(|x| x.lamport);
let ImportChangesResult {
latest_ids,
pending_changes,
changes_that_deps_on_trimmed_history,
} = import_changes_to_oplog(changes, oplog);
// TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
oplog.try_apply_pending(latest_ids);
oplog.import_unknown_lamport_pending_changes(pending_changes)?;
if !changes_that_deps_on_trimmed_history.is_empty() {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
Ok(())
Ok(changes)
}
35 changes: 25 additions & 10 deletions crates/loro-internal/src/encoding/json_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::sync::Arc;

use either::Either;
use loro_common::{
ContainerID, ContainerType, HasCounterSpan, IdLp, LoroError, LoroResult, LoroValue, PeerID,
TreeID, ID,
ContainerID, ContainerType, CounterSpan, HasCounter, HasCounterSpan, IdLp, IdSpanVector,
LoroError, LoroResult, LoroValue, PeerID, TreeID, ID,
};
use rle::{HasLength, RleVec, Sliceable};

Expand All @@ -22,8 +22,9 @@ use crate::{
OpLog, VersionVector,
};

use super::outdated_encode_reordered::{
import_changes_to_oplog, ImportChangesResult, ValueRegister,
use super::{
outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult, ValueRegister},
ImportStatus,
};
use json::{JsonOpContent, JsonSchema};

Expand Down Expand Up @@ -66,20 +67,34 @@ pub(crate) fn export_json<'a, 'c: 'a>(
}
}

pub(crate) fn import_json(oplog: &mut OpLog, json: JsonSchema) -> LoroResult<()> {
pub(crate) fn import_json(oplog: &mut OpLog, json: JsonSchema) -> LoroResult<ImportStatus> {
let before_vv = oplog.vv().clone();
let changes = decode_changes(json, &oplog.arena)?;
let ImportChangesResult {
latest_ids,
pending_changes,
changes_that_deps_on_trimmed_history,
} = import_changes_to_oplog(changes, oplog);
let mut pending = IdSpanVector::default();
pending_changes.iter().for_each(|c| {
let peer = c.id.peer;
let start = c.ctr_start();
let end = c.ctr_end();
pending
.entry(peer)
.or_insert_with(|| CounterSpan::new(start, end))
.extend_include(start, end);
});
oplog.try_apply_pending(latest_ids);
oplog.import_unknown_lamport_pending_changes(pending_changes)?;
if changes_that_deps_on_trimmed_history.is_empty() {
Ok(())
} else {
Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion)
}
if !changes_that_deps_on_trimmed_history.is_empty() {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
};
let after_vv = oplog.vv();
Ok(ImportStatus {
success: before_vv.diff(after_vv).right,
pending: (!pending.is_empty()).then_some(pending),
})
}

fn init_encode<'s, 'a: 's>(
Expand Down
22 changes: 8 additions & 14 deletions crates/loro-internal/src/encoding/outdated_encode_reordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{

use self::encode::{encode_changes, encode_ops, init_encode, TempOp};

use super::ImportStatus;
use super::{
arena::*,
parse_header_and_body,
Expand Down Expand Up @@ -136,7 +137,7 @@ pub(crate) fn encode_updates(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
}

#[instrument(skip_all)]
pub(crate) fn decode_updates(oplog: &mut OpLog, bytes: &[u8]) -> LoroResult<()> {
pub(crate) fn decode_updates(oplog: &mut OpLog, bytes: &[u8]) -> LoroResult<Vec<Change>> {
let iter = serde_columnar::iter_from_bytes::<EncodedDoc>(bytes)?;
let mut arenas = decode_arena(&iter.arenas)?;
let ops_map = extract_ops(
Expand All @@ -163,19 +164,8 @@ pub(crate) fn decode_updates(oplog: &mut OpLog, bytes: &[u8]) -> LoroResult<()>
deps,
ops_map,
)?;
let ImportChangesResult {
latest_ids,
pending_changes,
changes_that_deps_on_trimmed_history,
} = import_changes_to_oplog(changes, oplog);
// TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
oplog.try_apply_pending(latest_ids);
oplog.import_unknown_lamport_pending_changes(pending_changes)?;
if !changes_that_deps_on_trimmed_history.is_empty() {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}

Ok(())
Ok(changes)
}

pub fn decode_import_blob_meta(bytes: &[u8]) -> LoroResult<ImportBlobMetadata> {
Expand Down Expand Up @@ -734,7 +724,11 @@ pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: &[u8]) -> LoroResult<()> {
doc.update_oplog_and_apply_delta_to_state_if_needed(
|oplog| {
oplog.try_apply_pending(latest_ids);
Ok(())
// ImportStatus is unnecessary
Ok(ImportStatus {
success: Default::default(),
pending: None,
})
},
"".into(),
)?;
Expand Down
Loading

0 comments on commit 405cbe0

Please sign in to comment.