Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report unstable peers from TSS #1228

Merged
merged 37 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
51ebc94
Add an `account_id` to errors which may be reported
HCastano Dec 13, 2024
8541cb0
Add cursed match statement for protocol errors
HCastano Dec 13, 2024
5be281c
Move cursed match up to the caller
HCastano Dec 13, 2024
ae0640e
Remove outdated comment in socket handler
HCastano Jan 20, 2025
71ffda7
Try hacking together a passing subscribe test
HCastano Jan 22, 2025
031a741
Get test working for basic reporting flow
HCastano Jan 23, 2025
257b29a
Wrap TSS spawn helper so that we can access the task handles
HCastano Jan 23, 2025
0c88996
Manually stop Bob's TSS from spinning up again
HCastano Jan 23, 2025
a675856
Add a timeout when waiting for `NoteReport` events
HCastano Jan 23, 2025
2f6b7b5
Add Bob back during spinup
HCastano Jan 24, 2025
8a9c9de
Have a reporting test which consistently passes
HCastano Jan 27, 2025
e024127
Add a bit more logging
HCastano Jan 27, 2025
299df21
Add `Charlie` and `Dave` stash address constants
HCastano Jan 27, 2025
94973c7
Add test for reporting peers that don't connect to us
HCastano Jan 27, 2025
604d613
Temporarily ignore test
HCastano Jan 27, 2025
c004741
RustFmt
HCastano Jan 27, 2025
fdd9a23
Update some comments
HCastano Jan 28, 2025
0914a1e
Remove helper that gives out TSS handles
HCastano Jan 28, 2025
9287f7c
Add TODO comment about timeout errors
HCastano Jan 28, 2025
a65103b
Fix typo
HCastano Jan 28, 2025
1abd6dd
Clean up reportable error
HCastano Jan 28, 2025
06e2115
Add functionality to report peers if we time out while waiting to con…
HCastano Jan 29, 2025
2d82e01
Clean up how we get unsubscribed peers
HCastano Jan 29, 2025
e175082
RustFmt
HCastano Jan 29, 2025
4c5483a
Remove test ignore
HCastano Jan 29, 2025
058ede6
Make naming for a bit more consistent
HCastano Jan 29, 2025
367e846
Remove `Option` from `Timeout` error
HCastano Jan 29, 2025
607570b
RustFmt
HCastano Jan 29, 2025
c5d6053
Remove TODO comment
HCastano Jan 29, 2025
3dfa5db
Remove `dbg!` statements
HCastano Jan 29, 2025
42935bc
Add `CHANGELOG` entry
HCastano Jan 29, 2025
70a76db
Merge branch 'master' into hc/off-chain-peer-reporting
HCastano Jan 29, 2025
8362290
Fix typo in debug message
HCastano Jan 29, 2025
251e422
Improve test name
HCastano Jan 29, 2025
62060dc
Avoid using full import path
HCastano Jan 29, 2025
b2c1852
Update lockfile
HCastano Jan 30, 2025
7853f62
Merge branch 'master' into hc/off-chain-peer-reporting
HCastano Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ runtime
- Add TDX test network chainspec ([#1204](https://github.com/entropyxyz/entropy-core/pull/1204))
- Test CLI command to retrieve quote and change endpoint / TSS account in one command ([#1198](https://github.com/entropyxyz/entropy-core/pull/1198))
- On-chain unresponsiveness reporting [(#1215)](https://github.com/entropyxyz/entropy-core/pull/1215)
- Report unstable peers from TSS [(#1228)](https://github.com/entropyxyz/entropy-core/pull/1228)
- Add cli options for adding validator [(#1242)](https://github.com/entropyxyz/entropy-core/pull/1242)

### Changed
Expand Down
2 changes: 1 addition & 1 deletion crates/protocol/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::sync::{broadcast, mpsc, oneshot};
pub type ListenerResult = Result<Broadcaster, ListenerErr>;

/// Tracks which validators we are connected to for a particular protocol execution
/// and sets up channels for exchaning protocol messages
/// and sets up channels for exchanging protocol messages
#[derive(Debug)]
pub struct Listener {
/// Endpoint to create subscriptions
Expand Down
1 change: 1 addition & 0 deletions crates/protocol/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ async fn open_protocol_connections(
// Check the response as to whether they accepted our SubscribeMessage
let response_message = encrypted_connection.recv().await?;
let subscribe_response: Result<(), String> = bincode::deserialize(&response_message)?;

if let Err(error_message) = subscribe_response {
return Err(anyhow!(error_message));
}
Expand Down
12 changes: 9 additions & 3 deletions crates/testing-utils/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ use hex_literal::hex;
use subxt::utils::AccountId32;

lazy_static! {
pub static ref ALICE_STASH_ADDRESS: AccountId32 = hex!["be5ddb1579b72e84524fc29e78609e3caf42e85aa118ebfe0b0ad404b5bdd25f"].into();
pub static ref ALICE_STASH_ADDRESS: AccountId32 =
hex!["be5ddb1579b72e84524fc29e78609e3caf42e85aa118ebfe0b0ad404b5bdd25f"].into(); // subkey inspect //Alice//stash
pub static ref BOB_STASH_ADDRESS: AccountId32 =
hex!["fe65717dad0447d715f660a0a58411de509b42e6efb8375f562f58a554d5860e"].into(); // subkey inspect //Bob//stash
pub static ref CHARLIE_STASH_ADDRESS: AccountId32 =
hex!["1e07379407fecc4b89eb7dbd287c2c781cfb1907a96947a3eb18e4f8e7198625"].into(); // subkey inspect //Charlie//stash
pub static ref DAVE_STASH_ADDRESS: AccountId32 =
hex!["e860f1b1c7227f7c22602f53f15af80747814dffd839719731ee3bba6edc126c"].into(); // subkey inspect //Dave//stash

pub static ref RANDOM_ACCOUNT: AccountId32 = hex!["8676839ca1e196624106d17c56b1efbb90508a86d8053f7d4fcd21127a9f7565"].into();
pub static ref VALIDATOR_1_STASH_ID: AccountId32 =
hex!["be5ddb1579b72e84524fc29e78609e3caf42e85aa118ebfe0b0ad404b5bdd25f"].into(); // alice stash;
pub static ref BOB_STASH_ADDRESS: AccountId32 =
hex!["fe65717dad0447d715f660a0a58411de509b42e6efb8375f562f58a554d5860e"].into(); // subkey inspect //Bob//stash

// See entropy_tss::helpers::validator::get_signer_and_x25519_secret for how these are derived
pub static ref TSS_ACCOUNTS: Vec<AccountId32> = vec![
Expand Down
13 changes: 10 additions & 3 deletions crates/threshold-signature-server/src/helpers/signing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,16 @@ pub async fn do_signing(
.await?;

let channels = {
let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?;
let broadcast_out = ready??;
Channels(broadcast_out, rx_from_others)
match timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await {
Ok(ready) => {
let broadcast_out = ready??;
Channels(broadcast_out, rx_from_others)
},
Err(e) => {
let unsubscribed_peers = app_state.unsubscribed_peers(&session_id)?;
return Err(ProtocolErr::Timeout { source: e, inactive_peers: unsubscribed_peers });
},
}
};

let result = signing_service
Expand Down
1 change: 1 addition & 0 deletions crates/threshold-signature-server/src/helpers/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub async fn do_dkg(
x25519_secret_key,
)
.await?;

let channels = {
let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?;
let broadcast_out = ready??;
Expand Down
13 changes: 13 additions & 0 deletions crates/threshold-signature-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,19 @@ impl AppState {
pub fn new(configuration: Configuration, kv_store: KvManager) -> Self {
Self { listener_state: ListenerState::default(), configuration, kv_store }
}

/// Gets the list of peers who haven't yet subscribed to us for this particular session.
pub fn unsubscribed_peers(
&self,
session_id: &entropy_protocol::SessionId,
) -> Result<Vec<subxt::utils::AccountId32>, crate::signing_client::ProtocolErr> {
self.listener_state.unsubscribed_peers(session_id).map_err(|_| {
crate::signing_client::ProtocolErr::SessionError(format!(
"Unable to get unsubscribed peers for `SessionId` {:?}",
session_id,
))
})
}
}

pub fn app(app_state: AppState) -> Router {
Expand Down
19 changes: 15 additions & 4 deletions crates/threshold-signature-server/src/signing_client/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ pub async fn ws_handler(
async fn handle_socket_result(socket: WebSocket, app_state: AppState) {
if let Err(err) = handle_socket(socket, app_state).await {
tracing::warn!("Websocket connection closed unexpectedly {:?}", err);
// TODO here we should inform the chain that signing failed
};
}

Expand Down Expand Up @@ -293,7 +292,19 @@ pub async fn get_channels(
)
.await?;

let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?;
let broadcast_out = ready??;
Ok(Channels(broadcast_out, rx_from_others))
match timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await {
Ok(ready) => {
let broadcast_out = ready??;
Ok(Channels(broadcast_out, rx_from_others))
},
Err(e) => {
let unsubscribed_peers = state.unsubscribed_peers(session_id).map_err(|_| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to map the error here? Aren't we mapping it to what it already was? or am i missing something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two unsubscribed_peers() methods - the one on ListenerState (which is what's used) here returns a SubscribeErr so we have to map it.

The one on AppState returns a ProtocolErr but we don't have access to that here

crate::signing_client::ProtocolErr::SessionError(format!(
"Unable to get unsubscribed peers for `SessionId` {:?}",
session_id,
))
})?;
Err(ProtocolErr::Timeout { source: e, inactive_peers: unsubscribed_peers })
},
}
}
26 changes: 16 additions & 10 deletions crates/threshold-signature-server/src/signing_client/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use axum::{
};
use entropy_kvdb::kv_manager::error::InnerKvError;
use entropy_protocol::errors::ProtocolExecutionErr;
use subxt::utils::AccountId32;
use thiserror::Error;
use tokio::sync::oneshot::error::RecvError;

Expand All @@ -46,8 +47,8 @@ pub enum ProtocolErr {
Deserialization(String),
#[error("Oneshot timeout error: {0}")]
OneshotTimeout(#[from] RecvError),
#[error("Subscribe API error: {0}")]
Subscribe(#[from] SubscribeErr),
#[error("Subscribe API error: {source} by TSS Account `{account_id:?}`")]
Subscribe { source: SubscribeErr, account_id: AccountId32 },
#[error("reqwest error: {0}")]
Reqwest(#[from] reqwest::Error),
#[error("Utf8Error: {0:?}")]
Expand All @@ -70,18 +71,21 @@ pub enum ProtocolErr {
UserError(String),
#[error("Validation Error: {0}")]
ValidationErr(#[from] crate::validation::errors::ValidationErr),
#[error("Subscribe message rejected: {0}")]
BadSubscribeMessage(String),
#[error("Subscribe message rejected: {message} by TSS Account `{account_id}`")]
BadSubscribeMessage { message: String, account_id: AccountId32 },
#[error("From Hex Error: {0}")]
FromHex(#[from] hex::FromHexError),
#[error("Conversion Error: {0}")]
Conversion(&'static str),
#[error("Could not open ws connection: {0}")]
ConnectionError(#[from] tokio_tungstenite::tungstenite::Error),
#[error("Timed out waiting for remote party")]
Timeout(#[from] tokio::time::error::Elapsed),
#[error("Encrypted connection error {0}")]
EncryptedConnection(String),
#[error("Could not open ws connection: {source} with the TSS Account `{account_id:?}`")]
ConnectionError { source: tokio_tungstenite::tungstenite::Error, account_id: AccountId32 },
#[error("Timed out while waiting for peer(s): {:?}", inactive_peers)]
Timeout { source: tokio::time::error::Elapsed, inactive_peers: Vec<AccountId32> },
#[error("Encrypted connection error {source:?} with the TSS Account `{account_id:?}`")]
EncryptedConnection {
source: entropy_protocol::protocol_transport::errors::EncryptedConnectionErr,
account_id: AccountId32,
},
#[error("Program error: {0}")]
ProgramError(#[from] crate::user::errors::ProgramError),
#[error("Invalid length for converting address")]
Expand Down Expand Up @@ -143,6 +147,8 @@ pub enum SubscribeErr {
VersionMismatch(
#[from] entropy_protocol::protocol_transport::errors::ProtocolVersionMismatchError,
),
#[error("Unable to find `Listener` with `SessionId`: {0:?}")]
NoSessionId(entropy_protocol::SessionId),
}

impl IntoResponse for SubscribeErr {
Expand Down
16 changes: 16 additions & 0 deletions crates/threshold-signature-server/src/signing_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,20 @@ impl ListenerState {
.map_err(|e| SubscribeErr::LockError(e.to_string()))?
.contains_key(session_id))
}

/// Gets the list of peers who haven't yet subscribed to us for this particular session.
pub fn unsubscribed_peers(
&self,
session_id: &SessionId,
) -> Result<Vec<subxt::utils::AccountId32>, SubscribeErr> {
let listeners =
self.listeners.lock().map_err(|e| SubscribeErr::LockError(e.to_string()))?;
let listener =
listeners.get(session_id).ok_or(SubscribeErr::NoSessionId(session_id.clone()))?;

let unsubscribed_peers =
listener.validators.keys().map(|id| subxt::utils::AccountId32(*id)).collect();

Ok(unsubscribed_peers)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,31 @@ pub async fn open_protocol_connections(
let connect_to_validators = validators_info
.iter()
.filter(|validators_info| {
// Decide whether to initiate a connection by comparing accound ids
// otherwise, we wait for them to connect to us
signer.public().0 > validators_info.tss_account.0
// Decide whether to initiate a connection by comparing account IDs, otherwise we wait
// for them to connect to us
let initiate_connection = signer.public().0 > validators_info.tss_account.0;
if !initiate_connection {
tracing::debug!(
"Waiting for {:?} to open a connect with us.",
HCastano marked this conversation as resolved.
Show resolved Hide resolved
validators_info.tss_account.0
);
}

initiate_connection
})
.map(|validator_info| async move {
tracing::debug!(
"Attempting to open protocol connections with {:?}",
validator_info.tss_account.0.clone()
);

// Open a ws connection
let ws_endpoint = format!("ws://{}/ws", validator_info.ip_address);
let (ws_stream, _response) = connect_async(ws_endpoint).await?;
let (ws_stream, _response) =
connect_async(ws_endpoint).await.map_err(|e| ProtocolErr::ConnectionError {
source: e,
account_id: validator_info.tss_account.clone(),
})?;

// Send a SubscribeMessage in the payload of the final handshake message
let subscribe_message_vec =
Expand All @@ -75,32 +92,50 @@ pub async fn open_protocol_connections(
subscribe_message_vec,
)
.await
.map_err(|e| ProtocolErr::EncryptedConnection(e.to_string()))?;
.map_err(|e| ProtocolErr::EncryptedConnection {
source: e,
account_id: validator_info.tss_account.clone(),
})?;

// Check the response as to whether they accepted our SubscribeMessage
let response_message = encrypted_connection
.recv()
.await
.map_err(|e| ProtocolErr::EncryptedConnection(e.to_string()))?;
let response_message = encrypted_connection.recv().await.map_err(|e| {
ProtocolErr::EncryptedConnection {
source: e,
account_id: validator_info.tss_account.clone(),
}
})?;

let subscribe_response: Result<(), String> = bincode::deserialize(&response_message)?;
if let Err(error_message) = subscribe_response {
// In future versions, we can check here if the error is VersionTooNew(version)
// and if possible the downgrade protocol messages used to be backward compatible
return Err(ProtocolErr::BadSubscribeMessage(error_message));
return Err(ProtocolErr::BadSubscribeMessage {
message: error_message,
account_id: validator_info.tss_account.clone(),
});
}

// Setup channels
let ws_channels = get_ws_channels(state, session_id, &validator_info.tss_account)?;
let ws_channels = get_ws_channels(state, session_id, &validator_info.tss_account)
.map_err(|e| ProtocolErr::Subscribe {
source: e,
account_id: validator_info.tss_account.clone(),
})?;

let remote_party_id = PartyId::new(validator_info.tss_account.clone());
let account_id = validator_info.tss_account.clone();

// Handle protocol messages
tokio::spawn(async move {
if let Err(err) =
ws_to_channels(encrypted_connection, ws_channels, remote_party_id).await
{
tracing::warn!("{:?}", err);
};
ws_to_channels(encrypted_connection, ws_channels, remote_party_id).await.map_err(
|err| {
tracing::warn!("{:?}", err);
Err::<(), ProtocolErr>(ProtocolErr::EncryptedConnection {
source: err.into(),
account_id,
})
},
)
});

Ok::<_, ProtocolErr>(())
Expand Down
64 changes: 58 additions & 6 deletions crates/threshold-signature-server/src/user/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use x25519_dalek::StaticSecret;

use super::UserErr;
use crate::chain_api::entropy::runtime_types::pallet_registry::pallet::RegisteredInfo;
use crate::signing_client::ProtocolErr;
use crate::{
chain_api::{entropy, get_api, get_rpc, EntropyConfig},
helpers::{
Expand Down Expand Up @@ -351,14 +352,15 @@ pub async fn sign_tx(
request_limit,
derivation_path,
)
.await
.map(|signature| {
(
.await;

let signing_protocol_output = match signing_protocol_output {
Ok(signature) => Ok((
BASE64_STANDARD.encode(signature.to_rsv_bytes()),
signer.signer().sign(&signature.to_rsv_bytes()),
)
})
.map_err(|error| error.to_string());
)),
Err(e) => Err(handle_protocol_errors(&api, &rpc, &signer, e).await.unwrap_err()),
};

// This response chunk is sent later with the result of the signing protocol
if response_tx.try_send(serde_json::to_string(&signing_protocol_output)).is_err() {
Expand All @@ -370,6 +372,56 @@ pub async fn sign_tx(
Ok((StatusCode::OK, Body::from_stream(response_rx)))
}

/// Helper for handling different protocol errors.
///
/// If the error is of the reportable type (e.g an offence from another peer) it will be reported
/// on-chain by this helper.
async fn handle_protocol_errors(
api: &OnlineClient<EntropyConfig>,
rpc: &LegacyRpcMethods<EntropyConfig>,
signer: &PairSigner<EntropyConfig, sr25519::Pair>,
error: ProtocolErr,
) -> Result<(), String> {
let peers_to_report: Vec<SubxtAccountId32> = match &error {
ProtocolErr::ConnectionError { account_id, .. }
| ProtocolErr::EncryptedConnection { account_id, .. }
| ProtocolErr::BadSubscribeMessage { account_id, .. }
| ProtocolErr::Subscribe { account_id, .. } => vec![account_id.clone()],

ProtocolErr::Timeout { inactive_peers, .. } => inactive_peers.clone(),
_ => vec![],
};

// This is a non-reportable error, so we don't do any further processing with the error
if peers_to_report.is_empty() {
return Err(error.to_string());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as for the response here shouldn't this be Ok() as in the function here did not error out, everything progressed normally within the context of this function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree with you - but if we change this to an Ok<String> it'll mean that the caller will end up needing to treat the Ok case as a failure/error, because that's what it is. By only returning errors were we can save ourselves this and just do an unwrap_err() at the call site.

We also don't do any extra processing with the Err case - we log it and move on. If we had some different behaviour at the call site, then yes could make sense.

}

tracing::debug!("Reporting `{:?}` for `{}`", peers_to_report.clone(), error.to_string());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the logs these (and basically all the other AccountIds from this PR) are formatted as byte arrays. I can make them SS58 addresses, but this would potentially give us inconsistent logs with other events which use byte arrays. While they're harder to read I'd rather have more consistency across our logs - but lmk what you think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for being consistent everywhere. if i remember right subxt doesn't have the ss58 codec but sp-core does. At one point this was an issue as in some places we didn't have sp-core, but im pretty sure we sorted that and all our crates which would need to log something have it now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #1271. Could be a Friday activity for someone lol


let mut failed_reports = Vec::new();
for peer in peers_to_report {
let report_unstable_peer_tx =
entropy::tx().staking_extension().report_unstable_peer(peer.clone());

if let Err(tx_error) =
submit_transaction(api, rpc, signer, &report_unstable_peer_tx, None).await
{
failed_reports.push(format!("{}", tx_error));
}
}

if failed_reports.is_empty() {
Err(error.to_string())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

} else {
Err(format!(
"Failed to report peers for `{}` due to `{}`)",
error,
failed_reports.join(", ")
))
}
}

/// HTTP POST endpoint called by the off-chain worker (Propagation pallet) during the network
/// jumpstart.
///
Expand Down
Loading
Loading