From fa0c64f063c2df13515e61e12991c8e1dda9f07e Mon Sep 17 00:00:00 2001 From: "Ryan D. Friese" Date: Wed, 16 Oct 2024 10:36:02 -0700 Subject: [PATCH] convert darc into methods to handle based rather than explict blocking apis --- examples/active_message_examples/am_local.rs | 8 +- .../active_message_examples/recursive_am.rs | 2 +- examples/array_examples/global_lock_array.rs | 9 +- .../global_lock_atomic_array_put_bw.rs | 2 +- .../local_lock_atomic_array_put_bw.rs | 2 +- examples/darc_examples/darc.rs | 6 +- examples/kernels/am_flops.rs | 2 +- examples/kernels/cached_am_gemm.rs | 2 +- examples/kernels/dft_proxy.rs | 6 +- examples/misc/dist_hashmap.rs | 2 +- src/array.rs | 7 +- src/darc.rs | 204 +++---------- src/darc/global_rw_darc.rs | 194 +++--------- src/darc/handle.rs | 277 +++++++++++++++++- src/darc/local_rw_darc.rs | 238 +++------------ src/scheduler/tokio_executor.rs | 1 + tests/array/arithmetic_ops/add_test.rs | 6 +- tests/array/arithmetic_ops/div_test.rs | 2 +- tests/array/arithmetic_ops/fetch_add_test.rs | 9 +- tests/array/arithmetic_ops/fetch_div_test.rs | 2 +- tests/array/arithmetic_ops/fetch_mul_test.rs | 2 +- tests/array/arithmetic_ops/fetch_rem_test.rs | 2 +- tests/array/arithmetic_ops/fetch_sub_test.rs | 2 +- tests/array/arithmetic_ops/mul_test.rs | 2 +- tests/array/arithmetic_ops/rem_test.rs | 2 +- tests/array/arithmetic_ops/sub_test.rs | 2 +- tests/array/atomic_ops/load_store_test.rs | 2 +- tests/array/bitwise_ops/and_test.rs | 2 +- tests/array/bitwise_ops/fetch_and_test.rs | 2 +- tests/array/bitwise_ops/fetch_or_test.rs | 2 +- tests/array/bitwise_ops/fetch_xor_test.rs | 2 +- tests/array/bitwise_ops/or_test.rs | 2 +- tests/array/bitwise_ops/xor_test.rs | 2 +- tests/array/rdma/put_test.rs | 2 +- 34 files changed, 433 insertions(+), 576 deletions(-) diff --git a/examples/active_message_examples/am_local.rs b/examples/active_message_examples/am_local.rs index f7ca8240..10f2150d 100644 --- a/examples/active_message_examples/am_local.rs +++ b/examples/active_message_examples/am_local.rs @@ -113,18 +113,18 @@ fn main() { println!("-----------------------------------"); // println!("---------------------------------------------------------------"); // println!("Testing local am no return"); - // let res = world.exec_am_pe(my_pe, am.clone()).blocking_wait(); + // let res = world.exec_am_pe(my_pe, am.clone()).block(); // assert_eq!(res, None); // println!("no return result: {:?}", res); // println!("-----------------------------------"); // println!("Testing remote am no return"); - // let res = world.exec_am_pe(num_pes - 1, am.clone()).blocking_wait(); + // let res = world.exec_am_pe(num_pes - 1, am.clone()).block(); // assert_eq!(res, None); // println!("no return result: {:?}", res); // println!("-----------------------------------"); // println!("Testing all am no return"); // println!("[{:?}] exec on all", my_pe); - // let res = world.exec_am_all(am.clone()).blocking_wait(); + // let res = world.exec_am_all(am.clone()).block(); // assert!(res.iter().all(|x| x.is_none())); // println!("no return result: {:?}", res); // println!("---------------------------------------------------------------"); @@ -132,7 +132,7 @@ fn main() { // println!("---------------------------------------------------------------"); // println!("Testing ring pattern am no return"); - // let res = world.exec_am_pe((my_pe + 1) % num_pes, am.clone()).blocking_wait(); + // let res = world.exec_am_pe((my_pe + 1) % num_pes, am.clone()).block(); // assert_eq!(res, None); // println!("no return result: {:?}", res); // println!("-----------------------------------"); diff --git a/examples/active_message_examples/recursive_am.rs b/examples/active_message_examples/recursive_am.rs index d6bdbc85..49e8f1c4 100644 --- a/examples/active_message_examples/recursive_am.rs +++ b/examples/active_message_examples/recursive_am.rs @@ -43,7 +43,7 @@ impl LamellarAM for RecursiveAM { orig: self.orig, }, ); - // let mut res = next.blocking_wait().expect("error returning from am"); // this will cause deadlock + // let mut res = next.block().expect("error returning from am"); // this will cause deadlock let mut res = next.await; res.push(hostname::get().unwrap().into_string().unwrap()); //append my host name to list returned from previous call res diff --git a/examples/array_examples/global_lock_array.rs b/examples/array_examples/global_lock_array.rs index 58483778..dc18ee1f 100644 --- a/examples/array_examples/global_lock_array.rs +++ b/examples/array_examples/global_lock_array.rs @@ -9,7 +9,7 @@ fn main() { let array = GlobalLockArray::::new(&world, 100, Distribution::Block); let s = Instant::now(); - let local_data = array.blocking_read_local_data(); + let local_data = array.read_local_data().block(); println!( "PE{my_pe} time: {:?} {:?}", s.elapsed().as_secs_f64(), @@ -19,7 +19,7 @@ fn main() { drop(local_data); //release the lock world.barrier(); - let mut local_data = array.blocking_write_local_data(); + let mut local_data = array.write_local_data().block(); println!( "PE{my_pe} time: {:?} got write lock", s.elapsed().as_secs_f64() @@ -31,7 +31,7 @@ fn main() { array.print(); println!("PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64()); - let mut local_data = array.blocking_collective_write_local_data(); + let mut local_data = array.collective_write_local_data().block(); println!( "PE{my_pe} time: {:?} got collective write lock", s.elapsed().as_secs_f64() @@ -48,7 +48,8 @@ fn main() { println!("PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64()); array - .blocking_read_lock() + .read_lock() + .block() .dist_iter() .enumerate() .for_each(move |(i, elem)| { diff --git a/examples/bandwidths/global_lock_atomic_array_put_bw.rs b/examples/bandwidths/global_lock_atomic_array_put_bw.rs index 919521f2..248b57f7 100644 --- a/examples/bandwidths/global_lock_atomic_array_put_bw.rs +++ b/examples/bandwidths/global_lock_atomic_array_put_bw.rs @@ -67,7 +67,7 @@ fn main() { let cur_t = timer.elapsed().as_secs_f64(); if my_pe == 0 { for j in (0..2_u64.pow(exp) as usize).step_by(num_bytes as usize) { - let local_data = array.blocking_read_local_data(); + let local_data = array.read_local_data().block(); while *(&local_data[(j + num_bytes as usize) - 1]) == 255 as u8 { println!( "this should not happen {:?}", diff --git a/examples/bandwidths/local_lock_atomic_array_put_bw.rs b/examples/bandwidths/local_lock_atomic_array_put_bw.rs index 673aa22a..1b857e1f 100644 --- a/examples/bandwidths/local_lock_atomic_array_put_bw.rs +++ b/examples/bandwidths/local_lock_atomic_array_put_bw.rs @@ -67,7 +67,7 @@ fn main() { let cur_t = timer.elapsed().as_secs_f64(); if my_pe == num_pes - 1 { for j in (0..2_u64.pow(exp) as usize).step_by(num_bytes as usize) { - let local_data = array.blocking_read_local_data(); + let local_data = array.read_local_data().block(); while *(&local_data[(j + num_bytes as usize) - 1]) == 255 as u8 { println!( "this should not happen {:?}", diff --git a/examples/darc_examples/darc.rs b/examples/darc_examples/darc.rs index 08beb185..3a9215b8 100644 --- a/examples/darc_examples/darc.rs +++ b/examples/darc_examples/darc.rs @@ -56,10 +56,10 @@ fn main() { )); let global_darc = GlobalRwDarc::new(world.team(), 0).unwrap(); - let read_lock = global_darc.blocking_read(); + let read_lock = global_darc.read().block(); println!("I have the read lock!!!! {:?}", my_pe); drop(read_lock); - let write_lock = global_darc.blocking_write(); + let write_lock = global_darc.write().block(); println!("I have the write lock!!!! {:?}", my_pe); std::thread::sleep(std::time::Duration::from_secs(1)); drop(write_lock); @@ -100,7 +100,7 @@ fn main() { tg.add_am_all(darc_am); team.block_on(tg.exec()); } else { - *local_darc.blocking_write() += 1; + *local_darc.write().block() += 1; } } // -------- diff --git a/examples/kernels/am_flops.rs b/examples/kernels/am_flops.rs index 7783f39b..2e975cc8 100644 --- a/examples/kernels/am_flops.rs +++ b/examples/kernels/am_flops.rs @@ -150,7 +150,7 @@ fn main() { // let cur_t = timer.elapsed().as_secs_f64(); // let tot_flop: usize = reqs // .iter() - // .map(|r| r.blocking_wait().iter().map(|r| r.unwrap()).sum::()) + // .map(|r| r.block().iter().map(|r| r.unwrap()).sum::()) // .sum(); // let task_granularity = ((cur_t * 24f64) / num_tasks as f64) * 1000.0f64; // if my_pe == 0 { diff --git a/examples/kernels/cached_am_gemm.rs b/examples/kernels/cached_am_gemm.rs index b0c9c7d5..4e5c7567 100644 --- a/examples/kernels/cached_am_gemm.rs +++ b/examples/kernels/cached_am_gemm.rs @@ -255,7 +255,7 @@ fn main() { tasks += 1; } // for req in reqs { - // req.blocking_wait(); + // req.block(); // } } diff --git a/examples/kernels/dft_proxy.rs b/examples/kernels/dft_proxy.rs index e6b56040..3980f7ad 100644 --- a/examples/kernels/dft_proxy.rs +++ b/examples/kernels/dft_proxy.rs @@ -786,7 +786,7 @@ fn main() { // println!( // "{:?} array sum: {:?} time: {:?}", // my_pe, - // full_spectrum_array.sum().blocking_wait(), + // full_spectrum_array.sum().block(), // time // ); // } @@ -807,7 +807,7 @@ fn main() { // println!( // "{:?} array sum: {:?} time: {:?}", // my_pe, - // full_spectrum_array.sum().blocking_wait(), + // full_spectrum_array.sum().block(), // time // ); // } @@ -857,7 +857,7 @@ fn main() { // println!( // "{:?} array sum: {:?} time: {:?}", // my_pe, - // full_spectrum_array.sum().blocking_wait(), + // full_spectrum_array.sum().block(), // time // ); // } diff --git a/examples/misc/dist_hashmap.rs b/examples/misc/dist_hashmap.rs index e04d93c8..f442a1f4 100644 --- a/examples/misc/dist_hashmap.rs +++ b/examples/misc/dist_hashmap.rs @@ -112,6 +112,6 @@ fn main() { world.barrier(); println!( "[{my_pe}] local data: {:?}", - distributed_map.data.blocking_read() + distributed_map.data.read().block() ); } diff --git a/src/array.rs b/src/array.rs index b4f68a68..63e3ab12 100644 --- a/src/array.rs +++ b/src/array.rs @@ -194,8 +194,9 @@ crate::inventory::collect!(ReduceKey); lamellar_impl::generate_reductions_for_type_rt!(true, u8, usize, isize); lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, usize, isize); -lamellar_impl::generate_reductions_for_type_rt!(false, f32); -lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32); +// lamellar_impl::generate_reductions_for_type_rt!(false, f32); +// lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32); + // lamellar_impl::generate_reductions_for_type_rt!(false, u128); // lamellar_impl::generate_ops_for_type_rt!(true, false, true, u128); @@ -212,7 +213,7 @@ lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32); // lamellar_impl::generate_reductions_for_type_rt!(false, f32, f64); // lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32, f64); -lamellar_impl::generate_ops_for_bool_rt!(); +// lamellar_impl::generate_ops_for_bool_rt!(); impl Dist for Option {} impl ArrayOps for Option {} diff --git a/src/darc.rs b/src/darc.rs index 975decb7..0bcd1d60 100644 --- a/src/darc.rs +++ b/src/darc.rs @@ -78,6 +78,8 @@ pub(crate) mod global_rw_darc; use global_rw_darc::DistRwLock; pub use global_rw_darc::GlobalRwDarc; +use self::handle::{IntoGlobalRwDarcHandle, IntoLocalRwDarcHandle}; + pub(crate) mod handle; static DARC_ID: AtomicUsize = AtomicUsize::new(0); @@ -949,7 +951,7 @@ impl Darc { // the_darc.print(); weak } - fn inner(&self) -> &DarcInner { + pub(crate) fn inner(&self) -> &DarcInner { unsafe { self.inner.as_ref().expect("invalid darc inner ptr") } } fn inner_mut(&self) -> &mut DarcInner { @@ -1378,13 +1380,14 @@ impl Darc { #[doc(alias = "Collective")] /// Converts this Darc into a [LocalRwDarc] /// - /// This is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. + /// This returns a handle (which is Future) thats needs to be `awaited` or `blocked` on to perform the operation. + /// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the + /// Furthermore, the handle will not return while any additional references outside of the one making this call exist on each PE. It is not possible for the /// pointed to object to wrapped by both a Darc and a LocalRwDarc simultaneously (on any PE). /// /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) + /// Requires all PEs associated with the `darc` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) /// /// # Examples /// ``` @@ -1395,100 +1398,32 @@ impl Darc { /// let five = Darc::new(&world,5).expect("PE in world team"); /// let five_as_localdarc = world.block_on(async move {five.into_localrw().await}); /// ``` - pub async fn into_localrw(self) -> LocalRwDarc { - let inner = self.inner(); - let _cur_pe = inner.team().world_pe; - DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.inner as *mut DarcInner).expect("invalid darc pointer"), - }, - DarcMode::LocalRw, - 0, - ) - .await; - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - inner.total_local_cnt.fetch_add(1, Ordering::SeqCst); - // println! {"[{:?}] darc[{:?}] into_localrw {:?} {:?} {:?}",std::thread::current().id(),self.inner().id,self.inner,self.inner().local_cnt.load(Ordering::SeqCst),self.inner().total_local_cnt.load(Ordering::SeqCst)}; - let item = unsafe { *Box::from_raw(inner.item as *mut T) }; - - let d = Darc { - inner: self.inner as *mut DarcInner>>, - src_pe: self.src_pe, + pub fn into_localrw(self) -> IntoLocalRwDarcHandle { + + let wrapped_inner = WrappedInner { + inner: NonNull::new(self.inner as *mut DarcInner).expect("invalid darc pointer"), }; - d.inner_mut() - .update_item(Box::into_raw(Box::new(Arc::new(RwLock::new(item))))); - // d.print(); - LocalRwDarc { darc: d } - } - - #[doc(alias = "Collective")] - /// Converts this Darc into a [LocalRwDarc] - /// - /// This is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. - /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the - /// pointed to object to wrapped by both a Darc and a LocalRwDarc simultaneously (on any PE). - /// - /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) - /// - /// # Examples - /// ``` - /// use lamellar::darc::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// - /// let five = Darc::new(&world,5).expect("PE in world team"); - /// let five_as_localdarc = five.blocking_into_localrw(); - /// ``` - pub fn blocking_into_localrw(self) -> LocalRwDarc { - if std::thread::current().id() != *crate::MAIN_THREAD { - let msg = format!(" - [LAMELLAR WARNING] You are calling `Darc::blocking_into_localrw` from within an async context which may lead to deadlock, it is recommended that you use `into_localrw().await;` instead! - Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() - ); - if let Some(val) = config().blocking_call_warning { - if val { - println!("{msg}"); - } - } else { - println!("{msg}"); - } + let team = self.inner().team().clone(); + IntoLocalRwDarcHandle { + darc: self.into(), + team, + outstanding_future: Box::pin(async move { + DarcInner::block_on_outstanding(wrapped_inner, DarcMode::LocalRw, 0).await; + }), } - let inner = self.inner(); - let _cur_pe = inner.team().world_pe; - inner.team().block_on(DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.inner as *mut DarcInner).expect("invalid darc pointer"), - }, - DarcMode::LocalRw, - 0, - )); - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - inner.total_local_cnt.fetch_add(1, Ordering::SeqCst); - // println! {"[{:?}] darc[{:?}] into_localrw {:?} {:?} {:?}",std::thread::current().id(),self.inner().id,self.inner,self.inner().local_cnt.load(Ordering::SeqCst),self.inner().total_local_cnt.load(Ordering::SeqCst)}; - let item = unsafe { *Box::from_raw(inner.item as *mut T) }; - - let d = Darc { - inner: self.inner as *mut DarcInner>>, - src_pe: self.src_pe, - }; - d.inner_mut() - .update_item(Box::into_raw(Box::new(Arc::new(RwLock::new(item))))); - // d.print(); - LocalRwDarc { darc: d } } #[doc(alias = "Collective")] /// Converts this Darc into a [GlobalRwDarc] /// - /// This is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. + /// This returns a handle (which is Future) thats needs to be `awaited` or `blocked` on to perform the operation. + /// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the - /// pointed to object to wrapped by both a GlobalRwDarc and a Darc simultaneously (on any PE). + /// Furthermore, the handle will not return while any additional references outside of the one making this call exist on each PE. It is not possible for the + /// pointed to object to wrapped by both a Darc and a LocalRwDarc simultaneously (on any PE). /// /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) + /// Requires all PEs associated with the `darc` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) /// /// # Examples /// ``` @@ -1497,91 +1432,20 @@ impl Darc { /// let world = LamellarWorldBuilder::new().build(); /// /// let five = Darc::new(&world,5).expect("PE in world team"); - /// let five_as_globaldarc = world.block_on(async move {five.into_globalrw().await}); + /// let five_as_globaldarc = five.into_globalrw().block(); /// ``` - pub async fn into_globalrw(self) -> GlobalRwDarc { - let inner = self.inner(); - let _cur_pe = inner.team().world_pe; - DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.inner as *mut DarcInner).expect("invalid darc pointer"), - }, - DarcMode::GlobalRw, - 0, - ) - .await; - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - inner.total_local_cnt.fetch_add(1, Ordering::SeqCst); - // println! {"[{:?}] darc[{:?}] into_globalrw {:?} {:?} {:?}",std::thread::current().id(),self.inner().id,self.inner,self.inner().local_cnt.load(Ordering::SeqCst),self.inner().total_local_cnt.load(Ordering::SeqCst)}; - - let item = unsafe { Box::from_raw(inner.item as *mut T) }; - let d = Darc { - inner: self.inner as *mut DarcInner>, - src_pe: self.src_pe, + pub fn into_globalrw(self) -> IntoGlobalRwDarcHandle { + let wrapped_inner = WrappedInner { + inner: NonNull::new(self.inner as *mut DarcInner).expect("invalid darc pointer"), }; - d.inner_mut() - .update_item(Box::into_raw(Box::new(DistRwLock::new( - *item, - self.inner().team(), - )))); - GlobalRwDarc { darc: d } - } - - #[doc(alias = "Collective")] - /// Converts this Darc into a [GlobalRwDarc] - /// - /// This is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. - /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the - /// pointed to object to wrapped by both a GlobalRwDarc and a Darc simultaneously (on any PE). - /// - /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) - /// - /// # Examples - /// ``` - /// use lamellar::darc::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// - /// let five = Darc::new(&world,5).expect("PE in world team"); - /// let five_as_globaldarc = five.blocking_into_globalrw(); - /// ``` - pub fn blocking_into_globalrw(self) -> GlobalRwDarc { - if std::thread::current().id() != *crate::MAIN_THREAD { - let msg = format!(" - [LAMELLAR WARNING] You are calling `Darc::blocking_into_globalrw` from within an async context which may lead to deadlock, it is recommended that you use `into_globalrw().await;` instead! - Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() - ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), - } + let team = self.inner().team().clone(); + IntoGlobalRwDarcHandle { + darc: self.into(), + team, + outstanding_future: Box::pin(async move { + DarcInner::block_on_outstanding(wrapped_inner, DarcMode::GlobalRw, 0).await; + }), } - let inner = self.inner(); - let _cur_pe = inner.team().world_pe; - inner.team().block_on(DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.inner as *mut DarcInner).expect("invalid darc pointer"), - }, - DarcMode::GlobalRw, - 0, - )); - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - inner.total_local_cnt.fetch_add(1, Ordering::SeqCst); - // println! {"[{:?}] darc[{:?}] into_globalrw {:?} {:?} {:?}",std::thread::current().id(),self.inner().id,self.inner,self.inner().local_cnt.load(Ordering::SeqCst),self.inner().total_local_cnt.load(Ordering::SeqCst)}; - - let item = unsafe { Box::from_raw(inner.item as *mut T) }; - let d = Darc { - inner: self.inner as *mut DarcInner>, - src_pe: self.src_pe, - }; - d.inner_mut() - .update_item(Box::into_raw(Box::new(DistRwLock::new( - *item, - self.inner().team(), - )))); - GlobalRwDarc { darc: d } } } diff --git a/src/darc/global_rw_darc.rs b/src/darc/global_rw_darc.rs index 45183b0a..895cfc40 100644 --- a/src/darc/global_rw_darc.rs +++ b/src/darc/global_rw_darc.rs @@ -18,6 +18,7 @@ use crate::{IdError, LamellarEnv, LamellarTeam}; use super::handle::{ GlobalRwDarcCollectiveWriteHandle, GlobalRwDarcReadHandle, GlobalRwDarcWriteHandle, + IntoDarcHandle, IntoLocalRwDarcHandle, }; #[derive(serde::Serialize, serde::Deserialize, Debug)] @@ -62,7 +63,7 @@ impl DistRwLock { data: std::cell::UnsafeCell::new(data), } } - fn into_inner(self) -> T { + pub(crate) fn into_inner(self) -> T { self.data.into_inner() } } @@ -482,7 +483,7 @@ impl crate::active_messaging::DarcSerde for GlobalRwDarc { } impl GlobalRwDarc { - fn inner(&self) -> &DarcInner> { + pub(crate) fn inner(&self) -> &DarcInner> { self.darc.inner() } @@ -751,13 +752,14 @@ impl GlobalRwDarc { #[doc(alias = "Collective")] /// Converts this GlobalRwDarc into a regular [Darc] /// - /// This is a blocking collective call amongst all PEs in the GlobalRwDarc's team, only returning once every PE in the team has completed the call. + /// This returns a handle (which is Future) thats needs to be `awaited` or `blocked` on to perform the operation. + /// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the - /// pointed to object to wrapped by both a Darc and a GlobalRwDarc simultaneously (on any PE). + /// Furthermore, the handle will not return while any additional references outside of the one making this call exist on each PE. It is not possible for the + /// pointed to object to wrapped by both a Darc and a LocalRwDarc simultaneously (on any PE). /// /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) + /// Requires all PEs associated with the `darc` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) /// /// # Examples /// ``` @@ -766,94 +768,34 @@ impl GlobalRwDarc { /// let world = LamellarWorldBuilder::new().build(); /// /// let five = GlobalRwDarc::new(&world,5).expect("PE in world team"); - /// let five_as_darc = world.block_on(async move {five.into_darc()}); + /// let five_as_darc = five.into_darc().block(); /// ``` - pub async fn into_darc(self) -> Darc { - let inner = self.inner(); - // println!("into_darc"); - // self.print(); - DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.darc.inner as *mut DarcInner) - .expect("invalid darc pointer"), - }, - DarcMode::Darc, - 0, - ) - .await; - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - let item = unsafe { Box::from_raw(inner.item as *mut DistRwLock).into_inner() }; - let d = Darc { - inner: self.darc.inner as *mut DarcInner, - src_pe: self.darc.src_pe, - // phantom: PhantomData, + pub fn into_darc(self) -> IntoDarcHandle { + let wrapped_inner = WrappedInner { + inner: NonNull::new(self.darc.inner as *mut DarcInner) + .expect("invalid darc pointer"), }; - d.inner_mut().update_item(Box::into_raw(Box::new(item))); - d - } - #[doc(alias = "Collective")] - /// Converts this GlobalRwDarc into a regular [Darc] - /// - /// This is a blocking collective call amongst all PEs in the GlobalRwDarc's team, only returning once every PE in the team has completed the call. - /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the - /// pointed to object to wrapped by both a Darc and a GlobalRwDarc simultaneously (on any PE). - /// - /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) - /// - /// # Examples - /// ``` - /// use lamellar::darc::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// - /// let five = GlobalRwDarc::new(&world,5).expect("PE in world team"); - /// let five_as_darc = five.into_darc(); - /// ``` - pub fn blocking_into_darc(self) -> Darc { - if std::thread::current().id() != *crate::MAIN_THREAD { - let msg = format!(" - [LAMELLAR WARNING] You are calling `GlobalRwDarc::blocking_into_darc` from within an async context which may lead to deadlock, it is recommended that you use `into_darc().await;` instead! - Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() - ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), - } + let team = self.darc.inner().team().clone(); + IntoDarcHandle { + darc: self.into(), + team, + outstanding_future: Box::pin(async move { + DarcInner::block_on_outstanding(wrapped_inner, DarcMode::Darc, 0).await; + }), } - let inner = self.inner(); - // println!("into_darc"); - // self.print(); - inner.team().block_on(DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.darc.inner as *mut DarcInner) - .expect("invalid darc pointer"), - }, - DarcMode::Darc, - 0, - )); - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - let item = unsafe { Box::from_raw(inner.item as *mut DistRwLock).into_inner() }; - let d = Darc { - inner: self.darc.inner as *mut DarcInner, - src_pe: self.darc.src_pe, - // phantom: PhantomData, - }; - d.inner_mut().update_item(Box::into_raw(Box::new(item))); - d } #[doc(alias = "Collective")] /// Converts this GlobalRwDarc into a [LocalRwDarc] /// - /// This is a blocking collective call amongst all PEs in the GlobalRwDarc's team, only returning once every PE in the team has completed the call. + /// This returns a handle (which is Future) thats needs to be `awaited` or `blocked` on to perform the operation. + /// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the - /// pointed to object to wrapped by both a GlobalRwDarc and a LocalRwDarc simultaneously (on any PE). + /// Furthermore, the handle will not return while any additional references outside of the one making this call exist on each PE. It is not possible for the + /// pointed to object to wrapped by both a Darc and a LocalRwDarc simultaneously (on any PE). /// /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) + /// Requires all PEs associated with the `darc` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) /// /// # Examples /// ``` @@ -864,83 +806,19 @@ impl GlobalRwDarc { /// let five = GlobalRwDarc::new(&world,5).expect("PE in world team"); /// let five_as_localdarc = world.block_on(async move {five.into_localrw()}); /// ``` - pub async fn into_localrw(self) -> LocalRwDarc { - let inner = self.inner(); - // println!("into_localrw"); - // self.print(); - DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.darc.inner as *mut DarcInner) - .expect("invalid darc pointer"), - }, - DarcMode::LocalRw, - 0, - ) - .await; - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - let item = unsafe { Box::from_raw(inner.item as *mut DistRwLock).into_inner() }; - let d = Darc { - inner: self.darc.inner as *mut DarcInner>>, - src_pe: self.darc.src_pe, - // phantom: PhantomData, + pub fn into_localrw(self) -> IntoLocalRwDarcHandle { + let wrapped_inner = WrappedInner { + inner: NonNull::new(self.darc.inner as *mut DarcInner) + .expect("invalid darc pointer"), }; - d.inner_mut() - .update_item(Box::into_raw(Box::new(Arc::new(RwLock::new(item))))); - LocalRwDarc { darc: d } - } - - #[doc(alias = "Collective")] - /// Converts this GlobalRwDarc into a [LocalRwDarc] - /// - /// This is a blocking collective call amongst all PEs in the GlobalRwDarc's team, only returning once every PE in the team has completed the call. - /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the - /// pointed to object to wrapped by both a GlobalRwDarc and a LocalRwDarc simultaneously (on any PE). - /// - /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) - /// - /// # Examples - /// ``` - /// use lamellar::darc::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// - /// let five = GlobalRwDarc::new(&world,5).expect("PE in world team"); - /// let five_as_localdarc = five.into_localrw(); - /// ``` - pub fn blocking_into_localrw(self) -> LocalRwDarc { - if std::thread::current().id() != *crate::MAIN_THREAD { - let msg = format!(" - [LAMELLAR WARNING] You are calling `GlobalRwDarc::blocking_into_localrw` from within an async context which may lead to deadlock, it is recommended that you use `into_localrw().await;` instead! - Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() - ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), - } + let team = self.darc.inner().team().clone(); + IntoLocalRwDarcHandle { + darc: self.into(), + team, + outstanding_future: Box::pin(async move { + DarcInner::block_on_outstanding(wrapped_inner, DarcMode::LocalRw, 0).await; + }), } - let inner = self.inner(); - // println!("into_localrw"); - // self.print(); - inner.team().block_on(DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.darc.inner as *mut DarcInner) - .expect("invalid darc pointer"), - }, - DarcMode::LocalRw, - 0, - )); - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - let item = unsafe { Box::from_raw(inner.item as *mut DistRwLock).into_inner() }; - let d = Darc { - inner: self.darc.inner as *mut DarcInner>>, - src_pe: self.darc.src_pe, - // phantom: PhantomData, - }; - d.inner_mut() - .update_item(Box::into_raw(Box::new(Arc::new(RwLock::new(item))))); - LocalRwDarc { darc: d } } } diff --git a/src/darc/handle.rs b/src/darc/handle.rs index db1b4274..46acb340 100644 --- a/src/darc/handle.rs +++ b/src/darc/handle.rs @@ -6,17 +6,18 @@ use std::task::{Context, Poll}; use crate::darc::local_rw_darc::{LocalRwDarc, LocalRwDarcReadGuard}; use crate::lamellar_request::LamellarRequest; -use crate::AmHandle; -use crate::{config, GlobalRwDarc}; +use crate::{config, darc, GlobalRwDarc, LamellarTeamRT}; +use crate::{AmHandle, Darc}; -use async_lock::{RwLockReadGuardArc, RwLockWriteGuardArc}; +use async_lock::{RwLock, RwLockReadGuardArc, RwLockWriteGuardArc}; use futures_util::{ready, Future}; use pin_project::pin_project; use super::global_rw_darc::{ - GlobalRwDarcCollectiveWriteGuard, GlobalRwDarcReadGuard, GlobalRwDarcWriteGuard, + DistRwLock, GlobalRwDarcCollectiveWriteGuard, GlobalRwDarcReadGuard, GlobalRwDarcWriteGuard, }; use super::local_rw_darc::LocalRwDarcWriteGuard; +use super::DarcInner; #[pin_project(project = StateProj)] enum State { @@ -529,3 +530,271 @@ impl Future for GlobalRwDarcCollectiveWriteHandle { }) } } + +pub(crate) enum OrigDarc { + Darc(Darc), + LocalRw(LocalRwDarc), + GlobalRw(GlobalRwDarc), +} + +impl From> for OrigDarc { + fn from(darc: Darc) -> Self { + OrigDarc::Darc(darc) + } +} + +impl From> for OrigDarc { + fn from(darc: LocalRwDarc) -> Self { + OrigDarc::LocalRw(darc) + } +} + +impl From> for OrigDarc { + fn from(darc: GlobalRwDarc) -> Self { + OrigDarc::GlobalRw(darc) + } +} + +impl OrigDarc { + fn inc_local_cnt(&self) { + match self { + OrigDarc::Darc(darc) => darc.inc_local_cnt(1), + OrigDarc::LocalRw(darc) => darc.darc.inc_local_cnt(1), + OrigDarc::GlobalRw(darc) => darc.darc.inc_local_cnt(1), + } + } + fn inner(&self) -> *mut DarcInner { + match self { + OrigDarc::Darc(darc) => darc.inner_mut() as *mut _ as *mut DarcInner, + OrigDarc::LocalRw(darc) => darc.darc.inner_mut() as *mut _ as *mut DarcInner, + OrigDarc::GlobalRw(darc) => darc.darc.inner_mut() as *mut _ as *mut DarcInner, + } + } + fn src_pe(&self) -> usize { + match self { + OrigDarc::Darc(darc) => darc.src_pe, + OrigDarc::LocalRw(darc) => darc.darc.src_pe, + OrigDarc::GlobalRw(darc) => darc.darc.src_pe, + } + } + unsafe fn get_item(&self) -> T { + match self { + OrigDarc::Darc(darc) => *Box::from_raw(darc.inner().item as *mut T), + OrigDarc::LocalRw(darc) => { + let mut arc_item = + (*Box::from_raw(darc.inner().item as *mut Arc>)).clone(); + let item: T = loop { + arc_item = match Arc::try_unwrap(arc_item) { + Ok(item) => break item.into_inner(), + Err(arc_item) => arc_item, + }; + std::thread::yield_now(); + }; + item + } + OrigDarc::GlobalRw(darc) => { + Box::from_raw(darc.inner().item as *mut DistRwLock).into_inner() + } + } + } +} + +#[must_use] +#[pin_project] +#[doc(alias = "Collective")] +/// This is a handle representing the operation of changing from a [LocalRwDarc] or [GlobalRwDarc] into a regular [Darc]. +/// This handled must either be awaited in an async context or blocked on in a non-async context for the operation to be performed. +/// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. +/// +/// Furthermore, the handle will not return while any additional references outside of the one making this call exist on each PE. It is not possible for the +/// pointed to object to wrapped by both a Darc and a LocalRwDarc simultaneously (on any PE). +/// +/// # Collective Operation +/// Requires all PEs associated with the `darc` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) +/// +/// # Examples +/// ``` +/// use lamellar::darc::prelude::*; +/// +/// let world = LamellarWorldBuilder::new().build(); +/// +/// let five = LocalRwDarc::new(&world,5).expect("PE in world team"); +/// let five_as_darc = five.into_darc().block(); +/// /* alternatively something like the following is valid as well +/// let five_as_darc = world.block_on(async move{ +/// five.into_darc().await; +/// }) +/// */ +/// ``` +pub struct IntoDarcHandle { + pub(crate) darc: OrigDarc, + pub(crate) team: Pin>, + #[pin] + pub(crate) outstanding_future: Pin + Send>>, +} + +impl IntoDarcHandle { + /// Used to drive to conversion of a [LocalRwDarc] or [GlobalRwDarc] into a [Darc] + /// # Examples + /// + ///``` + /// use lamellar::darc::prelude::*; + /// + /// let world = LamellarWorldBuilder::new().build(); + /// let five = LocalRwDarc::new(&world,5).expect("PE in world team"); + /// let five_as_darc = five.into_darc().block(); + pub fn block(self) -> Darc { + self.team.clone().block_on(self) + } +} + +impl Future for IntoDarcHandle { + type Output = Darc; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + ready!(this.outstanding_future.as_mut().poll(cx)); + this.darc.inc_local_cnt(); + let item = unsafe { this.darc.get_item() }; + let darc: Darc = Darc { + inner: this.darc.inner(), + src_pe: this.darc.src_pe(), + }; + darc.inner_mut().update_item(Box::into_raw(Box::new(item))); + Poll::Ready(darc) + } +} + +#[must_use] +#[pin_project] +#[doc(alias = "Collective")] +/// This is a handle representing the operation of changing from a [Darc] or [GlobalRwDarc] into a [LocalRwDarc]. +/// This handled must either be awaited in an async context or blocked on in a non-async context for the operation to be performed. +/// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. +/// +/// Furthermore, the handle will not return while any additional references outside of the one making this call exist on each PE. It is not possible for the +/// pointed to object to wrapped by both a Darc and a LocalRwDarc simultaneously (on any PE). +/// +/// # Collective Operation +/// Requires all PEs associated with the `darc` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) +/// +/// # Examples +/// ``` +/// use lamellar::darc::prelude::*; +/// +/// let world = LamellarWorldBuilder::new().build(); +/// +/// let five = GlobalRwDarc::new(&world,5).expect("PE in world team"); +/// let five_as_localrw = five.into_localrw().block(); +/// /* alternatively something like the following is valid as well +/// let five_as_localrw = world.block_on(async move{ +/// five.into_localrw().await; +/// }) +/// */ +/// ``` +pub struct IntoLocalRwDarcHandle { + pub(crate) darc: OrigDarc, + pub(crate) team: Pin>, + #[pin] + pub(crate) outstanding_future: Pin + Send>>, +} + +impl IntoLocalRwDarcHandle { + /// Used to drive to conversion of a [Darc] or [GlobalRwDarc] into a [LocalRwDarc] + /// # Examples + /// + ///``` + /// use lamellar::darc::prelude::*; + /// + /// let world = LamellarWorldBuilder::new().build(); + /// let five = GlobalRwDarc::new(&world,5).expect("PE in world team"); + /// let five_as_localrw = five.into_localrw().block(); + pub fn block(self) -> LocalRwDarc { + self.team.clone().block_on(self) + } +} + +impl Future for IntoLocalRwDarcHandle { + type Output = LocalRwDarc; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + ready!(this.outstanding_future.as_mut().poll(cx)); + this.darc.inc_local_cnt(); + let item = unsafe { this.darc.get_item() }; + let darc: Darc>> = Darc { + inner: this.darc.inner(), + src_pe: this.darc.src_pe(), + }; + darc.inner_mut() + .update_item(Box::into_raw(Box::new(Arc::new(RwLock::new(item))))); + Poll::Ready(LocalRwDarc { darc }) + } +} + +#[must_use] +#[pin_project] +#[doc(alias = "Collective")] +/// This is a handle representing the operation of changing from a [Darc] or [LocalRwDarc] into a [GlobalRwDarc]. +/// This handled must either be awaited in an async context or blocked on in a non-async context for the operation to be performed. +/// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. +/// +/// Furthermore, the handle will not return while any additional references outside of the one making this call exist on each PE. It is not possible for the +/// pointed to object to wrapped by both a Darc and a LocalRwDarc simultaneously (on any PE). +/// +/// # Collective Operation +/// Requires all PEs associated with the `darc` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) +/// +/// # Examples +/// ``` +/// use lamellar::darc::prelude::*; +/// +/// let world = LamellarWorldBuilder::new().build(); +/// +/// let five = LocalRwDarc::new(&world,5).expect("PE in world team"); +/// let five_as_globalrw = five.into_globalrw().block(); +/// /* alternatively something like the following is valid as well +/// let five_as_globalrw = world.block_on(async move{ +/// five.into_globalrw().await; +/// }) +/// */ +/// ``` +pub struct IntoGlobalRwDarcHandle { + pub(crate) darc: OrigDarc, + pub(crate) team: Pin>, + #[pin] + pub(crate) outstanding_future: Pin + Send>>, +} + +impl IntoGlobalRwDarcHandle { + /// Used to drive to conversion of a [Darc] or [LocalRwDarc] into a [GlobalRwDarc] + /// # Examples + /// + ///``` + /// use lamellar::darc::prelude::*; + /// + /// let world = LamellarWorldBuilder::new().build(); + /// let five = LocalRwDarc::new(&world,5).expect("PE in world team"); + /// let five_as_globalrw = five.into_globalrw().block(); + pub fn block(self) -> GlobalRwDarc { + self.team.clone().block_on(self) + } +} + +impl Future for IntoGlobalRwDarcHandle { + type Output = GlobalRwDarc; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + ready!(this.outstanding_future.as_mut().poll(cx)); + this.darc.inc_local_cnt(); + let item = unsafe { this.darc.get_item() }; + let darc: Darc> = Darc { + inner: this.darc.inner(), + src_pe: this.darc.src_pe(), + }; + darc.inner_mut() + .update_item(Box::into_raw(Box::new(DistRwLock::new( + item, + this.team.clone(), + )))); + Poll::Ready(GlobalRwDarc { darc }) + } +} diff --git a/src/darc/local_rw_darc.rs b/src/darc/local_rw_darc.rs index c4c2dbbe..a315d9e4 100644 --- a/src/darc/local_rw_darc.rs +++ b/src/darc/local_rw_darc.rs @@ -27,7 +27,9 @@ use crate::lamellar_team::IntoLamellarTeam; use crate::scheduler::LamellarTask; use crate::{IdError, LamellarEnv, LamellarTeam}; -use super::handle::{LocalRwDarcReadHandle, LocalRwDarcWriteHandle}; +use super::handle::{ + IntoDarcHandle, IntoGlobalRwDarcHandle, LocalRwDarcReadHandle, LocalRwDarcWriteHandle, +}; #[derive(Debug)] pub struct LocalRwDarcReadGuard { @@ -160,7 +162,7 @@ impl crate::active_messaging::DarcSerde for LocalRwDarc { } impl LocalRwDarc { - fn inner(&self) -> &DarcInner>> { + pub(crate) fn inner(&self) -> &DarcInner>> { self.darc.inner() } @@ -206,7 +208,7 @@ impl LocalRwDarc { /// Creates a handle for aquiring a reader lock of this LocalRwDarc local to this PE. /// The returned handle must either be await'd `.read().await` within an async context /// or it must be blocked on `.read().block()` in a non async context to actually acquire the lock - /// + /// /// After awaiting or blocking on the handle, a RAII guard is returned which will drop the read access of the wrlock when dropped /// /// # One-sided Operation @@ -248,7 +250,7 @@ impl LocalRwDarc { /// Creates a handle for aquiring a writer lock of this LocalRwDarc local to this PE. /// The returned handle must either be await'd `.write().await` within an async context /// or it must be blocked on `.write().block()` in a non async context to actually acquire the lock - /// + /// /// After awaiting or blocking on the handle, a RAII guard is returned which will drop the write access of the wrlock when dropped /// /// # One-sided Operation @@ -327,13 +329,14 @@ impl LocalRwDarc { #[doc(alias = "Collective")] /// Converts this LocalRwDarc into a [GlobalRwDarc] /// - /// This is a blocking collective call amongst all PEs in the LocalRwDarc's team, only returning once every PE in the team has completed the call. + /// This returns a handle (which is Future) thats needs to be `awaited` or `blocked` on to perform the operation. + /// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the - /// pointed to object to wrapped by both a GlobalRwDarc and a LocalRwDarc simultaneously (on any PE). + /// Furthermore, the handle will not return while any additional references outside of the one making this call exist on each PE. It is not possible for the + /// pointed to object to wrapped by both a Darc and a LocalRwDarc simultaneously (on any PE). /// /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) + /// Requires all PEs associated with the `darc` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) /// /// # Examples /// ``` @@ -344,103 +347,21 @@ impl LocalRwDarc { /// let five = LocalRwDarc::new(&world,5).expect("PE in world team"); /// let five_as_globaldarc = world.block_on(async move {five.into_globalrw().await}); /// ``` - pub async fn into_globalrw(self) -> GlobalRwDarc { - let inner = self.inner(); - // println!("into_darc"); - // self.print(); - DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.darc.inner as *mut DarcInner) - .expect("invalid darc pointer"), - }, - DarcMode::GlobalRw, - 0, - ) - .await; - // println!("after block on outstanding"); - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - let mut arc_item = unsafe { (*Box::from_raw(inner.item as *mut Arc>)).clone() }; - let item: T = loop { - arc_item = match Arc::try_unwrap(arc_item) { - Ok(item) => break item.into_inner(), - Err(arc_item) => arc_item, - }; - }; - let d = Darc { - inner: self.darc.inner as *mut DarcInner>, - src_pe: self.darc.src_pe, - // phantom: PhantomData, + pub fn into_globalrw(self) -> IntoGlobalRwDarcHandle { + let wrapped_inner = WrappedInner { + inner: NonNull::new(self.darc.inner as *mut DarcInner) + .expect("invalid darc pointer"), }; - d.inner_mut() - .update_item(Box::into_raw(Box::new(DistRwLock::new( - item, - self.inner().team(), - )))); - GlobalRwDarc { darc: d } - } - - #[doc(alias = "Collective")] - /// Converts this LocalRwDarc into a [GlobalRwDarc] - /// - /// This is a blocking collective call amongst all PEs in the LocalRwDarc's team, only returning once every PE in the team has completed the call. - /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the - /// pointed to object to wrapped by both a GlobalRwDarc and a LocalRwDarc simultaneously (on any PE). - /// - /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) - /// - /// # Examples - /// ``` - /// use lamellar::darc::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// - /// let five = LocalRwDarc::new(&world,5).expect("PE in world team"); - /// let five_as_globaldarc = five.blocking_into_globalrw(); - /// ``` - pub fn blocking_into_globalrw(self) -> GlobalRwDarc { - if std::thread::current().id() != *crate::MAIN_THREAD { - let msg = format!(" - [LAMELLAR WARNING] You are calling `LocalRwDarc::blocking_into_globalrw` from within an async context which may lead to deadlock, it is recommended that you use `into_globalrw().await;` instead! - Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() - ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), - } + let team = self.darc.inner().team().clone(); + IntoGlobalRwDarcHandle { + darc: self.into(), + team, + outstanding_future: Box::pin(DarcInner::block_on_outstanding( + wrapped_inner, + DarcMode::GlobalRw, + 0, + )), } - let inner = self.inner(); - // println!("into_darc"); - // self.print(); - inner.team().block_on(DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.darc.inner as *mut DarcInner) - .expect("invalid darc pointer"), - }, - DarcMode::GlobalRw, - 0, - )); - // println!("after block on outstanding"); - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - let mut arc_item = unsafe { (*Box::from_raw(inner.item as *mut Arc>)).clone() }; - let item: T = loop { - arc_item = match Arc::try_unwrap(arc_item) { - Ok(item) => break item.into_inner(), - Err(arc_item) => arc_item, - }; - }; - let d = Darc { - inner: self.darc.inner as *mut DarcInner>, - src_pe: self.darc.src_pe, - // phantom: PhantomData, - }; - d.inner_mut() - .update_item(Box::into_raw(Box::new(DistRwLock::new( - item, - self.inner().team(), - )))); - GlobalRwDarc { darc: d } } } @@ -448,13 +369,14 @@ impl LocalRwDarc { #[doc(alias = "Collective")] /// Converts this LocalRwDarc into a regular [Darc] /// - /// This is a blocking collective call amongst all PEs in the LocalRwDarc's team, only returning once every PE in the team has completed the call. + /// This returns a handle (which is Future) thats needs to be `awaited` or `blocked` on to perform the operation. + /// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the Darc's team, only returning once every PE in the team has completed the call. /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the + /// Furthermore, the handle will not return while any additional references outside of the one making this call exist on each PE. It is not possible for the /// pointed to object to wrapped by both a Darc and a LocalRwDarc simultaneously (on any PE). /// /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) + /// Requires all PEs associated with the `darc` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) /// /// # Examples /// ``` @@ -463,101 +385,21 @@ impl LocalRwDarc { /// let world = LamellarWorldBuilder::new().build(); /// /// let five = LocalRwDarc::new(&world,5).expect("PE in world team"); - /// let five_as_darc = world.block_on(async move {five.into_darc()}); + /// let five_as_darc = five.into_darc().block(); /// ``` - pub async fn into_darc(self) -> Darc { - let inner = self.inner(); - // println!("into_darc"); - // self.print(); - DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.darc.inner as *mut DarcInner) - .expect("invalid darc pointer"), - }, - DarcMode::Darc, - 0, - ) - .await; - // println!("after block on outstanding"); - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - // let item = unsafe { Box::from_raw(inner.item as *mut Arc>).into_inner() }; - let mut arc_item = unsafe { (*Box::from_raw(inner.item as *mut Arc>)).clone() }; - - let item: T = loop { - arc_item = match Arc::try_unwrap(arc_item) { - Ok(item) => break item.into_inner(), - Err(arc_item) => arc_item, - }; - }; - let d = Darc { - inner: self.darc.inner as *mut DarcInner, - src_pe: self.darc.src_pe, - // phantom: PhantomData, + pub fn into_darc(self) -> IntoDarcHandle { + let wrapped_inner = WrappedInner { + inner: NonNull::new(self.darc.inner as *mut DarcInner) + .expect("invalid darc pointer"), }; - d.inner_mut().update_item(Box::into_raw(Box::new(item))); //the darc will free this approriately - d - } - - #[doc(alias = "Collective")] - /// Converts this LocalRwDarc into a regular [Darc] - /// - /// This is a blocking collective call amongst all PEs in the LocalRwDarc's team, only returning once every PE in the team has completed the call. - /// - /// Furthermore, this call will block while any additional references outside of the one making this call exist on each PE. It is not possible for the - /// pointed to object to wrapped by both a Darc and a LocalRwDarc simultaneously (on any PE). - /// - /// # Collective Operation - /// Requires all PEs associated with the `darc` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) - /// - /// # Examples - /// ``` - /// use lamellar::darc::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// - /// let five = LocalRwDarc::new(&world,5).expect("PE in world team"); - /// let five_as_darc = five.blocking_into_darc(); - /// ``` - pub fn blocking_into_darc(self) -> Darc { - if std::thread::current().id() != *crate::MAIN_THREAD { - let msg = format!(" - [LAMELLAR WARNING] You are calling `LocalRwDarc::blocking_into_darc` from within an async context which may lead to deadlock, it is recommended that you use `into_darc().await;` instead! - Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture() - ); - match config().blocking_call_warning { - Some(val) if val => println!("{msg}"), - _ => println!("{msg}"), - } + let team = self.darc.inner().team().clone(); + IntoDarcHandle { + darc: self.into(), + team, + outstanding_future: Box::pin(async move { + DarcInner::block_on_outstanding(wrapped_inner, DarcMode::Darc, 0).await; + }), } - let inner = self.inner(); - // println!("into_darc"); - // self.print(); - inner.team().block_on(DarcInner::block_on_outstanding( - WrappedInner { - inner: NonNull::new(self.darc.inner as *mut DarcInner) - .expect("invalid darc pointer"), - }, - DarcMode::Darc, - 0, - )); - // println!("after block on outstanding"); - inner.local_cnt.fetch_add(1, Ordering::SeqCst); //we add this here because to account for moving inner into d - // let item = unsafe { Box::from_raw(inner.item as *mut Arc>).into_inner() }; - let mut arc_item = unsafe { (*Box::from_raw(inner.item as *mut Arc>)).clone() }; - - let item: T = loop { - arc_item = match Arc::try_unwrap(arc_item) { - Ok(item) => break item.into_inner(), - Err(arc_item) => arc_item, - }; - }; - let d = Darc { - inner: self.darc.inner as *mut DarcInner, - src_pe: self.darc.src_pe, - // phantom: PhantomData, - }; - d.inner_mut().update_item(Box::into_raw(Box::new(item))); //the darc will free this approriately - d } } diff --git a/src/scheduler/tokio_executor.rs b/src/scheduler/tokio_executor.rs index cbe80de3..fe358edf 100644 --- a/src/scheduler/tokio_executor.rs +++ b/src/scheduler/tokio_executor.rs @@ -3,6 +3,7 @@ use crate::scheduler::{Executor, LamellarExecutor, LamellarTask, LamellarTaskInn use tokio::runtime::Runtime; use futures_util::Future; +use std::sync::Arc; #[derive(Debug)] pub(crate) struct TokioRt { diff --git a/tests/array/arithmetic_ops/add_test.rs b/tests/array/arithmetic_ops/add_test.rs index f98f640b..387f44fa 100644 --- a/tests/array/arithmetic_ops/add_test.rs +++ b/tests/array/arithmetic_ops/add_test.rs @@ -64,7 +64,7 @@ macro_rules! check_val { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() @@ -475,7 +475,7 @@ macro_rules! input_test{ // check_results!($array,array,num_pes,"LocalLockArray"); // LocalLockArray------------------------------ #[allow(unused_unsafe)] - let _ = unsafe{ array.batch_add(&input_array.blocking_read_local_data(),1).spawn()}; + let _ = unsafe{ array.batch_add(&input_array.read_local_data().block(),1).spawn()}; check_results!($array,array,num_pes,"&LocalLockArray"); println!("passed &LocalLockArray"); @@ -485,7 +485,7 @@ macro_rules! input_test{ // check_results!($array,array,num_pes,"GlobalLockArray"); // GlobalLockArray------------------------------ #[allow(unused_unsafe)] - let _ = unsafe{ array.batch_add(&input_array.blocking_read_local_data(),1).spawn()}; + let _ = unsafe{ array.batch_add(&input_array.read_local_data().block(),1).spawn()}; check_results!($array,array,num_pes,"&GlobalLockArray"); println!("passed &GlobalLockArray"); } diff --git a/tests/array/arithmetic_ops/div_test.rs b/tests/array/arithmetic_ops/div_test.rs index 2f7bdae9..29d88632 100644 --- a/tests/array/arithmetic_ops/div_test.rs +++ b/tests/array/arithmetic_ops/div_test.rs @@ -68,7 +68,7 @@ macro_rules! max_updates { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/arithmetic_ops/fetch_add_test.rs b/tests/array/arithmetic_ops/fetch_add_test.rs index 0aeb3a16..c14438ff 100644 --- a/tests/array/arithmetic_ops/fetch_add_test.rs +++ b/tests/array/arithmetic_ops/fetch_add_test.rs @@ -92,7 +92,7 @@ macro_rules! max_updates { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() @@ -102,7 +102,8 @@ macro_rules! onesided_iter { macro_rules! buffered_onesided_iter { (GlobalLockArray,$array:ident) => { $array - .blocking_read_lock() + .read_lock() + .block() .buffered_onesided_iter($array.len()) }; ($arraytype:ident,$array:ident) => { @@ -586,7 +587,7 @@ macro_rules! input_test{ // check_results!($array,array,num_pes,reqs,"LocalLockArray"); // LocalLockArray------------------------------ let mut reqs = vec![]; - let local_data = input_array.blocking_read_local_data(); + let local_data = input_array.read_local_data().block(); // println!("local lock array len: {:?}", local_data.deref()); #[allow(unused_unsafe)] reqs.push(unsafe{array.batch_fetch_add(&local_data,1)}); @@ -602,7 +603,7 @@ macro_rules! input_test{ // GlobalLockArray------------------------------ let mut reqs = vec![]; #[allow(unused_unsafe)] - reqs.push(unsafe{array.batch_fetch_add(&input_array.blocking_read_local_data(),1)}); + reqs.push(unsafe{array.batch_fetch_add(&input_array.read_local_data().block(),1)}); check_results!($array,array,num_pes,reqs,"&GlobalLockArray"); } } diff --git a/tests/array/arithmetic_ops/fetch_div_test.rs b/tests/array/arithmetic_ops/fetch_div_test.rs index d539dace..427915a9 100644 --- a/tests/array/arithmetic_ops/fetch_div_test.rs +++ b/tests/array/arithmetic_ops/fetch_div_test.rs @@ -98,7 +98,7 @@ macro_rules! max_updates { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/arithmetic_ops/fetch_mul_test.rs b/tests/array/arithmetic_ops/fetch_mul_test.rs index 21c0b41d..985954e9 100644 --- a/tests/array/arithmetic_ops/fetch_mul_test.rs +++ b/tests/array/arithmetic_ops/fetch_mul_test.rs @@ -93,7 +93,7 @@ macro_rules! max_updates { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/arithmetic_ops/fetch_rem_test.rs b/tests/array/arithmetic_ops/fetch_rem_test.rs index 888572d2..83b47839 100644 --- a/tests/array/arithmetic_ops/fetch_rem_test.rs +++ b/tests/array/arithmetic_ops/fetch_rem_test.rs @@ -98,7 +98,7 @@ macro_rules! max_updates { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/arithmetic_ops/fetch_sub_test.rs b/tests/array/arithmetic_ops/fetch_sub_test.rs index 864fa5ec..68ea6d04 100644 --- a/tests/array/arithmetic_ops/fetch_sub_test.rs +++ b/tests/array/arithmetic_ops/fetch_sub_test.rs @@ -88,7 +88,7 @@ macro_rules! max_updates { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/arithmetic_ops/mul_test.rs b/tests/array/arithmetic_ops/mul_test.rs index 5d937e3d..4f76ddb9 100644 --- a/tests/array/arithmetic_ops/mul_test.rs +++ b/tests/array/arithmetic_ops/mul_test.rs @@ -76,7 +76,7 @@ macro_rules! max_updates { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/arithmetic_ops/rem_test.rs b/tests/array/arithmetic_ops/rem_test.rs index 0b95f5c7..daf07dde 100644 --- a/tests/array/arithmetic_ops/rem_test.rs +++ b/tests/array/arithmetic_ops/rem_test.rs @@ -68,7 +68,7 @@ macro_rules! max_updates { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/arithmetic_ops/sub_test.rs b/tests/array/arithmetic_ops/sub_test.rs index 6d409f1a..2cd8382b 100644 --- a/tests/array/arithmetic_ops/sub_test.rs +++ b/tests/array/arithmetic_ops/sub_test.rs @@ -72,7 +72,7 @@ macro_rules! max_updates { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/atomic_ops/load_store_test.rs b/tests/array/atomic_ops/load_store_test.rs index 90ab7b51..de4993eb 100644 --- a/tests/array/atomic_ops/load_store_test.rs +++ b/tests/array/atomic_ops/load_store_test.rs @@ -57,7 +57,7 @@ macro_rules! check_val { // macro_rules! onesided_iter{ // (GlobalLockArray,$array:ident) => { -// $array.blocking_read_lock().onesided_iter() +// $array.read_lock().block().onesided_iter() // }; // ($arraytype:ident,$array:ident) => { // $array.onesided_iter() diff --git a/tests/array/bitwise_ops/and_test.rs b/tests/array/bitwise_ops/and_test.rs index 0e920739..82105b15 100644 --- a/tests/array/bitwise_ops/and_test.rs +++ b/tests/array/bitwise_ops/and_test.rs @@ -59,7 +59,7 @@ macro_rules! check_val { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/bitwise_ops/fetch_and_test.rs b/tests/array/bitwise_ops/fetch_and_test.rs index 9a15290e..25fcdce1 100644 --- a/tests/array/bitwise_ops/fetch_and_test.rs +++ b/tests/array/bitwise_ops/fetch_and_test.rs @@ -59,7 +59,7 @@ macro_rules! check_val { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/bitwise_ops/fetch_or_test.rs b/tests/array/bitwise_ops/fetch_or_test.rs index f635dacb..22739189 100644 --- a/tests/array/bitwise_ops/fetch_or_test.rs +++ b/tests/array/bitwise_ops/fetch_or_test.rs @@ -59,7 +59,7 @@ macro_rules! check_val { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/bitwise_ops/fetch_xor_test.rs b/tests/array/bitwise_ops/fetch_xor_test.rs index 8302a766..99e4a45f 100644 --- a/tests/array/bitwise_ops/fetch_xor_test.rs +++ b/tests/array/bitwise_ops/fetch_xor_test.rs @@ -59,7 +59,7 @@ macro_rules! check_val { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/bitwise_ops/or_test.rs b/tests/array/bitwise_ops/or_test.rs index 5c697397..43c7bc01 100644 --- a/tests/array/bitwise_ops/or_test.rs +++ b/tests/array/bitwise_ops/or_test.rs @@ -59,7 +59,7 @@ macro_rules! check_val { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/bitwise_ops/xor_test.rs b/tests/array/bitwise_ops/xor_test.rs index 4bbe6472..24754011 100644 --- a/tests/array/bitwise_ops/xor_test.rs +++ b/tests/array/bitwise_ops/xor_test.rs @@ -57,7 +57,7 @@ macro_rules! check_val { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter() diff --git a/tests/array/rdma/put_test.rs b/tests/array/rdma/put_test.rs index f5643207..c7c62e90 100644 --- a/tests/array/rdma/put_test.rs +++ b/tests/array/rdma/put_test.rs @@ -46,7 +46,7 @@ macro_rules! initialize_array { macro_rules! onesided_iter { (GlobalLockArray,$array:ident) => { - $array.blocking_read_lock().onesided_iter() + $array.read_lock().block().onesided_iter() }; ($arraytype:ident,$array:ident) => { $array.onesided_iter()