diff --git a/crates/mysten-common/src/sync/notify_read.rs b/crates/mysten-common/src/sync/notify_read.rs index be95f3fab865d..fef024b8d4f10 100644 --- a/crates/mysten-common/src/sync/notify_read.rs +++ b/crates/mysten-common/src/sync/notify_read.rs @@ -1,10 +1,12 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use futures::future::{join_all, Either}; use parking_lot::Mutex; use parking_lot::MutexGuard; use std::collections::hash_map::DefaultHasher; use std::collections::HashMap; +use std::error::Error; use std::future::Future; use std::hash::{Hash, Hasher}; use std::mem; @@ -115,6 +117,29 @@ impl NotifyRead { } } +impl NotifyRead { + pub async fn read( + &self, + keys: &[K], + fetch: impl FnOnce(&[K]) -> Result>, E>, + ) -> Result, E> { + let registrations = self.register_all(keys); + + let results = fetch(keys)?; + + let results = results + .into_iter() + .zip(registrations) + .map(|(a, r)| match a { + // Note that Some() clause also drops registration that is already fulfilled + Some(ready) => Either::Left(futures::future::ready(ready)), + None => Either::Right(r), + }); + + Ok(join_all(results).await) + } +} + /// Registration resolves to the value but also provides safe cancellation /// When Registration is dropped before it is resolved, we de-register from the pending list pub struct Registration<'a, K: Eq + Hash + Clone, V: Clone> { diff --git a/crates/sui-core/src/authority/authority_per_epoch_store.rs b/crates/sui-core/src/authority/authority_per_epoch_store.rs index b26748a786d75..01ab4666f7d45 100644 --- a/crates/sui-core/src/authority/authority_per_epoch_store.rs +++ b/crates/sui-core/src/authority/authority_per_epoch_store.rs @@ -1325,27 +1325,14 @@ impl AuthorityPerEpochStore { &self, checkpoints: Vec, ) -> SuiResult> { - // We need to register waiters _before_ reading from the database to avoid - // race conditions - let registrations = self.checkpoint_state_notify_read.register_all(&checkpoints); - let accumulators = self - .tables()? - .state_hash_by_checkpoint - .multi_get(checkpoints)?; - - // Zipping together registrations and accumulators ensures returned order is - // the same as order of digests - let results = - accumulators - .into_iter() - .zip(registrations.into_iter()) - .map(|(a, r)| match a { - // Note that Some() clause also drops registration that is already fulfilled - Some(ready) => Either::Left(futures::future::ready(ready)), - None => Either::Right(r), - }); - - Ok(join_all(results).await) + self.checkpoint_state_notify_read + .read(&checkpoints, |checkpoints| -> SuiResult<_> { + Ok(self + .tables()? + .state_hash_by_checkpoint + .multi_get(checkpoints)?) + }) + .await } pub async fn notify_read_running_root( diff --git a/crates/sui-core/src/execution_cache/passthrough_cache.rs b/crates/sui-core/src/execution_cache/passthrough_cache.rs index 08c4c1ff3fd0f..c60abb4a98f00 100644 --- a/crates/sui-core/src/execution_cache/passthrough_cache.rs +++ b/crates/sui-core/src/execution_cache/passthrough_cache.rs @@ -9,11 +9,7 @@ use crate::authority::AuthorityStore; use crate::state_accumulator::AccumulatorStore; use crate::transaction_outputs::TransactionOutputs; -use either::Either; -use futures::{ - future::{join_all, BoxFuture}, - FutureExt, -}; +use futures::{future::BoxFuture, FutureExt}; use mysten_common::sync::notify_read::NotifyRead; use prometheus::Registry; use std::sync::Arc; @@ -219,25 +215,11 @@ impl TransactionCacheRead for PassthroughCache { &'a self, digests: &'a [TransactionDigest], ) -> BoxFuture<'a, SuiResult>> { - async move { - let registrations = self - .executed_effects_digests_notify_read - .register_all(digests); - - let executed_effects_digests = self.multi_get_executed_effects_digests(digests)?; - - let results = executed_effects_digests - .into_iter() - .zip(registrations) - .map(|(a, r)| match a { - // Note that Some() clause also drops registration that is already fulfilled - Some(ready) => Either::Left(futures::future::ready(ready)), - None => Either::Right(r), - }); - - Ok(join_all(results).await) - } - .boxed() + self.executed_effects_digests_notify_read + .read(digests, |digests| { + self.multi_get_executed_effects_digests(digests) + }) + .boxed() } fn multi_get_events( diff --git a/crates/sui-core/src/execution_cache/writeback_cache.rs b/crates/sui-core/src/execution_cache/writeback_cache.rs index da219403788e3..13e1078cb9f12 100644 --- a/crates/sui-core/src/execution_cache/writeback_cache.rs +++ b/crates/sui-core/src/execution_cache/writeback_cache.rs @@ -49,11 +49,7 @@ use crate::transaction_outputs::TransactionOutputs; use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; -use either::Either; -use futures::{ - future::{join_all, BoxFuture}, - FutureExt, -}; +use futures::{future::BoxFuture, FutureExt}; use moka::sync::Cache as MokaCache; use mysten_common::sync::notify_read::NotifyRead; use parking_lot::Mutex; @@ -1735,25 +1731,11 @@ impl TransactionCacheRead for WritebackCache { &'a self, digests: &'a [TransactionDigest], ) -> BoxFuture<'a, SuiResult>> { - async move { - let registrations = self - .executed_effects_digests_notify_read - .register_all(digests); - - let executed_effects_digests = self.multi_get_executed_effects_digests(digests)?; - - let results = executed_effects_digests - .into_iter() - .zip(registrations) - .map(|(a, r)| match a { - // Note that Some() clause also drops registration that is already fulfilled - Some(ready) => Either::Left(futures::future::ready(ready)), - None => Either::Right(r), - }); - - Ok(join_all(results).await) - } - .boxed() + self.executed_effects_digests_notify_read + .read(digests, |digests| { + self.multi_get_executed_effects_digests(digests) + }) + .boxed() } fn multi_get_events(