Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Show close reason after disconnect & close gracefully #16

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 121 additions & 90 deletions src/doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,27 +569,27 @@ Ipv6:

/// Sends, receives and echoes data in a connection.
async fn active_side(
connection: Connection,
connection: &Connection,
config: &TestConfig,
gui: Option<&Gui>,
) -> anyhow::Result<()> {
let n = config.iterations.unwrap_or(u64::MAX);
if let Some(gui) = gui {
let pb = Some(&gui.pb);
for _ in 0..n {
let d = send_test(&connection, config, pb).await?;
let d = send_test(connection, config, pb).await?;
gui.set_send(config.size, d);
let d = recv_test(&connection, config, pb).await?;
let d = recv_test(connection, config, pb).await?;
gui.set_recv(config.size, d);
let d = echo_test(&connection, config, pb).await?;
let d = echo_test(connection, config, pb).await?;
gui.set_echo(config.size, d);
}
} else {
let pb = None;
for _ in 0..n {
let _d = send_test(&connection, config, pb).await?;
let _d = recv_test(&connection, config, pb).await?;
let _d = echo_test(&connection, config, pb).await?;
let _d = send_test(connection, config, pb).await?;
let _d = recv_test(connection, config, pb).await?;
let _d = echo_test(connection, config, pb).await?;
}
}

Expand Down Expand Up @@ -689,7 +689,7 @@ async fn recv_test(
}

/// Accepts connections and answers requests (echo, drain or send) as passive side.
async fn passive_side(gui: Gui, connection: Connection) -> anyhow::Result<()> {
async fn passive_side(gui: Gui, connection: &Connection) -> anyhow::Result<()> {
let conn = connection.clone();
let accept_loop = async move {
let result = loop {
Expand Down Expand Up @@ -783,29 +783,46 @@ async fn connect(
) -> anyhow::Result<()> {
let endpoint = make_endpoint(secret_key, relay_map, discovery).await?;

tracing::info!("dialing {:?}", node_id);
let node_addr = NodeAddr::from_parts(node_id, relay_url, direct_addresses);
let conn = endpoint.connect(node_addr, &DR_RELAY_ALPN).await;
match conn {
Ok(connection) => {
let maybe_conn_type = endpoint.conn_type(node_id);
let gui = Gui::new(endpoint, node_id);
if let Ok(conn_type) = maybe_conn_type {
log_connection_changes(gui.mp.clone(), node_id, conn_type);
}
futures_lite::future::race(close_endpoint_on_ctrl_c(endpoint.clone()), async move {
tracing::info!("dialing {:?}", node_id);
let node_addr = NodeAddr::from_parts(node_id, relay_url, direct_addresses);
let conn = endpoint.connect(node_addr, &DR_RELAY_ALPN).await;
match conn {
Ok(connection) => {
let maybe_conn_type = endpoint.conn_type(node_id);
let gui = Gui::new(endpoint, node_id);
if let Ok(conn_type) = maybe_conn_type {
log_connection_changes(gui.mp.clone(), node_id, conn_type);
}

if let Err(cause) = passive_side(gui, connection).await {
eprintln!("error handling connection: {cause}");
let close_reason = connection
.close_reason()
.map(|e| format!(" (reason: {e})"))
.unwrap_or_default();

if let Err(cause) = passive_side(gui, &connection).await {
eprintln!("error handling connection: {cause}{close_reason}");
} else {
eprintln!("Connection closed{close_reason}");
}
}
Err(cause) => {
eprintln!("unable to connect to {node_id}: {cause}");
}
}
Err(cause) => {
eprintln!("unable to connect to {node_id}: {cause}");
}
}
})
.await;

Ok(())
}

async fn close_endpoint_on_ctrl_c(endpoint: Endpoint) {
tokio::signal::ctrl_c()
.await
.expect("failed listening to SIGINT");
endpoint.close().await;
}

/// Formats a [`SocketAddr`] so that console doesn't escape it.
fn format_addr(addr: SocketAddr) -> String {
if addr.is_ipv6() {
Expand All @@ -823,78 +840,92 @@ async fn accept(
discovery: Option<Box<dyn Discovery>>,
) -> anyhow::Result<()> {
let endpoint = make_endpoint(secret_key.clone(), relay_map, discovery).await?;
let endpoints = endpoint.direct_addresses().initialized().await?;
let remote_addrs = endpoints
.iter()
.map(|endpoint| format!("--remote-endpoint {}", format_addr(endpoint.addr)))
.collect::<Vec<_>>()
.join(" ");
println!("Connect to this node using one of the following commands:\n");
println!(
"\tUsing the relay url and direct connections:\niroh-doctor connect {} {}\n",
secret_key.public(),
remote_addrs,
);
if let Some(relay_url) = endpoint.home_relay().get()? {
println!(
"\tUsing just the relay url:\niroh-doctor connect {} --relay-url {}\n",
secret_key.public(),
relay_url,
);
}
if endpoint.discovery().is_some() {

futures_lite::future::race(close_endpoint_on_ctrl_c(endpoint.clone()), async move {
let endpoints = endpoint
.direct_addresses()
.initialized()
.await
.expect("endpoint alive");

let remote_addrs = endpoints
.iter()
.map(|endpoint| format!("--remote-endpoint {}", format_addr(endpoint.addr)))
.collect::<Vec<_>>()
.join(" ");
println!("Connect to this node using one of the following commands:\n");
println!(
"\tUsing just the node id:\niroh-doctor connect {}\n",
"\tUsing the relay url and direct connections:\niroh-doctor connect {} {}\n",
secret_key.public(),
remote_addrs,
);
}
let connections = Arc::new(AtomicU64::default());
while let Some(incoming) = endpoint.accept().await {
let connecting = match incoming.accept() {
Ok(connecting) => connecting,
Err(err) => {
warn!("incoming connection failed: {err:#}");
// we can carry on in these cases:
// this can be caused by retransmitted datagrams
continue;
}
};
let connections = connections.clone();
let endpoint = endpoint.clone();
tokio::task::spawn(async move {
let n = connections.fetch_add(1, portable_atomic::Ordering::SeqCst);
match connecting.await {
Ok(connection) => {
if n == 0 {
let Ok(remote_peer_id) = endpoint::get_remote_node_id(&connection) else {
return;
};
println!("Accepted connection from {}", remote_peer_id);
let t0 = Instant::now();
let gui = Gui::new(endpoint.clone(), remote_peer_id);
if let Ok(conn_type) = endpoint.conn_type(remote_peer_id) {
log_connection_changes(gui.mp.clone(), remote_peer_id, conn_type);
}
let res = active_side(connection, &config, Some(&gui)).await;
gui.clear();
let dt = t0.elapsed().as_secs_f64();
if let Err(cause) = res {
eprintln!("Test finished after {dt}s: {cause}",);
if let Some(relay_url) = endpoint.home_relay().get().expect("endpoint alive") {
println!(
"\tUsing just the relay url:\niroh-doctor connect {} --relay-url {}\n",
secret_key.public(),
relay_url,
);
}
if endpoint.discovery().is_some() {
println!(
"\tUsing just the node id:\niroh-doctor connect {}\n",
secret_key.public(),
);
}
let connections = Arc::new(AtomicU64::default());
while let Some(incoming) = endpoint.accept().await {
let connecting = match incoming.accept() {
Ok(connecting) => connecting,
Err(err) => {
warn!("incoming connection failed: {err:#}");
// we can carry on in these cases:
// this can be caused by retransmitted datagrams
continue;
}
};
let connections = connections.clone();
let endpoint = endpoint.clone();
tokio::task::spawn(async move {
let n = connections.fetch_add(1, portable_atomic::Ordering::SeqCst);
match connecting.await {
Ok(connection) => {
if n == 0 {
let Ok(remote_peer_id) = endpoint::get_remote_node_id(&connection)
else {
return;
};
println!("Accepted connection from {}", remote_peer_id);
let t0 = Instant::now();
let gui = Gui::new(endpoint.clone(), remote_peer_id);
if let Ok(conn_type) = endpoint.conn_type(remote_peer_id) {
log_connection_changes(gui.mp.clone(), remote_peer_id, conn_type);
}
let res = active_side(&connection, &config, Some(&gui)).await;
gui.clear();
let dt = t0.elapsed().as_secs_f64();
if let Err(cause) = res {
let close_reason = connection
.close_reason()
.map(|e| format!(" (reason: {e})"))
.unwrap_or_default();
eprintln!("Test finished after {dt}s: {cause}{close_reason}",);
} else {
eprintln!("Test finished after {dt}s",);
}
} else {
eprintln!("Test finished after {dt}s",);
// silent
active_side(&connection, &config, None).await.ok();
}
} else {
// silent
active_side(connection, &config, None).await.ok();
}
}
Err(cause) => {
eprintln!("error accepting connection {cause}");
}
};
connections.sub(1, portable_atomic::Ordering::SeqCst);
});
}
Err(cause) => {
eprintln!("error accepting connection {cause}");
}
};
connections.sub(1, portable_atomic::Ordering::SeqCst);
});
}
})
.await;

Ok(())
}
Expand Down