diff --git a/Cargo.lock b/Cargo.lock index f664af21..11683533 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1922,7 +1922,6 @@ dependencies = [ "binary-layout", "bincode", "criterion", - "crossbeam-channel", "fast-counter", "hi_sparse_bitset", "human_bytes", @@ -1959,7 +1958,6 @@ dependencies = [ "bytes", "chrono", "chrono-tz", - "crossbeam-channel", "decorum", "iana-time-zone", "inventory", diff --git a/Cargo.toml b/Cargo.toml index 0a295785..9810f833 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,6 @@ binary-layout = "3.2.0" bytes = "1.5.0" chrono = "0.4.31" criterion = { version = "0.5.1", features = ["async_tokio"] } -crossbeam-channel = "0.5.10" decorum = "0.3.1" # For ordering & comparing our floats enum-primitive-derive = "0.3.0" fast-counter = "1.0.0" diff --git a/crates/db/Cargo.toml b/crates/db/Cargo.toml index efdc3166..4550b387 100644 --- a/crates/db/Cargo.toml +++ b/crates/db/Cargo.toml @@ -41,7 +41,6 @@ metrics.workspace = true metrics-macros.workspace = true # For the DB layer. -crossbeam-channel.workspace = true bincode.workspace = true im.workspace = true sized-chunks.workspace = true diff --git a/crates/db/src/tuplebox/coldstorage.rs b/crates/db/src/tuplebox/coldstorage.rs index 86f3d885..5bb70854 100644 --- a/crates/db/src/tuplebox/coldstorage.rs +++ b/crates/db/src/tuplebox/coldstorage.rs @@ -272,7 +272,7 @@ impl ColdStorage { Some(*r), 0, ts, - page.page_size, + page.page_size as usize, |buf| page.save_into(buf), ); write_batch.push((*page_id, Some(wal_entry_buffer))); diff --git a/crates/db/src/tuplebox/tb.rs b/crates/db/src/tuplebox/tb.rs index 43dd4692..950a7602 100644 --- a/crates/db/src/tuplebox/tb.rs +++ b/crates/db/src/tuplebox/tb.rs @@ -15,7 +15,6 @@ // TODO: support sorted indices, too. // TODO: 'join' and transitive closure -> datalog-style variable unification -use std::collections::HashSet; use std::path::PathBuf; use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -55,9 +54,6 @@ pub struct TupleBox { // TODO: take a look at Adnan's thread-sharded approach described in section 3.1 // (https://www.vldb.org/pvldb/vol16/p1426-alhomssi.pdf) -- "Ordered Snapshot Instant Commit" maximum_transaction: AtomicU64, - /// The set of currently active transactions, which will be used to prune old unreferenced - /// versions of tuples. - active_transactions: RwLock>, /// Monotonically incrementing sequence counters. sequences: Vec, /// The copy-on-write set of current canonical base relations. @@ -109,7 +105,6 @@ impl TupleBox { Arc::new(Self { relation_info: relations.to_vec(), maximum_transaction: AtomicU64::new(0), - active_transactions: RwLock::new(HashSet::new()), canonical: RwLock::new(base_relations), sequences, backing_store, @@ -285,16 +280,10 @@ impl TupleBox { // And update the timestamp on the canonical relation. canonical[idx].ts = commit_ts; } - // Clear out the active transaction. - self.active_transactions.write().await.remove(&commit_ts); Ok(()) } - pub(crate) async fn abort_transaction(&self, ts: u64) { - self.active_transactions.write().await.remove(&ts); - } - pub async fn sync(&self, ts: u64, world_state: WorkingSet) { if let Some(bs) = &self.backing_store { let seqs = self diff --git a/crates/db/src/tuplebox/tuples/slot_ptr.rs b/crates/db/src/tuplebox/tuples/slot_ptr.rs index 5ddbe1cd..d943e3a1 100644 --- a/crates/db/src/tuplebox/tuples/slot_ptr.rs +++ b/crates/db/src/tuplebox/tuples/slot_ptr.rs @@ -26,7 +26,7 @@ use crate::tuplebox::tuples::{SlotBox, TupleId}; pub struct SlotPtr { sb: Arc, id: TupleId, - buflen: usize, + buflen: u32, bufaddr: *mut u8, _pin: std::marker::PhantomPinned, @@ -46,7 +46,7 @@ impl SlotPtr { sb: sb.clone(), id: tuple_id, bufaddr, - buflen, + buflen: buflen as u32, _pin: std::marker::PhantomPinned, } } @@ -85,7 +85,7 @@ impl SlotPtr { #[inline] fn buffer(&self) -> &[u8] { let buf_addr = self.as_ptr(); - unsafe { std::slice::from_raw_parts(buf_addr, self.buflen) } + unsafe { std::slice::from_raw_parts(buf_addr, self.buflen as usize) } } #[inline] diff --git a/crates/db/src/tuplebox/tuples/slotted_page.rs b/crates/db/src/tuplebox/tuples/slotted_page.rs index db007260..99242f7c 100644 --- a/crates/db/src/tuplebox/tuples/slotted_page.rs +++ b/crates/db/src/tuplebox/tuples/slotted_page.rs @@ -37,7 +37,7 @@ use tracing::error; use crate::tuplebox::tuples::slotbox::SlotBoxError; -pub type SlotId = usize; +pub type SlotId = u32; // Note that if a page is empty, either because it's new, or because all its slots have been // removed, then used_bytes is 0. @@ -173,7 +173,7 @@ impl SlotIndexEntry { /// is the same as the representation in-memory. pub struct SlottedPage<'a> { pub(crate) base_address: *mut u8, - pub(crate) page_size: usize, + pub(crate) page_size: u32, _marker: std::marker::PhantomData<&'a u8>, } @@ -195,7 +195,7 @@ impl<'a> SlottedPage<'a> { pub fn for_page(base_address: *mut u8, page_size: usize) -> Self { Self { base_address, - page_size, + page_size: page_size as u32, _marker: Default::default(), } } @@ -207,7 +207,7 @@ impl<'a> SlottedPage<'a> { let used = (header.num_slots * std::mem::size_of::() as u32) as usize + header.used_bytes as usize + std::mem::size_of::(); - self.page_size - used + self.page_size as usize - used } /// How many bytes are available for appending to this page (i.e. not counting the space @@ -218,7 +218,7 @@ impl<'a> SlottedPage<'a> { let index_length = header.index_length as usize; let header_size = std::mem::size_of::(); - self.page_size - (index_length + content_length + header_size) + self.page_size as usize - (index_length + content_length + header_size) } /// Add the slot into the page, copying it into the memory region, and returning the slot id @@ -235,8 +235,9 @@ impl<'a> SlottedPage<'a> { } if let Some(fit_slot) = fit_slot { let content_position = self.offset_of(fit_slot).unwrap().0; - let memory_as_slice = - unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size) }; + let memory_as_slice = unsafe { + std::slice::from_raw_parts_mut(self.base_address, self.page_size as usize) + }; // If there's an initial value provided, copy it in. if let Some(initial_value) = initial_value { @@ -270,9 +271,9 @@ impl<'a> SlottedPage<'a> { // Add the slot to the content region. The start offset is PAGE_SIZE - content_length - // slot_length. So first thing, copy the bytes into the content region at that position. let content_length = self.header_mut().content_length as usize; - let content_position = self.page_size - content_length - size; + let content_position = self.page_size as usize - content_length - size; let memory_as_slice = - unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size) }; + unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size as usize) }; // If there's an initial value provided, copy it in. if let Some(initial_value) = initial_value { @@ -282,8 +283,8 @@ impl<'a> SlottedPage<'a> { } // Add the index entry and expand the index region. - let mut index_entry = self.get_index_entry_mut(self.header_mut().num_slots as usize); - index_entry.as_mut().alloc(content_position, size); + let mut index_entry = self.get_index_entry_mut(self.header_mut().num_slots as SlotId); + index_entry.as_mut().alloc(content_position as usize, size); // Update the header let header = self.header_mut(); @@ -307,7 +308,7 @@ impl<'a> SlottedPage<'a> { let memory_as_slice = unsafe { Pin::new_unchecked(std::slice::from_raw_parts_mut( self.base_address, - self.page_size, + self.page_size as usize, )) }; lf(memory_as_slice); @@ -321,7 +322,7 @@ impl<'a> SlottedPage<'a> { let mut slots = vec![]; let num_slots = header.num_slots; for i in 0..num_slots { - let mut index_entry = self.get_index_entry_mut(i as usize); + let mut index_entry = self.get_index_entry_mut(i as SlotId); if index_entry.used { unsafe { index_entry.as_mut().get_unchecked_mut() }.refcount = 1; let slot_id = i as SlotId; @@ -336,7 +337,7 @@ impl<'a> SlottedPage<'a> { pub fn save_into(&self, buf: &mut [u8]) { let _ = self.read_lock(); let memory_as_slice = - unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size) }; + unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size as usize) }; buf.copy_from_slice(memory_as_slice); } @@ -380,52 +381,52 @@ impl<'a> SlottedPage<'a> { fn get_slot(&self, slot_id: SlotId) -> Result, SlotBoxError> { // Check that the index is in bounds - let num_slots = self.header().num_slots as usize; + let num_slots = self.header().num_slots as SlotId; if slot_id >= num_slots { error!( "slot_id {} is out of bounds for page with {} slots", slot_id, num_slots ); - return Err(SlotBoxError::TupleNotFound(slot_id)); + return Err(SlotBoxError::TupleNotFound(slot_id as usize)); } // Read the index entry; let index_entry = self.get_index_entry(slot_id); if !index_entry.used { error!("slot_id {} is not used, invalid tuple", slot_id); - return Err(SlotBoxError::TupleNotFound(slot_id)); + return Err(SlotBoxError::TupleNotFound(slot_id as usize)); } let offset = index_entry.offset as usize; let length = index_entry.used_bytes as usize; let memory_as_slice = - unsafe { std::slice::from_raw_parts(self.base_address, self.page_size) }; + unsafe { std::slice::from_raw_parts(self.base_address, self.page_size as usize) }; Ok(unsafe { Pin::new_unchecked(&memory_as_slice[offset..offset + length]) }) } fn get_slot_mut(&self, slot_id: SlotId) -> Result, SlotBoxError> { // Check that the index is in bounds - let num_slots = self.header().num_slots as usize; + let num_slots = self.header().num_slots as SlotId; if slot_id >= num_slots { - return Err(SlotBoxError::TupleNotFound(slot_id)); + return Err(SlotBoxError::TupleNotFound(slot_id as usize)); } // Read the index entry; let index_entry = self.get_index_entry(slot_id); if !index_entry.used { - return Err(SlotBoxError::TupleNotFound(slot_id)); + return Err(SlotBoxError::TupleNotFound(slot_id as usize)); } let offset = index_entry.offset as usize; let length = index_entry.used_bytes as usize; assert!( - offset + length <= self.page_size, + offset + length <= self.page_size as usize, "slot {} is out of bounds", slot_id ); let memory_as_slice = - unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size) }; + unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size as usize) }; Ok(unsafe { Pin::new_unchecked(&mut memory_as_slice[offset..offset + length]) }) } } @@ -525,9 +526,9 @@ impl<'a> SlottedPage<'a> { /// Return the offset, size of the slot at the given index. fn offset_of(&self, tid: SlotId) -> Result<(usize, usize), SlotBoxError> { // Check that the index is in bounds - let num_slots = self.header().num_slots as usize; + let num_slots = self.header().num_slots as SlotId; if tid >= num_slots { - return Err(SlotBoxError::TupleNotFound(tid)); + return Err(SlotBoxError::TupleNotFound(tid as usize)); } // Read the index entry; @@ -541,7 +542,7 @@ impl<'a> SlottedPage<'a> { let header = self.header(); let num_slots = header.num_slots; for i in 0..num_slots { - let index_entry = self.get_index_entry(i as usize); + let index_entry = self.get_index_entry(i as SlotId); if index_entry.used { continue; } @@ -554,7 +555,7 @@ impl<'a> SlottedPage<'a> { // Sort fits.sort_by(|a, b| a.1.cmp(&b.1)); if let Some((tid, _)) = fits.first() { - return (true, Some(*tid)); + return (true, Some(*tid as SlotId)); } let index_length = header.index_length as isize; @@ -569,7 +570,7 @@ impl<'a> SlottedPage<'a> { fn get_index_entry(&self, slot_id: SlotId) -> Pin<&SlotIndexEntry> { let index_offset = std::mem::size_of::() - + (slot_id * std::mem::size_of::()); + + ((slot_id as usize) * std::mem::size_of::()); let base_address = self.base_address; @@ -581,7 +582,7 @@ impl<'a> SlottedPage<'a> { fn get_index_entry_mut(&self, slot_id: SlotId) -> Pin<&mut SlotIndexEntry> { let index_offset = std::mem::size_of::() - + (slot_id * std::mem::size_of::()); + + ((slot_id as usize) * std::mem::size_of::()); let base_address = self.base_address; unsafe { @@ -593,7 +594,7 @@ impl<'a> SlottedPage<'a> { pub struct PageWriteGuard<'a> { base_address: *mut u8, - page_size: usize, + page_size: u32, _marker: std::marker::PhantomData<&'a u8>, } @@ -657,7 +658,7 @@ impl<'a> Drop for PageWriteGuard<'a> { pub struct PageReadGuard<'a> { base_address: *const u8, - page_size: usize, + page_size: u32, _marker: std::marker::PhantomData<&'a u8>, } @@ -673,7 +674,7 @@ impl<'a> PageReadGuard<'a> { pub fn get_slot(&self, slot_id: SlotId) -> Result, SlotBoxError> { let sp = SlottedPage { base_address: self.base_address as _, - page_size: self.page_size, + page_size: self.page_size as _, _marker: Default::default(), }; sp.get_slot(slot_id) diff --git a/crates/db/src/tuplebox/tx/transaction.rs b/crates/db/src/tuplebox/tx/transaction.rs index 44ff5f03..648af79d 100644 --- a/crates/db/src/tuplebox/tx/transaction.rs +++ b/crates/db/src/tuplebox/tx/transaction.rs @@ -31,8 +31,6 @@ use crate::tuplebox::RelationId; /// A versioned transaction, which is a fork of the current canonical base relations. pub struct Transaction { - /// The timestamp of this transaction, as granted to us by the tuplebox. - pub(crate) ts: u64, /// Where we came from, for referencing back to the base relations. db: Arc, /// The "working set" is the set of retrieved and/or modified tuples from base relations, known @@ -64,7 +62,6 @@ impl Transaction { let next_transient_relation_id = RelationId::transient(db.relation_info().len()); Self { - ts, db, working_set: RwLock::new(Some(ws)), transient_relations: RwLock::new(HashMap::new()), @@ -116,8 +113,6 @@ impl Transaction { pub async fn rollback(&self) -> Result<(), CommitError> { self.working_set.write().await.as_mut().unwrap().clear(); - // Clear out the active transaction. - self.db.abort_transaction(self.ts).await; Ok(()) } diff --git a/crates/kernel/Cargo.toml b/crates/kernel/Cargo.toml index 56c487ce..5518fdc3 100644 --- a/crates/kernel/Cargo.toml +++ b/crates/kernel/Cargo.toml @@ -61,5 +61,4 @@ metrics-util.workspace = true metrics-macros.workspace = true # For the DB layer. -crossbeam-channel.workspace = true bincode.workspace = true