Skip to content

Commit

Permalink
convert darc into methods to handle based rather than explict blockin…
Browse files Browse the repository at this point in the history
…g apis
  • Loading branch information
rdfriese committed Oct 16, 2024
1 parent c55d5fe commit fa0c64f
Show file tree
Hide file tree
Showing 34 changed files with 433 additions and 576 deletions.
8 changes: 4 additions & 4 deletions examples/active_message_examples/am_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,26 +113,26 @@ fn main() {
println!("-----------------------------------");
// println!("---------------------------------------------------------------");
// println!("Testing local am no return");
// let res = world.exec_am_pe(my_pe, am.clone()).blocking_wait();
// let res = world.exec_am_pe(my_pe, am.clone()).block();
// assert_eq!(res, None);
// println!("no return result: {:?}", res);
// println!("-----------------------------------");
// println!("Testing remote am no return");
// let res = world.exec_am_pe(num_pes - 1, am.clone()).blocking_wait();
// let res = world.exec_am_pe(num_pes - 1, am.clone()).block();
// assert_eq!(res, None);
// println!("no return result: {:?}", res);
// println!("-----------------------------------");
// println!("Testing all am no return");
// println!("[{:?}] exec on all", my_pe);
// let res = world.exec_am_all(am.clone()).blocking_wait();
// let res = world.exec_am_all(am.clone()).block();
// assert!(res.iter().all(|x| x.is_none()));
// println!("no return result: {:?}", res);
// println!("---------------------------------------------------------------");
}

