diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..bfc3f1c --- /dev/null +++ b/.gitattributes @@ -0,0 +1,14 @@ +# Set the default behavior, in case people don't have core.autocrlf set. +* text=auto + +# Explicitly declare text files you want to always be normalized and converted +# to native line endings on checkout. +*.rs text +*.txt text +*.md text +*.toml text + +# Denote all files that are truly binary and should not be modified. +*.png binary +*.jpg binary +*.ods binary diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c26015e --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +/target +**/*.rs.bk +.idea/* +.vscode/* +thincollections.iml +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..0c41eac --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "widerwlock" +version = "0.5.0" +authors = ["Mohammad Rezaei "] +license = "MIT/Apache-2.0" +keywords = ["lock", "read-write", "rwlock"] +readme = "README.md" +repository = "https://github.com/mohrezaei/widerwlock" +homepage = "https://github.com/mohrezaei/widerwlock" +documentation = "https://docs.rs/widerwlock" +description = """ +A partitioned read-write lock optimized for many parallel readers. +""" +exclude = [ + ".travis.yml", + "benchmarks/*", + "benches/*", + ".gitignore", + ".gitattributes", + "tests/*" +] + +[dependencies] +parking_lot_core = "0.3" + +[dev-dependencies] +rand = "0.5.5" +xoshiro = "0.0.4" +parking_lot = "0.6" diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/NOTICE.txt b/NOTICE.txt new file mode 100644 index 0000000..3c0c677 --- /dev/null +++ b/NOTICE.txt @@ -0,0 +1,9 @@ +Wide Read-Write Lock for Rust +Copyright 2018 Mohammad Rezaei +Licensed under Apache 2.0 license + +This product depends on the following software: + +parkling_lot_core +Copyright Amanieu d'Antras +Licensed under the MIT license diff --git a/README.md b/README.md new file mode 100644 index 0000000..8e39fe1 --- /dev/null +++ b/README.md @@ -0,0 +1,44 @@ +# Wide (Partitioned) Read Write Lock + +[![Latest version](https://img.shields.io/crates/v/widerwlock.svg)](https://crates.io/crates/widerwlock) +[![Documentation](https://docs.rs/widerwlock/badge.svg)](https://docs.rs/widerwlock) +![Minimum rustc version](https://img.shields.io/badge/rustc-1.28+-yellow.svg) + +This crate implements an enterprise-grade read-write lock, typically used for large data structures. +The lock is internally 8x partitioned, which allows it to scale much better with a large number of readers, +as compared to `std::sync::RwLock`. + +Even though this is a large lock, it has better performance characteristic for uncontended single reader +or single writer lock than `std::sync::RwLock`. + +The lock uses a contiguous 576 byte heap area to store its state, so it's not a light-weight lock. +If you have a complex data structure that holds a GB of data, this would be an appropriate lock. + +An interesting feature of this lock, beside its performance, is its Read->Write upgrade mechanics. The `ReadGuard` allows an +upgrade to a write-lock and informs the user whether the upgrade succeeded atomically or not. This enables +the following pattern: +- Read to see if the data structure is in a particular state (e.g. contains a value). + - if not, upgrade to a write lock + - if upgrade was not atomic, perform the (potentially expensive) read again + - write to the structure if needed + +- [Documentation](https://docs.rs/widerwlock) + +## Usage + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +widerwlock = "0.5" +``` + +and this to your crate root: + +```rust +extern crate widerwlock; +``` + +## Rust Version Support + +The minimum supported Rust version is 1.28 due to use of allocator api. \ No newline at end of file diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..e69de29 diff --git a/benches/bench.rs b/benches/bench.rs new file mode 100644 index 0000000..01c3253 --- /dev/null +++ b/benches/bench.rs @@ -0,0 +1,551 @@ +// Copyright 2018 Mohammad Rezaei. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. +// + +#![feature(test)] +extern crate test; +extern crate widerwlock; +extern crate parking_lot; +extern crate rand; +extern crate xoshiro; + +use test::Bencher; +use test::stats::Summary; +use test::black_box; + +use widerwlock::*; +use parking_lot::*; +use std::sync::mpsc::*; +use std::sync::Mutex; +use std::ops::Deref; +use parking_lot::RwLock; +use std::thread; +use std::cell::RefCell; +use std::sync::Arc; +use xoshiro::Xoshiro512StarStar; +use rand::*; + + +thread_local!(static FOO: RefCell = RefCell::new(widerwlock::thread_id_as_u64())); + +#[bench] +fn bench_uncontended_read_grwl(b: &mut Bencher) { + let lock = WideRwLock::new(()); + b.iter(|| { + for _i in 0..1_000_000 { + lock.read(); + } + }) +} + +#[bench] +fn bench_uncontended_write_grwl(b: &mut Bencher) { + let lock = WideRwLock::new(()); + b.iter(|| { + for _i in 0..1_000_000 { + lock.write(); + } + }) +} + +#[bench] +fn bench_uncontended_read_std(b: &mut Bencher) { + let lock = std::sync::RwLock::new(()); + b.iter(|| { + for _i in 0..1_000_000 { + lock.read(); + } + }) +} + +#[bench] +fn bench_uncontended_write_std(b: &mut Bencher) { + let lock = std::sync::RwLock::new(()); + b.iter(|| { + for _i in 0..1_000_000 { + lock.write(); + } + }) +} + +#[bench] +fn bench_uncontended_mutex(b: &mut Bencher) { + let mutex = Mutex::new(()); + b.iter(|| { + for _i in 0..1_000_000 { + mutex.lock(); + } + }) +} + +#[bench] +fn bench_uncontended_read_pk(b: &mut Bencher) { + let lock = RwLock::new(()); + b.iter(|| { + for _i in 0..1_000_000 { + lock.read(); + } + }) +} + +#[bench] +fn bench_uncontended_write_pk(b: &mut Bencher) { + let lock = RwLock::new(()); + b.iter(|| { + for _i in 0..1_000_000 { + lock.write(); + } + }) +} + +#[bench] +fn bench_2_reader_grwl(b: &mut Bencher) { + run_multi_reader_grwl(b, 2); +} + +#[bench] +fn bench_4_reader_grwl(b: &mut Bencher) { + run_multi_reader_grwl(b, 4); +} + +#[bench] +fn bench_8_reader_grwl(b: &mut Bencher) { + run_multi_reader_grwl(b, 8); +} + +#[bench] +fn bench_2_reader_pk(b: &mut Bencher) { + run_multi_reader_pk(b, 2); +} + +#[bench] +fn bench_4_reader_pk(b: &mut Bencher) { + run_multi_reader_pk(b, 4); +} + +#[bench] +fn bench_8_reader_pk(b: &mut Bencher) { + run_multi_reader_pk(b, 8); +} + +#[bench] +fn bench_2_reader_std(b: &mut Bencher) { + run_multi_reader_std(b, 2); +} + +#[bench] +fn bench_4_reader_std(b: &mut Bencher) { + run_multi_reader_std(b, 4); +} + +#[bench] +fn bench_8_reader_std(b: &mut Bencher) { + run_multi_reader_std(b, 8); +} + +fn run_multi_reader_grwl(b: &mut Bencher, threads: u32) { + b.iter(|| { + let r = Arc::new(WideRwLock::new(())); + + let (tx, rx) = channel::<()>(); + for _ in 0..threads { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + for _ in 0..1_000_000 { + drop(r.read()); + } + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); + }); +} + +fn run_multi_reader_pk(b: &mut Bencher, threads: u32) { + b.iter(|| { + let r = Arc::new(RwLock::new(())); + + let (tx, rx) = channel::<()>(); + for _ in 0..threads { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + for _ in 0..1_000_000 { + drop(r.read()); + } + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); + }); +} + +fn run_multi_reader_std(b: &mut Bencher, threads: u32) { + b.iter(|| { + let r = Arc::new(std::sync::RwLock::new(())); + + let (tx, rx) = channel::<()>(); + for _ in 0..threads { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + for _ in 0..1_000_000 { + drop(r.read()); + } + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); + }); +} + +//#[bench] +fn bench_thread_id(b: &mut Bencher) { + b.iter(|| { + for _i in 0..1_000_000 { + widerwlock::thread_id_as_u64(); + } + }) +} + +//#[bench] +fn bench_thread_current(b: &mut Bencher) { + b.iter(|| { + for _i in 0..1_000_000 { + thread::current(); + } + }) +} + +//#[bench] +fn bench_thread_local_borrow(b: &mut Bencher) { + b.iter(|| { + FOO.with(|f| black_box(*f.borrow())); + }) +} + +#[bench] +fn bench_mixed_tenth_work10_1_grwl(b: &mut Bencher) { + b.iter(|| { + read_write_work10_grwl(1, 0.1); + }) +} + +#[bench] +fn bench_mixed_hundreth_work10_1_grwl(b: &mut Bencher) { + b.iter(|| { + read_write_work10_grwl(1, 0.01); + }) +} + +#[bench] +fn bench_mixed_tenth_work10_4_grwl(b: &mut Bencher) { +// read_write_no_work_grwl(4, 0.1); + + b.iter(|| { + read_write_work10_grwl(4, 0.1); + }) +} + +#[bench] +fn bench_mixed_hundreth_work10_4_grwl(b: &mut Bencher) { + b.iter(|| { + read_write_work10_grwl(4, 0.01); + }) +} + +#[bench] +fn bench_mixed_tenth_work100_4_grwl(b: &mut Bencher) { + b.iter(|| { + read_write_work100_grwl(4, 0.1); + }) +} + +#[bench] +fn bench_mixed_tenth_work10_1_pk(b: &mut Bencher) { + b.iter(|| { + read_write_work10_pk(1, 0.1); + }) +} + +#[bench] +fn bench_mixed_hundreth_work10_1_pk(b: &mut Bencher) { + b.iter(|| { + read_write_work10_pk(1, 0.01); + }) +} + +#[bench] +fn bench_mixed_tenth_work10_4_pk(b: &mut Bencher) { + b.iter(|| { + read_write_work10_pk(4, 0.1); + }) +} + +#[bench] +fn bench_mixed_tenth_work100_4_pk(b: &mut Bencher) { + b.iter(|| { + read_write_work100_pk(4, 0.1); + }) +} + +#[bench] +fn bench_mixed_hundreth_work10_4_pk(b: &mut Bencher) { + b.iter(|| { + read_write_work10_pk(4, 0.01); + }) +} + +#[bench] +fn bench_mixed_tenth_work10_1_std(b: &mut Bencher) { + b.iter(|| { + read_write_work10_std(1, 0.1); + }) +} + +#[bench] +fn bench_mixed_hundreth_work10_1_std(b: &mut Bencher) { + b.iter(|| { + read_write_work10_std(1, 0.01); + }) +} + +#[bench] +fn bench_mixed_tenth_work10_4_std(b: &mut Bencher) { + b.iter(|| { + read_write_work10_std(4, 0.1); + }) +} + +#[bench] +fn bench_mixed_tenth_work100_4_std(b: &mut Bencher) { + b.iter(|| { + read_write_work100_std(4, 0.1); + }) +} + +#[bench] +fn bench_mixed_hundreth_work10_4_std(b: &mut Bencher) { + b.iter(|| { + read_write_work10_std(4, 0.01); + }) +} + +fn read_write_work10_grwl(threads: u64, writer_fraction: f64) { + const M: u32 = 1_000_000; + + let r = Arc::new(WideRwLock::new(0u128)); + + let (tx, rx) = channel::<()>(); + for t in 0..threads { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut sum: f64 = 0.0; + let mut rng = Xoshiro512StarStar::from_seed_u64(0xCAFE_BABE_DEAD_BEEF ^ t); + for _ in 0..M { + if rng.gen::() < writer_fraction { + r.write(); + sum += work10(&mut rng); + } else { + r.read(); + sum += work10(&mut rng); + } + } + assert!(sum != 0.0); + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); +} + +fn read_write_work100_grwl(threads: u64, writer_fraction: f64) { + const M: u32 = 1_000_00; + + let r = Arc::new(WideRwLock::new(())); + + let (tx, rx) = channel::<()>(); + for t in 0..threads { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut sum: f64 = 0.0; + let mut rng = Xoshiro512StarStar::from_seed_u64(0xCAFE_BABE_DEAD_BEEF ^ t); + for _ in 0..M { + if rng.gen::() < writer_fraction { + r.write(); + sum += work100(&mut rng); + } else { + r.read(); + sum += work100(&mut rng); + } + } + assert!(sum > 0.0); + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); +} + +fn read_write_work10_pk(threads: u64, writer_fraction: f64) { + const M: u32 = 1_000_000; + + let r = Arc::new(RwLock::new(0u128)); + + let (tx, rx) = channel::<()>(); + for t in 0..threads { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut sum: f64 = 0.0; + let mut rng = Xoshiro512StarStar::from_seed_u64(0xCAFE_BABE_DEAD_BEEF ^ t); + for _ in 0..M { + if rng.gen::() < writer_fraction { + r.write(); + sum += work10(&mut rng); + } else { + r.read(); + sum += work10(&mut rng); + } + } + assert!(sum != 0.0); + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); +} + +fn read_write_work100_pk(threads: u64, writer_fraction: f64) { + const M: u32 = 1_000_00; + + let r = Arc::new(RwLock::new(())); + + let (tx, rx) = channel::<()>(); + for t in 0..threads { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut sum: f64 = 0.0; + let mut rng = Xoshiro512StarStar::from_seed_u64(0xCAFE_BABE_DEAD_BEEF ^ t); + for _ in 0..M { + if rng.gen::() < writer_fraction { + r.write(); + sum += work100(&mut rng); + } else { + r.read(); + sum += work100(&mut rng); + } + } + assert!(sum > 0.0); + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); +} + +fn read_write_work10_std(threads: u64, writer_fraction: f64) { + const M: u32 = 1_000_000; + + let r = Arc::new(std::sync::RwLock::new(0u128)); + + let (tx, rx) = channel::<()>(); + for t in 0..threads { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut sum: f64 = 0.0; + let mut rng = Xoshiro512StarStar::from_seed_u64(0xCAFE_BABE_DEAD_BEEF ^ t); + for _ in 0..M { + if rng.gen::() < writer_fraction { + r.write(); + sum += work10(&mut rng); + } else { + r.read(); + sum += work10(&mut rng); + } + } + assert!(sum != 0.0); + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); +} + +fn read_write_work100_std(threads: u64, writer_fraction: f64) { + const M: u32 = 1_000_00; + + let r = Arc::new(std::sync::RwLock::new(())); + + let (tx, rx) = channel::<()>(); + for t in 0..threads { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut sum: f64 = 0.0; + let mut rng = Xoshiro512StarStar::from_seed_u64(0xCAFE_BABE_DEAD_BEEF ^ t); + for _ in 0..M { + if rng.gen::() < writer_fraction { + r.write(); + sum += work100(&mut rng); + } else { + r.read(); + sum += work100(&mut rng); + } + } + assert!(sum != 0.0); + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); +} + +#[bench] +fn bench_work100(b: &mut Bencher) { + let mut rng = Xoshiro512StarStar::from_seed_u64(0xCAFE_BABE_DEAD_BEEF); + b.iter(|| { + work100(&mut rng); + }) +} + +#[bench] +fn bench_work10(b: &mut Bencher) { + let mut rng = Xoshiro512StarStar::from_seed_u64(0xCAFE_BABE_DEAD_BEEF); + b.iter(|| { + work10(&mut rng); + }) +} + +#[cold] +fn work100(rng: &mut Xoshiro512StarStar) -> f64 { + let max: i32 = (2.0 + rng.gen::() * 1.5) as i32; + let mut sum: f64 = 0.0; + for _i in 0..max { + sum += rng.gen::().cbrt(); + } + sum +} + +#[cold] +fn work10(rng: &mut Xoshiro512StarStar) -> f64 { + let max: i32 = (2.0 + rng.gen::() * 1.5) as i32; + let mut sum: f64 = 0.0; + for _i in 0..max { + sum += rng.gen::().sqrt(); + } + sum +} diff --git a/benchmarks/results.md b/benchmarks/results.md new file mode 100644 index 0000000..3548162 --- /dev/null +++ b/benchmarks/results.md @@ -0,0 +1,39 @@ + + +intel i7-7700K, Windows 10, nightly-x86_64-pc-windows-msvc +``` +test bench_2_reader_grwl ... bench: 12,035,099 ns/iter (+/- 1,851,445) +test bench_2_reader_pk ... bench: 28,803,382 ns/iter (+/- 423,200) +test bench_2_reader_std ... bench: 28,386,442 ns/iter (+/- 1,316,308) +test bench_4_reader_grwl ... bench: 14,991,882 ns/iter (+/- 5,051,984) +test bench_4_reader_pk ... bench: 58,448,153 ns/iter (+/- 837,085) +test bench_4_reader_std ... bench: 56,011,028 ns/iter (+/- 1,106,178) +test bench_8_reader_grwl ... bench: 21,986,720 ns/iter (+/- 4,930,849) +test bench_8_reader_pk ... bench: 128,697,642 ns/iter (+/- 4,848,127) +test bench_8_reader_std ... bench: 108,676,567 ns/iter (+/- 2,381,390) +test bench_mixed_hundreth_work10_1_grwl ... bench: 26,082,803 ns/iter (+/- 1,373,453) +test bench_mixed_hundreth_work10_1_pk ... bench: 20,867,510 ns/iter (+/- 1,788,476) +test bench_mixed_hundreth_work10_1_std ... bench: 23,517,335 ns/iter (+/- 1,168,930) +test bench_mixed_hundreth_work10_4_grwl ... bench: 70,482,765 ns/iter (+/- 5,733,574) +test bench_mixed_hundreth_work10_4_pk ... bench: 94,416,212 ns/iter (+/- 5,646,173) +test bench_mixed_hundreth_work10_4_std ... bench: 101,117,698 ns/iter (+/- 3,081,084) +test bench_mixed_tenth_work100_4_grwl ... bench: 29,531,374 ns/iter (+/- 1,187,823) +test bench_mixed_tenth_work100_4_pk ... bench: 32,086,846 ns/iter (+/- 1,075,212) +test bench_mixed_tenth_work100_4_std ... bench: 40,456,188 ns/iter (+/- 704,694) +test bench_mixed_tenth_work10_1_grwl ... bench: 27,482,565 ns/iter (+/- 2,266,590) +test bench_mixed_tenth_work10_1_pk ... bench: 20,712,666 ns/iter (+/- 184,322) +test bench_mixed_tenth_work10_1_std ... bench: 23,800,789 ns/iter (+/- 584,405) +test bench_mixed_tenth_work10_4_grwl ... bench: 220,569,891 ns/iter (+/- 6,972,154) +test bench_mixed_tenth_work10_4_pk ... bench: 94,027,579 ns/iter (+/- 4,944,653) +test bench_mixed_tenth_work10_4_std ... bench: 102,399,700 ns/iter (+/- 4,858,920) +test bench_uncontended_mutex ... bench: 15,171,497 ns/iter (+/- 118,918) +test bench_uncontended_read_grwl ... bench: 9,596,682 ns/iter (+/- 126,181) +test bench_uncontended_read_pk ... bench: 14,058,431 ns/iter (+/- 126,530) +test bench_uncontended_read_std ... bench: 13,382,540 ns/iter (+/- 146,634) +test bench_uncontended_write_grwl ... bench: 10,114,534 ns/iter (+/- 531,415) +test bench_uncontended_write_pk ... bench: 8,028,946 ns/iter (+/- 38,759) +test bench_uncontended_write_std ... bench: 14,276,519 ns/iter (+/- 163,786) +test bench_work10 ... bench: 10 ns/iter (+/- 0) +test bench_work100 ... bench: 102 ns/iter (+/- 3) +``` + diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..4a49e74 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,879 @@ +// Copyright 2018 Mohammad Rezaei. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. +// + +//! # Wide (Partitioned) Read Write Lock +//! This crate implements an enterprise-grade read-write lock, typically used for large data structures. +//! The lock is internally 8x partitioned, which allows it to scale much better with a large number of readers, +//! as compared to `std::sync::RwLock`. +//! Even though this is a large lock, it has better performance characteristic for uncontended single reader +//! or single writer lock than `std::sync::RwLock`. +//! The lock uses a contiguous 576 byte heap area to store its state, so it's not a light-weight lock. +//! If you have a complex data structure that holds a GB of data, this would be an appropriate lock. +//! +//! An interesting feature of this lock, beside its performance, is its Read->Write upgrade mechanics. The `ReadGuard` allows an +//! upgrade to a write-lock and informs the user whether the upgrade succeeded atomically or not. This enables +//! the following pattern: +//! - Read to see if the data structure is in a particular state (e.g. contains a value). +//! - if not, upgrade to a write lock +//! - if upgrade was not atomic, perform the (potentially expensive) read again +//! - write to the structure if needed +//! +//! Here is an example: +//! ``` +//! # extern crate widerwlock; +//! use widerwlock::*; +//! use std::collections::HashMap; +//! use std::hash::Hash; +//! +//! struct RwMap { +//! map: WideRwLock>, +//! } +//! +//! impl RwMap { +//! pub fn insert_if_absent(&self, key: K, v_factory: F) +//! where F: FnOnce(&K) -> V { +//! let guard: ReadGuard<_> = self.map.read(); +//! if !guard.contains_key(&key) { // perform the read. +//! +//! let result: UpgradeResult<_> = guard.upgrade_to_write_lock(); +//! let atomic = result.atomic(); +//! // the atomic return ensures the (potentially) expensive double read +//! // can be avoided when atomic +//! let mut write_guard: WriteGuard<_> = result.into_guard(); +//! if !atomic { +//! // we have to check again, because another writer may have come in +//! if write_guard.contains_key(&key) { return; } +//! } +//! let v = v_factory(&key); +//! write_guard.insert(key, v); +//! } +//! } +//! } +//! ``` +//! + +extern crate parking_lot_core; + +use std::{ + alloc::{self, Layout}, + mem, ptr, +}; +use std::sync::atomic::*; +use std::thread; +use std::cell::RefCell; + +use parking_lot_core::*; +use std::sync::atomic::AtomicBool; +use std::cell::UnsafeCell; +use std::ops::Deref; +use std::ops::DerefMut; +use std::thread::Thread; + +/// A multi-reader, single writer lock. +/// +/// The implementation has the following properties: +/// - Non-reentrant. Do no attempt to take this lock multiple times in the same thread. It will deadlock. +/// - Writer preferenced: writers will get preference and hold back incoming readers when waiting. +/// - 8x partitioned: allows very fast multi-reader locking. +/// - Uncontended single threaded performance better than `std::sync::Mutex` and `std::sync:RwLock` +/// - No poisoning. It should be possible to wrap this lock with poison support if needed. +/// +/// This struct uses a single pointer to point into a 576 byte (64-byte aligned) heap area +/// +pub struct WideRwLock { + ptr: *mut u64, + data: UnsafeCell, +} + +/// returned from `ReadGuard::upgrade_to_write_lock()` +pub enum UpgradeResult<'a, T: ?Sized + 'a> { + AtomicUpgrade(WriteGuard<'a, T>), NonAtomicUpgrade(WriteGuard<'a, T>) +} + +impl<'a, T: ?Sized + 'a> UpgradeResult<'a, T> { + pub fn atomic(&self) -> bool { + match self { + UpgradeResult::AtomicUpgrade(_) => { return true; } + UpgradeResult::NonAtomicUpgrade(_) => { return false; } + } + } + + pub fn into_guard(self) -> WriteGuard<'a, T> { + match self { + UpgradeResult::AtomicUpgrade(w) => { return w; } + UpgradeResult::NonAtomicUpgrade(w) => { return w; } + } + } +} + +const READER_BITS: u32 = 25; +const READER_MASK: u32 = (1 << READER_BITS) - 1; + +const WRITER_MASK: u32 = 1 << (READER_BITS + 1); +const PREPARING_TO_GO_LOCAL: u32 = 1 << (READER_BITS+2); + +const GLOBAL_MASK: u32 = 1 << (READER_BITS+4); + +const LOCK_POWER: u32 = 3; + +const LOCK_COUNT: u32 = 1 << LOCK_POWER; + +const LOCK_MASK: u32 = (1 << LOCK_POWER) - 1; + +const WRITE_PARK_TOKEN: ParkToken = ParkToken(1); +const READ_PARK_TOKEN: ParkToken = ParkToken(2); + +thread_local!(static THREAD_ID: RefCell = RefCell::new(thread_id_as_u64_init())); + +impl WideRwLock { + /// creates a new lock + pub fn new(data: T) -> WideRwLock { + unsafe { + if mem::size_of::() > 64 { + panic!("ThreadId has gotten too fat!"); + } + let layout = Layout::from_size_align((1+LOCK_COUNT as usize) * 64, 64).unwrap(); //aligned to cache line + let ptr = alloc::alloc(layout) as *mut u64; + RawGlobalLock::from_ptr(ptr).init(); + for i in 0..LOCK_COUNT { + RawLocalLock::new(ptr, i as usize).init(); + } + WideRwLock { ptr, data: UnsafeCell::new(data) } + } + } +} + +unsafe impl Send for WideRwLock {} +unsafe impl Sync for WideRwLock {} + +impl WideRwLock { + + /// Obtain a read lock. Multiple simultaneous read locks will be given if there are no writers waiting. + /// The lock remains locked until the returned RAII guard is dropped. + #[inline] + pub fn read(&self) -> ReadGuard { + let reader_id = reader_index(); + let local_lock = RawLocalLock::new(self.ptr, reader_id); + if local_lock.try_read_lock_local() { + return ReadGuard { lock: & self, reader_id}; + } + let global_sync = RawGlobalLock::from_ptr(self.ptr); + if !global_sync.try_fast_read_lock() { + if self.read_lock_slow(reader_id, &local_lock) { + return ReadGuard {lock: & self, reader_id } + } + } + ReadGuard {lock: & self, reader_id: LOCK_COUNT as usize } + } + + /// Obtain a write lock. If a writer is waiting, no more read locks will be given until the writer + /// has been granted access and finished. + /// The lock remains locked until the returned RAII guard is dropped. + #[inline] + pub fn write(&self) -> WriteGuard { + let global_sync = RawGlobalLock::from_ptr(self.ptr); + let (must_prep, must_recheck_global) = global_sync.write_lock(); + if must_prep { + self.prepare_local_write_locks_and_lock_global(); + } + if must_recheck_global { + global_sync.wait_for_reader(); + } + WriteGuard { lock : &self } + } + + #[inline] + fn write_unlock(&self) { + let global_lock = RawGlobalLock::from_ptr(self.ptr); + global_lock.write_unlock(); + } + + #[inline] + fn read_unlock(&self) { + let global_lock = RawGlobalLock::from_ptr(self.ptr); + global_lock.read_unlock_zero(); + } + + #[inline] + fn fast_local_read_unlock(&self, reader_id: usize) { + let lock = RawLocalLock::new(self.ptr, reader_id); + if lock.read_unlock() { + lock.unpark_thread(); + } + } + + fn upgrade_to_write_lock(&self) -> UpgradeResult { + if RawGlobalLock::from_ptr(self.ptr).try_single_reader_upgrade() { + return UpgradeResult::AtomicUpgrade(WriteGuard { lock: &self }); + } + let global_lock = RawGlobalLock::from_ptr(self.ptr); + global_lock.read_unlock_zero(); + UpgradeResult::NonAtomicUpgrade(self.write()) + } + + fn slow_upgrade(&self, reader_id: usize) -> UpgradeResult { + self.fast_local_read_unlock(reader_id); + UpgradeResult::NonAtomicUpgrade(self.write()) + } + + fn read_lock_slow(&self, reader_id: usize, local_lock: &RawLocalLock) -> bool { // always locks, returns true if the lock was locally taken + let global_sync = RawGlobalLock::from_ptr(self.ptr); + let (global_locked_needs_prep, local_locked) = global_sync.read_lock(local_lock); + if global_locked_needs_prep { + local_lock.read_lock_deglobalize(); + RawLocalLock::new(self.ptr, (reader_id + 1) & LOCK_MASK as usize).try_deglobalize(); + RawLocalLock::new(self.ptr, (reader_id + 2) & LOCK_MASK as usize).try_deglobalize(); + global_sync.read_unlock_prep_local(); + return true; + } + local_locked + } + + fn prepare_local_write_locks_and_lock_global(&self) { + let mut remaining_pos = -1; + for i in 0..LOCK_COUNT { + if !RawLocalLock::new(self.ptr, i as usize).make_global() { + remaining_pos = i as i32; + } + } + while remaining_pos >= 0 { + RawLocalLock::new(self.ptr, remaining_pos as usize).wait_for_reader(); + remaining_pos -= 1; + } + } +} + +/// Returned from the `write()` method and grants read-write access to the enclosed data +pub struct WriteGuard<'a, T: ?Sized + 'a> { + lock: &'a WideRwLock +} + +/// Returned from the `read()` method and grants read-only access to the enclosed data +pub struct ReadGuard<'a, T: ?Sized + 'a> { + lock: &'a WideRwLock, + reader_id: usize +} + +impl<'a, T: ?Sized + 'a> WriteGuard<'a, T> { + /// Downgrade the lock to a read lock. Other writers may take this lock in between, that is, + /// this is not an atomic downgrade. + pub fn downgrade_to_read_lock(self) -> ReadGuard<'a, T> { + let lock = self.lock; + mem::forget(self); + lock.write_unlock(); + lock.read() + } +} + +impl<'a, T: ?Sized + 'a> ReadGuard<'a, T> { + /// Upgrade a read lock to a write lock. If the upgrade happened atomically (no other writers + /// took the lock in the meantime), the return type indicates that. + pub fn upgrade_to_write_lock(self) -> UpgradeResult<'a, T> { + let lock = self.lock; + let reader_id = self.reader_id; + mem::forget(self); + if reader_id < LOCK_COUNT as usize { + lock.slow_upgrade(reader_id) + } else { + lock.upgrade_to_write_lock() + } + } +} + +impl Drop for WideRwLock { + #[inline] + fn drop(&mut self) { + if !self.ptr.is_null() { + unsafe { + let layout = Layout::from_size_align(9 * 64, 64).unwrap(); //aligned to cache line + alloc::dealloc(self.ptr as *mut u8, layout); + } + self.ptr = ptr::null_mut(); + } + } +} + +impl<'a, T: ?Sized + 'a> Drop for WriteGuard<'a, T> { + #[inline] + fn drop(&mut self) { + self.lock.write_unlock(); + } +} + +impl<'a, T: ?Sized + 'a> Drop for ReadGuard<'a, T> { + #[inline] + fn drop(&mut self) { + if self.reader_id < LOCK_COUNT as usize { + self.lock.fast_local_read_unlock(self.reader_id); + } + else { + self.lock.read_unlock(); + } + } +} + +impl<'a, T: ?Sized> Deref for ReadGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.data.get() } + } +} + +impl<'a, T: ?Sized> Deref for WriteGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.data.get() } + } +} + +impl<'a, T: ?Sized> DerefMut for WriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.data.get() } + } +} + +struct RawLocalLock { + state: AtomicUsize, + spin_lock: AtomicUsize, + parked_thread: Option, +} + +impl RawLocalLock { + #[inline] + fn new<'a>(ptr: *mut u64, reader_id: usize) -> &'a mut RawLocalLock { + unsafe { + let p = ptr.add(8 + (reader_id << 3)); + &mut *(p as * const _ as *mut RawLocalLock) + } + } + + fn init(&mut self) { + self.state = AtomicUsize::new(GLOBAL_MASK as usize); + self.spin_lock = AtomicUsize::new(0); + unsafe { + ptr::write(&mut self.parked_thread as *mut _, None); + } + } + + fn park_thread(&mut self) -> bool { + let current_thread = thread::current(); + let mut spinwait = SpinWait::new(); + loop { + if self.spin_lock.compare_exchange(0, 1, Ordering::Release, Ordering::Relaxed).is_ok() { + let state = self.load_state(); + let valid = state & READER_MASK != 0; + if !valid { + self.spin_lock.store(0, Ordering::Release); + return false; + } + self.parked_thread = Some(current_thread); + self.spin_lock.store(0, Ordering::Release); + thread::park(); + break; + } + spinwait.spin(); + } + true + } + + fn unpark_thread(&mut self) { + let mut spinwait = SpinWait::new(); + loop { + if self.spin_lock.compare_exchange(0, 1, Ordering::Release, Ordering::Relaxed).is_ok() { + if self.parked_thread.is_some() { + let mut t = None; + mem::swap(&mut self.parked_thread, & mut t); + t.unwrap().unpark(); + } + self.spin_lock.store(0, Ordering::Release); + break; + } + spinwait.spin(); + } + } + + #[inline] + fn cas_state(&self, expect: u32, new_val: u32) -> bool { + self.state.compare_exchange(expect as usize, new_val as usize, Ordering::Release, Ordering::Relaxed).is_ok() + } + + #[inline] + fn load_state(&self) -> u32 { + self.state.load(Ordering::Relaxed) as u32 + } + + #[inline] + fn try_read_lock_local(&self) -> bool { + self.cas_state(0, 1) || self.try_read_lock_local_slow() + } + + fn try_read_lock_local_slow(&self) -> bool { + loop { + let c = self.load_state(); + if (c & GLOBAL_MASK) != 0 { return false; } + if self.cas_state(c, c + 1) { + return true; + } + } + } + + #[inline] + fn read_lock_deglobalize(&self) { + if self.cas_state(GLOBAL_MASK, 1) { return; } + self.read_lock_deglobalize_slow(); + } + + fn read_lock_deglobalize_slow(&self) { + loop { + let c = self.load_state(); + if self.cas_state(c, (c & !GLOBAL_MASK) + 1) { return; } + } + } + + #[inline] + fn try_deglobalize(&self) { + self.cas_state(GLOBAL_MASK, 0); + } + + fn make_global(&self) -> bool { + loop { + let c = self.load_state(); + if self.cas_state(c, c | GLOBAL_MASK) { + return c & READER_MASK == 0; + } + } + } + + fn wait_for_reader(&mut self) { + if self.load_state() & READER_MASK != 0 { + self.slow_wait_for_reader(); + } + } + + fn slow_wait_for_reader(&mut self) { + let mut parked = false; + loop { + if !parked { + spin_loop_hint(); + } + if self.load_state() & READER_MASK == 0 { + return ; + } + parked = self.park_thread(); + } + } + + #[inline(always)] + fn read_unlock(&self) -> bool { + if self.cas_state(1, 0) { + return false; + } + self.try_slow_release_shared() + } + + fn try_slow_release_shared(&self) -> bool { + loop { + let c = self.load_state(); + if self.cas_state(c, c - 1) { + return c == GLOBAL_MASK | 1; + } + } + } + +} + +struct RawGlobalLock { + state: AtomicUsize, + parked: AtomicBool, + spin_lock: AtomicBool, + parked_priv_writer: Option, + local_bias: u32, +} + +impl RawGlobalLock { + + #[inline] + fn from_ptr<'a>(ptr: *mut u64) -> &'a mut RawGlobalLock { + unsafe { + &mut *(ptr as * const _ as *mut RawGlobalLock) + } + } + + #[inline] + fn init(&mut self) { + self.state = AtomicUsize::new(GLOBAL_MASK as usize); + self.local_bias = 0; + self.parked = AtomicBool::new(false); + self.spin_lock = AtomicBool::new(false); + unsafe { ptr::write(&mut self.parked_priv_writer, None); } + } + + fn park_priv_writer_thread(&mut self) -> bool { + let current_thread = thread::current(); + let mut spinwait = SpinWait::new(); + loop { + if self.spin_lock.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed).is_ok() { + let state = self.load_state(); + let valid = state & READER_MASK != 0; + if !valid { + self.spin_lock.store(false, Ordering::Release); + return false; + } + self.parked_priv_writer = Some(current_thread); + self.spin_lock.store(false, Ordering::Release); + thread::park(); + break; + } + spinwait.spin(); + } + true + } + + fn unpark_priv_writer_thread(&mut self) { + let mut spinwait = SpinWait::new(); + loop { + if self.spin_lock.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed).is_ok() { + if self.parked_priv_writer.is_some() { + let mut t = None; + mem::swap(&mut self.parked_priv_writer, & mut t); + t.unwrap().unpark(); + } + self.spin_lock.store(false, Ordering::Release); + break; + } + spinwait.spin(); + } + } + + #[inline] + fn cas_state(&self, expect: u32, new_val: u32) -> bool { + self.state.compare_exchange(expect as usize, new_val as usize, Ordering::Release, Ordering::Relaxed).is_ok() + } + + #[inline] + fn load_state(&self) -> u32 { + self.state.load(Ordering::Relaxed) as u32 + } + + #[inline] + fn raise_park_flag(&self) { + self.parked.store(true, Ordering::Release); + } + + #[inline] + fn write_lock(&mut self) -> (bool, bool) { + let (locked, write_prep, must_relock_global) = self.try_write_lock(); + if !locked { + return self.slow_write_lock(); + } + (write_prep, must_relock_global) + } + + #[inline] + fn try_fast_read_lock(&mut self) -> bool { + self.local_bias = self.local_bias.saturating_add(1); + if self.local_bias < 100 { + let c = self.load_state(); + if self.cas_state(c & !(WRITER_MASK), (c & !(WRITER_MASK)) + 1) { + return true; + } + } + false + } + + #[inline] + fn read_lock(&mut self, local_lock: &RawLocalLock) -> (bool, bool) { + let (locked, read_prep, local_locked) = self.try_read_lock(local_lock); + if !locked { + return self.slow_read_lock(local_lock); + } + (read_prep, local_locked) + } + + #[inline] + fn try_read_lock(&mut self, local_lock: &RawLocalLock) -> (bool,bool,bool) { + let c = self.load_state(); + if (c & (WRITER_MASK | PREPARING_TO_GO_LOCAL)) == 0 { + if self.cas_state(c, (c & !GLOBAL_MASK) | PREPARING_TO_GO_LOCAL) { + return (true, true, false); + } + } + if c & GLOBAL_MASK == 0 && local_lock.try_read_lock_local() { + return (true, false, true); + } + return (false, false, false); + } + + #[inline] + fn write_unlock(&mut self) { + self.cas_state(GLOBAL_MASK | WRITER_MASK, GLOBAL_MASK); + self.unpark_all(); + } + + #[inline] + fn read_unlock_zero(&mut self) { + if self.try_read_unlock_zero() { + self.unpark_priv_writer_thread(); + } + else { + self.unpark_all(); + } + } + + #[inline] + fn wait_for_reader(&mut self) { + let c = self.load_state(); + if c & READER_MASK != 0 { + self.slow_wait_for_reader(); + } + return; + } + + fn slow_wait_for_reader(&mut self) { + let mut parked = false; + loop { + if !parked { + spin_loop_hint(); + } + let state = self.load_state(); + if state & READER_MASK == 0 { + return; + } + parked = self.park_priv_writer_thread(); + } + } + + fn read_unlock_prep_local(&mut self) { + loop { + let c = self.load_state(); + if self.cas_state(c, c & !PREPARING_TO_GO_LOCAL) { + self.unpark_all(); + return ; + } + } + } + + #[inline] + fn try_read_unlock_zero(&mut self) -> bool { + if !self.cas_state(GLOBAL_MASK | 1, GLOBAL_MASK) { + return self.read_unlock_slow(); + } + false + } + + #[inline] + fn read_unlock_slow(&self) -> bool { + loop { + let c = self.load_state(); + if self.cas_state(c, c - 1) { return c & WRITER_MASK != 0; } + } + } + + fn try_single_reader_upgrade(&mut self) -> bool { + if self.cas_state(GLOBAL_MASK | 1, GLOBAL_MASK | WRITER_MASK ) { + self.local_bias = 0; + return true; + } + false + } + + //p is expected to be either GLOBAL_MASK, or GLOBAL_MASK | PREPARING_TO_WRITE_MASK + #[inline] + fn try_write_lock(&mut self) -> (bool, bool, bool) { // (global state changed, write_prep, must_wait_for_reader) + self.local_bias = 0; + let locked = self.cas_state(GLOBAL_MASK, WRITER_MASK | GLOBAL_MASK); + if locked { return (true, false, false); } + self.try_write_lock_slow() + } + + fn try_write_lock_slow(&mut self) -> (bool, bool, bool) { + loop { + let c = self.load_state(); + if c & (PREPARING_TO_GO_LOCAL | WRITER_MASK) == 0 { + if self.cas_state(c, c | WRITER_MASK | GLOBAL_MASK) { + return (true, c & GLOBAL_MASK == 0, c & READER_MASK != 0); + } + } + return (false, false, false); + } + } + + fn slow_write_lock(&mut self) -> (bool, bool) { + let mut parked = false; + loop { + if !parked { + spin_loop_hint(); + } + parked = false; + let x = self.try_write_lock(); + if x.0 { + return (x.1, x.2); + } + self.raise_park_flag(); + + // Park our thread until we are woken up by an unlock + unsafe { + let addr = &(self.state) as *const _ as usize; + let validate = || { + let state = self.load_state(); + let valid = state != 0 && state != GLOBAL_MASK && state != 1;// && state != GLOBAL_MASK | PREPARING_TO_WRITE_MASK; + if valid { self.raise_park_flag(); } + return valid; + }; + let before_sleep = || {}; + let timed_out = |_, _| {}; + match parking_lot_core::park( + addr, + validate, + before_sleep, + timed_out, + WRITE_PARK_TOKEN, + None, + ) { + // We were unparked normally, try acquiring the lock again + ParkResult::Unparked(_) => { + parked = true; + }, + + // The validation function failed, try locking again + ParkResult::Invalid => (), + + // Timeout expired + ParkResult::TimedOut => panic!(""), + } + } + } + } + + fn slow_read_lock(&mut self, local_lock: &RawLocalLock) -> (bool, bool) { + let mut parked = false; + loop { + if !parked { + spin_loop_hint(); + } + parked = false; + let x = self.try_read_lock(local_lock); + if x.0 { + return (x.1, x.2); + } + self.raise_park_flag(); + + // Park our thread until we are woken up by an unlock + unsafe { + let addr = &(self.state) as *const _ as usize; + let validate = || { + let state = self.load_state(); + let valid = (state & (WRITER_MASK | PREPARING_TO_GO_LOCAL)) != 0; + if valid { + self.raise_park_flag(); + } + return valid; + }; + let before_sleep = || {}; + let timed_out = |_, _| {}; + match parking_lot_core::park( + addr, + validate, + before_sleep, + timed_out, + READ_PARK_TOKEN, + None, + ) { + // We were unparked normally, try acquiring the lock again + ParkResult::Unparked(_) => { + parked = true; + } + + // The validation function failed, try locking again + ParkResult::Invalid => (), + + // Timeout expired + ParkResult::TimedOut => panic!(""), + } + } + } + } + + #[inline] + fn unpark_all(&self) { + let has_parked = self.parked.load(Ordering::Acquire); + if has_parked { + self.parked.store(false, Ordering::Release); + self.force_unpark_all(); + } + } + + #[inline] + fn force_unpark_all(&self) { + unsafe { + let addr = &(self.state) as *const _ as usize; + let mut first_writer = false; + let mut first = true; + parking_lot_core::unpark_filter(addr, |token| { + if first { + first = false; + if WRITE_PARK_TOKEN == token { + first_writer = true; + } + return FilterOp::Unpark; + } else { + if first_writer { + return FilterOp::Stop; + } + if READ_PARK_TOKEN == token { + return FilterOp::Unpark; + } + return FilterOp::Skip; + } + }, |res| { + if res.have_more_threads { + self.parked.store(true, Ordering::Release); + } + DEFAULT_UNPARK_TOKEN + }); + } + } +} + +#[inline] +pub fn thread_id_as_u64() -> u64 { + THREAD_ID.with(|id_cell| *id_cell.borrow() ) +} + +static ID_COUNT: AtomicUsize = AtomicUsize::new(0); + +#[inline] +pub fn thread_id_as_u64_init() -> u64 { + ID_COUNT.fetch_add(1, Ordering::Release) as u64 +} + +#[inline] +fn reader_index() -> usize { + (thread_id_as_u64() & LOCK_MASK as u64) as usize +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn write_lock_read_lock() { + let lock = WideRwLock::new(()); + { + lock.write(); + } + { + lock.read(); + } + } +} + diff --git a/tests/stable_tests.rs b/tests/stable_tests.rs new file mode 100644 index 0000000..ea4da4f --- /dev/null +++ b/tests/stable_tests.rs @@ -0,0 +1,370 @@ +// Copyright 2018 Mohammad Rezaei. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. +// + +extern crate widerwlock; +extern crate rand; +extern crate xoshiro; + +use widerwlock::*; +use std::sync::Arc; +use std::sync::mpsc::*; +use std::thread; +use rand::*; +use std::sync::Mutex; +use std::sync::atomic::AtomicBool; +use std::borrow::*; +use std::sync::atomic::Ordering; +use std::time::{SystemTime, UNIX_EPOCH}; +use xoshiro::Xoshiro512StarStar; +use std::collections::HashMap; +use std::hash::Hash; + +#[test] +fn test_upgrade() { + let lock = WideRwLock::new(()); + { + let guard = lock.read(); + assert!(guard.upgrade_to_write_lock().atomic()); + } +} + +#[test] +fn write_lock_read_lock() { + let lock = WideRwLock::new(()); + { + lock.write(); + } + { + lock.read(); + } +} + +#[test] +fn test_upgrade2() { + let lock = WideRwLock::new(()); + { + lock.read(); + lock.read();// this is not a test of reentrancy. This only works because the lock is uncontended + lock.read(); + } + // lock is now in local mode + { + lock.write(); + } + //lock is back in global mode + let guard = lock.read(); + assert!(guard.upgrade_to_write_lock().atomic()); +} + +#[test] +fn ten_threads() { + const N: u32 = 10; + const M: u32 = 100_000; + + let r = Arc::new(WideRwLock::new(())); + + let (tx, rx) = channel::<()>(); + for _ in 0..N { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + for _ in 0..M { + if rng.gen_bool(1.0 / N as f64) { + drop(r.write()); + } else { + drop(r.read()); + } + } + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); +} + +#[test] +fn test_multi_threads() +{ + const THREADS: u64 = 50; + + let r = Arc::new(WideRwLock::new(())); + + let (tx, rx) = channel::(); + + for t in 0..THREADS { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let seed = now() ^ t; + let mut runner = TestRunner::new(r, seed); + let sum = runner.run(); + assert!(!sum.is_nan()); + tx.send(runner.is_bad()); + drop(tx); + }); + } + drop(tx); + for _x in 0..THREADS { + assert!(!rx.recv().unwrap()); + } +} + +struct LockTester { + mutex: Mutex, + writer: AtomicBool, +} + +unsafe impl Send for LockTester {} +unsafe impl Sync for LockTester {} + + +struct LockTesterData { + readers: i32, + max_readers: i32 +} + +impl LockTesterData { + fn new() -> LockTesterData { + LockTesterData { + readers: 0, + max_readers: 0 + } + } +} + +impl LockTester { + fn new() -> LockTester { + LockTester { mutex: Mutex::new(LockTesterData::new()), + writer: AtomicBool::new(false), + } + } + + fn get_max_readers(&self) -> i32 { + let guard = self.mutex.lock().unwrap(); + guard.borrow().max_readers + } + + fn get_readers(&self) -> i32 { + let guard = self.mutex.lock().unwrap(); + guard.borrow().readers + } + + fn is_writer(&self) -> bool { + self.writer.load(Ordering::Acquire) + } + + fn start_reader(&self) -> Result<(), &str> { + if self.is_writer() { + return Err("can't read while writing!"); + } + let mut guard = self.mutex.lock().unwrap(); + let data = guard.borrow_mut(); + data.readers += 1; + if data.readers > data.max_readers { + data.max_readers = data.readers; + } + Ok(()) + } + + fn end_reader(&self) -> Result<(), &str> { + let mut guard = self.mutex.lock().unwrap(); + let data = guard.borrow_mut(); + data.readers -= 1; + if data.readers < 0 { + return Err("readers are out of wack!"); + } + Ok(()) + } + + fn start_writer(&self) -> Result<(), &str> { + if self.is_writer() { + return Err("too many writers!"); + } + { + let guard = self.mutex.lock().unwrap(); + let data = guard.borrow(); + if data.readers > 0 { + return Err("readers are out of wack!"); + } + } + self.writer.store(true, Ordering::Release); + Ok(()) + } + + fn end_writer(&self) -> Result<(), &str> { + if !self.is_writer() { + return Err("not writing!"); + } + { + let guard = self.mutex.lock().unwrap(); + let data = guard.borrow(); + if data.readers > 0 { + return Err("readers are out of wack!"); + } + } + self.writer.store(false, Ordering::Release); + Ok(()) + } +} + +struct TestRunner { + bad: AtomicBool, + rng: Xoshiro512StarStar, + lock: Arc>, + tester: LockTester, + last_type: f64, + current_type: f64 +} + +impl TestRunner { + + fn new(lock: Arc>, seed: u64) -> TestRunner { + TestRunner { + bad: AtomicBool::new(false), + rng: Xoshiro512StarStar::from_seed_u64(seed), + lock, + tester: LockTester::new(), + last_type: 0.0, + current_type: 0.0 + } + } + + fn is_bad(&self) -> bool { + self.bad.load(Ordering::Acquire) + } + + fn next_type(&mut self) -> f64 { + self.rng.gen() + } + + #[cold] + fn work(rng: &mut Xoshiro512StarStar) -> f64 { + let max: i32 = (500.0 + rng.gen::() * 500.0) as i32; + let mut sum: f64 = 0.0; + for _i in 0..max { + sum += rng.gen::().cbrt(); + } + sum + } + + fn run(&mut self) -> f64 { + let mut sum: f64 = 0.0; + for _i in 0..1000 { + let typ = self.next_type(); + self.last_type = self.current_type; + self.current_type = typ; + if typ < 0.1 { + self.lock.write(); + self.tester.start_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + sum += ::work(&mut self.rng); + self.tester.end_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + } + else if typ < 0.2 { + let guard = self.lock.read(); + self.tester.start_reader(). + or_else(|err| self.handle_error(err)).unwrap(); + sum += ::work(&mut self.rng); + self.tester.end_reader(). + or_else(|err| self.handle_error(err)).unwrap(); + guard.upgrade_to_write_lock(); + self.tester.start_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + sum += ::work(&mut self.rng); + self.tester.end_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + } + else if typ < 0.3 { + self.lock.write(); + self.tester.start_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + sum += ::work(&mut self.rng); + self.tester.end_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + self.tester.start_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + sum += ::work(&mut self.rng); + self.tester.end_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + } + else if typ < 0.4 { + let guard = self.lock.read(); + self.tester.start_reader(). + or_else(|err| self.handle_error(err)).unwrap(); + sum += ::work(&mut self.rng); + self.tester.end_reader(). + or_else(|err| self.handle_error(err)).unwrap(); + guard.upgrade_to_write_lock(); + self.tester.start_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + sum += ::work(&mut self.rng); + self.tester.end_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + self.tester.start_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + sum += ::work(&mut self.rng); + self.tester.end_writer(). + or_else(|err| self.handle_error(err)).unwrap(); + } + else { + self.lock.read(); + self.tester.start_reader(). + or_else(|err| self.handle_error(err)).unwrap(); + sum += ::work(&mut self.rng); + self.tester.end_reader(). + or_else(|err| self.handle_error(err)).unwrap(); + } + } + + sum + } + + fn handle_error(&self, err: &str) -> Result<(), ()> { + println!("{}", err); + self.bad.store(true, Ordering::Release); + Ok(()) + } +} + +fn now() -> u64 { + let now = SystemTime::now(); + let dur = now.duration_since(UNIX_EPOCH).unwrap(); + dur.as_secs() * 1000 + dur.subsec_nanos() as u64 / 1_000_000 +} + +struct RwMap { + map: WideRwLock>, +} + +impl RwMap { + // the atomic return allows two important things: + // 1) ensures that the factory function runs only if there is no value in the map + // 2) the (potentially) expensive double read can be avoided when atomic + pub fn insert_if_absent(&self, key: K, v_factory: F) + where F: FnOnce(&K) -> V { + let guard: ReadGuard<_> = self.map.read(); + let found; + { + found = guard.get(&key).is_some(); + } + if !found { + let result: UpgradeResult<_> = guard.upgrade_to_write_lock(); + let atomic = result.atomic(); + let mut write_guard: WriteGuard<_> = result.into_guard(); + if !atomic { + // we have to check again, because another writer may have come in + if write_guard.get(&key).is_some() { return; } + } + let v = v_factory(&key); + write_guard.insert(key, v); + } + } +} \ No newline at end of file