From 281dd644fb28cbe5c3c1dbdbc245b2c7f96f544e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erkki=20Sepp=C3=A4l=C3=A4?= Date: Tue, 3 Jan 2023 21:19:34 +0200 Subject: [PATCH] offer: support concurrent requests; don't exit after serving one --- src/offer.rs | 47 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/src/offer.rs b/src/offer.rs index b38efd4..9d668af 100644 --- a/src/offer.rs +++ b/src/offer.rs @@ -1,8 +1,8 @@ use crate::{ - config, matrix_common, matrix_signaling::MatrixSignalingRouter, protocol, - signaling::SignalingRouter, transport, + config, level_event::LevelEvent, matrix_common, matrix_signaling::MatrixSignalingRouter, + protocol, signaling::SignalingRouter, transport, }; -use futures::{AsyncReadExt, AsyncWriteExt}; +use futures::{future::BoxFuture, AsyncReadExt, AsyncWriteExt}; use matrix_sdk::config::SyncSettings; use matrix_sdk::Client; use std::fs::{self, File}; @@ -64,18 +64,36 @@ pub async fn transfer(files: Vec, mut cn: transport::DataStream) -> Res Ok(()) } +// https://github.com/rust-lang/rust/issues/78649#issuecomment-1264353351 +pub fn accepter_recurse( + exit_signal: LevelEvent, + files: Vec, + signaling_router: MatrixSignalingRouter, +) -> BoxFuture<'static, ()> { + Box::pin(async move { + accepter(exit_signal, files, signaling_router) + .await + .unwrap() + }) as BoxFuture<()> +} + pub async fn accepter( + exit_signal: LevelEvent, files: Vec, mut signaling_router: MatrixSignalingRouter, ) -> Result<(), Error> { + info!("Waiting for new signaling peer"); let signaling = signaling_router.accept().await.unwrap(); + tokio::spawn({ + let exit_signal = exit_signal.clone(); + let files = files.clone(); + async move { accepter_recurse(exit_signal, files, signaling_router).await } + }); let mut transport = transport::Transport::new(signaling)?; - debug!("Accepting!"); let cn = transport.accept().await?; transfer(files, cn).await?; debug!("Accepted!"); - // debug!("Received ack, stopping"); // transport.stop().await?; // info!("Transfer stopped"); @@ -143,7 +161,16 @@ pub async fn offer(config: config::Config, room: &str, files: Vec<&str>) -> Resu "Offer for {} started; press ctrl-c to redact", uri.matrix_uri_string() ); - let ctrl_c = tokio::signal::ctrl_c(); + let exit_signal = LevelEvent::new(); + tokio::spawn({ + let exit_signal = exit_signal.clone(); + async move { + tokio::signal::ctrl_c() + .await + .expect("Failed to listen for ctrl-c"); + exit_signal.issue().await; + } + }); let sync_settings = SyncSettings::default() .filter(filter) @@ -153,20 +180,20 @@ pub async fn offer(config: config::Config, room: &str, files: Vec<&str>) -> Resu let signaling_router = MatrixSignalingRouter::new(client.clone(), device_id, event_id.clone()).await; - let task = tokio::spawn({ + tokio::spawn({ let files: Vec = files .into_iter() .map(|file| Path::new(file).to_path_buf()) .collect(); - async move { accepter(files, signaling_router).await } + let exit_signal = exit_signal.clone(); + async move { accepter(exit_signal, files, signaling_router).await } }); select! { - _done = ctrl_c => (), + _exit = exit_signal.wait() => (), done = client.sync(sync_settings) => { done?; } - _exit = task => (), } info!("Redacting offer");