Skip to content

Commit

Permalink
Merge pull request #2 from kralverde/bulk_chunk
Browse files Browse the repository at this point in the history
Add drop guard for locks and rework some logic
  • Loading branch information
Mili-ssm authored Feb 21, 2025
2 parents 4e34235 + e46956b commit cab6493
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 76 deletions.
2 changes: 2 additions & 0 deletions pumpkin-world/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ dashmap = "6.1"

num-traits = "0.2"

parking_lot = "0.12.3"

# Compression
flate2 = "1.0"
lz4 = "1.28"
Expand Down
169 changes: 115 additions & 54 deletions pumpkin-world/src/chunks_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use dashmap::DashMap;
use log::{error, trace};
use pumpkin_util::math::vector2::Vector2;
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
use tokio::sync::RwLock;

use crate::{
chunk::{ChunkReadingError, ChunkWritingError},
Expand Down Expand Up @@ -56,6 +55,10 @@ where
folder: &LevelFolder,
chunks_data: &[(Vector2<i32>, &D)],
) -> Result<(), ChunkWritingError>;

fn cache_count(&self) -> usize;

fn wait_for_lock_releases(&self);
}

/// Trait to serialize and deserialize the chunk data to and from bytes.
Expand Down Expand Up @@ -85,63 +88,107 @@ pub trait ChunkSerializer: Send + Sync + Sized + Default {
/// It also avoid IO operations that could produce dataraces thanks to the
/// DashMap that manages the locks for the files.
pub struct ChunkFileManager<S: ChunkSerializer> {
file_locks: Arc<DashMap<PathBuf, Arc<RwLock<S>>>>,
// TODO: Make file reading/writing async

// Dashmap has rw-locks on shards, but we want per-serializer
file_locks: DashMap<PathBuf, Arc<parking_lot::RwLock<S>>>,
_serializer: std::marker::PhantomData<S>,
}

impl<S: ChunkSerializer> Default for ChunkFileManager<S> {
fn default() -> Self {
Self {
file_locks: Arc::default(),
file_locks: DashMap::default(),
_serializer: std::marker::PhantomData,
}
}
}

impl<S: ChunkSerializer> ChunkFileManager<S> {
pub fn read_file(&self, path: &Path) -> Result<Arc<RwLock<S>>, ChunkReadingError> {
// We get the entry from the DashMap and try to insert a new lock if it doesn't exist
// using dead-lock save methods like `or_try_insert_with`
pub struct ChunkFileManagerLockGuard<'a, S: ChunkSerializer> {
lock: Arc<parking_lot::RwLock<S>>,
parent: &'a ChunkFileManager<S>,
key: PathBuf,
}

if let Some(serializer) = &self.file_locks.get(path) {
trace!("Using cached file: {:?}", path);
return Ok(serializer.value().clone());
}
impl<'a, S: ChunkSerializer> ChunkFileManagerLockGuard<'a, S> {
fn new(
key: PathBuf,
parent: &'a ChunkFileManager<S>,
lock: Arc<parking_lot::RwLock<S>>,
) -> Self {
Self { key, parent, lock }
}

let serializer = &self
fn write(&self) -> parking_lot::RwLockWriteGuard<'_, S> {
self.lock.write()
}

fn read(&self) -> parking_lot::RwLockReadGuard<'_, S> {
self.lock.read()
}
}

impl<S: ChunkSerializer> Drop for ChunkFileManagerLockGuard<'_, S> {
fn drop(&mut self) {
// If we can aquire a write lock, that means nothing else is locking this -> drop it

Check warning on line 133 in pumpkin-world/src/chunks_io.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"aquire" should be "acquire".
let _ = self
.parent
.file_locks
.entry(path.to_path_buf())
.or_try_insert_with(|| {
trace!("Reading file from Disk: {:?}", path);
let file = OpenOptions::new()
.write(false)
.read(true)
.create(false)
.truncate(false)
.open(path)
.map_err(|err| match err.kind() {
ErrorKind::NotFound => ChunkReadingError::ChunkNotExist,
kind => ChunkReadingError::IoError(kind),
});

match file {
Ok(file) => {
let file_bytes = file
.bytes()
.collect::<Result<Vec<_>, _>>()
.map_err(|err| ChunkReadingError::IoError(err.kind()))?;

Ok(Arc::new(RwLock::new(S::from_bytes(&file_bytes)?)))
}
Err(ChunkReadingError::ChunkNotExist) => {
Ok(Arc::new(RwLock::new(S::default())))
}
Err(err) => Err(err),
}
})?
.downgrade();
.remove_if(&self.key, |_, value| value.try_write().is_some());
}
}

