Skip to content

Commit

Permalink
refactor(mpz-common): new thread future (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
sinui0 committed Jun 25, 2024
1 parent 5db60e5 commit 2196234
Showing 1 changed file with 40 additions and 18 deletions.
58 changes: 40 additions & 18 deletions crates/mpz-common/src/executor/mt.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::pin::Pin;

use async_trait::async_trait;
use futures::{stream::FuturesOrdered, StreamExt};
use futures::{stream::FuturesOrdered, Future, StreamExt};
use scoped_futures::ScopedBoxFuture;
use serio::IoDuplex;
use uid_mux::FramedUidMux;
Expand All @@ -22,7 +24,7 @@ pub struct MTExecutor<M> {

impl<M> MTExecutor<M>
where
M: FramedUidMux<ThreadId> + Clone,
M: FramedUidMux<ThreadId> + Clone + Send + 'static,
M::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
{
/// Creates a new multi-threaded executor.
Expand All @@ -39,29 +41,49 @@ where
}
}

/// Creates a new thread.
pub async fn new_thread(
&mut self,
) -> Result<MTContext<M, <M as FramedUidMux<ThreadId>>::Framed>, ContextError> {
/// Returns a future that yields a new thread context.
pub fn new_thread(&mut self) -> NewThread<M, <M as FramedUidMux<ThreadId>>::Framed> {
let id = self.id.increment_in_place().ok_or_else(|| {
ContextError::new(
ErrorKind::Thread,
"exceeded maximum number of threads (255)",
)
})?;
});

let io = self
.mux
.open_framed(&id)
.await
.map_err(|e| ContextError::new(ErrorKind::Mux, e))?;
let mux = self.mux.clone();
let concurrency = self.max_concurrency;

Ok(MTContext::new(
id,
self.mux.clone(),
io,
self.max_concurrency,
))
NewThread {
fut: Box::pin(async move {
let id = id?;
let io = mux
.open_framed(&id)
.await
.map_err(|e| ContextError::new(ErrorKind::Mux, e))?;

Ok(MTContext::new(id, mux, io, concurrency))
}),
}
}
}

pin_project_lite::pin_project! {
/// A future that yields a new thread context.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct NewThread<M, Io> {
#[pin]
fut: Pin<Box<dyn Future<Output = Result<MTContext<M, Io>, ContextError>> + Send>>,
}
}

impl<M, Io> Future for NewThread<M, Io> {
type Output = Result<MTContext<M, Io>, ContextError>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.project().fut.poll(cx)
}
}

Expand Down

0 comments on commit 2196234

Please sign in to comment.