Skip to content

Commit

Permalink
Add docs to dispatcher.mli
Browse files Browse the repository at this point in the history
  • Loading branch information
SGrondin committed Jun 17, 2023
1 parent b092a58 commit b13e7d5
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/cli/strings.ml
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ let main env options = function
let counts = { vue = ref 0; pug = ref 0; html = ref 0; js = ref 0; ts = ref 0 } in
Switch.run (fun sw ->
let dispatcher =
Utils.Dispatcher.create ~sw ~num_workers:Utils.Io.num_processors
~worker_limit:Utils.Io.processor_async env#domain_mgr
Utils.Dispatcher.create ~sw ~num_domains:Utils.Io.num_processors
~domain_concurrency:Utils.Io.processor_async env#domain_mgr
in
options.targets
|> Fiber.List.iter (fun directory ->
Expand Down
14 changes: 7 additions & 7 deletions src/utils/dispatcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type action =

type t = {
stream: action Eio.Stream.t;
num_workers: int;
num_domains: int;
terminating: action Promise.t * action Promise.u;
terminated: unit Promise.t * unit Promise.u;
}
Expand Down Expand Up @@ -62,14 +62,14 @@ let start_domain ~sw ~domain_mgr ~limit ~terminating ~transient stream =
go ();
`Stop_daemon )

let create ~sw ~num_workers ~worker_limit ?(capacity = 0) ?(transient = true) domain_mgr =
let create ~sw ~num_domains ~domain_concurrency ?(capacity = 0) ?(transient = true) domain_mgr =
let stream = Eio.Stream.create capacity in
let instance =
{ stream; num_workers; terminating = Promise.create (); terminated = Promise.create () }
{ stream; num_domains; terminating = Promise.create (); terminated = Promise.create () }
in
let terminating = fst instance.terminating in
for _ = 1 to num_workers do
start_domain ~sw ~domain_mgr ~limit:worker_limit ~terminating ~transient stream
for _ = 1 to num_domains do
start_domain ~sw ~domain_mgr ~limit:domain_concurrency ~terminating ~transient stream
done;
instance

Expand All @@ -85,8 +85,8 @@ let run_exn instance ~f =

let terminate = function
| { terminating = p1, _; terminated = p2, _; _ } when Promise.is_resolved p1 -> Promise.await p2
| { num_workers; terminating = _, w1; terminated = p2, w2; _ } ->
Promise.resolve w1 (Quit { atomic = Atomic.make 1; target = num_workers; all_done = w2 });
| { num_domains; terminating = _, w1; terminated = p2, w2; _ } ->
Promise.resolve w1 (Quit { atomic = Atomic.make 1; target = num_domains; all_done = w2 });
Promise.await p2

let is_terminating { terminating = p, _; _ } = Promise.is_resolved p
Expand Down
16 changes: 14 additions & 2 deletions src/utils/dispatcher.mli
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,33 @@ open! Core

type t

(** Creates a new workpool with [num_domains].
[domain_concurrency] is the maximum number of jobs that each domain can run at a time.
[capacity] (default: 0) is identical to the [Eio.Stream.create] capacity parameter.
[transient] (default: true). When true, the workpool will not block the Switch from completing.
When false, [terminate] must be called to resolve the [sw] Switch. *)
val create :
sw:Eio.Switch.t ->
num_workers:int ->
worker_limit:int ->
num_domains:int ->
domain_concurrency:int ->
?capacity:int ->
?transient:bool ->
#Eio.Domain_manager.t ->
t

(** Run a job on this workpool. It is placed at the end of the queue. *)
val run : t -> f:(unit -> 'a) -> ('a, exn) result

val run_exn : t -> f:(unit -> 'a) -> 'a

(** Waits for all running jobs to complete, then returns.
No new jobs are started, even if they were already enqueued.
To abort all running jobs instead of waiting for them, call [Switch.fail] on the Switch used to create this workpool *)
val terminate : t -> unit

(** Returns true if the [terminate] function has been called on this workpool.
Also returns true if the workpool has fully terminated. *)
val is_terminating : t -> bool

(** Returns true if the the workpool has fully terminated (all running jobs have finished). *)
val is_terminated : t -> bool
127 changes: 127 additions & 0 deletions src/utils/pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
open Eio

type 'a check = 'a -> bool

type 'a dispose = 'a -> unit

type 'a handlers = {
check: 'a check;
dispose: 'a dispose;
}

let noop_handlers : 'a handlers = { check = (fun _ -> true); dispose = (fun _ -> ()) }

type 'a ready = 'a * 'a handlers

type 'a alloc = unit -> 'a ready

type 'a t = {
max_size: int;
alloc_budget: Semaphore.t;
alloc: 'a alloc;
waiting: 'a ready Promise.u option Stream.t;
ready: 'a ready Stream.t;
(* Each runner is given a copy of the clear signal
to be checked after its run
The copy to use is replaced after each invocation
of [clear], so we can distinguish between
clearing of different batch of runs, e.g.
1. use (clear signal version 0)
2. use (clear signal version 0)
3. clear
4. use (clear signal version 1)
clear at 3 should apply to use at 1 and 2, but not use at 4
*)
mutable clear_signal: bool Atomic.t;
shutdown: bool Atomic.t;
}

let start_monitor ~sw (t : 'a t) : unit =
let rec aux () =
match Stream.take t.waiting with
| None -> ()
| Some resolver -> (
Semaphore.acquire t.alloc_budget;
let exn =
match Stream.take_nonblocking t.ready with
| None -> (
match t.alloc () with
| x ->
Promise.resolve resolver x;
None
| exception exn -> Some exn )
| Some x ->
Promise.resolve resolver x;
None
in
match exn with
| Some exn -> Switch.fail sw exn
| None -> aux () )
in
Fiber.fork ~sw aux

let create ~sw ~(alloc : 'a alloc) max_size : 'a t =
if max_size <= 0 then invalid_arg "Pool.create: max_size is <= 0";
let t =
{
max_size;
alloc_budget = Semaphore.make max_size;
alloc;
waiting = Stream.create max_size;
ready = Stream.create max_size;
clear_signal = Atomic.make false;
shutdown = Atomic.make false;
}
in
start_monitor ~sw t;
t

let async ~sw (t : 'a t) (f : 'a -> unit) : unit =
if Atomic.get t.shutdown then invalid_arg "Pool.async: Pool already shutdown";
let (promise, resolver) : 'a ready Promise.t * 'a ready Promise.u = Promise.create () in
(* Obtain a copy of clear signal for this runner *)
let clear_signal = t.clear_signal in
Fiber.fork ~sw (fun () ->
Stream.add t.waiting (Some resolver);
let elem, handlers = Promise.await promise in
let exn =
match f elem with
| () -> None
| exception exn -> Some exn
in
let do_not_clear = not (Atomic.get clear_signal) in
let ready_has_space = Stream.length t.ready < t.max_size in
if do_not_clear && handlers.check elem && ready_has_space
then Stream.add t.ready (elem, handlers)
else handlers.dispose elem;
Semaphore.release t.alloc_budget;
match exn with
| None -> ()
| Some exn -> Switch.fail sw exn )

let async_promise ~sw (t : 'a t) (f : 'a -> 'b) : 'b Promise.or_exn =
let promise, resolver = Promise.create () in
async ~sw t (fun x ->
match f x with
| res -> Promise.resolve_ok resolver res
| exception exn -> Promise.resolve_error resolver exn );
promise

let use t f =
Switch.run (fun sw ->
match Promise.await (async_promise ~sw t f) with
| Ok x -> x
| Error exn -> raise exn )

let clear (t : 'a t) =
let old_signal = t.clear_signal in
Atomic.set old_signal true;
t.clear_signal <- Atomic.make false

let shutdown (t : 'a t) =
Atomic.set t.clear_signal true;
Atomic.set t.shutdown true;
Stream.add t.waiting None
57 changes: 57 additions & 0 deletions src/utils/workpool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
open Eio

type 'a task = unit -> 'a

type runner = unit task -> unit

type t = {
sw: Switch.t;
runners: runner Pool.t;
}

let create ~sw ~max_domains domain_mgr : t =
let alloc () : runner * runner Pool.handlers =
let s : (unit task * unit Promise.u) option Stream.t = Stream.create 0 in
let is_okay = Atomic.make true in
Fiber.fork ~sw (fun () ->
Domain_manager.run domain_mgr (fun () ->
let rec aux () =
match Stream.take s with
| None -> ()
| Some (f, r) -> (
match f () with
| () ->
Promise.resolve r ();
aux ()
| exception exn ->
Atomic.set is_okay false;
raise exn )
in
aux () ) );
let runner (f : unit task) : unit =
let promise, resolver = Promise.create () in
Stream.add s (Some (f, resolver));
Promise.await promise
in
let check _ = Atomic.get is_okay in
let dispose _ = Stream.add s None in
runner, { check; dispose }
in
{ sw; runners : runner Pool.t = Pool.create ~sw ~alloc max_domains }

let async (t : t) (f : unit task) = Pool.async ~sw:t.sw t.runners (fun runner -> runner f)

let async_promise (t : t) (f : 'a task) : 'a Promise.or_exn =
let promise, resolver = Promise.create () in
async t (fun () ->
match f () with
| x -> Promise.resolve_ok resolver x
| exception exn -> Promise.resolve_error resolver exn );
promise

let run t (f : 'a task) : 'a =
match Promise.await (async_promise t f) with
| Ok x -> x
| Error exn -> raise exn

let shutdown t = Pool.shutdown t.runners

0 comments on commit b13e7d5

Please sign in to comment.