Ok(serializer.value().clone())
impl<S: ChunkSerializer> ChunkFileManager<S> {
pub fn read_file(
&self,
path: &Path,
) -> Result<ChunkFileManagerLockGuard<'_, S>, ChunkReadingError> {
// We get the entry from the DashMap and try to insert a new lock if it doesn't exist
// using dead-lock safe methods like `or_try_insert_with`

let locked_ref = if let Some(serializer) = self.file_locks.get(path) {
trace!("Using cached chunk serializer: {:?}", path);
serializer
} else {
self.file_locks
.entry(path.to_path_buf())
.or_try_insert_with(|| {
trace!("Reading file from Disk: {:?}", path);
let file = OpenOptions::new()
.write(false)
.read(true)
.create(false)
.truncate(false)
.open(path)
.map_err(|err| match err.kind() {
ErrorKind::NotFound => ChunkReadingError::ChunkNotExist,
kind => ChunkReadingError::IoError(kind),
});

let value = match file {
Ok(file) => {
let file_bytes = file
.bytes()
.collect::<Result<Vec<_>, _>>()
.map_err(|err| ChunkReadingError::IoError(err.kind()))?;

S::from_bytes(&file_bytes)?
}
Err(ChunkReadingError::ChunkNotExist) => S::default(),
Err(err) => return Err(err),
};

Ok(Arc::new(parking_lot::RwLock::new(value)))
})?
.downgrade()
};

let lock = locked_ref.clone();
Ok(ChunkFileManagerLockGuard::new(
path.to_path_buf(),
self,
lock,
))
}

pub fn write_file(&self, path: &Path, serializer: &S) -> Result<(), ChunkWritingError> {
Expand Down Expand Up @@ -227,12 +274,9 @@ where
};

// We need to block the read to avoid other threads to write/modify the data

let chunk_guard = tokio::task::block_in_place(|| chunk_serializer.blocking_read());
let fetched_chunks = chunk_guard.get_chunks_data(chunks.as_slice());
drop(chunk_guard);

fetched_chunks
// NOTE: Currently this is done in a rayon thread, so async is not needed
let chunk_guard = chunk_serializer.read();
chunk_guard.get_chunks_data(chunks.as_slice())
})
.collect();

Expand Down Expand Up @@ -267,6 +311,8 @@ where
.try_for_each(|(file_name, chunks)| {
let path = folder.region_folder.join(file_name);

//TODO: Do we need to read the chunk from file to write it every time? Cant we just write to
//offsets in the file? Especially for the anvil format.
let chunk_serializer = match self.read_file(&path) {
Ok(file) => Ok(file),
Err(ChunkReadingError::ChunkNotExist) => {
Expand All @@ -283,21 +329,36 @@ where
}?;

// We need to block the read to avoid other threads to write/modify/read the data
let mut chunk_guard =
tokio::task::block_in_place(|| chunk_serializer.blocking_write());
// NOTE: We currently call this from a rayon thread so no async is needed.
let mut chunk_guard = chunk_serializer.write();
chunk_guard.add_chunks_data(chunks.as_slice())?;

// With the modification done, we can drop the write lock but keep the read lock
// to avoid other threads to write/modify the data, but allow other threads to read it
let chunk_guard = chunk_guard.downgrade();
let chunk_guard = parking_lot::RwLockWriteGuard::downgrade(chunk_guard);
self.write_file(&path, &chunk_guard)?;
drop(chunk_guard);

Ok(())
})?;

self.clean_cache(&paths);

Ok(())
}

fn cache_count(&self) -> usize {
self.file_locks.len()
}

