Skip to content

Commit

Permalink
updated local chunks to utilize handles instead of blocking api
Browse files Browse the repository at this point in the history
  • Loading branch information
rdfriese committed Oct 17, 2024
1 parent fa0c64f commit 6ddccb2
Show file tree
Hide file tree
Showing 14 changed files with 530 additions and 344 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,9 @@ path="examples/array_examples/global_lock_array.rs"
name="histo"
path="examples/array_examples/histo.rs"

[[example]]
name="single_pe_array"
path="examples/array_examples/single_pe_array.rs"
#[[example]]
#name="single_pe_array"
#path="examples/array_examples/single_pe_array.rs"

##------------ RDMA Examples -----------------##
[[example]]
Expand Down
14 changes: 7 additions & 7 deletions examples/array_examples/global_lock_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() {
let s = Instant::now();
let local_data = array.read_local_data().block();
println!(
"PE{my_pe} time: {:?} {:?}",
"0. PE{my_pe} time: {:?} {:?}",
s.elapsed().as_secs_f64(),
local_data
);
Expand All @@ -21,31 +21,31 @@ fn main() {
world.barrier();
let mut local_data = array.write_local_data().block();
println!(
"PE{my_pe} time: {:?} got write lock",
"1. PE{my_pe} time: {:?} got write lock",
s.elapsed().as_secs_f64()
);
local_data.iter_mut().for_each(|elem| *elem = my_pe);
std::thread::sleep(Duration::from_secs(1));
drop(local_data);

array.print();
println!("PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64());
println!("2 .PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64());

let mut local_data = array.collective_write_local_data().block();
let mut local_data = world.block_on(array.collective_write_local_data());
println!(
"PE{my_pe} time: {:?} got collective write lock",
"3. PE{my_pe} time: {:?} got collective write lock",
s.elapsed().as_secs_f64()
);
local_data.iter_mut().for_each(|elem| *elem += my_pe);
std::thread::sleep(Duration::from_secs(1));
drop(local_data);
println!(
"PE{my_pe} time: {:?} dropped collective write lock",
"4. PE{my_pe} time: {:?} dropped collective write lock",
s.elapsed().as_secs_f64()
);

array.print();
println!("PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64());
println!("5. PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64());

array
.read_lock()
Expand Down
253 changes: 128 additions & 125 deletions examples/array_examples/onesided_iteration.rs
Original file line number Diff line number Diff line change
@@ -1,136 +1,139 @@
// use lamellar::array::prelude::*;
// const ARRAY_LEN: usize = 100;

// fn main() {
// let world = lamellar::LamellarWorldBuilder::new().build();
// let my_pe = world.my_pe();
// let _num_pes = world.num_pes();
// let block_array = AtomicArray::<usize>::new(world.team(), ARRAY_LEN, Distribution::Block);
// let cyclic_array = AtomicArray::<usize>::new(world.team(), ARRAY_LEN, Distribution::Cyclic);

// //we are going to initialize the data on each PE by directly accessing its local data

// block_array
// .mut_local_data()
// .iter()
// .for_each(|e| e.store(my_pe));
// cyclic_array
// .mut_local_data()
// .iter()
// .for_each(|e| e.store(my_pe));

// // In this example we will make use of a onesided iterator which
// // enables us to iterate over the entire array on a single PE.
// // The runtime will manage transferring data from remote PEs.
// // Note that for UnsafeArrays, AtomicArrays, and LocalLockArrays,
// // there is no guarantee that by the time the transferred data
// // as arrived to the calling PE it has remained the same on the remote PE.
// // we do not currently provide a mutable one sided iterator.

// if my_pe == 0 {
// println!("Here");
// for elem in block_array.onesided_iter().into_iter() {
// //we can convert from a oneside iterator into a rust iterator
// print!("{:?} ", elem);
// }
// println!("");
// println!("Here2");
// for elem in cyclic_array.onesided_iter().into_iter() {
// print!("{:?} ", elem);
// }
// println!("");
// }
// println!("Here3");
// println!("--------------------------------------------------------");

// // The lamellar array iterator used above is lazy, meaning that it only accesses and returns a value as its used,
// // while this is generally efficent and results in low overhead, because an elem may actually exists on a remote node
// // latencies to retrieve the next value in the iterator are dependent on the location of the data, as a result of
// // the need to get the data. Further impacting performance is that typically the transfer of a single element will
// // likely be small, thus inefficiently utilizing network resources.
// // to address these issues, we have provided a buffered iterator, which will transfer "get" and store a block of data
// // into a buffer, from with the iterated values are returned. More effectively using network resources. From the users
// // standpoint the only thing that changes is the instatiation of the iterator.

// if my_pe == 0 {
// for elem in block_array.buffered_onesided_iter(10).into_iter() {
// print!("{:?} ", elem);
// }
// println!("");

// for elem in cyclic_array.buffered_onesided_iter(10).into_iter() {
// print!("{:?} ", elem);
// }
// println!("");
// }

// println!("--------------------------------------------------------");

// // in addition to the buffered iters we also provide a method to iterate over chunks of a lamellar array, via
// // the chunks() method. Called on a OneSidedIterator this creates a chunk sized OneSidedMemoryRegion,
// // and then puts the appropriate date based on the iteration index into that region

// if my_pe == 0 {
// for chunk in block_array.onesided_iter().chunks(10).skip(4).into_iter() {
// println!("{:?}", unsafe { chunk.as_slice() });
// }
// println!("-----");
// for chunk in cyclic_array.onesided_iter().chunks(10).into_iter() {
// println!("{:?}", unsafe { chunk.as_slice() });
// }

// println!("-----");
// for (i, (a, b)) in cyclic_array
// .onesided_iter()
// .zip(block_array.onesided_iter())
// .into_iter()
// .enumerate()
// {
// println!("{:?}: {:?} {:?}", i, a, b);
// }
// println!("-----");
// for (a, b) in cyclic_array
// .onesided_iter()
// .chunks(10)
// .zip(block_array.onesided_iter().chunks(10))
// .into_iter()
// {
// unsafe {
// println!("{:?} {:?}", a.as_slice(), b.as_slice());
// }
// }
// }

// println!("--------------------------------------------------------");

// // let block_array = UnsafeArray::<usize>::new(world.team(), ARRAY_LEN, Distribution::Block);
// // for elem in block_onesided_iter!($array,array).into_iter().step_by(4) {...}
// // for elem in block_array.buffered_onesided_iter(10) {...}

// // //rust step_by pseudo code
// // fn step_by(&mut self, n: usize) -> Result<T>{
// // let val = self.next(); //grab val based on index
// // self.index += n;
// // val
// // }

// // //--------------
// // for elem in block_array.onesided_iter().step_by(4).into_iter() {...}
// }

use futures_util::stream::StreamExt;
use lamellar::array::prelude::*;

const ARRAY_LEN: usize = 100;

fn main() {
let world = LamellarWorldBuilder::new().build();
let array = LocalLockArray::<usize>::new(&world, 8, Distribution::Block);
let world = lamellar::LamellarWorldBuilder::new().build();
let my_pe = world.my_pe();
let num_pes = world.num_pes();
array.dist_iter_mut().for_each(move |e| *e = my_pe); //initialize array using a distributed iterator
array.wait_all();
let block_array = AtomicArray::<usize>::new(world.team(), ARRAY_LEN, Distribution::Block);
let cyclic_array = AtomicArray::<usize>::new(world.team(), ARRAY_LEN, Distribution::Cyclic);

//we are going to initialize the data on each PE by directly accessing its local data

block_array
.mut_local_data()
.iter()
.for_each(|e| e.store(my_pe));
cyclic_array
.mut_local_data()
.iter()
.for_each(|e| e.store(my_pe));

// In this example we will make use of a onesided iterator which
// enables us to iterate over the entire array on a single PE.
// The runtime will manage transferring data from remote PEs.
// Note that for UnsafeArrays, AtomicArrays, and LocalLockArrays,
// there is no guarantee that by the time the transferred data
// as arrived to the calling PE it has remained the same on the remote PE.
// we do not currently provide a mutable one sided iterator.

if my_pe == 0 {
println!("Here");
for elem in block_array.onesided_iter().into_iter() {
//we can convert from a oneside iterator into a rust iterator
print!("{:?} ", elem);
}
println!("");
println!("Here2");
for elem in cyclic_array.onesided_iter().into_iter() {
print!("{:?} ", elem);
}
println!("");
}
println!("Here3");
println!("--------------------------------------------------------");

// The lamellar array iterator used above is lazy, meaning that it only accesses and returns a value as its used,
// while this is generally efficent and results in low overhead, because an elem may actually exists on a remote node
// latencies to retrieve the next value in the iterator are dependent on the location of the data, as a result of
// the need to get the data. Further impacting performance is that typically the transfer of a single element will
// likely be small, thus inefficiently utilizing network resources.
// to address these issues, we have provided a buffered iterator, which will transfer "get" and store a block of data
// into a buffer, from with the iterated values are returned. More effectively using network resources. From the users
// standpoint the only thing that changes is the instatiation of the iterator.

if my_pe == 0 {
for elem in block_array.buffered_onesided_iter(10).into_iter() {
print!("{:?} ", elem);
}
println!("");

for elem in cyclic_array.buffered_onesided_iter(10).into_iter() {
print!("{:?} ", elem);
}
println!("");
}

println!("--------------------------------------------------------");

// in addition to the buffered iters we also provide a method to iterate over chunks of a lamellar array, via
// the chunks() method. Called on a OneSidedIterator this creates a chunk sized OneSidedMemoryRegion,
// and then puts the appropriate date based on the iteration index into that region

if my_pe == 0 {
for chunk in block_array.onesided_iter().chunks(10).skip(4).into_iter() {
println!("{:?}", unsafe { chunk.as_slice() });
}
println!("-----");
for chunk in cyclic_array.onesided_iter().chunks(10).into_iter() {
println!("{:?}", unsafe { chunk.as_slice() });
}

println!("-----");
for (i, (a, b)) in cyclic_array
.onesided_iter()
.zip(block_array.onesided_iter())
.into_iter()
.enumerate()
{
println!("{:?}: {:?} {:?}", i, a, b);
}
println!("-----");
for (a, b) in cyclic_array
.onesided_iter()
.chunks(10)
.zip(block_array.onesided_iter().chunks(10))
.into_iter()
{
unsafe {
println!("{:?} {:?}", a.as_slice(), b.as_slice());
}
}
}

println!("--------------------------------------------------------");

// let block_array = UnsafeArray::<usize>::new(world.team(), ARRAY_LEN, Distribution::Block);
// for elem in block_onesided_iter!($array,array).into_iter().step_by(4) {...}
// for elem in block_array.buffered_onesided_iter(10) {...}

// //rust step_by pseudo code
// fn step_by(&mut self, n: usize) -> Result<T>{
// let val = self.next(); //grab val based on index
// self.index += n;
// val
// }

// //--------------
// for elem in block_array.onesided_iter().step_by(4).into_iter() {...}
// }

// fn main() {
// let world = LamellarWorldBuilder::new().build();
// let array = LocalLockArray::<usize>::new(&world, 8, Distribution::Block);
// let my_pe = world.my_pe();
// let num_pes = world.num_pes();
let block_array = block_array.into_local_lock();
block_array
.dist_iter_mut()
.for_each(move |e| *e = my_pe)
.block(); //initialize array using a distributed iterator

world.block_on(async move {
if my_pe == 0 {
let result = array
let result = block_array
.onesided_iter()
.into_stream()
.take(4)
Expand Down
2 changes: 1 addition & 1 deletion examples/darc_examples/darc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn main() {
// drop(darc2);
// drop(wrapped);
println!("changing darc type");
let ro_darc = global_darc.blocking_into_localrw().blocking_into_darc(); // we can call into_darc directly on global_Darc, but string the operations for testing purposes
let ro_darc = global_darc.into_localrw().block().into_darc().block(); // we can call into_darc directly on global_Darc, but string the operations for testing purposes
println!("read only darc");
ro_darc.print();
println!("done");
Expand Down
8 changes: 4 additions & 4 deletions run_examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ for toolchain in stable; do #nightly; do
fi
cd ..
sleep 2
cur_tasks=`squeue -u frie869 | wc -l`
running_tasks=`squeue -u frie869 | grep " R " | wc -l`
cur_tasks=`squeue -u frie869 | grep frie869 |wc -l`
running_tasks=`squeue -u frie869 | grep frie869| grep " R " | wc -l`
while [ $((cur_tasks+running_tasks)) -gt 6 ]; do
cur_tasks=`squeue -u frie869 | wc -l`
running_tasks=`squeue -u frie869 | grep " R " | wc -l`
cur_tasks=`squeue -u frie869 | grep frie869 | wc -l`
running_tasks=`squeue -u frie869 | grep frie869 | grep " R " | wc -l`
sleep 5
done
# fi
Expand Down
27 changes: 14 additions & 13 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,29 +191,30 @@ pub struct ReduceKey {
crate::inventory::collect!(ReduceKey);

// impl Dist for bool {}
lamellar_impl::generate_reductions_for_type_rt!(true, u8, usize, isize);
lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, usize, isize);
// lamellar_impl::generate_reductions_for_type_rt!(true, u8, usize, isize);
// lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, usize, isize);

// lamellar_impl::generate_reductions_for_type_rt!(false, f32);
// lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32);

// lamellar_impl::generate_reductions_for_type_rt!(false, u128);
// lamellar_impl::generate_ops_for_type_rt!(true, false, true, u128);
// //------------------------------------

// lamellar_impl::generate_reductions_for_type_rt!(true, u8, u16, u32, u64, usize);
// lamellar_impl::generate_reductions_for_type_rt!(false, u128);
// lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, u16, u32, u64, usize);
// lamellar_impl::generate_ops_for_type_rt!(true, false, true, u128);
lamellar_impl::generate_reductions_for_type_rt!(true, u8, u16, u32, u64, usize);
lamellar_impl::generate_reductions_for_type_rt!(false, u128);
lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, u16, u32, u64, usize);
lamellar_impl::generate_ops_for_type_rt!(true, false, true, u128);

// lamellar_impl::generate_reductions_for_type_rt!(true, i8, i16, i32, i64, isize);
// lamellar_impl::generate_reductions_for_type_rt!(false, i128);
// lamellar_impl::generate_ops_for_type_rt!(true, true, true, i8, i16, i32, i64, isize);
// lamellar_impl::generate_ops_for_type_rt!(true, false, true, i128);
lamellar_impl::generate_reductions_for_type_rt!(true, i8, i16, i32, i64, isize);
lamellar_impl::generate_reductions_for_type_rt!(false, i128);
lamellar_impl::generate_ops_for_type_rt!(true, true, true, i8, i16, i32, i64, isize);
lamellar_impl::generate_ops_for_type_rt!(true, false, true, i128);

// lamellar_impl::generate_reductions_for_type_rt!(false, f32, f64);
// lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32, f64);
lamellar_impl::generate_reductions_for_type_rt!(false, f32, f64);
lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32, f64);

// lamellar_impl::generate_ops_for_bool_rt!();
lamellar_impl::generate_ops_for_bool_rt!();

impl<T: Dist + ArrayOps> Dist for Option<T> {}
impl<T: Dist + ArrayOps> ArrayOps for Option<T> {}
Expand Down
Loading

0 comments on commit 6ddccb2

Please sign in to comment.