Skip to content

Commit

Permalink
offer: support concurrent requests; don't exit after serving one
Browse files Browse the repository at this point in the history
  • Loading branch information
eras committed Jan 3, 2023
1 parent 0555a53 commit 281dd64
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions src/offer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
config, matrix_common, matrix_signaling::MatrixSignalingRouter, protocol,
signaling::SignalingRouter, transport,
config, level_event::LevelEvent, matrix_common, matrix_signaling::MatrixSignalingRouter,
protocol, signaling::SignalingRouter, transport,
};
use futures::{AsyncReadExt, AsyncWriteExt};
use futures::{future::BoxFuture, AsyncReadExt, AsyncWriteExt};
use matrix_sdk::config::SyncSettings;
use matrix_sdk::Client;
use std::fs::{self, File};
Expand Down Expand Up @@ -64,18 +64,36 @@ pub async fn transfer(files: Vec<PathBuf>, mut cn: transport::DataStream) -> Res
Ok(())
}

// https://github.com/rust-lang/rust/issues/78649#issuecomment-1264353351
pub fn accepter_recurse(
exit_signal: LevelEvent,
files: Vec<PathBuf>,
signaling_router: MatrixSignalingRouter,
) -> BoxFuture<'static, ()> {
Box::pin(async move {
accepter(exit_signal, files, signaling_router)
.await
.unwrap()
}) as BoxFuture<()>
}

pub async fn accepter(
exit_signal: LevelEvent,
files: Vec<PathBuf>,
mut signaling_router: MatrixSignalingRouter,
) -> Result<(), Error> {
info!("Waiting for new signaling peer");
let signaling = signaling_router.accept().await.unwrap();
tokio::spawn({
let exit_signal = exit_signal.clone();
let files = files.clone();
async move { accepter_recurse(exit_signal, files, signaling_router).await }
});
let mut transport = transport::Transport::new(signaling)?;

debug!("Accepting!");
let cn = transport.accept().await?;
transfer(files, cn).await?;
debug!("Accepted!");

// debug!("Received ack, stopping");
// transport.stop().await?;
// info!("Transfer stopped");
Expand Down Expand Up @@ -143,7 +161,16 @@ pub async fn offer(config: config::Config, room: &str, files: Vec<&str>) -> Resu
"Offer for {} started; press ctrl-c to redact",
uri.matrix_uri_string()
);
let ctrl_c = tokio::signal::ctrl_c();
let exit_signal = LevelEvent::new();
tokio::spawn({
let exit_signal = exit_signal.clone();
async move {
tokio::signal::ctrl_c()
.await
.expect("Failed to listen for ctrl-c");
exit_signal.issue().await;
}
});

let sync_settings = SyncSettings::default()
.filter(filter)
Expand All @@ -153,20 +180,20 @@ pub async fn offer(config: config::Config, room: &str, files: Vec<&str>) -> Resu
let signaling_router =
MatrixSignalingRouter::new(client.clone(), device_id, event_id.clone()).await;

let task = tokio::spawn({
tokio::spawn({
let files: Vec<PathBuf> = files
.into_iter()
.map(|file| Path::new(file).to_path_buf())
.collect();
async move { accepter(files, signaling_router).await }
let exit_signal = exit_signal.clone();
async move { accepter(exit_signal, files, signaling_router).await }
});

select! {
_done = ctrl_c => (),
_exit = exit_signal.wait() => (),
done = client.sync(sync_settings) => {
done?;
}
_exit = task => (),
}

info!("Redacting offer");
Expand Down

0 comments on commit 281dd64

Please sign in to comment.