// println!("---------------------------------------------------------------");
// println!("Testing ring pattern am no return");
// let res = world.exec_am_pe((my_pe + 1) % num_pes, am.clone()).blocking_wait();
// let res = world.exec_am_pe((my_pe + 1) % num_pes, am.clone()).block();
// assert_eq!(res, None);
// println!("no return result: {:?}", res);
// println!("-----------------------------------");
Expand Down
2 changes: 1 addition & 1 deletion examples/active_message_examples/recursive_am.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl LamellarAM for RecursiveAM {
orig: self.orig,
},
);
// let mut res = next.blocking_wait().expect("error returning from am"); // this will cause deadlock
// let mut res = next.block().expect("error returning from am"); // this will cause deadlock
let mut res = next.await;
res.push(hostname::get().unwrap().into_string().unwrap()); //append my host name to list returned from previous call
res
Expand Down
9 changes: 5 additions & 4 deletions examples/array_examples/global_lock_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn main() {
let array = GlobalLockArray::<usize>::new(&world, 100, Distribution::Block);

let s = Instant::now();
let local_data = array.blocking_read_local_data();
let local_data = array.read_local_data().block();
println!(
"PE{my_pe} time: {:?} {:?}",
s.elapsed().as_secs_f64(),
Expand All @@ -19,7 +19,7 @@ fn main() {
drop(local_data); //release the lock

world.barrier();
let mut local_data = array.blocking_write_local_data();
let mut local_data = array.write_local_data().block();
println!(
"PE{my_pe} time: {:?} got write lock",
s.elapsed().as_secs_f64()
Expand All @@ -31,7 +31,7 @@ fn main() {
array.print();
println!("PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64());

let mut local_data = array.blocking_collective_write_local_data();
let mut local_data = array.collective_write_local_data().block();
println!(
"PE{my_pe} time: {:?} got collective write lock",
s.elapsed().as_secs_f64()
Expand All @@ -48,7 +48,8 @@ fn main() {
println!("PE{my_pe} time: {:?} done", s.elapsed().as_secs_f64());

array
.blocking_read_lock()
.read_lock()
.block()
.dist_iter()
.enumerate()
.for_each(move |(i, elem)| {
Expand Down
2 changes: 1 addition & 1 deletion examples/bandwidths/global_lock_atomic_array_put_bw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fn main() {
let cur_t = timer.elapsed().as_secs_f64();
if my_pe == 0 {
for j in (0..2_u64.pow(exp) as usize).step_by(num_bytes as usize) {
let local_data = array.blocking_read_local_data();
let local_data = array.read_local_data().block();
while *(&local_data[(j + num_bytes as usize) - 1]) == 255 as u8 {
println!(
"this should not happen {:?}",
Expand Down
2 changes: 1 addition & 1 deletion examples/bandwidths/local_lock_atomic_array_put_bw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fn main() {
let cur_t = timer.elapsed().as_secs_f64();
if my_pe == num_pes - 1 {
for j in (0..2_u64.pow(exp) as usize).step_by(num_bytes as usize) {
let local_data = array.blocking_read_local_data();
let local_data = array.read_local_data().block();
while *(&local_data[(j + num_bytes as usize) - 1]) == 255 as u8 {
println!(
"this should not happen {:?}",
Expand Down
6 changes: 3 additions & 3 deletions examples/darc_examples/darc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ fn main() {
));

let global_darc = GlobalRwDarc::new(world.team(), 0).unwrap();
let read_lock = global_darc.blocking_read();
let read_lock = global_darc.read().block();
println!("I have the read lock!!!! {:?}", my_pe);
drop(read_lock);
let write_lock = global_darc.blocking_write();
let write_lock = global_darc.write().block();
println!("I have the write lock!!!! {:?}", my_pe);
std::thread::sleep(std::time::Duration::from_secs(1));
drop(write_lock);
Expand Down Expand Up @@ -100,7 +100,7 @@ fn main() {
tg.add_am_all(darc_am);
team.block_on(tg.exec());
} else {
*local_darc.blocking_write() += 1;
*local_darc.write().block() += 1;
}
}
// --------
Expand Down
2 changes: 1 addition & 1 deletion examples/kernels/am_flops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ fn main() {
// let cur_t = timer.elapsed().as_secs_f64();
// let tot_flop: usize = reqs
// .iter()
// .map(|r| r.blocking_wait().iter().map(|r| r.unwrap()).sum::<usize>())
// .map(|r| r.block().iter().map(|r| r.unwrap()).sum::<usize>())
// .sum();
// let task_granularity = ((cur_t * 24f64) / num_tasks as f64) * 1000.0f64;
// if my_pe == 0 {
Expand Down
2 changes: 1 addition & 1 deletion examples/kernels/cached_am_gemm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ fn main() {
tasks += 1;
}
// for req in reqs {
// req.blocking_wait();
// req.block();
// }
}

Expand Down
6 changes: 3 additions & 3 deletions examples/kernels/dft_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ fn main() {
// println!(
// "{:?} array sum: {:?} time: {:?}",
// my_pe,
// full_spectrum_array.sum().blocking_wait(),
// full_spectrum_array.sum().block(),
// time
// );
// }
Expand All @@ -807,7 +807,7 @@ fn main() {
// println!(
// "{:?} array sum: {:?} time: {:?}",
// my_pe,
// full_spectrum_array.sum().blocking_wait(),
// full_spectrum_array.sum().block(),
// time
// );
// }
Expand Down Expand Up @@ -857,7 +857,7 @@ fn main() {
// println!(
// "{:?} array sum: {:?} time: {:?}",
// my_pe,
// full_spectrum_array.sum().blocking_wait(),
// full_spectrum_array.sum().block(),
// time
// );
// }
Expand Down
2 changes: 1 addition & 1 deletion examples/misc/dist_hashmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,6 @@ fn main() {
world.barrier();
println!(
"[{my_pe}] local data: {:?}",
distributed_map.data.blocking_read()
distributed_map.data.read().block()
);
}
7 changes: 4 additions & 3 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ crate::inventory::collect!(ReduceKey);
lamellar_impl::generate_reductions_for_type_rt!(true, u8, usize, isize);
lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, usize, isize);

lamellar_impl::generate_reductions_for_type_rt!(false, f32);
lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32);
// lamellar_impl::generate_reductions_for_type_rt!(false, f32);
// lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32);

// lamellar_impl::generate_reductions_for_type_rt!(false, u128);
// lamellar_impl::generate_ops_for_type_rt!(true, false, true, u128);

Expand All @@ -212,7 +213,7 @@ lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32);
// lamellar_impl::generate_reductions_for_type_rt!(false, f32, f64);
// lamellar_impl::generate_ops_for_type_rt!(false, false, false, f32, f64);

lamellar_impl::generate_ops_for_bool_rt!();
// lamellar_impl::generate_ops_for_bool_rt!();

impl<T: Dist + ArrayOps> Dist for Option<T> {}
impl<T: Dist + ArrayOps> ArrayOps for Option<T> {}
Expand Down
Loading

0 comments on commit fa0c64f

Please sign in to comment.