diff --git a/Executable.ts b/Executable.ts index 93c4ce2..b1205ef 100644 --- a/Executable.ts +++ b/Executable.ts @@ -1,13 +1,9 @@ -import { type Promisable } from "./deps.ts"; +import { comlink, type Promisable } from "./deps.ts"; /** * The Runner interface. */ -export interface Executable< - TPayload = unknown, - TResult = unknown, - TError extends Error = Error -> { +export interface Executable { execute: (payload: TPayload) => Promisable; onSuccess?: (result: TResult) => Promisable; @@ -26,3 +22,16 @@ export interface Executable< */ dispose?: () => Promisable; } + +export const initializeWorker = < + T extends // deno-lint-ignore no-explicit-any + Executable, +>( + callbacks: T, +) => { + if (!(self instanceof WorkerGlobalScope)) { + throw new Error("This module is only intended to be used in a worker."); + } + + comlink.expose(callbacks); +}; diff --git a/ExecutableWorker.ts b/ExecutableWorker.ts index 2376354..1927354 100644 --- a/ExecutableWorker.ts +++ b/ExecutableWorker.ts @@ -16,9 +16,8 @@ import { type Executable } from "./Executable.ts"; export class ExecutableWorker< TPayload = unknown, TResult = unknown, - TError extends Error = Error -> implements Executable -{ + TError extends Error = Error, +> implements Executable { #worker: Worker; #linked: Remote>; diff --git a/README.md b/README.md index d822561..741c14a 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,27 @@ # Workerpool -An unopinionated small scale worker pool abstraction which serves as a base interface for more advanced worker managers. +An unopinionated small scale worker pool abstraction which serves as a base +interface for more advanced worker managers. ## Terminology 1. **Workerpool** - A manager that creates workers on the fly, executing tasks up to defined concurrency. + A manager that creates workers on the fly, executing tasks up to defined + concurrency. 2. **Runner** - Runners are internal wrappers for user provided runner classes, they maintain internal states such as active/busy and the retry counter. + Runners are internal wrappers for user provided runner classes, they maintain + internal states such as active/busy and the retry counter. 3. **Executable** - User implementation of task executors, where they all implements the `Executable` interface. + User implementation of task executors, where they all implements the + `Executable` interface. - Also called _workers_ for the sake of naming convension, but the usage kept minimal to avoid confusion with Web Workers. + Also called _workers_ for the sake of naming convension, but the usage kept + minimal to avoid confusion with Web Workers. 4. **Task** @@ -94,15 +99,26 @@ const pool = new Workerpool({ ### Web Workers -Deno has built-in support for workers, our `ExecutableWorker` class serves as a simple proxy class via `comlink`. +Deno has built-in support for workers, our `ExecutableWorker` class serves as a +simple proxy class via `comlink`. -You'll need a separated script file for the worker. +```ts +import { ExecutableWorker } from "https://deno.land/x/workerpool/mod.ts"; + +class MyRunner extends ExecutableWorker { + constructor() { + super(new URL("./worker.ts", import.meta.url).href); + } +} +``` + +You'll also need a separated script file for the worker itself. ```ts // worker.ts -import { expose } from "https://deno.land/x/comlink/mod.ts"; +import { initializeWorker } from "https://deno.land/x/workerpool/mod.ts"; -expose({ +initializeWorker({ execute: async (payload: string) => { // Simulate async actions await new Promise((resolve) => setTimeout(resolve, 1000)); @@ -112,18 +128,10 @@ expose({ }); ``` -Now register the runners into the workerpool: +Now register the runner into the workerpool: ```ts -import { ExecutableWorker } from "https://deno.land/x/workerpool/mod.ts"; - -class MyRunner extends ExecutableWorker { - constructor() { - super(new URL("./worker.ts", import.meta.url).href); - } -} - -const pool = new Workerpool({ +const pool = new Workerpool({ concurrency: 1, workers: [MyRunner], }); @@ -136,4 +144,5 @@ pool ## Sponsorship -If you appreciate my work, or want to see specific features to happen, [a coffee would do](https://www.github.com/sponsors/vicary). +If you appreciate my work, or want to see specific features to happen, +[a coffee would do](https://www.github.com/sponsors/vicary). diff --git a/Runner.ts b/Runner.ts index e6bded7..134987f 100644 --- a/Runner.ts +++ b/Runner.ts @@ -4,7 +4,7 @@ export class RunnerExecutionError extends Error { constructor( message: string, readonly name: string, - readonly retryable = false + readonly retryable = false, ) { super(message); } @@ -22,7 +22,7 @@ export class Runner { constructor( readonly runner: Executable, - readonly name: string + readonly name: string, ) {} get busy() { @@ -68,7 +68,7 @@ export class Runner { throw new RunnerExecutionError( error.message, error.name, - retryable ?? true + retryable ?? true, ); } finally { this.#executionnCount++; diff --git a/Workerpool.test.ts b/Workerpool.test.ts index 6d919b2..f5108a5 100644 --- a/Workerpool.test.ts +++ b/Workerpool.test.ts @@ -8,7 +8,7 @@ import { Executable } from "./Executable.ts"; import { ExecutableWorker } from "./ExecutableWorker.ts"; import { Task } from "./Task.ts"; import { Workerpool } from "./Workerpool.ts"; -import { comlink, type Class, type SetOptional } from "./deps.ts"; +import { type Class, comlink, type SetOptional } from "./deps.ts"; export type ArrowFunction = (...args: unknown[]) => unknown; type MemoryMutexTask = Task & { active?: boolean }; diff --git a/Workerpool.ts b/Workerpool.ts index e3c998b..b22359c 100644 --- a/Workerpool.ts +++ b/Workerpool.ts @@ -57,12 +57,12 @@ export type WorkerpoolOptions = { ( error: Error, result: null, - context: CallbackContext + context: CallbackContext, ): Promisable; ( error: null, result: TResult, - context: CallbackContext + context: CallbackContext, ): Promisable; }; @@ -241,7 +241,7 @@ export class Workerpool { } else { throw error; } - } + }, ) .finally(() => { if (runner.executionCount >= this.#maximumTaskPerRunner) { @@ -272,7 +272,7 @@ export class Workerpool { #getRunner(name: string): Runner | undefined { const idleRunners = [...this.#runners].filter((runner) => !runner.busy); const runner = idleRunners.find( - ({ name: runnerName }) => runnerName === name + ({ name: runnerName }) => runnerName === name, ); if (runner) { return runner; @@ -286,7 +286,7 @@ export class Workerpool { const runnerInstance = new Runner( new executableClass(), - executableClass.name + executableClass.name, ); this.#runners.add(runnerInstance); @@ -295,7 +295,7 @@ export class Workerpool { } else { // Discard idle runners of other types, if available. const idleRunner = idleRunners.find( - ({ name: runnerName }) => runnerName !== name + ({ name: runnerName }) => runnerName !== name, ); if (idleRunner) { diff --git a/__test__/example-worker.ts b/__test__/example-worker.ts index 9ae8b55..876f10b 100644 --- a/__test__/example-worker.ts +++ b/__test__/example-worker.ts @@ -5,7 +5,7 @@ import { comlink } from "../deps.ts"; import { type Executable } from "../Executable.ts"; import { type ArrowFunction } from "../Workerpool.test.ts"; -const exposedObject: Executable = { +const exposedObject: Executable = { async execute(payload) { // Mimic async action. await new Promise((resolve) => setTimeout(resolve, 100)); diff --git a/mod.ts b/mod.ts index e00f48d..34382dc 100644 --- a/mod.ts +++ b/mod.ts @@ -1,6 +1,5 @@ -export type { Executable } from "./Executable.ts"; +export { type Executable, initializeWorker } from "./Executable.ts"; export { ExecutableWorker } from "./ExecutableWorker.ts"; export { Runner, RunnerExecutionError } from "./Runner.ts"; -export type { Task } from "./Task.ts"; -export { Workerpool } from "./Workerpool.ts"; -export type { WorkerpoolOptions } from "./Workerpool.ts"; +export { type Task } from "./Task.ts"; +export { Workerpool, type WorkerpoolOptions } from "./Workerpool.ts";