diff --git a/pumpkin-world/Cargo.toml b/pumpkin-world/Cargo.toml index a3dd85303..52be66f97 100644 --- a/pumpkin-world/Cargo.toml +++ b/pumpkin-world/Cargo.toml @@ -27,6 +27,8 @@ dashmap = "6.1" num-traits = "0.2" +parking_lot = "0.12.3" + # Compression flate2 = "1.0" lz4 = "1.28" diff --git a/pumpkin-world/src/chunks_io.rs b/pumpkin-world/src/chunks_io.rs index c8bc4a214..7854e6e71 100644 --- a/pumpkin-world/src/chunks_io.rs +++ b/pumpkin-world/src/chunks_io.rs @@ -11,7 +11,6 @@ use dashmap::DashMap; use log::{error, trace}; use pumpkin_util::math::vector2::Vector2; use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; -use tokio::sync::RwLock; use crate::{ chunk::{ChunkReadingError, ChunkWritingError}, @@ -56,6 +55,10 @@ where folder: &LevelFolder, chunks_data: &[(Vector2, &D)], ) -> Result<(), ChunkWritingError>; + + fn cache_count(&self) -> usize; + + fn wait_for_lock_releases(&self); } /// Trait to serialize and deserialize the chunk data to and from bytes. @@ -85,63 +88,107 @@ pub trait ChunkSerializer: Send + Sync + Sized + Default { /// It also avoid IO operations that could produce dataraces thanks to the /// DashMap that manages the locks for the files. pub struct ChunkFileManager { - file_locks: Arc>>>, + // TODO: Make file reading/writing async + + // Dashmap has rw-locks on shards, but we want per-serializer + file_locks: DashMap>>, _serializer: std::marker::PhantomData, } impl Default for ChunkFileManager { fn default() -> Self { Self { - file_locks: Arc::default(), + file_locks: DashMap::default(), _serializer: std::marker::PhantomData, } } } -impl ChunkFileManager { - pub fn read_file(&self, path: &Path) -> Result>, ChunkReadingError> { - // We get the entry from the DashMap and try to insert a new lock if it doesn't exist - // using dead-lock save methods like `or_try_insert_with` +pub struct ChunkFileManagerLockGuard<'a, S: ChunkSerializer> { + lock: Arc>, + parent: &'a ChunkFileManager, + key: PathBuf, +} - if let Some(serializer) = &self.file_locks.get(path) { - trace!("Using cached file: {:?}", path); - return Ok(serializer.value().clone()); - } +impl<'a, S: ChunkSerializer> ChunkFileManagerLockGuard<'a, S> { + fn new( + key: PathBuf, + parent: &'a ChunkFileManager, + lock: Arc>, + ) -> Self { + Self { key, parent, lock } + } - let serializer = &self + fn write(&self) -> parking_lot::RwLockWriteGuard<'_, S> { + self.lock.write() + } + + fn read(&self) -> parking_lot::RwLockReadGuard<'_, S> { + self.lock.read() + } +} + +impl Drop for ChunkFileManagerLockGuard<'_, S> { + fn drop(&mut self) { + // If we can aquire a write lock, that means nothing else is locking this -> drop it + let _ = self + .parent .file_locks - .entry(path.to_path_buf()) - .or_try_insert_with(|| { - trace!("Reading file from Disk: {:?}", path); - let file = OpenOptions::new() - .write(false) - .read(true) - .create(false) - .truncate(false) - .open(path) - .map_err(|err| match err.kind() { - ErrorKind::NotFound => ChunkReadingError::ChunkNotExist, - kind => ChunkReadingError::IoError(kind), - }); - - match file { - Ok(file) => { - let file_bytes = file - .bytes() - .collect::, _>>() - .map_err(|err| ChunkReadingError::IoError(err.kind()))?; - - Ok(Arc::new(RwLock::new(S::from_bytes(&file_bytes)?))) - } - Err(ChunkReadingError::ChunkNotExist) => { - Ok(Arc::new(RwLock::new(S::default()))) - } - Err(err) => Err(err), - } - })? - .downgrade(); + .remove_if(&self.key, |_, value| value.try_write().is_some()); + } +} - Ok(serializer.value().clone()) +impl ChunkFileManager { + pub fn read_file( + &self, + path: &Path, + ) -> Result, ChunkReadingError> { + // We get the entry from the DashMap and try to insert a new lock if it doesn't exist + // using dead-lock safe methods like `or_try_insert_with` + + let locked_ref = if let Some(serializer) = self.file_locks.get(path) { + trace!("Using cached chunk serializer: {:?}", path); + serializer + } else { + self.file_locks + .entry(path.to_path_buf()) + .or_try_insert_with(|| { + trace!("Reading file from Disk: {:?}", path); + let file = OpenOptions::new() + .write(false) + .read(true) + .create(false) + .truncate(false) + .open(path) + .map_err(|err| match err.kind() { + ErrorKind::NotFound => ChunkReadingError::ChunkNotExist, + kind => ChunkReadingError::IoError(kind), + }); + + let value = match file { + Ok(file) => { + let file_bytes = file + .bytes() + .collect::, _>>() + .map_err(|err| ChunkReadingError::IoError(err.kind()))?; + + S::from_bytes(&file_bytes)? + } + Err(ChunkReadingError::ChunkNotExist) => S::default(), + Err(err) => return Err(err), + }; + + Ok(Arc::new(parking_lot::RwLock::new(value))) + })? + .downgrade() + }; + + let lock = locked_ref.clone(); + Ok(ChunkFileManagerLockGuard::new( + path.to_path_buf(), + self, + lock, + )) } pub fn write_file(&self, path: &Path, serializer: &S) -> Result<(), ChunkWritingError> { @@ -227,12 +274,9 @@ where }; // We need to block the read to avoid other threads to write/modify the data - - let chunk_guard = tokio::task::block_in_place(|| chunk_serializer.blocking_read()); - let fetched_chunks = chunk_guard.get_chunks_data(chunks.as_slice()); - drop(chunk_guard); - - fetched_chunks + // NOTE: Currently this is done in a rayon thread, so async is not needed + let chunk_guard = chunk_serializer.read(); + chunk_guard.get_chunks_data(chunks.as_slice()) }) .collect(); @@ -267,6 +311,8 @@ where .try_for_each(|(file_name, chunks)| { let path = folder.region_folder.join(file_name); + //TODO: Do we need to read the chunk from file to write it every time? Cant we just write to + //offsets in the file? Especially for the anvil format. let chunk_serializer = match self.read_file(&path) { Ok(file) => Ok(file), Err(ChunkReadingError::ChunkNotExist) => { @@ -283,16 +329,14 @@ where }?; // We need to block the read to avoid other threads to write/modify/read the data - let mut chunk_guard = - tokio::task::block_in_place(|| chunk_serializer.blocking_write()); + // NOTE: We currently call this from a rayon thread so no async is needed. + let mut chunk_guard = chunk_serializer.write(); chunk_guard.add_chunks_data(chunks.as_slice())?; // With the modification done, we can drop the write lock but keep the read lock // to avoid other threads to write/modify the data, but allow other threads to read it - let chunk_guard = chunk_guard.downgrade(); + let chunk_guard = parking_lot::RwLockWriteGuard::downgrade(chunk_guard); self.write_file(&path, &chunk_guard)?; - drop(chunk_guard); - Ok(()) })?; @@ -300,4 +344,21 @@ where Ok(()) } + + fn cache_count(&self) -> usize { + self.file_locks.len() + } + + fn wait_for_lock_releases(&self) { + let locks: Vec<_> = self + .file_locks + .iter() + .map(|entry| entry.value().clone()) + .collect(); + + // Aquire a write lock on all entries to verify they are complete + for lock in locks { + let _lock = lock.write(); + } + } } diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs index 671c439e5..dc9efac05 100644 --- a/pumpkin-world/src/level.rs +++ b/pumpkin-world/src/level.rs @@ -14,8 +14,8 @@ use tokio::{ use crate::{ chunk::{ChunkData, anvil::AnvilChunkFile, linear::LinearFile}, chunks_io::{ChunkFileManager, ChunkIO, LoadedData}, - generation::{ Seed, WorldGenerator, get_world_gen}, - lock::{LevelLocker,anvil::AnvilLevelLocker }, + generation::{Seed, WorldGenerator, get_world_gen}, + lock::{LevelLocker, anvil::AnvilLevelLocker}, world_info::{ LevelData, WorldInfoError, WorldInfoReader, WorldInfoWriter, anvil::{AnvilLevelInfo, LEVEL_DAT_BACKUP_FILE_NAME, LEVEL_DAT_FILE_NAME}, @@ -120,16 +120,17 @@ impl Level { pub async fn save(&self) { log::info!("Saving level..."); - // chunks are automatically saved when all players get removed - // TODO: Await chunks that have been called by this ^ - // save all stragling chunks let chunks_to_write = self .loaded_chunks .iter() .map(|chunk| chunk.value().clone()) .collect::>(); - self.write_chunks(chunks_to_write).await; + self.write_chunks(&chunks_to_write).await; + + // wait for chunks currently saving in other threads + // TODO: Make this async but its not a super big issue now since we're shutting down + self.chunk_saver.wait_for_lock_releases(); // then lets save the world info let result = self @@ -148,6 +149,10 @@ impl Level { self.loaded_chunks.len() } + pub fn cached_chunk_saver_count(&self) -> usize { + self.chunk_saver.cache_count() + } + pub fn list_cached(&self) { for entry in self.loaded_chunks.iter() { log::debug!("In map: {:?}", entry.key()); @@ -249,7 +254,7 @@ impl Level { .flatten() .collect::>(); - self.write_chunks(chunks_to_write).await; + self.write_chunks(&chunks_to_write).await; } pub async fn clean_chunk(self: &Arc, chunk: &Vector2) { @@ -278,7 +283,7 @@ impl Level { self.chunk_watchers.shrink_to_fit(); } } - pub async fn write_chunks(&self, chunks_to_write: Vec>>) { + pub async fn write_chunks(&self, chunks_to_write: &[Arc>]) { if chunks_to_write.is_empty() { return; } @@ -286,26 +291,21 @@ impl Level { let chunk_saver = self.chunk_saver.clone(); let level_folder = self.level_folder.clone(); - // TODO: Save the join handles to await them when stopping the server - tokio::spawn(async move { + let chunks_to_write = chunks_to_write.to_vec(); + + // TODO: Make this async + rayon::spawn(move || { let futures = chunks_to_write .iter() - .map(|chunk| chunk.read()) + .map(|chunk| chunk.blocking_read()) .collect::>(); - let mut chunks_guards = Vec::new(); - for guard in futures { - let chunk = guard.await; - chunks_guards.push(chunk); - } - - let chunks = chunks_guards + let chunks = futures .iter() .map(|chunk| (chunk.position, chunk.deref())) .collect::>(); - trace!("Writing chunks to disk {:}", chunks_guards.len()); - + trace!("Writing chunks to disk {:}", chunks.len()); if let Err(error) = chunk_saver.save_chunks(&level_folder, chunks.as_slice()) { log::error!("Failed writing Chunk to disk {}", error.to_string()); } diff --git a/pumpkin/src/entity/player.rs b/pumpkin/src/entity/player.rs index db47e40e3..9c1fac550 100644 --- a/pumpkin/src/entity/player.rs +++ b/pumpkin/src/entity/player.rs @@ -274,10 +274,11 @@ impl Player { level.clean_memory(&radial_chunks); log::debug!( - "Removed player id {} ({}) ({} chunks remain cached)", + "Removed player id {} ({}) ({} chunks, {} savers remain cached)", self.gameprofile.name, self.client.id, - level.loaded_chunk_count() + level.loaded_chunk_count(), + level.cached_chunk_saver_count(), ); //self.world().level.list_cached();