Skip to content

Commit

Permalink
use spawn_accept_loop
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Nov 14, 2024
1 parent bf58139 commit 8cfd2f7
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 47 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,4 @@ iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "main" }
iroh-gossip = { git = "https://github.com/n0-computer/iroh-gossip", branch = "main" }
quic-rpc = { git = "https://github.com/n0-computer/quic-rpc", branch = "main" }
50 changes: 5 additions & 45 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use quic_rpc::{
transport::flume::FlumeConnector,
RpcClient, RpcServer,
};
use tokio::task::JoinSet;
use tokio_util::task::AbortOnDropHandle;
use tracing::{error, warn};

use crate::engine::Engine;

Expand All @@ -35,13 +33,12 @@ impl<D: iroh_blobs::store::Store> Engine<D> {

/// Handle a docs request from the RPC server.
pub async fn handle_rpc_request<C: ChannelTypes<RpcService>>(
&self,
self,
msg: crate::rpc::proto::Request,
chan: RpcChannel<RpcService, C>,
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
use crate::rpc::proto::Request::*;

let this = self.clone();
let this = self;
match msg {
Open(msg) => chan.rpc(msg, this, Self::doc_open).await,
Close(msg) => chan.rpc(msg, this, Self::doc_close).await,
Expand Down Expand Up @@ -101,45 +98,8 @@ impl RpcHandler {
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = crate::rpc::client::docs::Client::new(RpcClient::new(connector));
let task = tokio::spawn(async move {
let mut tasks = JoinSet::new();
loop {
tokio::select! {
Some(res) = tasks.join_next(), if !tasks.is_empty() => {
if let Err(e) = res {
if e.is_panic() {
error!("Panic handling RPC request: {e}");
}
}
}
req = listener.accept() => {
let req = match req {
Ok(req) => req,
Err(e) => {
warn!("Error accepting RPC request: {e}");
continue;
}
};
let engine = engine.clone();
tasks.spawn(async move {
let (req, client) = match req.read_first().await {
Ok((req, client)) => (req, client),
Err(e) => {
warn!("Error reading first message: {e}");
return;
}
};
if let Err(cause) = engine.handle_rpc_request(req, client).await {
warn!("Error handling RPC request: {:?}", cause);
}
});
}
}
}
});
Self {
client,
_handler: AbortOnDropHandle::new(task),
}
let _handler = listener
.spawn_accept_loop(move |req, chan| engine.clone().handle_rpc_request(req, chan));
Self { client, _handler }
}
}

0 comments on commit 8cfd2f7

Please sign in to comment.