Skip to content

Commit

Permalink
Replaces mutex with watch channel (#9262)
Browse files Browse the repository at this point in the history
  • Loading branch information
arya2 authored Feb 18, 2025
1 parent 5cf5178 commit 9591c00
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 43 deletions.
29 changes: 13 additions & 16 deletions zebra-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use indexmap::IndexMap;
use jsonrpsee::core::{async_trait, RpcResult as Result};
use jsonrpsee_proc_macros::rpc;
use jsonrpsee_types::{ErrorCode, ErrorObject};
use tokio::{sync::broadcast, task::JoinHandle};
use tokio::{
sync::{broadcast, watch},
task::JoinHandle,
};
use tower::{Service, ServiceExt};
use tracing::Instrument;

Expand Down Expand Up @@ -421,12 +424,11 @@ where
address_book: AddressBook,

/// The last warning or error event logged by the server.
last_event: LoggedLastEvent,
last_warn_error_log_rx: LoggedLastEvent,
}

/// A type alias for the last event logged by the server.
pub type LoggedLastEvent =
Arc<std::sync::Mutex<Option<(String, tracing::Level, chrono::DateTime<Utc>)>>>;
pub type LoggedLastEvent = watch::Receiver<Option<(String, tracing::Level, chrono::DateTime<Utc>)>>;

impl<Mempool, State, Tip, AddressBook> Debug for RpcImpl<Mempool, State, Tip, AddressBook>
where
Expand Down Expand Up @@ -501,7 +503,7 @@ where
state: State,
latest_chain_tip: Tip,
address_book: AddressBook,
last_event: LoggedLastEvent,
last_warn_error_log_rx: LoggedLastEvent,
) -> (Self, JoinHandle<()>)
where
VersionString: ToString + Clone + Send + 'static,
Expand All @@ -528,7 +530,7 @@ where
latest_chain_tip: latest_chain_tip.clone(),
queue_sender,
address_book,
last_event,
last_warn_error_log_rx,
};

