From 387270039c4289b763246f27f0028044acf9f804 Mon Sep 17 00:00:00 2001 From: "Ryan D. Friese" Date: Thu, 7 Nov 2024 21:16:22 -0800 Subject: [PATCH] add a try_alloc api for memregions, fix examples and docs --- .../am_local_memregions.rs | 2 +- examples/array_examples/array_am.rs | 4 +- examples/array_examples/array_put_get.rs | 4 +- examples/array_examples/dist_array_reduce.rs | 2 +- examples/bandwidths/am_bw_get.rs | 6 +- examples/bandwidths/am_group_bw_get.rs | 6 +- examples/bandwidths/atomic_array_get_bw.rs | 2 +- examples/bandwidths/atomic_array_put_bw.rs | 2 +- examples/bandwidths/get_bw.rs | 5 +- .../global_lock_atomic_array_get_bw.rs | 2 +- .../global_lock_atomic_array_put_bw.rs | 2 +- .../local_lock_atomic_array_get_bw.rs | 2 +- .../local_lock_atomic_array_put_bw.rs | 2 +- examples/bandwidths/put_bw.rs | 5 +- examples/bandwidths/readonly_array_get_bw.rs | 2 +- .../readonly_array_get_unchecked_bw.rs | 2 +- examples/bandwidths/unsafe_array_get_bw.rs | 2 +- .../unsafe_array_get_unchecked_bw.rs | 2 +- examples/bandwidths/unsafe_array_put_bw.rs | 2 +- .../unsafe_array_put_unchecked_bw.rs | 2 +- examples/bandwidths/unsafe_array_store_bw.rs | 2 +- examples/kernels/am_gemm.rs | 13 +- examples/kernels/cached_am_gemm.rs | 10 +- examples/kernels/dft_proxy.rs | 15 +- examples/misc/ping_pong.rs | 15 +- examples/rdma_examples/rdma_am.rs | 8 +- examples/rdma_examples/rdma_get.rs | 4 +- examples/rdma_examples/rdma_put.rs | 4 +- src/array.rs | 143 +--------- src/array/atomic.rs | 6 +- src/array/atomic/handle.rs | 4 +- src/array/generic_atomic/handle.rs | 39 +-- src/array/generic_atomic/rdma.rs | 2 +- src/array/global_lock_atomic.rs | 4 +- src/array/global_lock_atomic/handle.rs | 2 +- src/array/global_lock_atomic/rdma.rs | 2 +- src/array/iterator/distributed_iterator.rs | 16 +- src/array/iterator/local_iterator.rs | 16 +- src/array/iterator/one_sided_iterator.rs | 2 +- .../iterator/one_sided_iterator/chunks.rs | 2 +- src/array/local_lock_atomic.rs | 4 +- src/array/local_lock_atomic/handle.rs | 2 +- src/array/local_lock_atomic/rdma.rs | 2 +- src/array/native_atomic/handle.rs | 40 +-- src/array/native_atomic/rdma.rs | 2 +- src/array/read_only.rs | 2 +- src/array/read_only/handle.rs | 2 +- src/array/unsafe.rs | 2 +- src/array/unsafe/handle.rs | 2 +- src/array/unsafe/rdma.rs | 10 +- src/darc.rs | 2 +- src/lamellar_team.rs | 85 +++--- src/lamellar_world.rs | 18 +- src/memregion.rs | 79 ++++-- src/memregion/handle.rs | 257 ++++++++++++++++++ src/memregion/shared.rs | 50 +++- tests/array/arithmetic_ops/add_test.rs | 4 +- tests/array/arithmetic_ops/fetch_add_test.rs | 4 +- tests/array/rdma/blocking_get_test.rs | 2 +- tests/array/rdma/get_test.rs | 2 +- tests/array/rdma/put_test.rs | 2 +- 61 files changed, 542 insertions(+), 398 deletions(-) create mode 100644 src/memregion/handle.rs diff --git a/examples/active_message_examples/am_local_memregions.rs b/examples/active_message_examples/am_local_memregions.rs index 1a2fa8cc..36e1a2bf 100644 --- a/examples/active_message_examples/am_local_memregions.rs +++ b/examples/active_message_examples/am_local_memregions.rs @@ -50,7 +50,7 @@ fn main() { let world = lamellar::LamellarWorldBuilder::new().build(); let my_pe = world.my_pe(); let num_pes = world.num_pes(); - let array = world.alloc_one_sided_mem_region::(10).expect("Enough memory should exist"); + let array = world.alloc_one_sided_mem_region::(10); let mut rng = rand::thread_rng(); let pes = Uniform::from(0..num_pes); diff --git a/examples/array_examples/array_am.rs b/examples/array_examples/array_am.rs index a9a0ceb1..ce9fec87 100644 --- a/examples/array_examples/array_am.rs +++ b/examples/array_examples/array_am.rs @@ -32,7 +32,7 @@ impl LamellarAM for RdmaAM { }); //get the original nodes data - let local = lamellar::world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let local = lamellar::world.alloc_one_sided_mem_region::(ARRAY_LEN); let local_slice = unsafe { local.as_mut_slice().unwrap() }; local_slice[ARRAY_LEN - 1] = num_pes as u8; unsafe { @@ -67,7 +67,7 @@ fn main() { println!("creating array"); let array = UnsafeArray::::new(world.team(), ARRAY_LEN, Distribution::Block).block(); println!("creating memregion"); - let local_mem_region = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let local_mem_region = world.alloc_one_sided_mem_region::(ARRAY_LEN); println!("about to initialize array"); array.print(); if my_pe == 0 { diff --git a/examples/array_examples/array_put_get.rs b/examples/array_examples/array_put_get.rs index 44c62f05..b76ff8b2 100644 --- a/examples/array_examples/array_put_get.rs +++ b/examples/array_examples/array_put_get.rs @@ -34,11 +34,11 @@ fn main() { let shared_mem_region = world .alloc_shared_mem_region(total_len) .await - .expect("Enough memory should exist") + .into(); //Convert into abstract LamellarMemoryRegion let local_mem_region = world .alloc_one_sided_mem_region(total_len) - .expect("Enough memory should exist") + .into(); initialize_array(&block_array).await; initialize_array(&cyclic_array).await; diff --git a/examples/array_examples/dist_array_reduce.rs b/examples/array_examples/dist_array_reduce.rs index 277d8ceb..d028ba85 100644 --- a/examples/array_examples/dist_array_reduce.rs +++ b/examples/array_examples/dist_array_reduce.rs @@ -40,7 +40,7 @@ fn main() { UnsafeArray::::new(world.team(), total_len, Distribution::Block).block(); let cyclic_array = UnsafeArray::::new(world.team(), total_len, Distribution::Cyclic).block(); - let local_mem_region = world.alloc_one_sided_mem_region(total_len).expect("Enough memory should exist"); + let local_mem_region = world.alloc_one_sided_mem_region(total_len); world.barrier(); if my_pe == 0 { unsafe { diff --git a/examples/bandwidths/am_bw_get.rs b/examples/bandwidths/am_bw_get.rs index 5b3e599c..aa26857e 100644 --- a/examples/bandwidths/am_bw_get.rs +++ b/examples/bandwidths/am_bw_get.rs @@ -25,7 +25,7 @@ impl LamellarAM for DataAM { async fn exec(&self) { unsafe { // let local = lamellar::team.local_array::(self.length, 255u8); - let local = lamellar::team.alloc_one_sided_mem_region::(self.length).expect("Enough memory should exist"); + let local = lamellar::team.alloc_one_sided_mem_region::(self.length); let local_slice = local.as_mut_slice().unwrap(); local_slice[self.length - 1] = 255u8; self.array.get_unchecked(self.index, local.clone()); @@ -42,8 +42,8 @@ fn main() { let world = lamellar::LamellarWorldBuilder::new().build(); let my_pe = world.my_pe(); let num_pes = world.num_pes(); - let array = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let array = world.alloc_one_sided_mem_region::(ARRAY_LEN); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/am_group_bw_get.rs b/examples/bandwidths/am_group_bw_get.rs index 3bc2e768..2bdc358e 100644 --- a/examples/bandwidths/am_group_bw_get.rs +++ b/examples/bandwidths/am_group_bw_get.rs @@ -25,7 +25,7 @@ impl LamellarAM for DataAM { async fn exec(&self) { unsafe { // let local = lamellar::team.local_array::(self.length, 255u8); - let local = lamellar::team.alloc_one_sided_mem_region::(self.length).expect("Enough memory should exist"); + let local = lamellar::team.alloc_one_sided_mem_region::(self.length); let local_slice = local.as_mut_slice().unwrap(); local_slice[self.length - 1] = 255u8; self.array.get_unchecked(self.index, local.clone()); @@ -42,8 +42,8 @@ fn main() { let world = lamellar::LamellarWorldBuilder::new().build(); let my_pe = world.my_pe(); let num_pes = world.num_pes(); - let array = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let array = world.alloc_one_sided_mem_region::(ARRAY_LEN); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/atomic_array_get_bw.rs b/examples/bandwidths/atomic_array_get_bw.rs index 8b81978d..7120c97d 100644 --- a/examples/bandwidths/atomic_array_get_bw.rs +++ b/examples/bandwidths/atomic_array_get_bw.rs @@ -14,7 +14,7 @@ fn main() { let num_pes = world.num_pes(); let array: LocalLockArray = LocalLockArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/atomic_array_put_bw.rs b/examples/bandwidths/atomic_array_put_bw.rs index f43c539a..393dbee4 100644 --- a/examples/bandwidths/atomic_array_put_bw.rs +++ b/examples/bandwidths/atomic_array_put_bw.rs @@ -14,7 +14,7 @@ fn main() { let num_pes = world.num_pes(); let array: LocalLockArray = LocalLockArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/get_bw.rs b/examples/bandwidths/get_bw.rs index 50f08725..2cd427b7 100644 --- a/examples/bandwidths/get_bw.rs +++ b/examples/bandwidths/get_bw.rs @@ -14,9 +14,8 @@ fn main() { let num_pes = world.num_pes(); let mem_reg = world .alloc_shared_mem_region::(MEMREG_LEN) - .block() - .unwrap(); - let data = world.alloc_one_sided_mem_region::(MEMREG_LEN).expect("Enough memory should exist"); + .block(); + let data = world.alloc_one_sided_mem_region::(MEMREG_LEN); for j in 0..MEMREG_LEN as usize { unsafe { data.as_mut_slice().unwrap()[j] = my_pe as u8; diff --git a/examples/bandwidths/global_lock_atomic_array_get_bw.rs b/examples/bandwidths/global_lock_atomic_array_get_bw.rs index c4825af5..dde20842 100644 --- a/examples/bandwidths/global_lock_atomic_array_get_bw.rs +++ b/examples/bandwidths/global_lock_atomic_array_get_bw.rs @@ -15,7 +15,7 @@ fn main() { let array: GlobalLockArray = GlobalLockArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/global_lock_atomic_array_put_bw.rs b/examples/bandwidths/global_lock_atomic_array_put_bw.rs index d62cc46f..6f5d9718 100644 --- a/examples/bandwidths/global_lock_atomic_array_put_bw.rs +++ b/examples/bandwidths/global_lock_atomic_array_put_bw.rs @@ -15,7 +15,7 @@ fn main() { let num_pes = world.num_pes(); let array: GlobalLockArray = GlobalLockArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/local_lock_atomic_array_get_bw.rs b/examples/bandwidths/local_lock_atomic_array_get_bw.rs index 0616cd25..f7984d3c 100644 --- a/examples/bandwidths/local_lock_atomic_array_get_bw.rs +++ b/examples/bandwidths/local_lock_atomic_array_get_bw.rs @@ -15,7 +15,7 @@ fn main() { let array: LocalLockArray = LocalLockArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/local_lock_atomic_array_put_bw.rs b/examples/bandwidths/local_lock_atomic_array_put_bw.rs index 83463cc7..4c98d164 100644 --- a/examples/bandwidths/local_lock_atomic_array_put_bw.rs +++ b/examples/bandwidths/local_lock_atomic_array_put_bw.rs @@ -15,7 +15,7 @@ fn main() { let num_pes = world.num_pes(); let array: LocalLockArray = LocalLockArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/put_bw.rs b/examples/bandwidths/put_bw.rs index 4fc58b65..8cc2bed2 100644 --- a/examples/bandwidths/put_bw.rs +++ b/examples/bandwidths/put_bw.rs @@ -13,11 +13,10 @@ fn main() { let num_pes = world.num_pes(); let array = world .alloc_shared_mem_region::(ARRAY_LEN) - .block() - .unwrap(); + .block(); let data = world .alloc_one_sided_mem_region::(ARRAY_LEN) - .expect("Enough memory should exist"); + ; unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/readonly_array_get_bw.rs b/examples/bandwidths/readonly_array_get_bw.rs index fc200b05..5055b9ed 100644 --- a/examples/bandwidths/readonly_array_get_bw.rs +++ b/examples/bandwidths/readonly_array_get_bw.rs @@ -14,7 +14,7 @@ fn main() { let my_pe = world.my_pe(); let num_pes = world.num_pes(); let array: UnsafeArray = UnsafeArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/readonly_array_get_unchecked_bw.rs b/examples/bandwidths/readonly_array_get_unchecked_bw.rs index f8a39053..76c27105 100644 --- a/examples/bandwidths/readonly_array_get_unchecked_bw.rs +++ b/examples/bandwidths/readonly_array_get_unchecked_bw.rs @@ -13,7 +13,7 @@ fn main() { let my_pe = world.my_pe(); let num_pes = world.num_pes(); let array: UnsafeArray = UnsafeArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/unsafe_array_get_bw.rs b/examples/bandwidths/unsafe_array_get_bw.rs index 7b99bb43..e925dcc0 100644 --- a/examples/bandwidths/unsafe_array_get_bw.rs +++ b/examples/bandwidths/unsafe_array_get_bw.rs @@ -14,7 +14,7 @@ fn main() { let my_pe = world.my_pe(); let num_pes = world.num_pes(); let array: UnsafeArray = UnsafeArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/unsafe_array_get_unchecked_bw.rs b/examples/bandwidths/unsafe_array_get_unchecked_bw.rs index 448160d0..cacf9910 100644 --- a/examples/bandwidths/unsafe_array_get_unchecked_bw.rs +++ b/examples/bandwidths/unsafe_array_get_unchecked_bw.rs @@ -13,7 +13,7 @@ fn main() { let my_pe = world.my_pe(); let num_pes = world.num_pes(); let array: UnsafeArray = UnsafeArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/unsafe_array_put_bw.rs b/examples/bandwidths/unsafe_array_put_bw.rs index b4cc0212..97fc60b3 100644 --- a/examples/bandwidths/unsafe_array_put_bw.rs +++ b/examples/bandwidths/unsafe_array_put_bw.rs @@ -13,7 +13,7 @@ fn main() { let my_pe = world.my_pe(); let num_pes = world.num_pes(); let array: UnsafeArray = UnsafeArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/unsafe_array_put_unchecked_bw.rs b/examples/bandwidths/unsafe_array_put_unchecked_bw.rs index c8d425f8..20735c8b 100644 --- a/examples/bandwidths/unsafe_array_put_unchecked_bw.rs +++ b/examples/bandwidths/unsafe_array_put_unchecked_bw.rs @@ -13,7 +13,7 @@ fn main() { let my_pe = world.my_pe(); let num_pes = world.num_pes(); let array: UnsafeArray = UnsafeArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/bandwidths/unsafe_array_store_bw.rs b/examples/bandwidths/unsafe_array_store_bw.rs index c6466855..0316c4fc 100644 --- a/examples/bandwidths/unsafe_array_store_bw.rs +++ b/examples/bandwidths/unsafe_array_store_bw.rs @@ -14,7 +14,7 @@ fn main() { let my_pe = world.my_pe(); let num_pes = world.num_pes(); let array: UnsafeArray = UnsafeArray::new(&world, ARRAY_LEN * num_pes, Distribution::Block).block(); - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in data.as_mut_slice().unwrap() { *i = my_pe as u8; diff --git a/examples/kernels/am_gemm.rs b/examples/kernels/am_gemm.rs index 61396fcd..021fda64 100644 --- a/examples/kernels/am_gemm.rs +++ b/examples/kernels/am_gemm.rs @@ -97,8 +97,8 @@ struct NaiveMM { #[lamellar::am] impl LamellarAM for NaiveMM { async fn exec() { - let a = lamellar::world.alloc_one_sided_mem_region(self.a.block_size * self.a.block_size).expect("Enough memory should exist"); //the tile for the A matrix - let b = lamellar::world.alloc_one_sided_mem_region(self.b.block_size * self.b.block_size).expect("Enough memory should exist"); //the tile for the B matrix + let a = lamellar::world.alloc_one_sided_mem_region(self.a.block_size * self.a.block_size); //the tile for the A matrix + let b = lamellar::world.alloc_one_sided_mem_region(self.b.block_size * self.b.block_size); //the tile for the B matrix let b_fut = get_sub_mat(&self.b, &b); //b is remote so we will launch "gets" for this data first let a_fut = get_sub_mat(&self.a, &a); let a_b_fut = future::join(a_fut, b_fut); @@ -164,16 +164,13 @@ fn main() { let a = world .alloc_shared_mem_region::((m * n) / num_pes) - .block() - .unwrap(); + .block(); let b = world .alloc_shared_mem_region::((n * p) / num_pes) - .block() - .unwrap(); + .block(); let c = world .alloc_shared_mem_region::((m * p) / num_pes) - .block() - .unwrap(); + .block(); unsafe { let mut cnt = (((m * n) / num_pes) * my_pe) as f32; for elem in a.as_mut_slice().unwrap() { diff --git a/examples/kernels/cached_am_gemm.rs b/examples/kernels/cached_am_gemm.rs index 90473df9..f784e82c 100644 --- a/examples/kernels/cached_am_gemm.rs +++ b/examples/kernels/cached_am_gemm.rs @@ -105,7 +105,7 @@ impl LamellarAM for MatMulAM { async fn exec() { let b = lamellar::world .alloc_one_sided_mem_region::(self.b.block_size * self.b.block_size) - .expect("enough memory exists"); + ; get_sub_mat(&self.b, &b).await; // we dont actually want to alloc a shared memory region as there is an implicit barrier here // introduces sync point and potential for deadlock @@ -122,7 +122,7 @@ impl LamellarAM for MatMulAM { c.row_block = row; let sub_a = lamellar::world .alloc_one_sided_mem_region::(a.block_size * a.block_size) - .expect("enough memory exists"); + ; get_sub_mat(&a, &sub_a).await; //this should be local copy so returns immediately do_gemm(&sub_a, &b, c, self.block_size); } @@ -179,15 +179,15 @@ fn main() { let a = world .alloc_shared_mem_region::((m * n) / num_pes) .block() - .expect("enough memory exists"); + ; let b = world .alloc_shared_mem_region::((n * p) / num_pes) .block() - .expect("enough memory exists"); + ; let c = world .alloc_shared_mem_region::((m * p) / num_pes) .block() - .expect("enough memory exists"); + ; // let c2 = world.alloc_shared_mem_region::((m * p) / num_pes); unsafe { let mut cnt = my_pe as f32 * ((m * n) / num_pes) as f32; diff --git a/examples/kernels/dft_proxy.rs b/examples/kernels/dft_proxy.rs index a3b1fb25..2f7d201d 100644 --- a/examples/kernels/dft_proxy.rs +++ b/examples/kernels/dft_proxy.rs @@ -145,8 +145,7 @@ fn dft_lamellar( let spectrum_slice = unsafe { spectrum.as_slice().unwrap() }; let add_spec = world .alloc_shared_mem_region::(spectrum_slice.len()) - .block() - .unwrap(); + .block(); let timer = Instant::now(); for pe in 0..num_pes { @@ -640,18 +639,18 @@ fn main() { let partial_sum = world .alloc_shared_mem_region::(num_pes) .block() - .expect("Enough memory should exist"); + ; let partial_spectrum = world .alloc_shared_mem_region::(array_len) .block() - .expect("Enough memory should exist"); + ; let partial_signal = world .alloc_shared_mem_region::(array_len) .block() - .expect("Enough memory should exist"); - let full_signal = world.alloc_one_sided_mem_region::(global_len).expect("Enough memory should exist"); - let full_spectrum = world.alloc_one_sided_mem_region::(global_len).expect("Enough memory should exist"); - let magic = world.alloc_one_sided_mem_region::(num_pes).expect("Enough memory should exist"); + ; + let full_signal = world.alloc_one_sided_mem_region::(global_len); + let full_spectrum = world.alloc_one_sided_mem_region::(global_len); + let magic = world.alloc_one_sided_mem_region::(num_pes); let full_spectrum_array = UnsafeArray::::new(world.team(), global_len, Distribution::Block).block(); diff --git a/examples/misc/ping_pong.rs b/examples/misc/ping_pong.rs index 320e3b5c..346867bc 100644 --- a/examples/misc/ping_pong.rs +++ b/examples/misc/ping_pong.rs @@ -328,28 +328,23 @@ fn main() { let indices = world .alloc_shared_mem_region::(UPDATES_PER_CORE * world.num_threads_per_pe()) - .block() - .unwrap(); + .block(); let index_send_buffers = world .alloc_shared_mem_region::(buffer_size * num_pes) - .block() - .unwrap(); + .block(); world.barrier(); let index_recv_buffers = world .alloc_shared_mem_region::(buffer_size * num_pes) - .block() - .unwrap(); + .block(); world.barrier(); let result_send_buffers = world .alloc_shared_mem_region::(buffer_size * num_pes) - .block() - .unwrap(); + .block(); world.barrier(); let result_recv_buffers = world .alloc_shared_mem_region::(buffer_size * num_pes) - .block() - .unwrap(); + .block(); world.barrier(); let mut rng: StdRng = SeedableRng::seed_from_u64(my_pe as u64); let table_size_per_pe = 100000 * world.num_threads_per_pe(); diff --git a/examples/rdma_examples/rdma_am.rs b/examples/rdma_examples/rdma_am.rs index 7463b2da..e35159f8 100644 --- a/examples/rdma_examples/rdma_am.rs +++ b/examples/rdma_examples/rdma_am.rs @@ -34,7 +34,7 @@ impl LamellarAM for RdmaAM { //get the original nodes data let local = lamellar::world .alloc_one_sided_mem_region::(ARRAY_LEN) - .expect("Enough memory should exist"); + ; let local_slice = unsafe { local.as_mut_slice().unwrap() }; local_slice[ARRAY_LEN - 1] = lamellar::num_pes as u8; unsafe { @@ -68,7 +68,7 @@ impl LamellarAM for RdmaLocalMRAM { ); //get the original nodes data - let local = lamellar::world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let local = lamellar::world.alloc_one_sided_mem_region::(ARRAY_LEN); let local_slice = unsafe { local.as_mut_slice().unwrap() }; local_slice[ARRAY_LEN - 1] = lamellar::num_pes as u8; unsafe { @@ -110,8 +110,8 @@ fn main() { let array = world .alloc_shared_mem_region::(ARRAY_LEN) .block() - .expect("Enough memory should exist"); - let local_array = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + ; + let local_array = world.alloc_one_sided_mem_region::(ARRAY_LEN); unsafe { for i in array.as_mut_slice().unwrap() { *i = 255_u8; diff --git a/examples/rdma_examples/rdma_get.rs b/examples/rdma_examples/rdma_get.rs index 26a27cea..17ced9e1 100644 --- a/examples/rdma_examples/rdma_get.rs +++ b/examples/rdma_examples/rdma_get.rs @@ -22,12 +22,12 @@ fn main() { let array = world .alloc_shared_mem_region::(ARRAY_LEN) .block() - .expect("Enough memory should exist"); + ; let array_slice = unsafe { array.as_slice().unwrap() }; //we can unwrap because we know array is local // instatiates a local array whos memory is registered with // the underlying network device, so that it can be used // as the src buffer in a put or as the dst buffer in a get - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); let data_slice = unsafe { data.as_mut_slice().unwrap() }; //we can unwrap because we know data is local for elem in data_slice.iter_mut() { *elem = my_pe as u8; diff --git a/examples/rdma_examples/rdma_put.rs b/examples/rdma_examples/rdma_put.rs index 87e94ba8..fba7f55d 100644 --- a/examples/rdma_examples/rdma_put.rs +++ b/examples/rdma_examples/rdma_put.rs @@ -19,13 +19,13 @@ fn main() { if num_pes > 1 { // instatiates a shared memory region on every PE in world // all other pes can put/get into this region - let array = world.alloc_shared_mem_region::(ARRAY_LEN).block().expect("Enough memory should exist"); + let array = world.alloc_shared_mem_region::(ARRAY_LEN).block(); let array_slice = unsafe { array.as_slice().unwrap() }; //we can unwrap because we know array is local // instatiates a local array whos memory is registered with // the underlying network device, so that it can be used // as the src buffer in a put or as the dst buffer in a get - let data = world.alloc_one_sided_mem_region::(ARRAY_LEN).expect("Enough memory should exist"); + let data = world.alloc_one_sided_mem_region::(ARRAY_LEN); let data_slice = unsafe { data.as_mut_slice().unwrap() }; //we can unwrap because we know data is local for elem in data_slice { *elem = my_pe as u8; diff --git a/src/array.rs b/src/array.rs index 08cfda22..745015ec 100644 --- a/src/array.rs +++ b/src/array.rs @@ -194,6 +194,9 @@ crate::inventory::collect!(ReduceKey); // lamellar_impl::generate_reductions_for_type_rt!(true, u8, usize); // lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, usize); +// lamellar_impl::generate_reductions_for_type_rt!(true, isize); +// lamellar_impl::generate_ops_for_type_rt!(true, true, true, isize); + // lamellar_impl::generate_reductions_for_type_rt!(false, f32); // lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32); @@ -302,7 +305,7 @@ impl LamellarRead for &[T] {} impl TeamFrom<&T> for LamellarArrayRdmaInput { /// Constructs a single element [OneSidedMemoryRegion] and copies `val` into it fn team_from(val: &T, team: &Pin>) -> Self { - let buf: OneSidedMemoryRegion = team.alloc_one_sided_mem_region_or_panic(1); + let buf: OneSidedMemoryRegion = team.alloc_one_sided_mem_region(1); unsafe { buf.as_mut_slice().expect("Data should exist on PE")[0] = val.clone(); } @@ -313,7 +316,7 @@ impl TeamFrom<&T> for LamellarArrayRdmaInput { impl TeamFrom for LamellarArrayRdmaInput { /// Constructs a single element [OneSidedMemoryRegion] and copies `val` into it fn team_from(val: T, team: &Pin>) -> Self { - let buf: OneSidedMemoryRegion = team.alloc_one_sided_mem_region_or_panic(1); + let buf: OneSidedMemoryRegion = team.alloc_one_sided_mem_region(1); unsafe { buf.as_mut_slice().expect("Data should exist on PE")[0] = val; } @@ -324,7 +327,7 @@ impl TeamFrom for LamellarArrayRdmaInput { impl TeamFrom> for LamellarArrayRdmaInput { /// Constructs a [OneSidedMemoryRegion] equal in length to `vals` and copies `vals` into it fn team_from(vals: Vec, team: &Pin>) -> Self { - let buf: OneSidedMemoryRegion = team.alloc_one_sided_mem_region_or_panic(vals.len()); + let buf: OneSidedMemoryRegion = team.alloc_one_sided_mem_region(vals.len()); unsafe { std::ptr::copy_nonoverlapping( vals.as_ptr(), @@ -338,7 +341,7 @@ impl TeamFrom> for LamellarArrayRdmaInput { impl TeamFrom<&Vec> for LamellarArrayRdmaInput { /// Constructs a [OneSidedMemoryRegion] equal in length to `vals` and copies `vals` into it fn team_from(vals: &Vec, team: &Pin>) -> Self { - let buf: OneSidedMemoryRegion = team.alloc_one_sided_mem_region_or_panic(vals.len()); + let buf: OneSidedMemoryRegion = team.alloc_one_sided_mem_region(vals.len()); unsafe { std::ptr::copy_nonoverlapping( vals.as_ptr(), @@ -352,7 +355,7 @@ impl TeamFrom<&Vec> for LamellarArrayRdmaInput { impl TeamFrom<&[T]> for LamellarArrayRdmaInput { /// Constructs a [OneSidedMemoryRegion] equal in length to `vals` and copies `vals` into it fn team_from(vals: &[T], team: &Pin>) -> Self { - let buf: OneSidedMemoryRegion = team.alloc_one_sided_mem_region_or_panic(vals.len()); + let buf: OneSidedMemoryRegion = team.alloc_one_sided_mem_region(vals.len()); unsafe { std::ptr::copy_nonoverlapping( vals.as_ptr(), @@ -1051,38 +1054,6 @@ pub trait LamellarArray: private::LamellarArrayPrivate + ActiveMessa ///``` fn team_rt(&self) -> Pin>; //todo turn this into Arc - // #[doc(alias("One-sided", "onesided"))] - // /// Return the current PE of the calling thread - // /// - // /// # One-sided Operation - // /// the result is returned only on the calling PE - // /// - // /// # Examples - // ///``` - // /// use lamellar::array::prelude::*; - // /// let world = LamellarWorldBuilder::new().build(); - // /// let array: LocalLockArray = LocalLockArray::new(&world,100,Distribution::Cyclic).block(); - // /// - // /// assert_eq!(world.my_pe(),array.my_pe()); - // ///``` - // fn my_pe(&self) -> usize; - - // #[doc(alias("One-sided", "onesided"))] - // /// Return the number of PEs containing data for this array - // /// - // /// # One-sided Operation - // /// the result is returned only on the calling PE - // /// - // /// # Examples - // ///``` - // /// use lamellar::array::prelude::*; - // /// let world = LamellarWorldBuilder::new().build(); - // /// let array: LocalLockArray = LocalLockArray::new(&world,100,Distribution::Cyclic).block(); - // /// - // /// assert_eq!(world.num_pes(),array.num_pes()); - // ///``` - // fn num_pes(&self) -> usize; - #[doc(alias("One-sided", "onesided"))] /// Return the total number of elements in this array /// @@ -1110,84 +1081,12 @@ pub trait LamellarArray: private::LamellarArrayPrivate + ActiveMessa ///```no_run //assert is for 4 PEs /// use lamellar::array::prelude::*; /// let world = LamellarWorldBuilder::new().build(); - /// let array: ReadOnlyArray = ReadOnlyArray::new(&world,100,Distribution::Cyclic).block(); + /// let array = ReadOnlyArray::::new(&world,100,Distribution::Cyclic).block(); /// /// assert_eq!(25,array.num_elems_local()); ///``` fn num_elems_local(&self) -> usize; - /// Change the distribution this array handle uses to index into the data of the array. - /// - /// This is a one-sided call and does not redistribute the actual data, it simply changes how the array is indexed for this particular handle. - /// - /// # Examples - ///``` - /// use lamellar::array::prelude::*; - /// let world = LamellarWorldBuilder::new().build(); - /// let array: UnsafeArray = UnsafeArray::new(&world,100,Distribution::Cyclic).block(); - /// // do something interesting... or not - /// let block_view = array.clone().use_distribution(Distribution::Block).block(); - ///``` - // fn use_distribution(self, distribution: Distribution) -> Self; - - // #[doc(alias = "Collective")] - // /// Global synchronization method which blocks calling thread until all PEs in the owning Array data have entered the barrier - // /// - // /// # Collective Operation - // /// Requires all PEs associated with the array to enter the barrier, otherwise deadlock will occur - // /// - // /// # Examples - // ///``` - // /// use lamellar::array::prelude::*; - // /// let world = LamellarWorldBuilder::new().build(); - // /// let array: ReadOnlyArray = ReadOnlyArray::new(&world,100,Distribution::Cyclic).block(); - // /// - // /// array.barrier(); - // ///``` - // fn barrier(&self); - - // #[doc(alias("One-sided", "onesided"))] - // /// blocks calling thread until all remote tasks (e.g. element wise operations) - // /// initiated by the calling PE have completed. - // /// - // /// # One-sided Operation - // /// this is not a distributed synchronization primitive (i.e. it has no knowledge of a Remote PEs tasks), the calling thread will only wait for tasks - // /// to finish that were initiated by the calling PE itself - // /// - // /// # Examples - // ///``` - // /// use lamellar::array::prelude::*; - // /// let world = LamellarWorldBuilder::new().build(); - // /// let array: AtomicArray = AtomicArray::new(&world,100,Distribution::Cyclic).block(); - // /// - // /// for i in 0..100{ - // /// array.add(i,1); - // /// } - // /// array.wait_all(); //block until the previous add operations have finished - // ///``` - // fn wait_all(&self); - - // #[doc(alias("One-sided", "onesided"))] - // /// Run a future to completion on the current thread - // /// - // /// This function will block the caller until the given future has completed, the future is executed within the Lamellar threadpool - // /// - // /// Users can await any future, including those returned from lamellar remote operations - // /// - // /// # One-sided Operation - // /// this is not a distributed synchronization primitive and only blocks the calling thread until the given future has completed on the calling PE - // /// - // /// # Examples - // ///``` - // /// use lamellar::array::prelude::*; - // /// let world = LamellarWorldBuilder::new().build(); - // /// let array: AtomicArray = AtomicArray::new(&world,100,Distribution::Cyclic).block(); - // /// - // /// let request = array.fetch_add(10,1000); //fetch index 10 and add 1000 to it - // /// let result = array.block_on(request); //block until am has executed - // /// // we also could have used world.block_on() or team.block_on() - // ///``` - // fn block_on(&self, f: F) -> F::Output; #[doc(alias("One-sided", "onesided"))] /// Given a global index, calculate the PE and offset on that PE where the element actually resides. @@ -1307,30 +1206,6 @@ pub trait LamellarArray: private::LamellarArrayPrivate + ActiveMessa /// assert_eq!(index , 15); ///``` fn last_global_index_for_pe(&self, pe: usize) -> Option; - - // /// Returns a distributed iterator for the LamellarArray - // /// must be called accross all pes containing data in the array - // /// iteration on a pe only occurs on the data which is locally present - // /// with all pes iterating concurrently - // /// blocking: true - // pub fn dist_iter(&self) -> DistIter<'static, T>; - - // /// Returns a distributed iterator for the LamellarArray - // /// must be called accross all pes containing data in the array - // /// iteration on a pe only occurs on the data which is locally present - // /// with all pes iterating concurrently - // pub fn dist_iter_mut(&self) -> DistIterMut<'static, T>; - - // /// Returns an iterator for the LamellarArray, all iteration occurs on the PE - // /// where this was called, data that is not local to the PE is automatically - // /// copied and transferred - // pub fn onesided_iter(&self) -> OneSidedIter<'_, T> ; - - // /// Returns an iterator for the LamellarArray, all iteration occurs on the PE - // /// where this was called, data that is not local to the PE is automatically - // /// copied and transferred, array data is buffered to more efficiently make - // /// use of network buffers - // pub fn buffered_onesided_iter(&self, buf_size: usize) -> OneSidedIter<'_, T> ; } /// Sub arrays are contiguous subsets of the elements of an array. diff --git a/src/array/atomic.rs b/src/array/atomic.rs index 2e6ae755..02ddd895 100644 --- a/src/array/atomic.rs +++ b/src/array/atomic.rs @@ -904,9 +904,9 @@ impl AtomicArray { ///``` /// use lamellar::array::prelude::*; /// let world = LamellarWorldBuilder::new().build(); - /// let array: AtomicArray = AtomicArray::new(&world,100,Distribution::Cyclic).block(); + /// let array = AtomicArray::::new(&world,100,Distribution::Cyclic).block(); /// // do something interesting... or not - /// let block_view = array.clone().use_distribution(Distribution::Block).block(); + /// let block_view = array.clone().use_distribution(Distribution::Block); ///``` pub fn use_distribution(self, distribution: Distribution) -> Self { match self { @@ -928,7 +928,7 @@ impl AtomicArray { /// use lamellar::array::prelude::*; /// let world = LamellarWorldBuilder::new().build(); /// let my_pe = world.my_pe(); - /// let array: AtomicArray = AtomicArray::new(&world,100,Distribution::Cyclic).block(); + /// let array = AtomicArray::::new(&world,100,Distribution::Cyclic).block(); /// /// let local_data = array.local_data(); /// println!("PE{my_pe} local_data[0]: {:?}",local_data.at(0).load()); diff --git a/src/array/atomic/handle.rs b/src/array/atomic/handle.rs index d6526569..3a2790e9 100644 --- a/src/array/atomic/handle.rs +++ b/src/array/atomic/handle.rs @@ -69,7 +69,7 @@ impl AtomicArrayHandle { /// use lamellar::array::prelude::*; /// /// let world = LamellarWorldBuilder::new().build(); - /// let array: AtomicArray = AtomicArray::new(&world,100,Distribution::Cyclic).block(); + /// let array = AtomicArray::::new(&world,100,Distribution::Cyclic).block(); pub fn block(mut self) -> AtomicArray { self.launched = true; self.inner.set_launched(true); @@ -90,7 +90,7 @@ impl AtomicArrayHandle { /// use lamellar::array::prelude::*; /// /// let world = LamellarWorldBuilder::new().build(); - /// let array_task: AtomicArray = AtomicArray::new(&world,100,Distribution::Cyclic).spawn(); + /// let array_task = AtomicArray::::new(&world,100,Distribution::Cyclic).spawn(); /// // do some other work /// let array = array_task.block(); #[must_use = "this function returns a future [LamellarTask] used to poll for completion. Call '.await' on the returned future in an async context or '.block()' in a non async context. Alternatively it may be acceptable to call '.block()' instead of 'spawn()' on this handle"] diff --git a/src/array/generic_atomic/handle.rs b/src/array/generic_atomic/handle.rs index 8502bdcb..f451d0b6 100644 --- a/src/array/generic_atomic/handle.rs +++ b/src/array/generic_atomic/handle.rs @@ -13,21 +13,6 @@ use pin_project::{pin_project, pinned_drop}; #[must_use = " GenericAtomicArray 'new' handles do nothing unless polled or awaited, or 'spawn()' or 'block()' are called"] #[pin_project(PinnedDrop)] #[doc(alias = "Collective")] -/// This is a handle representing the operation of creating a new [GenericAtomicArray]. -/// This handled must either be awaited in an async context or blocked on in a non-async context for the operation to be performed. -/// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the GenericAtomicArray's team, only returning once every PE in the team has completed the call. -/// -/// # Collective Operation -/// Requires all PEs associated with the `GenericAtomicArray` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) -/// -/// # Examples -/// ``` -/// use lamellar::array::prelude::*; -/// -/// let world = LamellarWorldBuilder::new().build(); -/// -/// let array: GenericAtomicArray = GenericAtomicArray::new(&world,100,Distribution::Cyclic).block(); -/// ``` pub(crate) struct GenericAtomicArrayHandle { pub(crate) team: Pin>, pub(crate) launched: bool, @@ -45,15 +30,7 @@ impl PinnedDrop for GenericAtomicArrayHandle { } impl GenericAtomicArrayHandle { - /// Used to drive creation of a new GenericAtomicArray - /// # Examples - /// - ///``` - /// use lamellar::array::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// let array: GenericAtomicArray = GenericAtomicArray::new(&world,100,Distribution::Cyclic).block(); - pub fn block(mut self) -> GenericAtomicArray { + pub(crate) fn block(mut self) -> GenericAtomicArray { self.launched = true; RuntimeWarning::BlockingCall( "GenericAtomicArrayHandle::block", @@ -63,20 +40,8 @@ impl GenericAtomicArrayHandle { self.team.clone().block_on(self) } - /// This method will spawn the creation of the GenericAtomicArray on the work queue - /// - /// This function returns a handle that can be used to wait for the operation to complete - /// /// # Examples - /// - ///``` - /// use lamellar::array::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// let array_task: GenericAtomicArray = GenericAtomicArray::new(&world,100,Distribution::Cyclic).spawn(); - /// // do some other work - /// let array = array_task.block(); #[must_use = "this function returns a future [LamellarTask] used to poll for completion. Call '.await' on the returned future in an async context or '.block()' in a non async context. Alternatively it may be acceptable to call '.block()' instead of 'spawn()' on this handle"] - pub fn spawn(mut self) -> LamellarTask> { + pub(crate) fn spawn(mut self) -> LamellarTask> { self.launched = true; self.team.clone().spawn(self) } diff --git a/src/array/generic_atomic/rdma.rs b/src/array/generic_atomic/rdma.rs index d4155b4e..1243e754 100644 --- a/src/array/generic_atomic/rdma.rs +++ b/src/array/generic_atomic/rdma.rs @@ -24,7 +24,7 @@ impl LamellarArrayInternalGet for GenericAtomicArray { } } unsafe fn internal_at(&self, index: usize) -> ArrayRdmaAtHandle { - let buf: OneSidedMemoryRegion = self.array.team_rt().alloc_one_sided_mem_region_or_panic(1); + let buf: OneSidedMemoryRegion = self.array.team_rt().alloc_one_sided_mem_region(1); let req = self.exec_am_local(InitGetAm { array: self.clone(), index: index, diff --git a/src/array/global_lock_atomic.rs b/src/array/global_lock_atomic.rs index 555a1dac..d03208de 100644 --- a/src/array/global_lock_atomic.rs +++ b/src/array/global_lock_atomic.rs @@ -358,9 +358,9 @@ impl GlobalLockArray { ///``` /// use lamellar::array::prelude::*; /// let world = LamellarWorldBuilder::new().build(); - /// let array: GlobalLockArray = GlobalLockArray::new(&world,100,Distribution::Cyclic).block(); + /// let array = GlobalLockArray::::new(&world,100,Distribution::Cyclic).block(); /// // do something interesting... or not - /// let block_view = array.clone().use_distribution(Distribution::Block).block(); + /// let block_view = array.clone().use_distribution(Distribution::Block); ///``` pub fn use_distribution(self, distribution: Distribution) -> Self { GlobalLockArray { diff --git a/src/array/global_lock_atomic/handle.rs b/src/array/global_lock_atomic/handle.rs index db471451..573e279e 100644 --- a/src/array/global_lock_atomic/handle.rs +++ b/src/array/global_lock_atomic/handle.rs @@ -80,7 +80,7 @@ impl GlobalLockArrayHandle { /// use lamellar::array::prelude::*; /// /// let world = LamellarWorldBuilder::new().build(); - /// let array_task: GlobalLockArray = GlobalLockArray::new(&world,100,Distribution::Cyclic).spawn(); + /// let array_task = GlobalLockArray::::new(&world,100,Distribution::Cyclic).spawn(); /// // do some other work /// let array = array_task.block(); #[must_use = "this function returns a future [LamellarTask] used to poll for completion. Call '.await' on the returned future in an async context or '.block()' in a non async context. Alternatively it may be acceptable to call '.block()' instead of 'spawn()' on this handle"] diff --git a/src/array/global_lock_atomic/rdma.rs b/src/array/global_lock_atomic/rdma.rs index 6e224233..dddbefcc 100644 --- a/src/array/global_lock_atomic/rdma.rs +++ b/src/array/global_lock_atomic/rdma.rs @@ -34,7 +34,7 @@ impl LamellarArrayInternalGet for GlobalLockArray { } } unsafe fn internal_at(&self, index: usize) -> ArrayRdmaAtHandle { - let buf: OneSidedMemoryRegion = self.array.team_rt().alloc_one_sided_mem_region_or_panic(1); + let buf: OneSidedMemoryRegion = self.array.team_rt().alloc_one_sided_mem_region(1); let req = self.exec_am_local_tg(InitGetAm { array: self.clone(), index: index, diff --git a/src/array/iterator/distributed_iterator.rs b/src/array/iterator/distributed_iterator.rs index 82abb3f7..b14526db 100644 --- a/src/array/iterator/distributed_iterator.rs +++ b/src/array/iterator/distributed_iterator.rs @@ -512,12 +512,12 @@ pub trait DistributedIterator: SyncSend + InnerIter + 'static { /// use lamellar::array::prelude::*; /// /// let world = LamellarWorldBuilder::new().build(); - /// let array: ReadOnlyArray = ReadOnlyArray::new(&world,100,Distribution::Block).block(); + /// let array = ReadOnlyArray::::new(&world,100,Distribution::Block).block(); /// /// let req = array.dist_iter() /// .map(|elem| *elem) //because of constraints of collect we need to convert from &usize to usize /// .filter(|elem| *elem < 10) // (if we didnt do the previous map we would have needed to do **elem) - /// .collect::>(Distribution::Block).block(); + /// .collect::>(Distribution::Block); /// let new_array = array.block_on(req); //wait on the collect request to get the new array ///``` #[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "] @@ -547,12 +547,12 @@ pub trait DistributedIterator: SyncSend + InnerIter + 'static { /// use lamellar::array::prelude::*; /// /// let world = LamellarWorldBuilder::new().build(); - /// let array: ReadOnlyArray = ReadOnlyArray::new(&world,100,Distribution::Block).block(); + /// let array = ReadOnlyArray::::new(&world,100,Distribution::Block).block(); /// /// let req = array.dist_iter() /// .map(|elem| *elem) //because of constraints of collect we need to convert from &usize to usize /// .filter(|elem| * elem < 10) // (if we didnt do the previous map we would have needed to do **elem) - /// .collect::>(Distribution::Block).block(); + /// .collect::>(Distribution::Block); /// let new_array = array.block_on(req); //wait on the collect request to get the new array ///``` #[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "] @@ -590,7 +590,7 @@ pub trait DistributedIterator: SyncSend + InnerIter + 'static { /// use lamellar::array::prelude::*; /// // initialize a world and an atomic array /// let world = LamellarWorldBuilder::new().build(); - /// let array: AtomicArray = AtomicArray::new(&world,100,Distribution::Block).block(); + /// let array = AtomicArray::::new(&world,100,Distribution::Block).block(); /// /// // clone the array; this doesn't duplicate the underlying /// // data but it does create a second pointer that we can @@ -603,7 +603,7 @@ pub trait DistributedIterator: SyncSend + InnerIter + 'static { /// move |elem| /// array_clone /// .fetch_add(elem.load(),1000)) - /// .collect_async::,_>(Distribution::Cyclic).block(); + /// .collect_async::,_>(Distribution::Cyclic); /// let _new_array = array.block_on(req); ///``` #[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "] @@ -638,7 +638,7 @@ pub trait DistributedIterator: SyncSend + InnerIter + 'static { /// use lamellar::array::prelude::*; /// // initialize a world and an atomic array /// let world = LamellarWorldBuilder::new().build(); - /// let array: AtomicArray = AtomicArray::new(&world,100,Distribution::Block).block(); + /// let array = AtomicArray::::new(&world,100,Distribution::Block).block(); /// /// // clone the array; this doesn't duplicate the underlying /// // data but it does create a second pointer that we can @@ -651,7 +651,7 @@ pub trait DistributedIterator: SyncSend + InnerIter + 'static { /// move |elem| /// array_clone /// .fetch_add(elem.load(),1000)) - /// .collect_async_with_schedule::,_>(Schedule::Dynamic, Distribution::Cyclic).block(); + /// .collect_async_with_schedule::,_>(Schedule::Dynamic, Distribution::Cyclic); /// let _new_array = array.block_on(req); ///``` #[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "] diff --git a/src/array/iterator/local_iterator.rs b/src/array/iterator/local_iterator.rs index cfa84852..1cc6194b 100644 --- a/src/array/iterator/local_iterator.rs +++ b/src/array/iterator/local_iterator.rs @@ -493,10 +493,10 @@ pub trait LocalIterator: SyncSend + InnerIter + 'static { /// use lamellar::array::prelude::*; /// /// let world = LamellarWorldBuilder::new().build(); - /// let array: AtomicArray = AtomicArray::new(&world,100,Distribution::Block).block(); + /// let array = AtomicArray::::new(&world,100,Distribution::Block).block(); /// /// let array_clone = array.clone(); - /// let req = array.local_iter().map(|elem|elem.load()).filter(|elem| elem % 2 == 0).collect::>(Distribution::Cyclic).block(); + /// let req = array.local_iter().map(|elem|elem.load()).filter(|elem| elem % 2 == 0).collect::>(Distribution::Cyclic); /// let new_array = array.block_on(req); ///``` #[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."] @@ -519,10 +519,10 @@ pub trait LocalIterator: SyncSend + InnerIter + 'static { /// use lamellar::array::prelude::*; /// /// let world = LamellarWorldBuilder::new().build(); - /// let array: AtomicArray = AtomicArray::new(&world,100,Distribution::Block).block(); + /// let array = AtomicArray::::new(&world,100,Distribution::Block).block(); /// /// let array_clone = array.clone(); - /// let req = array.local_iter().map(|elem|elem.load()).filter(|elem| elem % 2 == 0).collect_with_schedule::>(Schedule::WorkStealing,Distribution::Cyclic).block(); + /// let req = array.local_iter().map(|elem|elem.load()).filter(|elem| elem % 2 == 0).collect_with_schedule::>(Schedule::WorkStealing,Distribution::Cyclic); /// let new_array = array.block_on(req); ///``` #[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."] @@ -560,7 +560,7 @@ pub trait LocalIterator: SyncSend + InnerIter + 'static { /// use lamellar::array::prelude::*; /// // initialize a world and an atomic array /// let world = LamellarWorldBuilder::new().build(); - /// let array: AtomicArray = AtomicArray::new(&world,100,Distribution::Block).block(); + /// let array = AtomicArray::::new(&world,100,Distribution::Block).block(); /// /// // clone the array; this doesn't duplicate the underlying /// // data but it does create a second pointer that we can @@ -573,7 +573,7 @@ pub trait LocalIterator: SyncSend + InnerIter + 'static { /// move |elem| /// array_clone /// .fetch_add(elem.load(),1000)) - /// .collect_async::,_>(Distribution::Cyclic).block(); + /// .collect_async::,_>(Distribution::Cyclic); /// let _new_array = array.block_on(req); ///``` #[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."] @@ -608,7 +608,7 @@ pub trait LocalIterator: SyncSend + InnerIter + 'static { /// use lamellar::array::prelude::*; /// // initialize a world and an atomic array /// let world = LamellarWorldBuilder::new().build(); - /// let array: AtomicArray = AtomicArray::new(&world,100,Distribution::Block).block(); + /// let array = AtomicArray::::new(&world,100,Distribution::Block).block(); /// /// // clone the array; this doesn't duplicate the underlying /// // data but it does create a second pointer that we can @@ -621,7 +621,7 @@ pub trait LocalIterator: SyncSend + InnerIter + 'static { /// move |elem| /// array_clone /// .fetch_add(elem.load(),1000)) - /// .collect_async_with_schedule::,_>(Schedule::Dynamic, Distribution::Cyclic).block(); + /// .collect_async_with_schedule::,_>(Schedule::Dynamic, Distribution::Cyclic); /// let _new_array = array.block_on(req); ///``` #[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."] diff --git a/src/array/iterator/one_sided_iterator.rs b/src/array/iterator/one_sided_iterator.rs index 14ada186..264416ac 100644 --- a/src/array/iterator/one_sided_iterator.rs +++ b/src/array/iterator/one_sided_iterator.rs @@ -430,7 +430,7 @@ impl<'a, T: Dist + 'static, A: LamellarArrayInternalGet> OneSidedIter<'a, T, team: Pin>, buf_size: usize, ) -> OneSidedIter<'a, T, A> { - let buf_0 = team.alloc_one_sided_mem_region_or_panic(buf_size); + let buf_0 = team.alloc_one_sided_mem_region(buf_size); // potentially unsafe depending on the array type (i.e. UnsafeArray - which requries unsafe to construct an iterator), // but safe with respect to the buf_0 as self is the only reference diff --git a/src/array/iterator/one_sided_iterator/chunks.rs b/src/array/iterator/one_sided_iterator/chunks.rs index 2beadc80..40f53660 100644 --- a/src/array/iterator/one_sided_iterator/chunks.rs +++ b/src/array/iterator/one_sided_iterator/chunks.rs @@ -50,7 +50,7 @@ where ) -> (OneSidedMemoryRegion, ArrayRdmaHandle) { // println!(" get chunk of len: {:?}", size); let mem_region: OneSidedMemoryRegion = - array.team_rt().alloc_one_sided_mem_region_or_panic(size); + array.team_rt().alloc_one_sided_mem_region(size); // potentially unsafe depending on the array type (i.e. UnsafeArray - which requries unsafe to construct an iterator), // but safe with respect to the mem_region as this is the only reference let mut req = unsafe { array.internal_get(index, &mem_region) }; diff --git a/src/array/local_lock_atomic.rs b/src/array/local_lock_atomic.rs index a2d684cc..7f418f52 100644 --- a/src/array/local_lock_atomic.rs +++ b/src/array/local_lock_atomic.rs @@ -358,9 +358,9 @@ impl LocalLockArray { ///``` /// use lamellar::array::prelude::*; /// let world = LamellarWorldBuilder::new().build(); - /// let array: LocalLockArray = LocalLockArray::new(&world,100,Distribution::Cyclic).block(); + /// let array = LocalLockArray::::new(&world,100,Distribution::Cyclic).block(); /// // do something interesting... or not - /// let block_view = array.clone().use_distribution(Distribution::Block).block(); + /// let block_view = array.clone().use_distribution(Distribution::Block); ///``` pub fn use_distribution(self, distribution: Distribution) -> Self { LocalLockArray { diff --git a/src/array/local_lock_atomic/handle.rs b/src/array/local_lock_atomic/handle.rs index 3122cbbd..582b61d2 100644 --- a/src/array/local_lock_atomic/handle.rs +++ b/src/array/local_lock_atomic/handle.rs @@ -78,7 +78,7 @@ impl LocalLockArrayHandle { /// use lamellar::array::prelude::*; /// /// let world = LamellarWorldBuilder::new().build(); - /// let array_task: LocalLockArray = LocalLockArray::new(&world,100,Distribution::Cyclic).spawn(); + /// let array_task = LocalLockArray::::new(&world,100,Distribution::Cyclic).spawn(); /// // do some other work /// let array = array_task.block(); #[must_use = "this function returns a future [LamellarTask] used to poll for completion. Call '.await' on the returned future in an async context or '.block()' in a non async context. Alternatively it may be acceptable to call '.block()' instead of 'spawn()' on this handle"] diff --git a/src/array/local_lock_atomic/rdma.rs b/src/array/local_lock_atomic/rdma.rs index 319291dd..3b3cf9d1 100644 --- a/src/array/local_lock_atomic/rdma.rs +++ b/src/array/local_lock_atomic/rdma.rs @@ -27,7 +27,7 @@ impl LamellarArrayInternalGet for LocalLockArray { } } unsafe fn internal_at(&self, index: usize) -> ArrayRdmaAtHandle { - let buf: OneSidedMemoryRegion = self.array.team_rt().alloc_one_sided_mem_region_or_panic(1); + let buf: OneSidedMemoryRegion = self.array.team_rt().alloc_one_sided_mem_region(1); let req = self.exec_am_local(InitGetAm { array: self.clone(), index: index, diff --git a/src/array/native_atomic/handle.rs b/src/array/native_atomic/handle.rs index fb985636..b049348e 100644 --- a/src/array/native_atomic/handle.rs +++ b/src/array/native_atomic/handle.rs @@ -13,21 +13,6 @@ use pin_project::{pin_project, pinned_drop}; #[must_use = " NativeAtomicArray 'new' handles do nothing unless polled or awaited, or 'spawn()' or 'block()' are called"] #[pin_project(PinnedDrop)] #[doc(alias = "Collective")] -/// This is a handle representing the operation of creating a new [NativeAtomicArray]. -/// This handled must either be awaited in an async context or blocked on in a non-async context for the operation to be performed. -/// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the NativeAtomicArray's team, only returning once every PE in the team has completed the call. -/// -/// # Collective Operation -/// Requires all PEs associated with the `NativeAtomicArray` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) -/// -/// # Examples -/// ``` -/// use lamellar::array::prelude::*; -/// -/// let world = LamellarWorldBuilder::new().build(); -/// -/// let array: NativeAtomicArray = NativeAtomicArray::new(&world,100,Distribution::Cyclic).block(); -/// ``` pub(crate) struct NativeAtomicArrayHandle { pub(crate) team: Pin>, pub(crate) launched: bool, @@ -45,15 +30,7 @@ impl PinnedDrop for NativeAtomicArrayHandle { } impl NativeAtomicArrayHandle { - /// Used to drive creation of a new NativeAtomicArray - /// # Examples - /// - ///``` - /// use lamellar::array::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// let array: NativeAtomicArray = NativeAtomicArray::new(&world,100,Distribution::Cyclic).block(); - pub fn block(mut self) -> NativeAtomicArray { + pub(crate) fn block(mut self) -> NativeAtomicArray { self.launched = true; RuntimeWarning::BlockingCall( "NativeAtomicArrayHandle::block", @@ -63,20 +40,9 @@ impl NativeAtomicArrayHandle { self.team.clone().block_on(self) } - /// This method will spawn the creation of the NativeAtomicArray on the work queue - /// - /// This function returns a handle that can be used to wait for the operation to complete - /// /// # Examples - /// - ///``` - /// use lamellar::array::prelude::*; - /// - /// let world = LamellarWorldBuilder::new().build(); - /// let array_task: NativeAtomicArray = NativeAtomicArray::new(&world,100,Distribution::Cyclic).spawn(); - /// // do some other work - /// let array = array_task.block(); + #[must_use = "this function returns a future [LamellarTask] used to poll for completion. Call '.await' on the returned future in an async context or '.block()' in a non async context. Alternatively it may be acceptable to call '.block()' instead of 'spawn()' on this handle"] - pub fn spawn(mut self) -> LamellarTask> { + pub(crate) fn spawn(mut self) -> LamellarTask> { self.launched = true; self.team.clone().spawn(self) } diff --git a/src/array/native_atomic/rdma.rs b/src/array/native_atomic/rdma.rs index 6d1cdfde..84553794 100644 --- a/src/array/native_atomic/rdma.rs +++ b/src/array/native_atomic/rdma.rs @@ -24,7 +24,7 @@ impl LamellarArrayInternalGet for NativeAtomicArray { } } unsafe fn internal_at(&self, index: usize) -> ArrayRdmaAtHandle { - let buf: OneSidedMemoryRegion = self.array.team_rt().alloc_one_sided_mem_region_or_panic(1); + let buf: OneSidedMemoryRegion = self.array.team_rt().alloc_one_sided_mem_region(1); let req = self.exec_am_local(InitGetAm { array: self.clone(), index: index, diff --git a/src/array/read_only.rs b/src/array/read_only.rs index 9e359de1..25a90c94 100644 --- a/src/array/read_only.rs +++ b/src/array/read_only.rs @@ -103,7 +103,7 @@ impl ReadOnlyArray { /// let world = LamellarWorldBuilder::new().build(); /// let array: ReadOnlyArray = ReadOnlyArray::new(&world,100,Distribution::Cyclic).block(); /// // do something interesting... or not - /// let block_view = array.clone().use_distribution(Distribution::Block).block(); + /// let block_view = array.clone().use_distribution(Distribution::Block); ///``` pub fn use_distribution(self, distribution: Distribution) -> Self { ReadOnlyArray { diff --git a/src/array/read_only/handle.rs b/src/array/read_only/handle.rs index 95691682..ccd17e36 100644 --- a/src/array/read_only/handle.rs +++ b/src/array/read_only/handle.rs @@ -72,7 +72,7 @@ impl ReadOnlyArrayHandle { /// use lamellar::array::prelude::*; /// /// let world = LamellarWorldBuilder::new().build(); - /// let array_task: ReadOnlyArray = ReadOnlyArray::new(&world,100,Distribution::Cyclic).spawn(); + /// let array_task = ReadOnlyArray::::new(&world,100,Distribution::Cyclic).spawn(); /// // do some other work /// let array = array_task.block(); #[must_use = "this function returns a future [LamellarTask] used to poll for completion. Call '.await' on the returned future in an async context or '.block()' in a non async context. Alternatively it may be acceptable to call '.block()' instead of 'spawn()' on this handle"] diff --git a/src/array/unsafe.rs b/src/array/unsafe.rs index 7761c9aa..f1c42eba 100644 --- a/src/array/unsafe.rs +++ b/src/array/unsafe.rs @@ -282,7 +282,7 @@ impl UnsafeArray { /// let world = LamellarWorldBuilder::new().build(); /// let array: UnsafeArray = UnsafeArray::new(&world,100,Distribution::Cyclic).block(); /// // do something interesting... or not - /// let block_view = array.clone().use_distribution(Distribution::Block).block(); + /// let block_view = array.clone().use_distribution(Distribution::Block); ///``` pub fn use_distribution(mut self, distribution: Distribution) -> Self { self.inner.distribution = distribution; diff --git a/src/array/unsafe/handle.rs b/src/array/unsafe/handle.rs index 58773079..2022821b 100644 --- a/src/array/unsafe/handle.rs +++ b/src/array/unsafe/handle.rs @@ -72,7 +72,7 @@ impl UnsafeArrayHandle { /// use lamellar::array::prelude::*; /// /// let world = LamellarWorldBuilder::new().build(); - /// let array_task: UnsafeArray = UnsafeArray::new(&world,100,Distribution::Cyclic).spawn(); + /// let array_task = UnsafeArray::::new(&world,100,Distribution::Cyclic).spawn(); /// // do some other work /// let array = array_task.block(); #[must_use = "this function returns a future [LamellarTask] used to poll for completion. Call '.await' on the returned future in an async context or '.block()' in a non async context. Alternatively it may be acceptable to call '.block()' instead of 'spawn()' on this handle"] diff --git a/src/array/unsafe/rdma.rs b/src/array/unsafe/rdma.rs index 8d62931c..6254bf28 100644 --- a/src/array/unsafe/rdma.rs +++ b/src/array/unsafe/rdma.rs @@ -173,7 +173,7 @@ impl UnsafeArray { .inner .data .team - .alloc_one_sided_mem_region_or_panic::(num_elems_pe); + .alloc_one_sided_mem_region::(num_elems_pe); unsafe { for i in 0..std::cmp::min(buf.len(), num_pes) { let mut k = 0; @@ -200,7 +200,7 @@ impl UnsafeArray { .inner .data .team - .alloc_one_sided_mem_region_or_panic::(num_elems_pe); + .alloc_one_sided_mem_region::(num_elems_pe); let mut k = 0; let pe = (start_pe + i) % num_pes; // let offset = global_index / num_pes + overflow; @@ -248,7 +248,7 @@ impl UnsafeArray { .inner .data .team - .alloc_one_sided_mem_region_or_panic::(num_elems_pe); + .alloc_one_sided_mem_region::(num_elems_pe); let rem = buf.len() % num_pes; // let temp_buf: LamellarMemoryRegion = buf.team_into(&self.inner.data.team); for i in 0..std::cmp::min(buf.len(), num_pes) { @@ -290,7 +290,7 @@ impl UnsafeArray { .inner .data .team - .alloc_one_sided_mem_region_or_panic::(num_elems_pe); + .alloc_one_sided_mem_region::(num_elems_pe); let pe = (start_pe + i) % num_pes; let offset = global_index / num_pes + overflow; let num_elems = (num_elems_pe - 1) + if i < rem { 1 } else { 0 }; @@ -639,7 +639,7 @@ impl UnsafeArray { } pub(crate) unsafe fn internal_at(&self, index: usize) -> ArrayRdmaAtHandle { - let buf: OneSidedMemoryRegion = self.team_rt().alloc_one_sided_mem_region_or_panic(1); + let buf: OneSidedMemoryRegion = self.team_rt().alloc_one_sided_mem_region(1); self.blocking_get(index, &buf); ArrayRdmaAtHandle { array: self.as_lamellar_byte_array(), diff --git a/src/darc.rs b/src/darc.rs index 797291c3..6e781908 100644 --- a/src/darc.rs +++ b/src/darc.rs @@ -1483,7 +1483,7 @@ impl Darc { /// /// let five_handle = Darc::new(&world,5); /// let five_as_localdarc = world.block_on(async move { - /// let five = five_handle.await; + /// let five = five_handle.await.expect("PE in world team"); /// five.into_localrw().await /// }); /// ``` diff --git a/src/lamellar_team.rs b/src/lamellar_team.rs index e1446d75..db26b706 100644 --- a/src/lamellar_team.rs +++ b/src/lamellar_team.rs @@ -7,7 +7,7 @@ use crate::lamellar_arch::{GlobalArch, IdError, LamellarArch, LamellarArchEnum, use crate::lamellar_env::LamellarEnv; use crate::lamellar_request::*; use crate::lamellar_world::LamellarWorld; -use crate::memregion::handle::SharedMemoryRegionHandle; +use crate::memregion::handle::{FallibleSharedMemoryRegionHandle, SharedMemoryRegionHandle}; use crate::memregion::{ one_sided::OneSidedMemoryRegion, shared::SharedMemoryRegion, Dist, LamellarMemoryRegion, MemoryRegion, RemoteMemoryRegion, @@ -573,7 +573,8 @@ impl ActiveMessaging for Arc { impl RemoteMemoryRegion for Arc { //#[tracing::instrument(skip_all)] - fn alloc_shared_mem_region(&self, size: usize) -> SharedMemoryRegionHandle { + + fn try_alloc_shared_mem_region(&self, size: usize) -> FallibleSharedMemoryRegionHandle { assert!(self.panic.load(Ordering::SeqCst) == 0); // self.team.barrier.barrier(); @@ -589,29 +590,53 @@ impl RemoteMemoryRegion for Arc { // self.team.barrier.barrier(); mr } + fn alloc_shared_mem_region(&self, size: usize) -> SharedMemoryRegionHandle { + assert!(self.panic.load(Ordering::SeqCst) == 0); + + // self.team.barrier.barrier(); + let mr = if self.team.num_world_pes == self.team.num_pes { + SharedMemoryRegion::new(size, self.team.clone(), AllocationType::Global) + } else { + SharedMemoryRegion::new( + size, + self.team.clone(), + AllocationType::Sub(self.team.arch.team_iter().collect::>()), + ) + }; + // self.team.barrier.barrier(); + mr + } + + fn try_alloc_one_sided_mem_region( + &self, + size: usize, + ) -> Result, anyhow::Error> { + assert!(self.panic.load(Ordering::SeqCst) == 0); + + OneSidedMemoryRegion::try_new(size, &self.team, self.team.lamellae.clone()) + } //#[tracing::instrument(skip_all)] fn alloc_one_sided_mem_region( &self, size: usize, - ) -> Result, anyhow::Error> { + ) -> OneSidedMemoryRegion { assert!(self.panic.load(Ordering::SeqCst) == 0); - let lmr = OneSidedMemoryRegion::try_new(size, &self.team, self.team.lamellae.clone()); - // while let Err(_err) = lmr { - // std::thread::yield_now(); - // // println!( - // // "out of Lamellar mem trying to alloc new pool {:?} {:?}", - // // size, - // // std::mem::size_of::() - // // ); - // self.team - // .lamellae - // .alloc_pool(size * std::mem::size_of::()); - // lmr = OneSidedMemoryRegion::try_new(size, &self.team, self.team.lamellae.clone()); - // } - // lmr.expect("out of memory") - lmr + let mut lmr = OneSidedMemoryRegion::try_new(size, &self.team, self.team.lamellae.clone()); + while let Err(_err) = lmr { + std::thread::yield_now(); + // println!( + // "out of Lamellar mem trying to alloc new pool {:?} {:?}", + // size, + // std::mem::size_of::() + // ); + self.team + .lamellae + .alloc_pool(size * std::mem::size_of::()); + lmr = OneSidedMemoryRegion::try_new(size, &self.team, self.team.lamellae.clone()); + } + lmr.expect("out of memory") } } @@ -2189,26 +2214,11 @@ impl LamellarTeamRT { /// * `size` - number of elements of T to allocate a memory region for -- (not size in bytes) /// //#[tracing::instrument(skip_all)] - pub fn alloc_one_sided_mem_region( + pub fn try_alloc_one_sided_mem_region( self: &Pin>, size: usize, ) -> Result, anyhow::Error> { - // let lmr: OneSidedMemoryRegion = - // OneSidedMemoryRegion::new(size, self, self.lamellae.clone()).into(); - // lmr - let lmr = OneSidedMemoryRegion::try_new(size, self, self.lamellae.clone()); - // while let Err(_err) = lmr { - // std::thread::yield_now(); - // // println!( - // // "out of Lamellar mem trying to alloc new pool {:?} {:?}", - // // size, - // // std::mem::size_of::() - // // ); - // self.lamellae.alloc_pool(size * std::mem::size_of::()); - // lmr = OneSidedMemoryRegion::try_new(size, self, self.lamellae.clone()); - // } - // lmr.expect("out of memory") - lmr + OneSidedMemoryRegion::try_new(size, self, self.lamellae.clone()) } /// allocate a local memory region from the asymmetric heap @@ -2218,13 +2228,10 @@ impl LamellarTeamRT { /// * `size` - number of elements of T to allocate a memory region for -- (not size in bytes) /// //#[tracing::instrument(skip_all)] - pub(crate) fn alloc_one_sided_mem_region_or_panic( + pub(crate) fn alloc_one_sided_mem_region( self: &Pin>, size: usize, ) -> OneSidedMemoryRegion{ - // let lmr: OneSidedMemoryRegion = - // OneSidedMemoryRegion::new(size, self, self.lamellae.clone()).into(); - // lmr let mut lmr = OneSidedMemoryRegion::try_new(size, self, self.lamellae.clone()); while let Err(_err) = lmr { std::thread::yield_now(); diff --git a/src/lamellar_world.rs b/src/lamellar_world.rs index c9aea9ff..81c2866b 100644 --- a/src/lamellar_world.rs +++ b/src/lamellar_world.rs @@ -3,7 +3,7 @@ use crate::lamellae::{create_lamellae, Backend, Lamellae, LamellaeComm, Lamellae use crate::lamellar_arch::LamellarArch; use crate::lamellar_env::LamellarEnv; use crate::lamellar_team::{LamellarTeam, LamellarTeamRT}; -use crate::memregion::handle::SharedMemoryRegionHandle; +use crate::memregion::handle::{FallibleSharedMemoryRegionHandle,SharedMemoryRegionHandle}; use crate::memregion::{ one_sided::OneSidedMemoryRegion, Dist, RemoteMemoryRegion, }; @@ -133,17 +133,29 @@ impl ActiveMessaging for LamellarWorld { } impl RemoteMemoryRegion for LamellarWorld { + //#[tracing::instrument(skip_all)] + fn try_alloc_shared_mem_region(&self, size: usize) -> FallibleSharedMemoryRegionHandle { + self.team.try_alloc_shared_mem_region::(size) + } + //#[tracing::instrument(skip_all)] fn alloc_shared_mem_region(&self, size: usize) -> SharedMemoryRegionHandle { - self.barrier(); self.team.alloc_shared_mem_region::(size) } //#[tracing::instrument(skip_all)] - fn alloc_one_sided_mem_region( + fn try_alloc_one_sided_mem_region( &self, size: usize, ) -> Result, anyhow::Error> { + self.team.try_alloc_one_sided_mem_region::(size) + } + + //#[tracing::instrument(skip_all)] + fn alloc_one_sided_mem_region( + &self, + size: usize, + ) -> OneSidedMemoryRegion { self.team.alloc_one_sided_mem_region::(size) } } diff --git a/src/memregion.rs b/src/memregion.rs index 05d47700..5851ed23 100644 --- a/src/memregion.rs +++ b/src/memregion.rs @@ -28,7 +28,7 @@ pub(crate) mod one_sided; pub use one_sided::OneSidedMemoryRegion; pub(crate) mod handle; -use handle::SharedMemoryRegionHandle; +use handle::{FallibleSharedMemoryRegionHandle, SharedMemoryRegionHandle}; use enum_dispatch::enum_dispatch; @@ -287,7 +287,7 @@ pub trait RegisteredMemoryRegion { /// /// let world = LamellarWorldBuilder::new().build(); /// - /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000); + /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000).block(); /// assert_eq!(mem_region.len(),1000); ///``` fn len(&self) -> usize; @@ -313,7 +313,7 @@ pub trait RegisteredMemoryRegion { /// /// let world = LamellarWorldBuilder::new().build(); /// - /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000); + /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000).block(); /// let slice = unsafe{mem_region.as_slice().expect("PE is part of the world team")}; ///``` unsafe fn as_slice(&self) -> MemResult<&[T]>; @@ -335,7 +335,7 @@ pub trait RegisteredMemoryRegion { /// /// let world = LamellarWorldBuilder::new().build(); /// - /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000); + /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000).block(); /// let val = unsafe{mem_region.at(999).expect("PE is part of the world team")}; ///``` unsafe fn at(&self, index: usize) -> MemResult<&T>; @@ -357,7 +357,7 @@ pub trait RegisteredMemoryRegion { /// /// let world = LamellarWorldBuilder::new().build(); /// - /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000); + /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000).block(); /// let slice =unsafe { mem_region.as_mut_slice().expect("PE is part of the world team")}; ///``` unsafe fn as_mut_slice(&self) -> MemResult<&mut [T]>; @@ -379,7 +379,7 @@ pub trait RegisteredMemoryRegion { /// /// let world = LamellarWorldBuilder::new().build(); /// - /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000); + /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000).block(); /// let ptr = unsafe { mem_region.as_ptr().expect("PE is part of the world team")}; ///``` unsafe fn as_ptr(&self) -> MemResult<*const T>; @@ -401,7 +401,7 @@ pub trait RegisteredMemoryRegion { /// /// let world = LamellarWorldBuilder::new().build(); /// - /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000); + /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000).block(); /// let ptr = unsafe { mem_region.as_mut_ptr().expect("PE is part of the world team")}; ///``` unsafe fn as_mut_ptr(&self) -> MemResult<*mut T>; @@ -440,7 +440,7 @@ pub trait SubRegion { /// let my_pe = world.my_pe(); /// let num_pes = world.num_pes(); /// - /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(100); + /// let mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(100).block(); /// /// let sub_region = mem_region.sub_region(30..70); ///``` @@ -476,8 +476,8 @@ pub trait MemoryRegionRDMA { /// let my_pe = world.my_pe(); /// let num_pes = world.num_pes(); /// - /// let dst_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(num_pes*10); - /// let src_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(10); + /// let dst_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(num_pes*10).block(); + /// let src_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(10).block(); /// unsafe{ for elem in dst_mem_region.as_mut_slice().expect("PE in world team") {*elem = num_pes;}} /// unsafe{ for elem in src_mem_region.as_mut_slice().expect("PE in world team") {*elem = my_pe;}} /// @@ -516,8 +516,8 @@ pub trait MemoryRegionRDMA { /// let my_pe = world.my_pe(); /// let num_pes = world.num_pes(); /// - /// let dst_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(num_pes*10); - /// let src_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(10); + /// let dst_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(num_pes*10).block(); + /// let src_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(10).block(); /// unsafe{ for elem in dst_mem_region.as_mut_slice().expect("PE in world team") {*elem = num_pes;}} /// unsafe{ for elem in src_mem_region.as_mut_slice().expect("PE in world team") {*elem = my_pe;}} /// @@ -559,8 +559,8 @@ pub trait MemoryRegionRDMA { /// let my_pe = world.my_pe(); /// let num_pes = world.num_pes(); /// - /// let dst_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(num_pes*10); - /// let src_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(10); + /// let dst_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(num_pes*10).block(); + /// let src_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(10).block(); /// unsafe{ for elem in dst_mem_region.as_mut_slice().expect("PE in world team") {*elem = num_pes;}} /// unsafe{ for elem in src_mem_region.as_mut_slice().expect("PE in world team") {*elem = my_pe;}} /// @@ -599,8 +599,8 @@ pub trait MemoryRegionRDMA { /// let my_pe = world.my_pe(); /// let num_pes = world.num_pes(); /// - /// let src_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(10); - /// let dst_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(num_pes*10); + /// let src_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(10).block(); + /// let dst_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(num_pes*10).block(); /// /// unsafe{ for elem in src_mem_region.as_mut_slice().expect("PE in world team") {*elem = my_pe;}} /// unsafe{ for elem in dst_mem_region.as_mut_slice().expect("PE in world team") {*elem = num_pes;}} @@ -647,8 +647,8 @@ pub trait MemoryRegionRDMA { /// let my_pe = world.my_pe(); /// let num_pes = world.num_pes(); /// - /// let src_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(10); - /// let dst_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(num_pes*10); + /// let src_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(10).block(); + /// let dst_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(num_pes*10).block(); /// /// unsafe{ for elem in src_mem_region.as_mut_slice().expect("PE in world team") {*elem = my_pe;}} /// unsafe{ for elem in dst_mem_region.as_mut_slice().expect("PE in world team") {*elem = num_pes;}} @@ -1214,6 +1214,14 @@ pub trait RemoteMemoryRegion { #[doc(alias = "Collective")] /// Allocate a shared memory region from the asymmetric heap. /// There will be `size` number of `T` elements on each PE. + /// + /// Note: If there is not enough memory in the lamellar heap on the calling PE + /// this call will trigger a "heap grow" operation (initiated and handled by the runtime), + /// this behavior can be disabled by setting the env variable "LAMELLAR_HEAP_MODE=static", + /// in which case this call will cause a panic if there is not enough memory. + /// + /// Alternatively, you can use the `try_alloc_shared_mem_region` method which returns + /// a `Result` and allows you to handle the error case when there is not enough memory. /// /// # Collective Operation /// Requires all PEs associated with the `array` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) @@ -1223,11 +1231,32 @@ pub trait RemoteMemoryRegion { size: usize, ) -> SharedMemoryRegionHandle; + #[doc(alias = "Collective")] + /// Allocate a shared memory region from the asymmetric heap. + /// There will be `size` number of `T` elements on each PE. + /// + /// # Collective Operation + /// Requires all PEs associated with the `array` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally) + /// + fn try_alloc_shared_mem_region( + &self, + size: usize, + ) -> FallibleSharedMemoryRegionHandle; + + #[doc(alias("One-sided", "onesided"))] /// Allocate a one-sided memory region from the internal lamellar heap. /// This region only exists on the calling PE, but the returned handle can be /// sent to other PEs allowing remote access to the region. /// There will be `size` number of `T` elements on the calling PE. + /// + /// Note: If there is not enough memory in the lamellar heap on the calling PE + /// this call will trigger a "heap grow" operation (initiated and handled by the runtime), + /// this behavior can be disabled by setting the env variable "LAMELLAR_HEAP_MODE=static", + /// in which case this call will cause a panic if there is not enough memory. + /// + /// Alternatively, you can use the `try_alloc_one_sided_mem_region` method which returns + /// a `Result` and allows you to handle the error case when there is not enough memory. /// /// # One-sided Operation /// the calling PE will allocate the memory region locally, without intervention from the other PEs. @@ -1235,6 +1264,20 @@ pub trait RemoteMemoryRegion { fn alloc_one_sided_mem_region( &self, size: usize, + ) -> OneSidedMemoryRegion; + + #[doc(alias("One-sided", "onesided"))] + /// Allocate a one-sided memory region from the internal lamellar heap. + /// This region only exists on the calling PE, but the returned handle can be + /// sent to other PEs allowing remote access to the region. + /// There will be `size` number of `T` elements on the calling PE. + /// + /// # One-sided Operation + /// the calling PE will allocate the memory region locally, without intervention from the other PEs. + /// + fn try_alloc_one_sided_mem_region( + &self, + size: usize, ) -> Result, anyhow::Error>; } diff --git a/src/memregion/handle.rs b/src/memregion/handle.rs new file mode 100644 index 00000000..4e7f4626 --- /dev/null +++ b/src/memregion/handle.rs @@ -0,0 +1,257 @@ +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use super::SharedMemoryRegion; +use crate::scheduler::LamellarTask; +use crate::warnings::RuntimeWarning; +use crate::{Dist, LamellarTeamRT}; + +use futures_util::{ Future}; +use pin_project::{pin_project, pinned_drop}; + +#[must_use = " SharedMemoryRegion 'new' handles do nothing unless polled or awaited, or 'spawn()' or 'block()' are called"] +#[pin_project(PinnedDrop)] +#[doc(alias = "Collective")] +/// This is a handle representing the operation of creating a new [SharedMemoryRegion]. +/// This handled must either be awaited in an async context or blocked on in a non-async context for the operation to be performed. +/// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the SharedMemoryRegion's team, only returning once every PE in the team has completed the call. +/// +/// # Collective Operation +/// Requires all PEs associated with the `SharedMemoryRegion` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) +/// +/// # Examples +/// ``` +/// use lamellar::memregion::prelude::*; +/// +/// let world = LamellarWorldBuilder::new().build(); +/// +/// let memregion: SharedMemoryRegion = world.alloc_shared_mem_region(100).block(); +/// ``` +pub struct FallibleSharedMemoryRegionHandle { + pub(crate) team: Pin>, + pub(crate) launched: bool, + #[pin] + pub(crate) creation_future: + Pin, anyhow::Error>> + Send>>, +} + +#[pinned_drop] +impl PinnedDrop for FallibleSharedMemoryRegionHandle { + fn drop(self: Pin<&mut Self>) { + if !self.launched { + RuntimeWarning::DroppedHandle("a FallibleSharedMemoryRegionHandle").print(); + } + } +} + +impl FallibleSharedMemoryRegionHandle { + /// Used to drive creation of a new SharedMemoryRegion + /// # Examples + /// + ///``` + /// use lamellar::memregion::prelude::*; + /// + /// let world = LamellarWorldBuilder::new().build(); + /// let memregion: SharedMemoryRegion = world.alloc_shared_mem_region(100).block(); + pub fn block(mut self) -> Result, anyhow::Error> { + self.launched = true; + RuntimeWarning::BlockingCall( + "SharedMemoryRegionHandle::block", + ".spawn() or.await", + ) + .print(); + self.team.clone().block_on(self) + } + + /// This method will spawn the creation of the SharedMemoryRegion on the work queue + /// + /// This function returns a handle that can be used to wait for the operation to complete + /// /// # Examples + /// + ///``` + /// use lamellar::memregion::prelude::*; + /// + /// let world = LamellarWorldBuilder::new().build(); + /// let memregion_task = world.alloc_shared_mem_region::(100).spawn(); + /// // do some other work + /// let memregion = memregion_task.block(); + #[must_use = "this function returns a future [LamellarTask] used to poll for completion. Call '.await' on the returned future in an async context or '.block()' in a non async context. Alternatively it may be acceptable to call '.block()' instead of 'spawn()' on this handle"] + pub fn spawn(mut self) -> LamellarTask, anyhow::Error>> { + self.launched = true; + self.team.clone().spawn(self) + } +} + +impl Future for FallibleSharedMemoryRegionHandle { + type Output = Result, anyhow::Error>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.launched = true; + let mut this = self.project(); + this.creation_future.as_mut().poll(cx) + } +} + +#[must_use = " SharedMemoryRegion 'new' handles do nothing unless polled or awaited, or 'spawn()' or 'block()' are called"] +#[pin_project(PinnedDrop)] +#[doc(alias = "Collective")] +/// This is a handle representing the operation of creating a new [SharedMemoryRegion]. +/// This handled must either be awaited in an async context or blocked on in a non-async context for the operation to be performed. +/// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the SharedMemoryRegion's team, only returning once every PE in the team has completed the call. +/// +/// # Collective Operation +/// Requires all PEs associated with the `SharedMemoryRegion` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) +/// +/// # Examples +/// ``` +/// use lamellar::memregion::prelude::*; +/// +/// let world = LamellarWorldBuilder::new().build(); +/// +/// let memregion: SharedMemoryRegion = world.alloc_shared_mem_region(100).block(); +/// ``` +pub struct SharedMemoryRegionHandle { + pub(crate) team: Pin>, + pub(crate) launched: bool, + #[pin] + pub(crate) creation_future: + Pin> + Send>>, +} + +#[pinned_drop] +impl PinnedDrop for SharedMemoryRegionHandle { + fn drop(self: Pin<&mut Self>) { + if !self.launched { + RuntimeWarning::DroppedHandle("a SharedMemoryRegionHandle").print(); + } + } +} + +impl SharedMemoryRegionHandle { + /// Used to drive creation of a new SharedMemoryRegion + /// # Examples + /// + ///``` + /// use lamellar::memregion::prelude::*; + /// + /// let world = LamellarWorldBuilder::new().build(); + /// let memregion: SharedMemoryRegion = world.alloc_shared_mem_region(100).block(); + pub fn block(mut self) -> SharedMemoryRegion { + self.launched = true; + RuntimeWarning::BlockingCall( + "SharedMemoryRegionHandle::block", + ".spawn() or.await", + ) + .print(); + self.team.clone().block_on(self) + } + + /// This method will spawn the creation of the SharedMemoryRegion on the work queue + /// + /// This function returns a handle that can be used to wait for the operation to complete + /// /// # Examples + /// + ///``` + /// use lamellar::memregion::prelude::*; + /// + /// let world = LamellarWorldBuilder::new().build(); + /// let memregion_task = world.alloc_shared_mem_region::(100).spawn(); + /// // do some other work + /// let memregion = memregion_task.block(); + #[must_use = "this function returns a future [LamellarTask] used to poll for completion. Call '.await' on the returned future in an async context or '.block()' in a non async context. Alternatively it may be acceptable to call '.block()' instead of 'spawn()' on this handle"] + pub fn spawn(mut self) -> LamellarTask> { + self.launched = true; + self.team.clone().spawn(self) + } +} + +impl Future for SharedMemoryRegionHandle { + type Output = SharedMemoryRegion; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.launched = true; + let mut this = self.project(); + this.creation_future.as_mut().poll(cx) + } +} + +// #[must_use = " OneSidedMemoryRegion 'new' handles do nothing unless polled or awaited, or 'spawn()' or 'block()' are called"] +// #[pin_project(PinnedDrop)] +// #[doc(alias = "Collective")] +// /// This is a handle representing the operation of creating a new [OneSidedMemoryRegion]. +// /// This handled must either be awaited in an async context or blocked on in a non-async context for the operation to be performed. +// /// Awaiting/blocking on the handle is a blocking collective call amongst all PEs in the OneSidedMemoryRegion's team, only returning once every PE in the team has completed the call. +// /// +// /// # Collective Operation +// /// Requires all PEs associated with the `OneSidedMemoryRegion` to await/block the handle otherwise deadlock will occur (i.e. team barriers are being called internally) +// /// +// /// # Examples +// /// ``` +// /// use lamellar::array::prelude::*; +// /// +// /// let world = LamellarWorldBuilder::new().build(); +// /// +// /// let array: OneSidedMemoryRegion = OneSidedMemoryRegion::new(&world,100).block(); +// /// ``` +// pub(crate) struct OneSidedMemoryRegionHandle { +// pub(crate) team: Pin>, +// pub(crate) launched: bool, +// #[pin] +// pub(crate) creation_future: +// Pin, anyhow::Error>> + Send>>, +// } + +// #[pinned_drop] +// impl PinnedDrop for OneSidedMemoryRegionHandle { +// fn drop(self: Pin<&mut Self>) { +// if !self.launched { +// RuntimeWarning::DroppedHandle("a OneSidedMemoryRegionHandle").print(); +// } +// } +// } + +// impl OneSidedMemoryRegionHandle { +// /// Used to drive creation of a new OneSidedMemoryRegion +// /// # Examples +// /// +// ///``` +// /// use lamellar::array::prelude::*; +// /// +// /// let world = LamellarWorldBuilder::new().build(); +// /// let array: OneSidedMemoryRegion = OneSidedMemoryRegion::new(&world,100).block(); +// pub fn block(mut self) -> Result, anyhow::Error> { +// self.launched = true; +// RuntimeWarning::BlockingCall( +// "OneSidedMemoryRegionHandle::block", +// ".spawn() or.await", +// ) +// .print(); +// self.team.clone().block_on(self) +// } + +// /// This method will spawn the creation of the OneSidedMemoryRegion on the work queue +// /// +// /// This function returns a handle that can be used to wait for the operation to complete +// /// /// # Examples +// /// +// ///``` +// /// use lamellar::array::prelude::*; +// /// +// /// let world = LamellarWorldBuilder::new().build(); +// /// let array_task: OneSidedMemoryRegion = OneSidedMemoryRegion::new(&world,100).spawn(); +// /// // do some other work +// /// let array = array_task.block(); +// #[must_use = "this function returns a future [LamellarTask] used to poll for completion. Call '.await' on the returned future in an async context or '.block()' in a non async context. Alternatively it may be acceptable to call '.block()' instead of 'spawn()' on this handle"] +// pub fn spawn(mut self) -> LamellarTask, anyhow::Error>> { +// self.launched = true; +// self.team.clone().spawn(self) +// } +// } + +// impl Future for OneSidedMemoryRegionHandle { +// type Output = Result, anyhow::Error>; +// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +// self.launched = true; +// let mut this = self.project(); +// this.creation_future.as_mut().poll(cx) +// } +// } diff --git a/src/memregion/shared.rs b/src/memregion/shared.rs index da2662ff..f43fd2d9 100644 --- a/src/memregion/shared.rs +++ b/src/memregion/shared.rs @@ -36,7 +36,7 @@ use std::ops::Bound; /// /// let world = LamellarWorldBuilder::new().build(); /// -/// let world_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000); +/// let world_mem_region: SharedMemoryRegion = world.alloc_shared_mem_region(1000).block(); /// ``` #[derive(serde::Serialize, serde::Deserialize, Clone)] pub struct SharedMemoryRegion { @@ -93,22 +93,52 @@ impl SharedMemoryRegion { // SharedMemoryRegion::try_new(size, team, alloc).expect("Out of memory") // } - pub(crate) fn try_new( + pub(crate) fn new( size: usize, team: Pin>, alloc: AllocationType, ) -> SharedMemoryRegionHandle { // println!("creating new shared mem region {:?} {:?}",size,alloc); - // Ok(SharedMemoryRegion { - // mr: Darc::try_new(team.clone(), mr, crate::darc::DarcMode::Darc) - // .expect("memregions can only be created on a member of the team"), - // sub_region_offset: 0, - // sub_region_size: size, - // phantom: PhantomData, - // }) - SharedMemoryRegionHandle { + team: team.clone(), + launched: false, + creation_future: Box::pin(async move { + team.async_barrier().await; + let mut mr_t = + MemoryRegion::::try_new(size, team.lamellae.clone(), alloc.clone()); + while let Err(e) = mr_t { + async_std::task::yield_now().await; + team.lamellae.alloc_pool(size * std::mem::size_of::()); + mr_t = MemoryRegion::try_new(size, team.lamellae.clone(), alloc.clone()); + } + + let mr = unsafe { mr_t.expect("enough memory should have been allocated").to_base::() }; + SharedMemoryRegion { + mr: Darc::async_try_new_with_drop( + team.clone(), + mr, + crate::darc::DarcMode::Darc, + None, + ) + .await + .expect("memregions can only be created on a member of the team"), + sub_region_offset: 0, + sub_region_size: size, + phantom: PhantomData, + } + }), + } + } + + pub(crate) fn try_new( + size: usize, + team: Pin>, + alloc: AllocationType, + ) -> FallibleSharedMemoryRegionHandle { + // println!("creating new shared mem region {:?} {:?}",size,alloc); + + FallibleSharedMemoryRegionHandle { team: team.clone(), launched: false, creation_future: Box::pin(async move { diff --git a/tests/array/arithmetic_ops/add_test.rs b/tests/array/arithmetic_ops/add_test.rs index 4312e379..250c9e45 100644 --- a/tests/array/arithmetic_ops/add_test.rs +++ b/tests/array/arithmetic_ops/add_test.rs @@ -426,7 +426,7 @@ macro_rules! input_test { // LMR------------------------------ unsafe { - let lmr = world.alloc_one_sided_mem_region(array.len()).unwrap(); + let lmr = world.alloc_one_sided_mem_region(array.len()); let slice = lmr.as_mut_slice().unwrap(); for i in 0..array.len() { slice[i] = i; @@ -438,7 +438,7 @@ macro_rules! input_test { // SMR------------------------------ unsafe { - let smr = world.alloc_shared_mem_region(array.len()).block().unwrap(); + let smr = world.alloc_shared_mem_region(array.len()).block(); let slice = smr.as_mut_slice().unwrap(); for i in 0..array.len() { diff --git a/tests/array/arithmetic_ops/fetch_add_test.rs b/tests/array/arithmetic_ops/fetch_add_test.rs index 8a5c80cd..fbded8e5 100644 --- a/tests/array/arithmetic_ops/fetch_add_test.rs +++ b/tests/array/arithmetic_ops/fetch_add_test.rs @@ -524,7 +524,7 @@ macro_rules! input_test{ // scoped &LMR------------------------------ let mut reqs = vec![]; unsafe { - let lmr=world.alloc_one_sided_mem_region(array.len()).unwrap(); + let lmr=world.alloc_one_sided_mem_region(array.len()); let slice = lmr.as_mut_slice().unwrap(); for i in 0..array.len(){ slice[i]=i; @@ -536,7 +536,7 @@ macro_rules! input_test{ // scoped SMR------------------------------ let mut reqs = vec![]; unsafe { - let smr=world.alloc_shared_mem_region(array.len()).block().unwrap(); + let smr=world.alloc_shared_mem_region(array.len()).block(); let slice = smr.as_mut_slice().unwrap(); for i in 0..array.len(){ slice[i]=i; diff --git a/tests/array/rdma/blocking_get_test.rs b/tests/array/rdma/blocking_get_test.rs index 9fc83a93..fe34f4fd 100644 --- a/tests/array/rdma/blocking_get_test.rs +++ b/tests/array/rdma/blocking_get_test.rs @@ -127,7 +127,7 @@ macro_rules! blocking_get_test{ #[allow(unused_mut)] let mut array: $array::<$t> = $array::<$t>::new(world.team(), array_total_len, $dist).block().into(); //convert into abstract LamellarArray, distributed len is total_len - let shared_mem_region: LamellarMemoryRegion<$t> = world.alloc_shared_mem_region(mem_seg_len).block().unwrap().into(); //Convert into abstract LamellarMemoryRegion, each local segment is total_len + let shared_mem_region: LamellarMemoryRegion<$t> = world.alloc_shared_mem_region(mem_seg_len).block().into(); //Convert into abstract LamellarMemoryRegion, each local segment is total_len //initialize array initialize_array!($array, array, $t); array.wait_all(); diff --git a/tests/array/rdma/get_test.rs b/tests/array/rdma/get_test.rs index d3608ec0..261d53f2 100644 --- a/tests/array/rdma/get_test.rs +++ b/tests/array/rdma/get_test.rs @@ -128,7 +128,7 @@ macro_rules! get_test{ let mut array: $array::<$t> = $array::<$t>::new(world.team(), array_total_len, $dist).block().into(); //convert into abstract LamellarArray, distributed len is total_len // println!("bout to initialize"); initialize_array!($array, array, $t); - let shared_mem_region: LamellarMemoryRegion<$t> = world.alloc_shared_mem_region(mem_seg_len).block().unwrap().into(); //Convert into abstract LamellarMemoryRegion, each local segment is total_len + let shared_mem_region: LamellarMemoryRegion<$t> = world.alloc_shared_mem_region(mem_seg_len).block().into(); //Convert into abstract LamellarMemoryRegion, each local segment is total_len //initialize array array.wait_all(); diff --git a/tests/array/rdma/put_test.rs b/tests/array/rdma/put_test.rs index 3aefb3bd..335280af 100644 --- a/tests/array/rdma/put_test.rs +++ b/tests/array/rdma/put_test.rs @@ -64,7 +64,7 @@ macro_rules! put_test{ let mut success = true; let array: $array::<$t> = $array::<$t>::new(world.team(), array_total_len, $dist).block().into(); //convert into abstract LamellarArray, distributed len is total_len - let shared_mem_region: LamellarMemoryRegion<$t> = world.alloc_shared_mem_region(mem_seg_len).block().unwrap().into(); //Convert into abstract LamellarMemoryRegion, each local segment is total_len + let shared_mem_region: LamellarMemoryRegion<$t> = world.alloc_shared_mem_region(mem_seg_len).block().into(); //Convert into abstract LamellarMemoryRegion, each local segment is total_len //initialize array let init_val = my_pe as $t; initialize_array!($array, array, init_val);