Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup(logger): Replace last log mutex with watch channel #9262

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions zebra-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use indexmap::IndexMap;
use jsonrpsee::core::{async_trait, RpcResult as Result};
use jsonrpsee_proc_macros::rpc;
use jsonrpsee_types::{ErrorCode, ErrorObject};
use tokio::{sync::broadcast, task::JoinHandle};
use tokio::{
sync::{broadcast, watch},
task::JoinHandle,
};
use tower::{Service, ServiceExt};
use tracing::Instrument;

Expand Down Expand Up @@ -421,12 +424,11 @@ where
address_book: AddressBook,

/// The last warning or error event logged by the server.
last_event: LoggedLastEvent,
last_warn_error_log_rx: LoggedLastEvent,
}

/// A type alias for the last event logged by the server.
pub type LoggedLastEvent =
Arc<std::sync::Mutex<Option<(String, tracing::Level, chrono::DateTime<Utc>)>>>;
pub type LoggedLastEvent = watch::Receiver<Option<(String, tracing::Level, chrono::DateTime<Utc>)>>;

impl<Mempool, State, Tip, AddressBook> Debug for RpcImpl<Mempool, State, Tip, AddressBook>
where
Expand Down Expand Up @@ -501,7 +503,7 @@ where
state: State,
latest_chain_tip: Tip,
address_book: AddressBook,
last_event: LoggedLastEvent,
last_warn_error_log_rx: LoggedLastEvent,
) -> (Self, JoinHandle<()>)
where
VersionString: ToString + Clone + Send + 'static,
Expand All @@ -528,7 +530,7 @@ where
latest_chain_tip: latest_chain_tip.clone(),
queue_sender,
address_book,
last_event,
last_warn_error_log_rx,
};

