From 6ddccb2549023487dbe80baa7d65d301eb4fce60 Mon Sep 17 00:00:00 2001 From: "Ryan D. Friese" Date: Thu, 17 Oct 2024 13:32:18 -0700 Subject: [PATCH] updated local chunks to utilize handles instead of blocking api --- Cargo.toml | 6 +- examples/array_examples/global_lock_array.rs | 14 +- examples/array_examples/onesided_iteration.rs | 253 +++++++++--------- examples/darc_examples/darc.rs | 2 +- run_examples.sh | 8 +- src/array.rs | 27 +- src/array/global_lock_atomic.rs | 6 +- src/array/global_lock_atomic/handle.rs | 50 ++-- src/array/iterator/local_iterator.rs | 4 +- src/array/local_lock_atomic.rs | 4 +- src/array/local_lock_atomic/handle.rs | 227 ++++++++++++++-- src/array/local_lock_atomic/local_chunks.rs | 167 ++++-------- src/darc/global_rw_darc.rs | 34 ++- src/darc/handle.rs | 72 +++-- 14 files changed, 530 insertions(+), 344 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 52c206dd..b316d1dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -366,9 +366,9 @@ path="examples/array_examples/global_lock_array.rs" name="histo" path="examples/array_examples/histo.rs" -[[example]] -name="single_pe_array" -path="examples/array_examples/single_pe_array.rs" +#[[example]] +#name="single_pe_array" +#path="examples/array_examples/single_pe_array.rs" ##------------ RDMA Examples -----------------## [[example]] diff --git a/examples/array_examples/global_lock_array.rs b/examples/array_examples/global_lock_array.rs index dc18ee1f..82cbe5b6 100644 --- a/examples/array_examples/global_lock_array.rs +++ b/examples/array_examples/global_lock_array.rs @@ -11,7 +11,7 @@ fn main() { let s = Instant::now(); let local_data = array.read_local_data().block(); println!( - "PE{my_pe} time: {:?} {:?}", + "0. PE{my_pe} time: {:?} {:?}", s.elapsed().as_secs_f64(), local_data ); @@ -21,7 +21,7 @@ fn main() { world.barrier(); let mut local_data = array.write_local_data().block(); println!( - "PE{my_pe} time: {:?} got write lock", + "1. PE{my_pe} time: {:?} got write lock", s.elapsed().as_secs_f64() ); local_data.iter_mut().for_each(|elem| *elem = my_pe); @@ -29,23 +29,23 @@ fn main() { drop(local_data); array.print(); - println!("PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64()); + println!("2 .PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64()); - let mut local_data = array.collective_write_local_data().block(); + let mut local_data = world.block_on(array.collective_write_local_data()); println!( - "PE{my_pe} time: {:?} got collective write lock", + "3. PE{my_pe} time: {:?} got collective write lock", s.elapsed().as_secs_f64() ); local_data.iter_mut().for_each(|elem| *elem += my_pe); std::thread::sleep(Duration::from_secs(1)); drop(local_data); println!( - "PE{my_pe} time: {:?} dropped collective write lock", + "4. PE{my_pe} time: {:?} dropped collective write lock", s.elapsed().as_secs_f64() ); array.print(); - println!("PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64()); + println!("5. PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64()); array .read_lock() diff --git a/examples/array_examples/onesided_iteration.rs b/examples/array_examples/onesided_iteration.rs index 1ffb25bf..f38a6b55 100644 --- a/examples/array_examples/onesided_iteration.rs +++ b/examples/array_examples/onesided_iteration.rs @@ -1,136 +1,139 @@ -// use lamellar::array::prelude::*; -// const ARRAY_LEN: usize = 100; - -// fn main() { -// let world = lamellar::LamellarWorldBuilder::new().build(); -// let my_pe = world.my_pe(); -// let _num_pes = world.num_pes(); -// let block_array = AtomicArray::::new(world.team(), ARRAY_LEN, Distribution::Block); -// let cyclic_array = AtomicArray::::new(world.team(), ARRAY_LEN, Distribution::Cyclic); - -// //we are going to initialize the data on each PE by directly accessing its local data - -// block_array -// .mut_local_data() -// .iter() -// .for_each(|e| e.store(my_pe)); -// cyclic_array -// .mut_local_data() -// .iter() -// .for_each(|e| e.store(my_pe)); - -// // In this example we will make use of a onesided iterator which -// // enables us to iterate over the entire array on a single PE. -// // The runtime will manage transferring data from remote PEs. -// // Note that for UnsafeArrays, AtomicArrays, and LocalLockArrays, -// // there is no guarantee that by the time the transferred data -// // as arrived to the calling PE it has remained the same on the remote PE. -// // we do not currently provide a mutable one sided iterator. - -// if my_pe == 0 { -// println!("Here"); -// for elem in block_array.onesided_iter().into_iter() { -// //we can convert from a oneside iterator into a rust iterator -// print!("{:?} ", elem); -// } -// println!(""); -// println!("Here2"); -// for elem in cyclic_array.onesided_iter().into_iter() { -// print!("{:?} ", elem); -// } -// println!(""); -// } -// println!("Here3"); -// println!("--------------------------------------------------------"); - -// // The lamellar array iterator used above is lazy, meaning that it only accesses and returns a value as its used, -// // while this is generally efficent and results in low overhead, because an elem may actually exists on a remote node -// // latencies to retrieve the next value in the iterator are dependent on the location of the data, as a result of -// // the need to get the data. Further impacting performance is that typically the transfer of a single element will -// // likely be small, thus inefficiently utilizing network resources. -// // to address these issues, we have provided a buffered iterator, which will transfer "get" and store a block of data -// // into a buffer, from with the iterated values are returned. More effectively using network resources. From the users -// // standpoint the only thing that changes is the instatiation of the iterator. - -// if my_pe == 0 { -// for elem in block_array.buffered_onesided_iter(10).into_iter() { -// print!("{:?} ", elem); -// } -// println!(""); - -// for elem in cyclic_array.buffered_onesided_iter(10).into_iter() { -// print!("{:?} ", elem); -// } -// println!(""); -// } - -// println!("--------------------------------------------------------"); - -// // in addition to the buffered iters we also provide a method to iterate over chunks of a lamellar array, via -// // the chunks() method. Called on a OneSidedIterator this creates a chunk sized OneSidedMemoryRegion, -// // and then puts the appropriate date based on the iteration index into that region - -// if my_pe == 0 { -// for chunk in block_array.onesided_iter().chunks(10).skip(4).into_iter() { -// println!("{:?}", unsafe { chunk.as_slice() }); -// } -// println!("-----"); -// for chunk in cyclic_array.onesided_iter().chunks(10).into_iter() { -// println!("{:?}", unsafe { chunk.as_slice() }); -// } - -// println!("-----"); -// for (i, (a, b)) in cyclic_array -// .onesided_iter() -// .zip(block_array.onesided_iter()) -// .into_iter() -// .enumerate() -// { -// println!("{:?}: {:?} {:?}", i, a, b); -// } -// println!("-----"); -// for (a, b) in cyclic_array -// .onesided_iter() -// .chunks(10) -// .zip(block_array.onesided_iter().chunks(10)) -// .into_iter() -// { -// unsafe { -// println!("{:?} {:?}", a.as_slice(), b.as_slice()); -// } -// } -// } - -// println!("--------------------------------------------------------"); - -// // let block_array = UnsafeArray::::new(world.team(), ARRAY_LEN, Distribution::Block); -// // for elem in block_onesided_iter!($array,array).into_iter().step_by(4) {...} -// // for elem in block_array.buffered_onesided_iter(10) {...} - -// // //rust step_by pseudo code -// // fn step_by(&mut self, n: usize) -> Result{ -// // let val = self.next(); //grab val based on index -// // self.index += n; -// // val -// // } - -// // //-------------- -// // for elem in block_array.onesided_iter().step_by(4).into_iter() {...} -// } - use futures_util::stream::StreamExt; use lamellar::array::prelude::*; + +const ARRAY_LEN: usize = 100; + fn main() { - let world = LamellarWorldBuilder::new().build(); - let array = LocalLockArray::::new(&world, 8, Distribution::Block); + let world = lamellar::LamellarWorldBuilder::new().build(); let my_pe = world.my_pe(); let num_pes = world.num_pes(); - array.dist_iter_mut().for_each(move |e| *e = my_pe); //initialize array using a distributed iterator - array.wait_all(); + let block_array = AtomicArray::::new(world.team(), ARRAY_LEN, Distribution::Block); + let cyclic_array = AtomicArray::::new(world.team(), ARRAY_LEN, Distribution::Cyclic); + + //we are going to initialize the data on each PE by directly accessing its local data + + block_array + .mut_local_data() + .iter() + .for_each(|e| e.store(my_pe)); + cyclic_array + .mut_local_data() + .iter() + .for_each(|e| e.store(my_pe)); + + // In this example we will make use of a onesided iterator which + // enables us to iterate over the entire array on a single PE. + // The runtime will manage transferring data from remote PEs. + // Note that for UnsafeArrays, AtomicArrays, and LocalLockArrays, + // there is no guarantee that by the time the transferred data + // as arrived to the calling PE it has remained the same on the remote PE. + // we do not currently provide a mutable one sided iterator. + + if my_pe == 0 { + println!("Here"); + for elem in block_array.onesided_iter().into_iter() { + //we can convert from a oneside iterator into a rust iterator + print!("{:?} ", elem); + } + println!(""); + println!("Here2"); + for elem in cyclic_array.onesided_iter().into_iter() { + print!("{:?} ", elem); + } + println!(""); + } + println!("Here3"); + println!("--------------------------------------------------------"); + + // The lamellar array iterator used above is lazy, meaning that it only accesses and returns a value as its used, + // while this is generally efficent and results in low overhead, because an elem may actually exists on a remote node + // latencies to retrieve the next value in the iterator are dependent on the location of the data, as a result of + // the need to get the data. Further impacting performance is that typically the transfer of a single element will + // likely be small, thus inefficiently utilizing network resources. + // to address these issues, we have provided a buffered iterator, which will transfer "get" and store a block of data + // into a buffer, from with the iterated values are returned. More effectively using network resources. From the users + // standpoint the only thing that changes is the instatiation of the iterator. + + if my_pe == 0 { + for elem in block_array.buffered_onesided_iter(10).into_iter() { + print!("{:?} ", elem); + } + println!(""); + + for elem in cyclic_array.buffered_onesided_iter(10).into_iter() { + print!("{:?} ", elem); + } + println!(""); + } + + println!("--------------------------------------------------------"); + + // in addition to the buffered iters we also provide a method to iterate over chunks of a lamellar array, via + // the chunks() method. Called on a OneSidedIterator this creates a chunk sized OneSidedMemoryRegion, + // and then puts the appropriate date based on the iteration index into that region + + if my_pe == 0 { + for chunk in block_array.onesided_iter().chunks(10).skip(4).into_iter() { + println!("{:?}", unsafe { chunk.as_slice() }); + } + println!("-----"); + for chunk in cyclic_array.onesided_iter().chunks(10).into_iter() { + println!("{:?}", unsafe { chunk.as_slice() }); + } + + println!("-----"); + for (i, (a, b)) in cyclic_array + .onesided_iter() + .zip(block_array.onesided_iter()) + .into_iter() + .enumerate() + { + println!("{:?}: {:?} {:?}", i, a, b); + } + println!("-----"); + for (a, b) in cyclic_array + .onesided_iter() + .chunks(10) + .zip(block_array.onesided_iter().chunks(10)) + .into_iter() + { + unsafe { + println!("{:?} {:?}", a.as_slice(), b.as_slice()); + } + } + } + + println!("--------------------------------------------------------"); + + // let block_array = UnsafeArray::::new(world.team(), ARRAY_LEN, Distribution::Block); + // for elem in block_onesided_iter!($array,array).into_iter().step_by(4) {...} + // for elem in block_array.buffered_onesided_iter(10) {...} + + // //rust step_by pseudo code + // fn step_by(&mut self, n: usize) -> Result{ + // let val = self.next(); //grab val based on index + // self.index += n; + // val + // } + + // //-------------- + // for elem in block_array.onesided_iter().step_by(4).into_iter() {...} + // } + + // fn main() { + // let world = LamellarWorldBuilder::new().build(); + // let array = LocalLockArray::::new(&world, 8, Distribution::Block); + // let my_pe = world.my_pe(); + // let num_pes = world.num_pes(); + let block_array = block_array.into_local_lock(); + block_array + .dist_iter_mut() + .for_each(move |e| *e = my_pe) + .block(); //initialize array using a distributed iterator world.block_on(async move { if my_pe == 0 { - let result = array + let result = block_array .onesided_iter() .into_stream() .take(4) diff --git a/examples/darc_examples/darc.rs b/examples/darc_examples/darc.rs index 3a9215b8..dbfffdce 100644 --- a/examples/darc_examples/darc.rs +++ b/examples/darc_examples/darc.rs @@ -109,7 +109,7 @@ fn main() { // drop(darc2); // drop(wrapped); println!("changing darc type"); - let ro_darc = global_darc.blocking_into_localrw().blocking_into_darc(); // we can call into_darc directly on global_Darc, but string the operations for testing purposes + let ro_darc = global_darc.into_localrw().block().into_darc().block(); // we can call into_darc directly on global_Darc, but string the operations for testing purposes println!("read only darc"); ro_darc.print(); println!("done"); diff --git a/run_examples.sh b/run_examples.sh index 881b440f..e26e1a6a 100755 --- a/run_examples.sh +++ b/run_examples.sh @@ -44,11 +44,11 @@ for toolchain in stable; do #nightly; do fi cd .. sleep 2 - cur_tasks=`squeue -u frie869 | wc -l` - running_tasks=`squeue -u frie869 | grep " R " | wc -l` + cur_tasks=`squeue -u frie869 | grep frie869 |wc -l` + running_tasks=`squeue -u frie869 | grep frie869| grep " R " | wc -l` while [ $((cur_tasks+running_tasks)) -gt 6 ]; do - cur_tasks=`squeue -u frie869 | wc -l` - running_tasks=`squeue -u frie869 | grep " R " | wc -l` + cur_tasks=`squeue -u frie869 | grep frie869 | wc -l` + running_tasks=`squeue -u frie869 | grep frie869 | grep " R " | wc -l` sleep 5 done # fi diff --git a/src/array.rs b/src/array.rs index 63e3ab12..383fdbd8 100644 --- a/src/array.rs +++ b/src/array.rs @@ -191,29 +191,30 @@ pub struct ReduceKey { crate::inventory::collect!(ReduceKey); // impl Dist for bool {} -lamellar_impl::generate_reductions_for_type_rt!(true, u8, usize, isize); -lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, usize, isize); +// lamellar_impl::generate_reductions_for_type_rt!(true, u8, usize, isize); +// lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, usize, isize); // lamellar_impl::generate_reductions_for_type_rt!(false, f32); // lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32); // lamellar_impl::generate_reductions_for_type_rt!(false, u128); // lamellar_impl::generate_ops_for_type_rt!(true, false, true, u128); +// //------------------------------------ -// lamellar_impl::generate_reductions_for_type_rt!(true, u8, u16, u32, u64, usize); -// lamellar_impl::generate_reductions_for_type_rt!(false, u128); -// lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, u16, u32, u64, usize); -// lamellar_impl::generate_ops_for_type_rt!(true, false, true, u128); +lamellar_impl::generate_reductions_for_type_rt!(true, u8, u16, u32, u64, usize); +lamellar_impl::generate_reductions_for_type_rt!(false, u128); +lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, u16, u32, u64, usize); +lamellar_impl::generate_ops_for_type_rt!(true, false, true, u128); -// lamellar_impl::generate_reductions_for_type_rt!(true, i8, i16, i32, i64, isize); -// lamellar_impl::generate_reductions_for_type_rt!(false, i128); -// lamellar_impl::generate_ops_for_type_rt!(true, true, true, i8, i16, i32, i64, isize); -// lamellar_impl::generate_ops_for_type_rt!(true, false, true, i128); +lamellar_impl::generate_reductions_for_type_rt!(true, i8, i16, i32, i64, isize); +lamellar_impl::generate_reductions_for_type_rt!(false, i128); +lamellar_impl::generate_ops_for_type_rt!(true, true, true, i8, i16, i32, i64, isize); +lamellar_impl::generate_ops_for_type_rt!(true, false, true, i128); -// lamellar_impl::generate_reductions_for_type_rt!(false, f32, f64); -// lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32, f64); +lamellar_impl::generate_reductions_for_type_rt!(false, f32, f64); +lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32, f64); -// lamellar_impl::generate_ops_for_bool_rt!(); +lamellar_impl::generate_ops_for_bool_rt!(); impl Dist for Option {} impl ArrayOps for Option {} diff --git a/src/array/global_lock_atomic.rs b/src/array/global_lock_atomic.rs index d722afad..c7a126c0 100644 --- a/src/array/global_lock_atomic.rs +++ b/src/array/global_lock_atomic.rs @@ -996,15 +996,17 @@ impl GlobalLockArray { ///``` pub fn print(&self) { self.barrier(); - let _guard = self.read_local_data(); + // println!("printing array"); + let _guard = self.read_local_data().block(); self.array.print(); + // println!("done printing array"); } } impl ArrayPrint for GlobalLockArray { fn print(&self) { self.barrier(); - let _guard = self.read_local_data(); + let _guard = self.read_local_data().block(); self.array.print() } } diff --git a/src/array/global_lock_atomic/handle.rs b/src/array/global_lock_atomic/handle.rs index d15e7da1..f37b7622 100644 --- a/src/array/global_lock_atomic/handle.rs +++ b/src/array/global_lock_atomic/handle.rs @@ -65,13 +65,15 @@ impl GlobalLockReadHandle { /// let guard = handle.block(); ///``` pub fn block(self) -> GlobalLockReadGuard { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `GlobalLockReadHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } self.array.lock.darc.team().scheduler.block_on(self) @@ -140,13 +142,15 @@ impl GlobalLockLocalDataHandle { /// println!("local data: {:?}",local_data); ///``` pub fn block(self) -> GlobalLockLocalData { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `GlobalLockLocalDataHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } self.array.lock.darc.team().scheduler.block_on(self) @@ -219,13 +223,15 @@ impl GlobalLockWriteHandle { /// handle.block(); ///``` pub fn block(self) -> GlobalLockWriteGuard { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `GlobalLockWriteHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } self.array.lock.darc.team().scheduler.block_on(self) @@ -294,13 +300,15 @@ impl GlobalLockMutLocalDataHandle { /// local_data.iter_mut().for_each(|elem| *elem += my_pe); ///``` pub fn block(self) -> GlobalLockMutLocalData { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `GlobalLockLocalDataHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } self.array.lock.darc.team().scheduler.block_on(self) @@ -373,13 +381,15 @@ impl GlobalLockCollectiveMutLocalDataHandle { /// local_data.iter_mut().for_each(|elem| *elem += my_pe); ///``` pub fn block(self) -> GlobalLockCollectiveMutLocalData { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `GlobalLockCollectiveMutLocalDataHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } self.array.lock.darc.team().scheduler.block_on(self) diff --git a/src/array/iterator/local_iterator.rs b/src/array/iterator/local_iterator.rs index 01d7536f..cd719d13 100644 --- a/src/array/iterator/local_iterator.rs +++ b/src/array/iterator/local_iterator.rs @@ -766,8 +766,8 @@ pub trait IndexedLocalIterator: LocalIterator + SyncSend + IterClone + 'static { /// If the number of elements is not evenly divisible by `size`, the last chunk may be shorter than `size` /// /// # Note - /// If calling this on a LocalLockArray it may be possible to call [blocking_read_local_chunks](crate::array::LocalLockArray::blocking_read_local_chunks), [read_local_chunks](crate::array::LocalLockArray::read_local_chunks) - /// [blocking_write_local_chunks](crate::array::LocalLockArray::blocking_write_local_chunks), or [write_local_chunks](crate::array::LocalLockArray::blocking_write_local_chunks) for better performance + /// If calling this on a LocalLockArray it may be possible to call [read_local_chunks](crate::array::LocalLockArray::read_local_chunks) + /// or [write_local_chunks](crate::array::LocalLockArray::write_local_chunks) for better performance /// /// If calling this on an UnsafeArray it may be possible to call [local_chunks](crate::array::UnsafeArray::local_chunks) or [local_chunks_mut](crate::array::UnsafeArray::local_chunks_mut) /// diff --git a/src/array/local_lock_atomic.rs b/src/array/local_lock_atomic.rs index f13d8515..43d0a4bf 100644 --- a/src/array/local_lock_atomic.rs +++ b/src/array/local_lock_atomic.rs @@ -3,8 +3,8 @@ pub(crate) mod local_chunks; pub use local_chunks::{LocalLockLocalChunks, LocalLockLocalChunksMut}; mod handle; use handle::{ - LocalLockLocalDataHandle, LocalLockMutLocalDataHandle, LocalLockReadHandle, - LocalLockWriteHandle, + LocalLockLocalChunksHandle, LocalLockLocalChunksMutHandle, LocalLockLocalDataHandle, + LocalLockMutLocalDataHandle, LocalLockReadHandle, LocalLockWriteHandle, }; pub(crate) mod operations; mod rdma; diff --git a/src/array/local_lock_atomic/handle.rs b/src/array/local_lock_atomic/handle.rs index 23111fae..2c42dc5e 100644 --- a/src/array/local_lock_atomic/handle.rs +++ b/src/array/local_lock_atomic/handle.rs @@ -10,7 +10,10 @@ use crate::LocalLockArray; use futures_util::Future; use pin_project::pin_project; -use super::{LocalLockLocalData, LocalLockMutLocalData, LocalLockReadGuard, LocalLockWriteGuard}; +use super::{ + LocalLockLocalChunks, LocalLockLocalChunksMut, LocalLockLocalData, LocalLockMutLocalData, + LocalLockReadGuard, LocalLockWriteGuard, +}; #[must_use] #[pin_project] @@ -49,7 +52,7 @@ impl LocalLockReadHandle { } } - /// Handle used to retrieve the aquired read lock of a LocalLockArray within a non async context + /// Blocks the calling thread to retrieve the aquired read lock of a LocalLockArray within a non async context /// /// Returns an RAII guard which will drop the read access of the wrlock when dropped /// # Examples @@ -62,15 +65,16 @@ impl LocalLockReadHandle { /// let guard = handle.block(); ///``` pub fn block(self) -> LocalLockReadGuard { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `LocalLockReadHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } - self.array.lock.darc.team().scheduler.block_on(self) } } @@ -122,7 +126,7 @@ pub struct LocalLockLocalDataHandle { } impl LocalLockLocalDataHandle { - /// Handle used to retrieve the aquired local data [LocalLockLocalData] of a LocalLockArray within a non async context + /// Blocks the calling thread to retrieve the aquired local data [LocalLockLocalData] of a LocalLockArray within a non async context /// /// Returns an RAII guard which will drop the write access of the wrlock when dropped /// # Examples @@ -137,15 +141,16 @@ impl LocalLockLocalDataHandle { /// println!("local data: {:?}",local_data); ///``` pub fn block(self) -> LocalLockLocalData { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `LocalLockLocalDataHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } - self.array.lock.darc.team().scheduler.block_on(self) } } @@ -203,7 +208,7 @@ impl LocalLockWriteHandle { lock_handle: array.lock.write(), } } - /// Handle used to retrieve the aquired write lock of a LocalLockArray within a non async context + /// Blocks the calling thread to retrieve the aquired write lock of a LocalLockArray within a non async context /// /// Returns an RAII guard which will drop the write access of the wrlock when dropped /// # Examples @@ -216,13 +221,15 @@ impl LocalLockWriteHandle { /// handle.block(); ///``` pub fn block(self) -> LocalLockWriteGuard { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `LocalLockWriteHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } self.array.lock.darc.team().scheduler.block_on(self) @@ -276,7 +283,7 @@ pub struct LocalLockMutLocalDataHandle { } impl LocalLockMutLocalDataHandle { - /// Handle used to retrieve the aquired mutable local data [LocalLockMutLocalData] of a LocalLockArray within a non async context + /// Blocks the calling thread to retrieve the aquired mutable local data [LocalLockMutLocalData] of a LocalLockArray within a non async context /// /// Returns an RAII guard which will drop the write access of the wrlock when dropped /// # Examples @@ -291,13 +298,15 @@ impl LocalLockMutLocalDataHandle { /// local_data.iter_mut().for_each(|elem| *elem += my_pe); ///``` pub fn block(self) -> LocalLockMutLocalData { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `LocalLockLocalDataHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } self.array.lock.darc.team().scheduler.block_on(self) @@ -320,3 +329,175 @@ impl Future for LocalLockMutLocalDataHandle { } } } + +#[must_use] +#[pin_project] +/// Constructs a handle for immutably iterating over fixed sized chunks(slices) of the local data of this array. +/// This handle must be either await'd in an async context or block'd in an non-async context. +/// Awaiting or blocking will not return until the read lock has been acquired. +/// +/// the returned iterator is a lamellar [LocalIterator] and also captures a read lock on the local data. +/// +/// # Examples +///``` +/// use lamellar::array::prelude::*; +/// +/// let world = LamellarWorldBuilder::new().build(); +/// let array: LocalLockArray = LocalLockArray::new(&world,40,Distribution::Block); +/// let my_pe = world.my_pe(); +/// //block in a non-async context +/// let _ = array.read_local_chunks(5).block().enumerate().for_each(move|(i,chunk)| { +/// println!("PE: {my_pe} i: {i} chunk: {chunk:?}"); +/// }).block(); +/// +/// //await in an async context +/// world.block_on(async move { +/// let _ = array.read_local_chunks(5).await.enumerate().for_each(move|(i,chunk)| { +/// println!("PE: {my_pe} i: {i} chunk: {chunk:?}"); +/// }).await; +/// }); +/// +/// ``` +pub struct LocalLockLocalChunksHandle { + pub(crate) chunk_size: usize, + pub(crate) index: usize, //global index within the array local data + pub(crate) end_index: usize, //global index within the array local data + pub(crate) array: LocalLockArray, + #[pin] + pub(crate) lock_handle: LocalRwDarcReadHandle<()>, +} + +impl LocalLockLocalChunksHandle { + /// Blocks the calling thread to retrieve the aquired immutable local chunks iterator of a LocalLockArray within a non async context + /// + /// # Examples + ///``` + /// use lamellar::array::prelude::*; + /// + /// let world = LamellarWorldBuilder::new().build(); + /// let array: LocalLockArray = LocalLockArray::new(&world,40,Distribution::Block); + /// let my_pe = world.my_pe(); + /// //block in a non-async context + /// let _ = array.read_local_chunks(5).block().enumerate().for_each(move|(i,chunk)| { + /// println!("PE: {my_pe} i: {i} chunk: {chunk:?}"); + /// }).block(); + ///``` + pub fn block(self) -> LocalLockLocalChunks { + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" + [LAMELLAR WARNING] You are calling `LocalLockLocalChunksHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! + Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() + ); + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } + } + + self.array.lock.darc.team().scheduler.block_on(self) + } +} + +impl Future for LocalLockLocalChunksHandle { + type Output = LocalLockLocalChunks; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.lock_handle.poll(cx) { + Poll::Ready(val) => Poll::Ready(LocalLockLocalChunks { + chunk_size: *this.chunk_size, + index: *this.index, //global index within the array local data + end_index: *this.end_index, //global index within the array local data + array: this.array.clone(), + lock_guard: Arc::new(val), + }), + Poll::Pending => Poll::Pending, + } + } +} + +#[must_use] +#[pin_project] +/// A handle for mutably iterating over fixed sized chunks(slices) of the local data of this array. +/// This handle must be either await'd in an async context or block'd in an non-async context. +/// Awaiting or blocking will not return until the write lock has been acquired. +/// +/// the returned iterator is a lamellar [LocalIterator] and also captures a write lock on the local data. +/// +/// # Examples +///``` +/// use lamellar::array::prelude::*; +/// +/// let world = LamellarWorldBuilder::new().build(); +/// let array: LocalLockArray = LocalLockArray::new(&world,40,Distribution::Block); +/// let my_pe = world.my_pe(); +/// let _ = array.write_local_chunks(5).block().enumerate().for_each(move|(i, mut chunk)| { +/// for elem in chunk.iter_mut() { +/// *elem = i; +/// } +/// }).block(); +/// world.block_on(async move { +/// let _ = array.write_local_chunks(5).await.enumerate().for_each(move|(i, mut chunk)| { +/// for elem in chunk.iter_mut() { +/// *elem = i; +/// } +/// }).await; +/// }); +/// ``` +pub struct LocalLockLocalChunksMutHandle { + pub(crate) chunk_size: usize, + pub(crate) index: usize, //global index within the array local data + pub(crate) end_index: usize, //global index within the array local data + pub(crate) array: LocalLockArray, + #[pin] + pub(crate) lock_handle: LocalRwDarcWriteHandle<()>, +} + +impl LocalLockLocalChunksMutHandle { + /// Blocks the calling thread to retrieve the aquired mutable local chunks iterator of a LocalLockArray within a non async context + /// + /// # Examples + ///``` + /// use lamellar::array::prelude::*; + /// + /// let world = LamellarWorldBuilder::new().build(); + /// let array: LocalLockArray = LocalLockArray::new(&world,40,Distribution::Block); + /// let my_pe = world.my_pe(); + /// //block in a non-async context + /// let _ = array.write_local_chunks(5).block().enumerate().for_each(move|(i, mut chunk)| { + /// for elem in chunk.iter_mut() { + /// *elem = i; + /// } + /// }).block(); + ///``` + pub fn block(self) -> LocalLockLocalChunksMut { + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" + [LAMELLAR WARNING] You are calling `LocalLockLocalChunksMutHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! + Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() + ); + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } + } + + self.array.lock.darc.team().scheduler.block_on(self) + } +} + +impl Future for LocalLockLocalChunksMutHandle { + type Output = LocalLockLocalChunksMut; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.lock_handle.poll(cx) { + Poll::Ready(val) => Poll::Ready(LocalLockLocalChunksMut { + chunk_size: *this.chunk_size, + index: *this.index, //global index within the array local data + end_index: *this.end_index, //global index within the array local data + array: this.array.clone(), + lock_guard: Arc::new(val), + }), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/array/local_lock_atomic/local_chunks.rs b/src/array/local_lock_atomic/local_chunks.rs index dfdeb759..fe7e627c 100644 --- a/src/array/local_lock_atomic/local_chunks.rs +++ b/src/array/local_lock_atomic/local_chunks.rs @@ -9,16 +9,14 @@ use crate::memregion::Dist; use std::sync::Arc; /// An iterator over immutable (nonoverlapping) local chunks (of size chunk_size) of a [LocalLockArray] -/// This struct is created by calling [LocalLockArray::read_local_chunks] or [LocalLockArray::blocking_read_local_chunks] +/// This struct is created by awaiting or blocking on the handle returned by [LocalLockArray::read_local_chunks] #[derive(Clone)] pub struct LocalLockLocalChunks { - chunk_size: usize, - index: usize, //global index within the array local data - end_index: usize, //global index within the array local data - array: LocalLockArray, - // lock: LocalRwDarc<()>, - // lock_guard: Arc>, - lock_guard: Arc>, + pub(crate) chunk_size: usize, + pub(crate) index: usize, //global index within the array local data + pub(crate) end_index: usize, //global index within the array local data + pub(crate) array: LocalLockArray, + pub(crate) lock_guard: Arc>, } impl IterClone for LocalLockLocalChunks { @@ -35,15 +33,15 @@ impl IterClone for LocalLockLocalChunks { } /// An iterator over mutable (nonoverlapping) local chunks (of size chunk_size) of a [LocalLockArray] -/// This struct is created by calling [LocalLockArray""write_local_chunks] or [LocalLockArray::blocking_write_local_chunks] +/// This struct is created by awaiting or blocking on the handle returned by [LocalLockArray::write_local_chunks] pub struct LocalLockLocalChunksMut { // data: &'a mut [T], - chunk_size: usize, - index: usize, //global index within the array local data - end_index: usize, //global index within the array local data - array: LocalLockArray, + pub(crate) chunk_size: usize, + pub(crate) index: usize, //global index within the array local data + pub(crate) end_index: usize, //global index within the array local data + pub(crate) array: LocalLockArray, // lock: LocalRwDarc<()>, - lock_guard: Arc>, + pub(crate) lock_guard: Arc>, } impl IterClone for LocalLockLocalChunksMut { @@ -59,6 +57,12 @@ impl IterClone for LocalLockLocalChunksMut { } } +/// Provides mutable access to a chunk of a PEs local data to provide "local" indexing while maintaining safety guarantees of the array type. +/// +/// This derefences down to a `&mut [T]`. +/// +/// This struct is the item type returned when iterating over a [LocalLockLocalChunksMut] iterator created using [LocalLockArray::write_local_chunks]. +/// While the Local Chunk iterator is valid, each chunk is guaranteed to have exclusive access to the data it points to (allowing for the safe deref into `&mut [T]`), preventing any other local or remote access. #[derive(Debug)] pub struct LocalLockMutChunkLocalData<'a, T: Dist> { data: &'a mut [T], @@ -218,40 +222,11 @@ impl IndexedLocalIterator for LocalLockLocalChunksMut { } impl LocalLockArray { - /// mutably iterate over fixed sized chunks(slices) of the local data of this array. - /// the returned iterator is a lamellar [LocalIterator] and also captures a read lock on the local data. - /// This call will block the calling task until a read lock is acquired. + /// Constructs a handle for immutably iterating over fixed sized chunks(slices) of the local data of this array. + /// This handle must be either await'd in an async context or block'd in an non-async context. + /// Awaiting or blocking will not return until the read lock has been acquired. /// - /// # Examples - ///``` - /// use lamellar::array::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// let array: LocalLockArray = LocalLockArray::new(&world,40,Distribution::Block); - /// let my_pe = world.my_pe(); - /// world.block_on(async move { - /// let _ = array.read_local_chunks(5).await.enumerate().for_each(move|(i,chunk)| { - /// println!("PE: {my_pe} i: {i} chunk: {chunk:?}"); - /// }).spawn(); - /// array.await_all().await; - /// }); - /// ``` - pub async fn read_local_chunks(&self, chunk_size: usize) -> LocalLockLocalChunks { - let lock = Arc::new(self.lock.read().await); - LocalLockLocalChunks { - chunk_size, - index: 0, - end_index: 0, - array: self.clone(), - // lock: self.lock.clone(), - lock_guard: lock, - } - } - - /// immutably iterate over fixed sized chunks(slices) of the local data of this array. /// the returned iterator is a lamellar [LocalIterator] and also captures a read lock on the local data. - /// This call will block the calling thread until a read lock is acquired. - /// Calling within an asynchronous block may lead to deadlock, use [read_lock](self::LocalLockArray::read_local_chunks) instead. /// /// # Examples ///``` @@ -260,68 +235,34 @@ impl LocalLockArray { /// let world = LamellarWorldBuilder::new().build(); /// let array: LocalLockArray = LocalLockArray::new(&world,40,Distribution::Block); /// let my_pe = world.my_pe(); - /// - /// let _ = array.blocking_read_local_chunks(5).enumerate().for_each(move|(i,chunk)| { + /// //block in a non-async context + /// let _ = array.read_local_chunks(5).block().enumerate().for_each(move|(i,chunk)| { /// println!("PE: {my_pe} i: {i} chunk: {chunk:?}"); /// }).block(); /// - /// ``` - pub fn blocking_read_local_chunks(&self, chunk_size: usize) -> LocalLockLocalChunks { - if std::thread::current().id() != *crate::MAIN_THREAD { - let msg = format!(" - [LAMELLAR WARNING] You are calling `LocalLockArray::blocking_read_local_chunks` from within an async context which may lead to deadlock, it is recommended that you use `read_local_chunks().await;` instead! - Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() - ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), - } - } - let lock = Arc::new(self.array.block_on(self.lock.read())); - LocalLockLocalChunks { - chunk_size, - index: 0, - end_index: 0, - array: self.clone(), - // lock: self.lock.clone(), - lock_guard: lock, - } - } - - /// mutably iterate over fixed sized chunks(slices) of the local data of this array. - /// the returned iterator is a lamellar [LocalIterator] and also captures the write lock on the local data. - /// This call will block the calling task until the write lock is acquired. - /// - /// # Examples - ///``` - /// use lamellar::array::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// let array: LocalLockArray = LocalLockArray::new(&world,40,Distribution::Block); - /// let my_pe = world.my_pe(); + /// //await in an async context /// world.block_on(async move { - /// let _ = array.write_local_chunks(5).await.enumerate().for_each(move|(i,chunk)| { + /// let _ = array.read_local_chunks(5).await.enumerate().for_each(move|(i,chunk)| { /// println!("PE: {my_pe} i: {i} chunk: {chunk:?}"); - /// }).spawn(); - /// array.await_all().await; + /// }).await; /// }); + /// /// ``` - pub async fn write_local_chunks(&self, chunk_size: usize) -> LocalLockLocalChunksMut { - let lock = Arc::new(self.lock.write().await); - LocalLockLocalChunksMut { + pub fn read_local_chunks(&self, chunk_size: usize) -> LocalLockLocalChunksHandle { + let lock = self.lock.read(); + LocalLockLocalChunksHandle { chunk_size, index: 0, end_index: 0, array: self.clone(), - // lock: self.lock.clone(), - lock_guard: lock, + lock_handle: lock, } } - - /// mutably iterate over fixed sized chunks(slices) of the local data of this array. - /// the returned iterator is a lamellar [LocalIterator] and also captures the write lock on the local data. - /// This call will block the calling thread until the write lock is acquired. - /// Calling within an asynchronous block may lead to deadlock, use [write_lock](self::LocalLockArray::write_local_chunks) instead. + /// Constructs a handle for mutably iterating over fixed sized chunks(slices) of the local data of this array. + /// This handle must be either await'd in an async context or block'd in an non-async context. + /// Awaiting or blocking will not return until the write lock has been acquired. + /// + /// the returned iterator is a lamellar [LocalIterator] and also captures a write lock on the local data. /// /// # Examples ///``` @@ -330,32 +271,28 @@ impl LocalLockArray { /// let world = LamellarWorldBuilder::new().build(); /// let array: LocalLockArray = LocalLockArray::new(&world,40,Distribution::Block); /// let my_pe = world.my_pe(); - /// - /// let _ = array.blocking_write_local_chunks(5).enumerate().for_each(move|(i,chunk)| { - /// println!("PE: {my_pe} i: {i} chunk: {chunk:?}"); - /// }).spawn(); - /// array.wait_all(); - /// + /// let _ = array.write_local_chunks(5).block().enumerate().for_each(move|(i, mut chunk)| { + /// for elem in chunk.iter_mut() { + /// *elem = i; + /// } + /// }).block(); + /// world.block_on(async move { + /// let _ = array.write_local_chunks(5).await.enumerate().for_each(move|(i, mut chunk)| { + /// for elem in chunk.iter_mut() { + /// *elem = i; + /// } + /// }).await; + /// }); /// ``` - pub fn blocking_write_local_chunks(&self, chunk_size: usize) -> LocalLockLocalChunksMut { - if std::thread::current().id() != *crate::MAIN_THREAD { - let msg = format!(" - [LAMELLAR WARNING] You are calling `LocalLockArray::blocking_write_local_chunks` from within an async context which may lead to deadlock, it is recommended that you use `write_local_chunks().await;` instead! - Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() - ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), - } - } - let lock = Arc::new(self.array.block_on(self.lock.write())); - LocalLockLocalChunksMut { + pub fn write_local_chunks(&self, chunk_size: usize) -> LocalLockLocalChunksMutHandle { + let lock = self.lock.write(); + LocalLockLocalChunksMutHandle { chunk_size, index: 0, end_index: 0, array: self.clone(), // lock: self.lock.clone(), - lock_guard: lock, + lock_handle: lock, } } } diff --git a/src/darc/global_rw_darc.rs b/src/darc/global_rw_darc.rs index 895cfc40..fb4d3d38 100644 --- a/src/darc/global_rw_darc.rs +++ b/src/darc/global_rw_darc.rs @@ -73,13 +73,23 @@ impl DistRwLock { while self.writer.load(Ordering::SeqCst) != self.team.num_pes { async_std::task::yield_now().await; } - // println!("\t{:?} inc read count {:?} {:?}",pe,self.readers.load(Ordering::SeqCst),self.writer.load(Ordering::SeqCst)); + // println!( + // "\t{:?} inc read count {:?} {:?}", + // _pe, + // self.readers.load(Ordering::SeqCst), + // self.writer.load(Ordering::SeqCst) + // ); self.readers.fetch_add(1, Ordering::SeqCst); if self.writer.load(Ordering::SeqCst) == self.team.num_pes { break; } self.readers.fetch_sub(1, Ordering::SeqCst); - // println!("\t{:?} writers exist dec read count {:?} {:?}",pe,self.readers.load(Ordering::SeqCst),self.writer.load(Ordering::SeqCst)); + // println!( + // "\t{:?} writers exist dec read count {:?} {:?}", + // _pe, + // self.readers.load(Ordering::SeqCst), + // self.writer.load(Ordering::SeqCst) + // ); } // println!( // "\t{:?} read locked {:?} {:?}", @@ -95,7 +105,12 @@ impl DistRwLock { { async_std::task::yield_now().await; } - // println!("\t{:?} write lock checking for readers {:?} {:?}",pe,self.readers.load(Ordering::SeqCst),self.writer.load(Ordering::SeqCst)); + // println!( + // "\t{:?} write lock checking for readers {:?} {:?}", + // pe, + // self.readers.load(Ordering::SeqCst), + // self.writer.load(Ordering::SeqCst) + // ); while self.readers.load(Ordering::SeqCst) != 0 { async_std::task::yield_now().await; } @@ -108,10 +123,17 @@ impl DistRwLock { } async fn async_collective_writer_lock(&self, pe: usize, collective_cnt: usize) { + println!("{:?} collective writer lock {:?}", pe, collective_cnt); // first lets set the normal writer lock, but will set it to a unique id all the PEs should have (it is initialized to num_pes+1 and is incremented by one after each lock) if pe == 0 { self.async_writer_lock(collective_cnt).await; } else { + // println!( + // "\t{:?} write lock checking for readers {:?} {:?}", + // pe, + // self.readers.load(Ordering::SeqCst), + // self.writer.load(Ordering::SeqCst) + // ); while self.writer.load(Ordering::SeqCst) != collective_cnt { async_std::task::yield_now().await; } @@ -187,7 +209,6 @@ impl DistRwLock { let _temp = self.collective_writer.fetch_add(1, Ordering::SeqCst); // println!("collective unlock PE{:?} {:?} {:?} {:?}",pe,temp,self.collective_writer.load(Ordering::SeqCst),self.team.num_pes); while self.collective_writer.load(Ordering::SeqCst) != self.team.num_pes { - // async_std::task::yield_now().await; } //we have all entered the unlock @@ -217,7 +238,7 @@ struct LockAm { #[lamellar_impl::rt_am] impl LamellarAM for LockAm { async fn exec() { - // println!("In lock am {:?}",self); + // println!("In lock am {:?}", self); // let lock = { let rwlock = unsafe { &*(self.rwlock_addr as *mut DarcInner>) }.item(); //we dont actually care about the "type" we wrap here, we just need access to the meta data for the darc match self.lock_type { @@ -232,7 +253,7 @@ impl LamellarAM for LockAm { } } // }; - // println!("finished lock am"); + // println!("finished lock am {:?}", self); } } @@ -256,6 +277,7 @@ impl LamellarAM for UnlockAm { } } } + // println!("Finished in unlock am {:?}", self); } } diff --git a/src/darc/handle.rs b/src/darc/handle.rs index 46acb340..983c2e15 100644 --- a/src/darc/handle.rs +++ b/src/darc/handle.rs @@ -98,13 +98,15 @@ impl LocalRwDarcReadHandle { /// ///``` pub fn block(self) -> LocalRwDarcReadGuard { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `LocalRwDarcReadHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } let inner_darc = self.darc.darc.clone(); @@ -216,13 +218,15 @@ impl LocalRwDarcWriteHandle { /// *guard += my_pe; ///``` pub fn block(self) -> LocalRwDarcWriteGuard { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `LocalRwDarcWriteHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } let inner_darc = self.darc.darc.clone(); @@ -329,13 +333,15 @@ impl GlobalRwDarcReadHandle { /// println!("the current counter value on pe {} main thread = {}",my_pe,*guard); ///``` pub fn block(self) -> GlobalRwDarcReadGuard { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `GlobalRwDarcReadHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } let _ = self.lock_am.blocking_wait(); @@ -425,13 +431,15 @@ impl GlobalRwDarcWriteHandle { /// *guard += my_pe; ///``` pub fn block(self) -> GlobalRwDarcWriteGuard { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `GlobalRwDarcWriteHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } let _ = self.lock_am.blocking_wait(); @@ -500,13 +508,15 @@ impl GlobalRwDarcCollectiveWriteHandle { /// let mut guard = handle.block(); //block until we get the write lock /// *guard += my_pe; pub fn block(self) -> GlobalRwDarcCollectiveWriteGuard { - let msg = format!(" + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" [LAMELLAR WARNING] You are calling `GlobalRwDarcCollectiveWriteHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } } let _ = self.lock_am.blocking_wait(); @@ -699,7 +709,7 @@ pub struct IntoLocalRwDarcHandle { } impl IntoLocalRwDarcHandle { - /// Used to drive to conversion of a [Darc] or [GlobalRwDarc] into a [LocalRwDarc] + /// Used to drive to conversion of a [Darc] or [GlobalRwDarc] into a [LocalRwDarc] /// # Examples /// ///``` @@ -709,6 +719,16 @@ impl IntoLocalRwDarcHandle { /// let five = GlobalRwDarc::new(&world,5).expect("PE in world team"); /// let five_as_localrw = five.into_localrw().block(); pub fn block(self) -> LocalRwDarc { + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" + [LAMELLAR WARNING] You are calling `GlobalRwDarcCollectiveWriteHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! + Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() + ); + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } + } self.team.clone().block_on(self) } } @@ -775,6 +795,16 @@ impl IntoGlobalRwDarcHandle { /// let five = LocalRwDarc::new(&world,5).expect("PE in world team"); /// let five_as_globalrw = five.into_globalrw().block(); pub fn block(self) -> GlobalRwDarc { + if std::thread::current().id() != *crate::MAIN_THREAD { + let msg = format!(" + [LAMELLAR WARNING] You are calling `GlobalRwDarcCollectiveWriteHandle::block` from within an async context which may lead to deadlock, it is recommended that you use `.await;` instead! + Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() + ); + match config().blocking_call_warning { + Some(val) if val => println!("{msg}"), + _ => println!("{msg}"), + } + } self.team.clone().block_on(self) } }