fn wait_for_lock_releases(&self) {
let locks: Vec<_> = self
.file_locks
.iter()
.map(|entry| entry.value().clone())
.collect();

// Aquire a write lock on all entries to verify they are complete

Check warning on line 359 in pumpkin-world/src/chunks_io.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Aquire" should be "Acquire".
for lock in locks {
let _lock = lock.write();
}
}
}
40 changes: 20 additions & 20 deletions pumpkin-world/src/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use tokio::{
use crate::{
chunk::{ChunkData, anvil::AnvilChunkFile, linear::LinearFile},
chunks_io::{ChunkFileManager, ChunkIO, LoadedData},
generation::{ Seed, WorldGenerator, get_world_gen},
lock::{LevelLocker,anvil::AnvilLevelLocker },
generation::{Seed, WorldGenerator, get_world_gen},
lock::{LevelLocker, anvil::AnvilLevelLocker},
world_info::{
LevelData, WorldInfoError, WorldInfoReader, WorldInfoWriter,
anvil::{AnvilLevelInfo, LEVEL_DAT_BACKUP_FILE_NAME, LEVEL_DAT_FILE_NAME},
Expand Down Expand Up @@ -120,16 +120,17 @@ impl Level {

pub async fn save(&self) {
log::info!("Saving level...");
// chunks are automatically saved when all players get removed
// TODO: Await chunks that have been called by this ^

// save all stragling chunks
let chunks_to_write = self
.loaded_chunks
.iter()
.map(|chunk| chunk.value().clone())
.collect::<Vec<_>>();
self.write_chunks(chunks_to_write).await;
self.write_chunks(&chunks_to_write).await;

// wait for chunks currently saving in other threads
// TODO: Make this async but its not a super big issue now since we're shutting down
self.chunk_saver.wait_for_lock_releases();

// then lets save the world info
let result = self
Expand All @@ -148,6 +149,10 @@ impl Level {
self.loaded_chunks.len()
}

pub fn cached_chunk_saver_count(&self) -> usize {
self.chunk_saver.cache_count()
}

pub fn list_cached(&self) {
for entry in self.loaded_chunks.iter() {
log::debug!("In map: {:?}", entry.key());
Expand Down Expand Up @@ -249,7 +254,7 @@ impl Level {
.flatten()
.collect::<Vec<_>>();

self.write_chunks(chunks_to_write).await;
self.write_chunks(&chunks_to_write).await;
}

pub async fn clean_chunk(self: &Arc<Self>, chunk: &Vector2<i32>) {
Expand Down Expand Up @@ -278,34 +283,29 @@ impl Level {
self.chunk_watchers.shrink_to_fit();
}
}
pub async fn write_chunks(&self, chunks_to_write: Vec<Arc<RwLock<ChunkData>>>) {
pub async fn write_chunks(&self, chunks_to_write: &[Arc<RwLock<ChunkData>>]) {
if chunks_to_write.is_empty() {
return;
}

let chunk_saver = self.chunk_saver.clone();
let level_folder = self.level_folder.clone();

// TODO: Save the join handles to await them when stopping the server
tokio::spawn(async move {
let chunks_to_write = chunks_to_write.to_vec();

// TODO: Make this async
rayon::spawn(move || {
let futures = chunks_to_write
.iter()
.map(|chunk| chunk.read())
.map(|chunk| chunk.blocking_read())
.collect::<Vec<_>>();

let mut chunks_guards = Vec::new();
for guard in futures {
let chunk = guard.await;
chunks_guards.push(chunk);
}

let chunks = chunks_guards
let chunks = futures
.iter()
.map(|chunk| (chunk.position, chunk.deref()))
.collect::<Vec<_>>();

trace!("Writing chunks to disk {:}", chunks_guards.len());

trace!("Writing chunks to disk {:}", chunks.len());
if let Err(error) = chunk_saver.save_chunks(&level_folder, chunks.as_slice()) {
log::error!("Failed writing Chunk to disk {}", error.to_string());
}
Expand Down
5 changes: 3 additions & 2 deletions pumpkin/src/entity/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,11 @@ impl Player {
level.clean_memory(&radial_chunks);

log::debug!(
"Removed player id {} ({}) ({} chunks remain cached)",
"Removed player id {} ({}) ({} chunks, {} savers remain cached)",
self.gameprofile.name,
self.client.id,
level.loaded_chunk_count()
level.loaded_chunk_count(),
level.cached_chunk_saver_count(),
);

//self.world().level.list_cached();
Expand Down

0 comments on commit cab6493

Please sign in to comment.