// run the process queue
Expand Down Expand Up @@ -567,17 +569,12 @@ where
AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
{
async fn get_info(&self) -> Result<GetInfo> {
let version = GetInfo::version(&self.build_version).ok_or(ErrorObject::owned(
server::error::LegacyCode::Misc.into(),
"invalid version string",
None::<()>,
))?;
let version = GetInfo::version(&self.build_version).expect("invalid version string");

// TODO: Change to use `currently_live_peers()` after #9214
let connections = self.address_book.recently_live_peers(Utc::now()).len();

let last_error_recorded = self.last_event.lock().expect("mutex poisoned").clone();
let (last_event, _last_event_level, last_event_time) = last_error_recorded.unwrap_or((
let last_error_recorded = self.last_warn_error_log_rx.borrow().clone();
let (last_error_log, _level, last_error_log_time) = last_error_recorded.unwrap_or((
GetInfo::default().errors,
tracing::Level::INFO,
Utc::now(),
Expand Down Expand Up @@ -612,8 +609,8 @@ where
testnet,
pay_tx_fee,
relay_fee,
errors: last_event,
errors_timestamp: last_event_time.to_string(),
errors: last_error_log,
errors_timestamp: last_error_log_time.to_string(),
};

Ok(response)
Expand Down
3 changes: 2 additions & 1 deletion zebra-rpc/src/methods/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ where
let mempool = MockService::build().for_prop_tests();
let state = MockService::build().for_prop_tests();

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, mempool_tx_queue) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -989,7 +990,7 @@ where
Buffer::new(state.clone(), 1),
chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

(mempool, state, rpc, mempool_tx_queue)
Expand Down
9 changes: 6 additions & 3 deletions zebra-rpc/src/methods/tests/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ async fn test_z_get_treestate() {

let (_, state, tip, _) = zebra_state::populated_state(blocks.clone(), &testnet).await;

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, _) = RpcImpl::new(
"",
"",
Expand All @@ -118,7 +119,7 @@ async fn test_z_get_treestate() {
state,
tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Request the treestate by a hash.
Expand Down Expand Up @@ -200,6 +201,7 @@ async fn test_rpc_response_data_for_network(network: &Network) {
.await;

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"/Zebra:RPC test/",
Expand All @@ -210,7 +212,7 @@ async fn test_rpc_response_data_for_network(network: &Network) {
read_state,
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// We only want a snapshot of the `getblocksubsidy` and `getblockchaininfo` methods for the non-default Testnet (with an NU6 activation height).
Expand Down Expand Up @@ -529,6 +531,7 @@ async fn test_mocked_rpc_response_data_for_network(network: &Network) {
let mut state = MockService::build().for_unit_tests();
let mempool = MockService::build().for_unit_tests();

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, _) = RpcImpl::new(
"0.0.1",
"/Zebra:RPC test/",
Expand All @@ -539,7 +542,7 @@ async fn test_mocked_rpc_response_data_for_network(network: &Network) {
state.clone(),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Test the response format from `z_getsubtreesbyindex` for Sapling.
Expand Down
33 changes: 22 additions & 11 deletions zebra-rpc/src/methods/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async fn rpc_getinfo() {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"/Zebra:RPC test/",
Expand All @@ -39,7 +40,7 @@ async fn rpc_getinfo() {
Buffer::new(state.clone(), 1),
NoChainTip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

let getinfo_future = tokio::spawn(async move { rpc.get_info().await });
Expand Down Expand Up @@ -143,6 +144,7 @@ async fn rpc_getblock() {
zebra_state::populated_state(blocks.clone(), &Mainnet).await;

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -153,7 +155,7 @@ async fn rpc_getblock() {
read_state.clone(),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Make height calls with verbosity=0 and check response
Expand Down Expand Up @@ -486,6 +488,7 @@ async fn rpc_getblock_parse_error() {
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -496,7 +499,7 @@ async fn rpc_getblock_parse_error() {
Buffer::new(state.clone(), 1),
NoChainTip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Make sure we get an error if Zebra can't parse the block height.
Expand Down Expand Up @@ -531,6 +534,7 @@ async fn rpc_getblock_missing_error() {
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -541,7 +545,7 @@ async fn rpc_getblock_missing_error() {
Buffer::new(state.clone(), 1),
NoChainTip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Make sure Zebra returns the correct error code `-8` for missing blocks
Expand Down Expand Up @@ -595,6 +599,7 @@ async fn rpc_getblockheader() {
zebra_state::populated_state(blocks.clone(), &Mainnet).await;

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -605,7 +610,7 @@ async fn rpc_getblockheader() {
read_state.clone(),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Make height calls with verbose=false and check response
Expand Down Expand Up @@ -708,6 +713,7 @@ async fn rpc_getbestblockhash() {
zebra_state::populated_state(blocks.clone(), &Mainnet).await;

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -718,7 +724,7 @@ async fn rpc_getbestblockhash() {
read_state,
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Get the tip hash using RPC method `get_best_block_hash`
Expand Down Expand Up @@ -756,6 +762,7 @@ async fn rpc_getrawtransaction() {
latest_chain_tip_sender.send_best_tip_height(Height(10));

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -766,7 +773,7 @@ async fn rpc_getrawtransaction() {
read_state.clone(),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Test case where transaction is in mempool.
Expand Down Expand Up @@ -934,6 +941,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
let (_state, read_state, latest_chain_tip, _chain_tip_change) =
zebra_state::populated_state(blocks.clone(), &Mainnet).await;

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -944,7 +952,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
Buffer::new(read_state.clone(), 1),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// call the method with an invalid address string
Expand Down Expand Up @@ -1085,6 +1093,7 @@ async fn rpc_getaddresstxids_response_with(
) {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -1095,7 +1104,7 @@ async fn rpc_getaddresstxids_response_with(
Buffer::new(read_state.clone(), 1),
latest_chain_tip.clone(),
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// call the method with valid arguments
Expand Down Expand Up @@ -1139,6 +1148,7 @@ async fn rpc_getaddressutxos_invalid_arguments() {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

let (_tx, rx) = tokio::sync::watch::channel(None);
let rpc = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -1149,7 +1159,7 @@ async fn rpc_getaddressutxos_invalid_arguments() {
Buffer::new(state.clone(), 1),
NoChainTip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// call the method with an invalid address string
Expand Down Expand Up @@ -1186,6 +1196,7 @@ async fn rpc_getaddressutxos_response() {
let (_state, read_state, latest_chain_tip, _chain_tip_change) =
zebra_state::populated_state(blocks.clone(), &Mainnet).await;

let (_tx, rx) = tokio::sync::watch::channel(None);
let rpc = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -1196,7 +1207,7 @@ async fn rpc_getaddressutxos_response() {
Buffer::new(read_state.clone(), 1),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// call the method with a valid address
Expand Down
11 changes: 7 additions & 4 deletions zebra-rpc/src/server/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async fn rpc_server_spawn() {

info!("spawning RPC server...");

let (_tx, rx) = watch::channel(None);
let _rpc_server_task_handle = RpcServer::spawn(
config,
Default::default(),
Expand All @@ -57,7 +58,7 @@ async fn rpc_server_spawn() {
NoChainTip,
Mainnet,
None,
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

info!("spawned RPC server, checking services...");
Expand Down Expand Up @@ -105,6 +106,7 @@ async fn rpc_spawn_unallocated_port(do_shutdown: bool) {

info!("spawning RPC server...");

let (_tx, rx) = watch::channel(None);
let rpc_server_task_handle = RpcServer::spawn(
config,
Default::default(),
Expand All @@ -118,7 +120,7 @@ async fn rpc_spawn_unallocated_port(do_shutdown: bool) {
NoChainTip,
Mainnet,
None,
crate::methods::LoggedLastEvent::new(None.into()),
rx,
)
.await
.expect("");
Expand Down Expand Up @@ -162,6 +164,7 @@ async fn rpc_server_spawn_port_conflict() {

info!("spawning RPC server 1...");

let (_tx, rx) = watch::channel(None);
let _rpc_server_1_task_handle = RpcServer::spawn(
config.clone(),
Default::default(),
Expand All @@ -175,7 +178,7 @@ async fn rpc_server_spawn_port_conflict() {
NoChainTip,
Mainnet,
None,
crate::methods::LoggedLastEvent::new(None.into()),
rx.clone(),
)
.await;

Expand All @@ -196,7 +199,7 @@ async fn rpc_server_spawn_port_conflict() {
NoChainTip,
Mainnet,
None,
crate::methods::LoggedLastEvent::new(None.into()),
rx,
)
.await;

Expand Down
Loading

0 comments on commit 9591c00

Please sign in to comment.