From de93d34a9c07c5e67d20b9c5c72b4fa67b2ddb1f Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Wed, 2 Oct 2024 14:24:57 +0800 Subject: [PATCH] fix: return err if snapshot container has unknown container (#488) --- crates/loro-common/src/error.rs | 3 + crates/loro-internal/src/encoding.rs | 78 ++++++++-------- crates/loro-internal/src/encoding/arena.rs | 2 +- ...ated_fast_snapshot.rs => fast_snapshot.rs} | 56 +----------- .../loro-internal/src/encoding/json_schema.rs | 4 +- ...rdered.rs => outdated_encode_reordered.rs} | 0 .../src/encoding/trimmed_snapshot.rs | 90 +++++++++++++++++-- crates/loro-internal/src/encoding/value.rs | 2 +- crates/loro-internal/src/loro.rs | 9 +- 9 files changed, 138 insertions(+), 106 deletions(-) rename crates/loro-internal/src/encoding/{outdated_fast_snapshot.rs => fast_snapshot.rs} (84%) rename crates/loro-internal/src/encoding/{encode_reordered.rs => outdated_encode_reordered.rs} (100%) diff --git a/crates/loro-common/src/error.rs b/crates/loro-common/src/error.rs index ebee8d07a..48d476384 100644 --- a/crates/loro-common/src/error.rs +++ b/crates/loro-common/src/error.rs @@ -110,12 +110,15 @@ pub enum LoroTreeError { TreeNodeDeletedOrNotExist(TreeID), } +#[non_exhaustive] #[derive(Error, Debug, PartialEq)] pub enum LoroEncodeError { #[error("The frontiers are not found in this doc: {0}")] FrontiersNotFound(String), #[error("Trimmed snapshot incompatible with old snapshot format. Use new snapshot format or avoid trimmed snapshots for storage.")] TrimmedSnapshotIncompatibleWithOldFormat, + #[error("Cannot export trimmed snapshot with unknown container type. Please upgrade the Loro version.")] + UnknownContainer, } #[cfg(feature = "wasm")] diff --git a/crates/loro-internal/src/encoding.rs b/crates/loro-internal/src/encoding.rs index 8510eb6ce..ddd877836 100644 --- a/crates/loro-internal/src/encoding.rs +++ b/crates/loro-internal/src/encoding.rs @@ -1,15 +1,14 @@ pub(crate) mod arena; -mod encode_reordered; +mod fast_snapshot; +pub(crate) mod json_schema; +mod outdated_encode_reordered; +mod trimmed_snapshot; pub(crate) mod value; pub(crate) mod value_register; -use std::borrow::Cow; - -pub(crate) use encode_reordered::{ +pub(crate) use outdated_encode_reordered::{ decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId, }; -pub(crate) mod json_schema; -mod outdated_fast_snapshot; -mod trimmed_snapshot; +pub(crate) use value::OwnedValue; use crate::op::OpWithId; use crate::version::Frontiers; @@ -19,7 +18,7 @@ use loro_common::{IdLpSpan, IdSpan, LoroEncodeError, LoroResult, PeerID, ID}; use num_traits::{FromPrimitive, ToPrimitive}; use rle::{HasLength, Sliceable}; use serde::{Deserialize, Serialize}; -pub(crate) use value::OwnedValue; +use std::borrow::Cow; #[non_exhaustive] #[derive(Debug, Clone)] @@ -234,7 +233,7 @@ pub(crate) fn encode_oplog(oplog: &OpLog, vv: &VersionVector, mode: EncodeMode) }; let body = match &mode { - EncodeMode::OutdatedRle => encode_reordered::encode_updates(oplog, vv), + EncodeMode::OutdatedRle => outdated_encode_reordered::encode_updates(oplog, vv), _ => unreachable!(), }; @@ -248,12 +247,10 @@ pub(crate) fn decode_oplog( let ParsedHeaderAndBody { mode, body, .. } = parsed; match mode { EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => { - encode_reordered::decode_updates(oplog, body) - } - EncodeMode::FastSnapshot => outdated_fast_snapshot::decode_oplog(oplog, body), - EncodeMode::FastUpdates => { - outdated_fast_snapshot::decode_updates(oplog, body.to_vec().into()) + outdated_encode_reordered::decode_updates(oplog, body) } + EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body), + EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()), EncodeMode::Auto => unreachable!(), } } @@ -331,7 +328,7 @@ fn encode_header_and_body(mode: EncodeMode, body: Vec) -> Vec { } pub(crate) fn export_snapshot(doc: &LoroDoc) -> Vec { - let body = encode_reordered::encode_snapshot( + let body = outdated_encode_reordered::encode_snapshot( &doc.oplog().try_lock().unwrap(), &mut doc.app_state().try_lock().unwrap(), &Default::default(), @@ -342,30 +339,36 @@ pub(crate) fn export_snapshot(doc: &LoroDoc) -> Vec { pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec { encode_with(EncodeMode::FastSnapshot, &mut |ans| { - outdated_fast_snapshot::encode_snapshot(doc, ans); + fast_snapshot::encode_snapshot(doc, ans); + Ok(()) }) + .unwrap() } -pub(crate) fn export_fast_snapshot_at( +pub(crate) fn export_snapshot_at( doc: &LoroDoc, frontiers: &Frontiers, ) -> Result, LoroEncodeError> { check_target_version_reachable(doc, frontiers)?; - Ok(encode_with(EncodeMode::FastSnapshot, &mut |ans| { - outdated_fast_snapshot::encode_snapshot_at(doc, frontiers, ans).unwrap(); - })) + encode_with(EncodeMode::FastSnapshot, &mut |ans| { + trimmed_snapshot::encode_snapshot_at(doc, frontiers, ans) + }) } pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec { encode_with(EncodeMode::FastUpdates, &mut |ans| { - outdated_fast_snapshot::encode_updates(doc, vv, ans); + fast_snapshot::encode_updates(doc, vv, ans); + Ok(()) }) + .unwrap() } pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec { encode_with(EncodeMode::FastUpdates, &mut |ans| { - outdated_fast_snapshot::encode_updates_in_range(oplog, spans, ans); + fast_snapshot::encode_updates_in_range(oplog, spans, ans); + Ok(()) }) + .unwrap() } pub(crate) fn export_trimmed_snapshot( @@ -373,9 +376,10 @@ pub(crate) fn export_trimmed_snapshot( f: &Frontiers, ) -> Result, LoroEncodeError> { check_target_version_reachable(doc, f)?; - Ok(encode_with(EncodeMode::FastSnapshot, &mut |ans| { - trimmed_snapshot::export_trimmed_snapshot(doc, f, ans).unwrap(); - })) + encode_with(EncodeMode::FastSnapshot, &mut |ans| { + trimmed_snapshot::export_trimmed_snapshot(doc, f, ans)?; + Ok(()) + }) } fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> { @@ -392,12 +396,16 @@ pub(crate) fn export_state_only_snapshot( f: &Frontiers, ) -> Result, LoroEncodeError> { check_target_version_reachable(doc, f)?; - Ok(encode_with(EncodeMode::FastSnapshot, &mut |ans| { - trimmed_snapshot::export_state_only_snapshot(doc, f, ans).unwrap(); - })) + encode_with(EncodeMode::FastSnapshot, &mut |ans| { + trimmed_snapshot::export_state_only_snapshot(doc, f, ans)?; + Ok(()) + }) } -fn encode_with(mode: EncodeMode, f: &mut dyn FnMut(&mut Vec)) -> Vec { +fn encode_with( + mode: EncodeMode, + f: &mut dyn FnMut(&mut Vec) -> Result<(), LoroEncodeError>, +) -> Result, LoroEncodeError> { // HEADER let mut ans = Vec::with_capacity(MIN_HEADER_SIZE); ans.extend(MAGIC_BYTES); @@ -406,13 +414,13 @@ fn encode_with(mode: EncodeMode, f: &mut dyn FnMut(&mut Vec)) -> Vec { ans.extend(mode.to_bytes()); // BODY - f(&mut ans); + f(&mut ans)?; // CHECKSUM in HEADER let checksum_body = &ans[20..]; let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED); ans[16..20].copy_from_slice(&checksum.to_le_bytes()); - ans + Ok(ans) } pub(crate) fn decode_snapshot( @@ -421,10 +429,8 @@ pub(crate) fn decode_snapshot( body: &[u8], ) -> Result<(), LoroError> { match mode { - EncodeMode::OutdatedSnapshot => encode_reordered::decode_snapshot(doc, body), - EncodeMode::FastSnapshot => { - outdated_fast_snapshot::decode_snapshot(doc, body.to_vec().into()) - } + EncodeMode::OutdatedSnapshot => outdated_encode_reordered::decode_snapshot(doc, body), + EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot(doc, body.to_vec().into()), _ => unreachable!(), } } @@ -453,7 +459,7 @@ pub struct ImportBlobMetadata { impl LoroDoc { /// Decodes the metadata for an imported blob from the provided bytes. pub fn decode_import_blob_meta(blob: &[u8]) -> LoroResult { - encode_reordered::decode_import_blob_meta(blob) + outdated_encode_reordered::decode_import_blob_meta(blob) } } diff --git a/crates/loro-internal/src/encoding/arena.rs b/crates/loro-internal/src/encoding/arena.rs index 6597256e3..ef02084b1 100644 --- a/crates/loro-internal/src/encoding/arena.rs +++ b/crates/loro-internal/src/encoding/arena.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use serde_columnar::{columnar, ColumnarError}; use super::{ - encode_reordered::{PeerIdx, MAX_DECODED_SIZE}, + outdated_encode_reordered::{PeerIdx, MAX_DECODED_SIZE}, value::{Value, ValueDecodedArenasTrait, ValueEncodeRegister}, value_register::ValueRegister, }; diff --git a/crates/loro-internal/src/encoding/outdated_fast_snapshot.rs b/crates/loro-internal/src/encoding/fast_snapshot.rs similarity index 84% rename from crates/loro-internal/src/encoding/outdated_fast_snapshot.rs rename to crates/loro-internal/src/encoding/fast_snapshot.rs index 3fc513a27..c3bf630da 100644 --- a/crates/loro-internal/src/encoding/outdated_fast_snapshot.rs +++ b/crates/loro-internal/src/encoding/fast_snapshot.rs @@ -15,15 +15,12 @@ //! use std::io::{Read, Write}; -use crate::{ - encoding::trimmed_snapshot, oplog::ChangeStore, version::Frontiers, LoroDoc, OpLog, - VersionVector, -}; +use crate::{encoding::trimmed_snapshot, oplog::ChangeStore, LoroDoc, OpLog, VersionVector}; use bytes::{Buf, Bytes}; -use loro_common::{IdSpan, LoroEncodeError, LoroError, LoroResult}; +use loro_common::{IdSpan, LoroError, LoroResult}; use tracing::trace; -use super::encode_reordered::{import_changes_to_oplog, ImportChangesResult}; +use super::outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult}; pub(crate) const EMPTY_MARK: &[u8] = b"E"; pub(super) struct Snapshot { @@ -211,53 +208,6 @@ pub(crate) fn encode_snapshot(doc: &LoroDoc, w: &mut W) { } } -pub(crate) fn encode_snapshot_at( - doc: &LoroDoc, - frontiers: &Frontiers, - w: &mut W, -) -> Result<(), LoroEncodeError> { - let version_before_start = doc.oplog_frontiers(); - doc.checkout_without_emitting(frontiers).unwrap(); - { - let mut state = doc.app_state().try_lock().unwrap(); - let oplog = doc.oplog().try_lock().unwrap(); - let is_gc = state.store.trimmed_store().is_some(); - if is_gc { - unimplemented!() - } - - assert!(!state.is_in_txn()); - let Some(oplog_bytes) = oplog.fork_changes_up_to(frontiers) else { - return Err(LoroEncodeError::FrontiersNotFound(format!( - "frontiers: {:?} when export in SnapshotAt mode", - frontiers - ))); - }; - - if oplog.is_trimmed() { - assert_eq!( - oplog.trimmed_frontiers(), - state.store.trimmed_frontiers().unwrap() - ); - } - - state.ensure_all_alive_containers(); - let state_bytes = state.store.encode(); - _encode_snapshot( - Snapshot { - oplog_bytes, - state_bytes: Some(state_bytes), - trimmed_bytes: Bytes::new(), - }, - w, - ); - } - doc.checkout_without_emitting(&version_before_start) - .unwrap(); - doc.drop_pending_events(); - Ok(()) -} - pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroError> { let oplog_len = u32::from_le_bytes(bytes[0..4].try_into().unwrap()); let oplog_bytes = &bytes[4..4 + oplog_len as usize]; diff --git a/crates/loro-internal/src/encoding/json_schema.rs b/crates/loro-internal/src/encoding/json_schema.rs index 0fa345c08..ea721b254 100644 --- a/crates/loro-internal/src/encoding/json_schema.rs +++ b/crates/loro-internal/src/encoding/json_schema.rs @@ -22,7 +22,9 @@ use crate::{ OpLog, VersionVector, }; -use super::encode_reordered::{import_changes_to_oplog, ImportChangesResult, ValueRegister}; +use super::outdated_encode_reordered::{ + import_changes_to_oplog, ImportChangesResult, ValueRegister, +}; use json::{JsonOpContent, JsonSchema}; const SCHEMA_VERSION: u8 = 1; diff --git a/crates/loro-internal/src/encoding/encode_reordered.rs b/crates/loro-internal/src/encoding/outdated_encode_reordered.rs similarity index 100% rename from crates/loro-internal/src/encoding/encode_reordered.rs rename to crates/loro-internal/src/encoding/outdated_encode_reordered.rs diff --git a/crates/loro-internal/src/encoding/trimmed_snapshot.rs b/crates/loro-internal/src/encoding/trimmed_snapshot.rs index 9ff266ce2..480787942 100644 --- a/crates/loro-internal/src/encoding/trimmed_snapshot.rs +++ b/crates/loro-internal/src/encoding/trimmed_snapshot.rs @@ -1,13 +1,14 @@ +use bytes::Bytes; use rle::HasLength; use std::collections::BTreeSet; -use loro_common::{ContainerID, LoroResult, ID}; +use loro_common::{ContainerID, ContainerType, LoroEncodeError, ID}; use tracing::{debug, trace}; use crate::{ container::list::list_op::InnerListOp, dag::{Dag, DagUtils}, - encoding::outdated_fast_snapshot::{Snapshot, _encode_snapshot}, + encoding::fast_snapshot::{Snapshot, _encode_snapshot}, state::container_store::FRONTIERS_KEY, version::Frontiers, LoroDoc, @@ -23,7 +24,7 @@ pub(crate) fn export_trimmed_snapshot( doc: &LoroDoc, start_from: &Frontiers, w: &mut W, -) -> LoroResult { +) -> Result { let oplog = doc.oplog().try_lock().unwrap(); let start_from = calc_trimmed_doc_start(&oplog, start_from); let mut start_vv = oplog.dag().frontiers_to_vv(&start_from).unwrap(); @@ -62,9 +63,12 @@ pub(crate) fn export_trimmed_snapshot( let latest_vv = oplog.vv(); let ops_num: usize = latest_vv.sub_iter(&start_vv).map(|x| x.atom_len()).sum(); drop(oplog); - doc.checkout_without_emitting(&start_from)?; + doc.checkout_without_emitting(&start_from).unwrap(); let mut state = doc.app_state().try_lock().unwrap(); let alive_containers = state.ensure_all_alive_containers(); + if has_unknown_container(alive_containers.iter()) { + return Err(LoroEncodeError::UnknownContainer); + } let mut alive_c_bytes: BTreeSet> = alive_containers.iter().map(|x| x.to_bytes()).collect(); state.store.flush(); @@ -119,11 +123,15 @@ pub(crate) fn export_trimmed_snapshot( Ok(start_from) } +fn has_unknown_container<'a>(mut cids: impl Iterator) -> bool { + cids.any(|cid| matches!(cid.container_type(), ContainerType::Unknown(_))) +} + pub(crate) fn export_state_only_snapshot( doc: &LoroDoc, start_from: &Frontiers, w: &mut W, -) -> LoroResult { +) -> Result { let oplog = doc.oplog().try_lock().unwrap(); let start_from = calc_trimmed_doc_start(&oplog, start_from); let mut start_vv = oplog.dag().frontiers_to_vv(&start_from).unwrap(); @@ -147,14 +155,13 @@ pub(crate) fn export_state_only_snapshot( let state_frontiers = doc.state_frontiers(); let is_attached = !doc.is_detached(); drop(oplog); - doc.checkout_without_emitting(&start_from)?; + doc.checkout_without_emitting(&start_from).unwrap(); let mut state = doc.app_state().try_lock().unwrap(); let alive_containers = state.ensure_all_alive_containers(); - let alive_c_bytes: BTreeSet> = alive_containers.iter().map(|x| x.to_bytes()).collect(); + let alive_c_bytes = cids_to_bytes(alive_containers); state.store.flush(); let trimmed_state_kv = state.store.get_kv().clone(); drop(state); - let state_bytes = None; trimmed_state_kv.retain_keys(&alive_c_bytes); trimmed_state_kv.insert(FRONTIERS_KEY, start_from.encode().into()); let trimmed_state_bytes = trimmed_state_kv.export(); @@ -162,7 +169,7 @@ pub(crate) fn export_state_only_snapshot( // println!("oplog_bytes.len = {:?}", oplog_bytes.len()); let snapshot = Snapshot { oplog_bytes, - state_bytes, + state_bytes: None, trimmed_bytes: trimmed_state_bytes, }; _encode_snapshot(snapshot, w); @@ -179,6 +186,16 @@ pub(crate) fn export_state_only_snapshot( Ok(start_from) } +fn cids_to_bytes( + alive_containers: std::collections::HashSet< + ContainerID, + std::hash::BuildHasherDefault, + >, +) -> BTreeSet> { + let alive_c_bytes: BTreeSet> = alive_containers.iter().map(|x| x.to_bytes()).collect(); + alive_c_bytes +} + /// Calculates optimal starting version for the trimmed doc /// /// It should be the LCA of the user given version and the latest version. @@ -210,3 +227,58 @@ fn calc_trimmed_doc_start(oplog: &crate::OpLog, frontiers: &Frontiers) -> Fronti start } + +pub(crate) fn encode_snapshot_at( + doc: &LoroDoc, + frontiers: &Frontiers, + w: &mut W, +) -> Result<(), LoroEncodeError> { + let version_before_start = doc.oplog_frontiers(); + doc.checkout_without_emitting(frontiers).unwrap(); + { + let mut state = doc.app_state().try_lock().unwrap(); + let oplog = doc.oplog().try_lock().unwrap(); + let is_trimmed = state.store.trimmed_store().is_some(); + if is_trimmed { + unimplemented!() + } + + assert!(!state.is_in_txn()); + let Some(oplog_bytes) = oplog.fork_changes_up_to(frontiers) else { + return Err(LoroEncodeError::FrontiersNotFound(format!( + "frontiers: {:?} when export in SnapshotAt mode", + frontiers + ))); + }; + + if oplog.is_trimmed() { + assert_eq!( + oplog.trimmed_frontiers(), + state.store.trimmed_frontiers().unwrap() + ); + } + + let alive_containers = state.ensure_all_alive_containers(); + if has_unknown_container(alive_containers.iter()) { + return Err(LoroEncodeError::UnknownContainer); + } + + let alive_c_bytes = cids_to_bytes(alive_containers); + state.store.flush(); + let state_kv = state.store.get_kv().clone(); + state_kv.retain_keys(&alive_c_bytes); + let bytes = state_kv.export(); + _encode_snapshot( + Snapshot { + oplog_bytes, + state_bytes: Some(bytes), + trimmed_bytes: Bytes::new(), + }, + w, + ); + } + doc.checkout_without_emitting(&version_before_start) + .unwrap(); + doc.drop_pending_events(); + Ok(()) +} diff --git a/crates/loro-internal/src/encoding/value.rs b/crates/loro-internal/src/encoding/value.rs index a2c69f5b3..b5b0ac725 100644 --- a/crates/loro-internal/src/encoding/value.rs +++ b/crates/loro-internal/src/encoding/value.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use crate::{ change::Lamport, container::tree::tree_op::TreeOp, - encoding::encode_reordered::MAX_COLLECTION_SIZE, + encoding::outdated_encode_reordered::MAX_COLLECTION_SIZE, }; use super::{ diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 7f348b101..47eac69e9 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -33,10 +33,9 @@ use crate::{ dag::DagUtils, diff_calc::DiffCalculator, encoding::{ - decode_snapshot, export_fast_snapshot, export_fast_snapshot_at, export_fast_updates, - export_fast_updates_in_range, export_snapshot, export_state_only_snapshot, - export_trimmed_snapshot, json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, - ParsedHeaderAndBody, + decode_snapshot, export_fast_snapshot, export_fast_updates, export_fast_updates_in_range, + export_snapshot, export_snapshot_at, export_state_only_snapshot, export_trimmed_snapshot, + json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, ParsedHeaderAndBody, }, event::{str_to_path, EventTriggerKind, Index, InternalDocDiff}, handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler}, @@ -1484,7 +1483,7 @@ impl LoroDoc { Some(f) => export_state_only_snapshot(self, &f)?, None => export_state_only_snapshot(self, &self.oplog_frontiers())?, }, - ExportMode::SnapshotAt { version } => export_fast_snapshot_at(self, &version)?, + ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?, }; self.renew_txn_if_auto_commit();