diff --git a/src/doctor.rs b/src/doctor.rs index 0def757..7acec95 100644 --- a/src/doctor.rs +++ b/src/doctor.rs @@ -569,7 +569,7 @@ Ipv6: /// Sends, receives and echoes data in a connection. async fn active_side( - connection: Connection, + connection: &Connection, config: &TestConfig, gui: Option<&Gui>, ) -> anyhow::Result<()> { @@ -577,19 +577,19 @@ async fn active_side( if let Some(gui) = gui { let pb = Some(&gui.pb); for _ in 0..n { - let d = send_test(&connection, config, pb).await?; + let d = send_test(connection, config, pb).await?; gui.set_send(config.size, d); - let d = recv_test(&connection, config, pb).await?; + let d = recv_test(connection, config, pb).await?; gui.set_recv(config.size, d); - let d = echo_test(&connection, config, pb).await?; + let d = echo_test(connection, config, pb).await?; gui.set_echo(config.size, d); } } else { let pb = None; for _ in 0..n { - let _d = send_test(&connection, config, pb).await?; - let _d = recv_test(&connection, config, pb).await?; - let _d = echo_test(&connection, config, pb).await?; + let _d = send_test(connection, config, pb).await?; + let _d = recv_test(connection, config, pb).await?; + let _d = echo_test(connection, config, pb).await?; } } @@ -689,7 +689,7 @@ async fn recv_test( } /// Accepts connections and answers requests (echo, drain or send) as passive side. -async fn passive_side(gui: Gui, connection: Connection) -> anyhow::Result<()> { +async fn passive_side(gui: Gui, connection: &Connection) -> anyhow::Result<()> { let conn = connection.clone(); let accept_loop = async move { let result = loop { @@ -783,29 +783,46 @@ async fn connect( ) -> anyhow::Result<()> { let endpoint = make_endpoint(secret_key, relay_map, discovery).await?; - tracing::info!("dialing {:?}", node_id); - let node_addr = NodeAddr::from_parts(node_id, relay_url, direct_addresses); - let conn = endpoint.connect(node_addr, &DR_RELAY_ALPN).await; - match conn { - Ok(connection) => { - let maybe_conn_type = endpoint.conn_type(node_id); - let gui = Gui::new(endpoint, node_id); - if let Ok(conn_type) = maybe_conn_type { - log_connection_changes(gui.mp.clone(), node_id, conn_type); - } + futures_lite::future::race(close_endpoint_on_ctrl_c(endpoint.clone()), async move { + tracing::info!("dialing {:?}", node_id); + let node_addr = NodeAddr::from_parts(node_id, relay_url, direct_addresses); + let conn = endpoint.connect(node_addr, &DR_RELAY_ALPN).await; + match conn { + Ok(connection) => { + let maybe_conn_type = endpoint.conn_type(node_id); + let gui = Gui::new(endpoint, node_id); + if let Ok(conn_type) = maybe_conn_type { + log_connection_changes(gui.mp.clone(), node_id, conn_type); + } - if let Err(cause) = passive_side(gui, connection).await { - eprintln!("error handling connection: {cause}"); + let close_reason = connection + .close_reason() + .map(|e| format!(" (reason: {e})")) + .unwrap_or_default(); + + if let Err(cause) = passive_side(gui, &connection).await { + eprintln!("error handling connection: {cause}{close_reason}"); + } else { + eprintln!("Connection closed{close_reason}"); + } + } + Err(cause) => { + eprintln!("unable to connect to {node_id}: {cause}"); } } - Err(cause) => { - eprintln!("unable to connect to {node_id}: {cause}"); - } - } + }) + .await; Ok(()) } +async fn close_endpoint_on_ctrl_c(endpoint: Endpoint) { + tokio::signal::ctrl_c() + .await + .expect("failed listening to SIGINT"); + endpoint.close().await; +} + /// Formats a [`SocketAddr`] so that console doesn't escape it. fn format_addr(addr: SocketAddr) -> String { if addr.is_ipv6() { @@ -823,78 +840,92 @@ async fn accept( discovery: Option>, ) -> anyhow::Result<()> { let endpoint = make_endpoint(secret_key.clone(), relay_map, discovery).await?; - let endpoints = endpoint.direct_addresses().initialized().await?; - let remote_addrs = endpoints - .iter() - .map(|endpoint| format!("--remote-endpoint {}", format_addr(endpoint.addr))) - .collect::>() - .join(" "); - println!("Connect to this node using one of the following commands:\n"); - println!( - "\tUsing the relay url and direct connections:\niroh-doctor connect {} {}\n", - secret_key.public(), - remote_addrs, - ); - if let Some(relay_url) = endpoint.home_relay().get()? { - println!( - "\tUsing just the relay url:\niroh-doctor connect {} --relay-url {}\n", - secret_key.public(), - relay_url, - ); - } - if endpoint.discovery().is_some() { + + futures_lite::future::race(close_endpoint_on_ctrl_c(endpoint.clone()), async move { + let endpoints = endpoint + .direct_addresses() + .initialized() + .await + .expect("endpoint alive"); + + let remote_addrs = endpoints + .iter() + .map(|endpoint| format!("--remote-endpoint {}", format_addr(endpoint.addr))) + .collect::>() + .join(" "); + println!("Connect to this node using one of the following commands:\n"); println!( - "\tUsing just the node id:\niroh-doctor connect {}\n", + "\tUsing the relay url and direct connections:\niroh-doctor connect {} {}\n", secret_key.public(), + remote_addrs, ); - } - let connections = Arc::new(AtomicU64::default()); - while let Some(incoming) = endpoint.accept().await { - let connecting = match incoming.accept() { - Ok(connecting) => connecting, - Err(err) => { - warn!("incoming connection failed: {err:#}"); - // we can carry on in these cases: - // this can be caused by retransmitted datagrams - continue; - } - }; - let connections = connections.clone(); - let endpoint = endpoint.clone(); - tokio::task::spawn(async move { - let n = connections.fetch_add(1, portable_atomic::Ordering::SeqCst); - match connecting.await { - Ok(connection) => { - if n == 0 { - let Ok(remote_peer_id) = endpoint::get_remote_node_id(&connection) else { - return; - }; - println!("Accepted connection from {}", remote_peer_id); - let t0 = Instant::now(); - let gui = Gui::new(endpoint.clone(), remote_peer_id); - if let Ok(conn_type) = endpoint.conn_type(remote_peer_id) { - log_connection_changes(gui.mp.clone(), remote_peer_id, conn_type); - } - let res = active_side(connection, &config, Some(&gui)).await; - gui.clear(); - let dt = t0.elapsed().as_secs_f64(); - if let Err(cause) = res { - eprintln!("Test finished after {dt}s: {cause}",); + if let Some(relay_url) = endpoint.home_relay().get().expect("endpoint alive") { + println!( + "\tUsing just the relay url:\niroh-doctor connect {} --relay-url {}\n", + secret_key.public(), + relay_url, + ); + } + if endpoint.discovery().is_some() { + println!( + "\tUsing just the node id:\niroh-doctor connect {}\n", + secret_key.public(), + ); + } + let connections = Arc::new(AtomicU64::default()); + while let Some(incoming) = endpoint.accept().await { + let connecting = match incoming.accept() { + Ok(connecting) => connecting, + Err(err) => { + warn!("incoming connection failed: {err:#}"); + // we can carry on in these cases: + // this can be caused by retransmitted datagrams + continue; + } + }; + let connections = connections.clone(); + let endpoint = endpoint.clone(); + tokio::task::spawn(async move { + let n = connections.fetch_add(1, portable_atomic::Ordering::SeqCst); + match connecting.await { + Ok(connection) => { + if n == 0 { + let Ok(remote_peer_id) = endpoint::get_remote_node_id(&connection) + else { + return; + }; + println!("Accepted connection from {}", remote_peer_id); + let t0 = Instant::now(); + let gui = Gui::new(endpoint.clone(), remote_peer_id); + if let Ok(conn_type) = endpoint.conn_type(remote_peer_id) { + log_connection_changes(gui.mp.clone(), remote_peer_id, conn_type); + } + let res = active_side(&connection, &config, Some(&gui)).await; + gui.clear(); + let dt = t0.elapsed().as_secs_f64(); + if let Err(cause) = res { + let close_reason = connection + .close_reason() + .map(|e| format!(" (reason: {e})")) + .unwrap_or_default(); + eprintln!("Test finished after {dt}s: {cause}{close_reason}",); + } else { + eprintln!("Test finished after {dt}s",); + } } else { - eprintln!("Test finished after {dt}s",); + // silent + active_side(&connection, &config, None).await.ok(); } - } else { - // silent - active_side(connection, &config, None).await.ok(); } - } - Err(cause) => { - eprintln!("error accepting connection {cause}"); - } - }; - connections.sub(1, portable_atomic::Ordering::SeqCst); - }); - } + Err(cause) => { + eprintln!("error accepting connection {cause}"); + } + }; + connections.sub(1, portable_atomic::Ordering::SeqCst); + }); + } + }) + .await; Ok(()) }