// run the process queue
Expand Down Expand Up @@ -567,17 +569,12 @@ where
AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
{
async fn get_info(&self) -> Result<GetInfo> {
let version = GetInfo::version(&self.build_version).ok_or(ErrorObject::owned(
server::error::LegacyCode::Misc.into(),
"invalid version string",
None::<()>,
))?;
let version = GetInfo::version(&self.build_version).expect("invalid version string");

// TODO: Change to use `currently_live_peers()` after #9214
let connections = self.address_book.recently_live_peers(Utc::now()).len();

let last_error_recorded = self.last_event.lock().expect("mutex poisoned").clone();
let (last_event, _last_event_level, last_event_time) = last_error_recorded.unwrap_or((
let last_error_recorded = self.last_warn_error_log_rx.borrow().clone();
let (last_error_log, _level, last_error_log_time) = last_error_recorded.unwrap_or((
GetInfo::default().errors,
tracing::Level::INFO,
Utc::now(),
Expand Down Expand Up @@ -612,8 +609,8 @@ where
testnet,
pay_tx_fee,
relay_fee,
errors: last_event,
errors_timestamp: last_event_time.to_string(),
errors: last_error_log,
errors_timestamp: last_error_log_time.to_string(),
};

Ok(response)
Expand Down
3 changes: 2 additions & 1 deletion zebra-rpc/src/methods/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ where
let mempool = MockService::build().for_prop_tests();
let state = MockService::build().for_prop_tests();

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, mempool_tx_queue) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -989,7 +990,7 @@ where
Buffer::new(state.clone(), 1),
chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

(mempool, state, rpc, mempool_tx_queue)
Expand Down
9 changes: 6 additions & 3 deletions zebra-rpc/src/methods/tests/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ async fn test_z_get_treestate() {

let (_, state, tip, _) = zebra_state::populated_state(blocks.clone(), &testnet).await;

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, _) = RpcImpl::new(
"",
"",
Expand All @@ -118,7 +119,7 @@ async fn test_z_get_treestate() {
state,
tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Request the treestate by a hash.
Expand Down Expand Up @@ -200,6 +201,7 @@ async fn test_rpc_response_data_for_network(network: &Network) {
.await;

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"/Zebra:RPC test/",
Expand All @@ -210,7 +212,7 @@ async fn test_rpc_response_data_for_network(network: &Network) {
read_state,
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// We only want a snapshot of the `getblocksubsidy` and `getblockchaininfo` methods for the non-default Testnet (with an NU6 activation height).
Expand Down Expand Up @@ -529,6 +531,7 @@ async fn test_mocked_rpc_response_data_for_network(network: &Network) {
let mut state = MockService::build().for_unit_tests();
let mempool = MockService::build().for_unit_tests();

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, _) = RpcImpl::new(
"0.0.1",
"/Zebra:RPC test/",
Expand All @@ -539,7 +542,7 @@ async fn test_mocked_rpc_response_data_for_network(network: &Network) {
state.clone(),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Test the response format from `z_getsubtreesbyindex` for Sapling.
Expand Down
33 changes: 22 additions & 11 deletions zebra-rpc/src/methods/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async fn rpc_getinfo() {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"/Zebra:RPC test/",
Expand All @@ -39,7 +40,7 @@ async fn rpc_getinfo() {
Buffer::new(state.clone(), 1),
NoChainTip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

let getinfo_future = tokio::spawn(async move { rpc.get_info().await });
Expand Down Expand Up @@ -143,6 +144,7 @@ async fn rpc_getblock() {
zebra_state::populated_state(blocks.clone(), &Mainnet).await;

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -153,7 +155,7 @@ async fn rpc_getblock() {
read_state.clone(),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Make height calls with verbosity=0 and check response
Expand Down Expand Up @@ -486,6 +488,7 @@ async fn rpc_getblock_parse_error() {
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -496,7 +499,7 @@ async fn rpc_getblock_parse_error() {
Buffer::new(state.clone(), 1),
NoChainTip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Make sure we get an error if Zebra can't parse the block height.
Expand Down Expand Up @@ -531,6 +534,7 @@ async fn rpc_getblock_missing_error() {
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -541,7 +545,7 @@ async fn rpc_getblock_missing_error() {
Buffer::new(state.clone(), 1),
NoChainTip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Make sure Zebra returns the correct error code `-8` for missing blocks
Expand Down Expand Up @@ -595,6 +599,7 @@ async fn rpc_getblockheader() {
zebra_state::populated_state(blocks.clone(), &Mainnet).await;

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -605,7 +610,7 @@ async fn rpc_getblockheader() {
read_state.clone(),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Make height calls with verbose=false and check response
Expand Down Expand Up @@ -708,6 +713,7 @@ async fn rpc_getbestblockhash() {
zebra_state::populated_state(blocks.clone(), &Mainnet).await;

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -718,7 +724,7 @@ async fn rpc_getbestblockhash() {
read_state,
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Get the tip hash using RPC method `get_best_block_hash`
Expand Down Expand Up @@ -756,6 +762,7 @@ async fn rpc_getrawtransaction() {
latest_chain_tip_sender.send_best_tip_height(Height(10));

// Init RPC
let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -766,7 +773,7 @@ async fn rpc_getrawtransaction() {
read_state.clone(),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// Test case where transaction is in mempool.
Expand Down Expand Up @@ -934,6 +941,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
let (_state, read_state, latest_chain_tip, _chain_tip_change) =
zebra_state::populated_state(blocks.clone(), &Mainnet).await;

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -944,7 +952,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
Buffer::new(read_state.clone(), 1),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// call the method with an invalid address string
Expand Down Expand Up @@ -1085,6 +1093,7 @@ async fn rpc_getaddresstxids_response_with(
) {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

let (_tx, rx) = tokio::sync::watch::channel(None);
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -1095,7 +1104,7 @@ async fn rpc_getaddresstxids_response_with(
Buffer::new(read_state.clone(), 1),
latest_chain_tip.clone(),
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// call the method with valid arguments
Expand Down Expand Up @@ -1139,6 +1148,7 @@ async fn rpc_getaddressutxos_invalid_arguments() {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

let (_tx, rx) = tokio::sync::watch::channel(None);
let rpc = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -1149,7 +1159,7 @@ async fn rpc_getaddressutxos_invalid_arguments() {
Buffer::new(state.clone(), 1),
NoChainTip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// call the method with an invalid address string
Expand Down Expand Up @@ -1186,6 +1196,7 @@ async fn rpc_getaddressutxos_response() {
let (_state, read_state, latest_chain_tip, _chain_tip_change) =
zebra_state::populated_state(blocks.clone(), &Mainnet).await;

let (_tx, rx) = tokio::sync::watch::channel(None);
let rpc = RpcImpl::new(
"0.0.1",
"RPC test",
Expand All @@ -1196,7 +1207,7 @@ async fn rpc_getaddressutxos_response() {
Buffer::new(read_state.clone(), 1),
latest_chain_tip,
MockAddressBookPeers::new(vec![]),
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

// call the method with a valid address
Expand Down
11 changes: 7 additions & 4 deletions zebra-rpc/src/server/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async fn rpc_server_spawn() {

info!("spawning RPC server...");

let (_tx, rx) = watch::channel(None);
let _rpc_server_task_handle = RpcServer::spawn(
config,
Default::default(),
Expand All @@ -57,7 +58,7 @@ async fn rpc_server_spawn() {
NoChainTip,
Mainnet,
None,
crate::methods::LoggedLastEvent::new(None.into()),
rx,
);

info!("spawned RPC server, checking services...");
Expand Down Expand Up @@ -105,6 +106,7 @@ async fn rpc_spawn_unallocated_port(do_shutdown: bool) {

info!("spawning RPC server...");

let (_tx, rx) = watch::channel(None);
let rpc_server_task_handle = RpcServer::spawn(
config,
Default::default(),
Expand All @@ -118,7 +120,7 @@ async fn rpc_spawn_unallocated_port(do_shutdown: bool) {
NoChainTip,
Mainnet,
None,
crate::methods::LoggedLastEvent::new(None.into()),
rx,
)
.await
.expect("");
Expand Down Expand Up @@ -162,6 +164,7 @@ async fn rpc_server_spawn_port_conflict() {

info!("spawning RPC server 1...");

let (_tx, rx) = watch::channel(None);
let _rpc_server_1_task_handle = RpcServer::spawn(
config.clone(),
Default::default(),
Expand All @@ -175,7 +178,7 @@ async fn rpc_server_spawn_port_conflict() {
NoChainTip,
Mainnet,
None,
crate::methods::LoggedLastEvent::new(None.into()),
rx.clone(),
)
.await;

Expand All @@ -196,7 +199,7 @@ async fn rpc_server_spawn_port_conflict() {
NoChainTip,
Mainnet,
None,
crate::methods::LoggedLastEvent::new(None.into()),
rx,
)
.await;

Expand Down
Loading
Loading