Skip to content

Commit

Permalink
Add an account_id to errors which may be reported
Browse files Browse the repository at this point in the history
This is going to be the `offender` account ID which gets submitted on-chain.
  • Loading branch information
HCastano committed Dec 19, 2024
1 parent 1c1bc75 commit b785279
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 23 deletions.
1 change: 1 addition & 0 deletions crates/protocol/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ async fn open_protocol_connections(
// Check the response as to whether they accepted our SubscribeMessage
let response_message = encrypted_connection.recv().await?;
let subscribe_response: Result<(), String> = bincode::deserialize(&response_message)?;

if let Err(error_message) = subscribe_response {
return Err(anyhow!(error_message));
}
Expand Down
15 changes: 13 additions & 2 deletions crates/threshold-signature-server/src/helpers/signing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,25 @@ pub async fn do_signing(
.map_err(|_| ProtocolErr::SessionError("Error getting lock".to_string()))?
.insert(session_id.clone(), listener);

open_protocol_connections(
let result = open_protocol_connections(
&sign_context.sign_init.validators_info,
&session_id,
signer,
state,
&x25519_secret_key,
)
.await?;
.await;

match result {
Ok(_) => (),
Err(ProtocolErr::ConnectionError { source: _, account_id: _ }) => {
todo!()
},
// Err(ProtocolErr::EncryptedConnection(_)) => todo!(),
// Err(ProtocolErr::BadSubscribeMessage(_)) => todo!(),
// Err(ProtocolErr::Subscribe(_)) => todo!(),
_ => todo!(),
}

let channels = {
let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?;
Expand Down
1 change: 1 addition & 0 deletions crates/threshold-signature-server/src/helpers/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub async fn do_dkg(
x25519_secret_key,
)
.await?;

let channels = {
let ready = timeout(Duration::from_secs(SETUP_TIMEOUT_SECONDS), rx_ready).await?;
let broadcast_out = ready??;
Expand Down
2 changes: 2 additions & 0 deletions crates/threshold-signature-server/src/signing_client/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ async fn handle_socket_result(socket: WebSocket, app_state: AppState) {
if let Err(err) = handle_socket(socket, app_state).await {
tracing::warn!("Websocket connection closed unexpectedly {:?}", err);
// TODO here we should inform the chain that signing failed
//
// TODO (Nando): Report error up here?
};
}

Expand Down
20 changes: 12 additions & 8 deletions crates/threshold-signature-server/src/signing_client/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use axum::{
};
use entropy_kvdb::kv_manager::error::InnerKvError;
use entropy_protocol::errors::ProtocolExecutionErr;
use subxt::utils::AccountId32;
use thiserror::Error;
use tokio::sync::oneshot::error::RecvError;

Expand All @@ -46,8 +47,8 @@ pub enum ProtocolErr {
Deserialization(String),
#[error("Oneshot timeout error: {0}")]
OneshotTimeout(#[from] RecvError),
#[error("Subscribe API error: {0}")]
Subscribe(#[from] SubscribeErr),
#[error("Subscribe API error: {source} by TSS Account `{account_id:?}`")]
Subscribe { source: SubscribeErr, account_id: AccountId32 },
#[error("reqwest error: {0}")]
Reqwest(#[from] reqwest::Error),
#[error("Utf8Error: {0:?}")]
Expand All @@ -70,18 +71,21 @@ pub enum ProtocolErr {
UserError(String),
#[error("Validation Error: {0}")]
ValidationErr(#[from] crate::validation::errors::ValidationErr),
#[error("Subscribe message rejected: {0}")]
BadSubscribeMessage(String),
#[error("Subscribe message rejected: {message} by TSS Account `{account_id:?}`")]
BadSubscribeMessage { message: String, account_id: AccountId32 },
#[error("From Hex Error: {0}")]
FromHex(#[from] hex::FromHexError),
#[error("Conversion Error: {0}")]
Conversion(&'static str),
#[error("Could not open ws connection: {0}")]
ConnectionError(#[from] tokio_tungstenite::tungstenite::Error),
#[error("Could not open ws connection: {source} with the TSS Account `{account_id:?}`")]
ConnectionError { source: tokio_tungstenite::tungstenite::Error, account_id: AccountId32 },
#[error("Timed out waiting for remote party")]
Timeout(#[from] tokio::time::error::Elapsed),
#[error("Encrypted connection error {0}")]
EncryptedConnection(String),
#[error("Encrypted connection error {source:?} with the TSS Account `{account_id:?}`")]
EncryptedConnection {
source: entropy_protocol::protocol_transport::errors::EncryptedConnectionErr,
account_id: AccountId32,
},
#[error("Program error: {0}")]
ProgramError(#[from] crate::user::errors::ProgramError),
#[error("Invalid length for converting address")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ pub async fn open_protocol_connections(
.map(|validator_info| async move {
// Open a ws connection
let ws_endpoint = format!("ws://{}/ws", validator_info.ip_address);
let (ws_stream, _response) = connect_async(ws_endpoint).await?;
let (ws_stream, _response) =
connect_async(ws_endpoint).await.map_err(|e| ProtocolErr::ConnectionError {
source: e,
account_id: validator_info.tss_account.clone(),
})?;

// Send a SubscribeMessage in the payload of the final handshake message
let subscribe_message_vec =
Expand All @@ -75,32 +79,50 @@ pub async fn open_protocol_connections(
subscribe_message_vec,
)
.await
.map_err(|e| ProtocolErr::EncryptedConnection(e.to_string()))?;
.map_err(|e| ProtocolErr::EncryptedConnection {
source: e,
account_id: validator_info.tss_account.clone(),
})?;

// Check the response as to whether they accepted our SubscribeMessage
let response_message = encrypted_connection
.recv()
.await
.map_err(|e| ProtocolErr::EncryptedConnection(e.to_string()))?;
let response_message = encrypted_connection.recv().await.map_err(|e| {
ProtocolErr::EncryptedConnection {
source: e,
account_id: validator_info.tss_account.clone(),
}
})?;

let subscribe_response: Result<(), String> = bincode::deserialize(&response_message)?;
if let Err(error_message) = subscribe_response {
// In future versions, we can check here if the error is VersionTooNew(version)
// and if possible the downgrade protocol messages used to be backward compatible
return Err(ProtocolErr::BadSubscribeMessage(error_message));
return Err(ProtocolErr::BadSubscribeMessage {
message: error_message,
account_id: validator_info.tss_account.clone(),
});
}

// Setup channels
let ws_channels = get_ws_channels(state, session_id, &validator_info.tss_account)?;
let ws_channels = get_ws_channels(state, session_id, &validator_info.tss_account)
.map_err(|e| ProtocolErr::Subscribe {
source: e,
account_id: validator_info.tss_account.clone(),
})?;

let remote_party_id = PartyId::new(validator_info.tss_account.clone());
let account_id = validator_info.tss_account.clone();

// Handle protocol messages
tokio::spawn(async move {
if let Err(err) =
ws_to_channels(encrypted_connection, ws_channels, remote_party_id).await
{
tracing::warn!("{:?}", err);
};
ws_to_channels(encrypted_connection, ws_channels, remote_party_id).await.map_err(
|err| {
tracing::warn!("{:?}", err);
Err::<(), ProtocolErr>(ProtocolErr::EncryptedConnection {
source: err.into(),
account_id,
})
},
)
});

Ok::<_, ProtocolErr>(())
Expand Down

0 comments on commit b785279

Please sign in to comment.