Skip to content

Commit

Permalink
feat(worker): wrap comlink with a worker init method
Browse files Browse the repository at this point in the history
  • Loading branch information
vicary committed Feb 22, 2024
1 parent 90ccee4 commit 964ee7a
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 44 deletions.
21 changes: 15 additions & 6 deletions Executable.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import { type Promisable } from "./deps.ts";
import { comlink, type Promisable } from "./deps.ts";

/**
* The Runner interface.
*/
export interface Executable<
TPayload = unknown,
TResult = unknown,
TError extends Error = Error
> {
export interface Executable<TPayload, TResult, TError extends Error = Error> {
execute: (payload: TPayload) => Promisable<TResult>;

onSuccess?: (result: TResult) => Promisable<void>;
Expand All @@ -26,3 +22,16 @@ export interface Executable<
*/
dispose?: () => Promisable<void>;
}

export const initializeWorker = <
T extends // deno-lint-ignore no-explicit-any
Executable<any, any>,
>(
callbacks: T,
) => {
if (!(self instanceof WorkerGlobalScope)) {
throw new Error("This module is only intended to be used in a worker.");
}

comlink.expose(callbacks);
};
5 changes: 2 additions & 3 deletions ExecutableWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ import { type Executable } from "./Executable.ts";
export class ExecutableWorker<
TPayload = unknown,
TResult = unknown,
TError extends Error = Error
> implements Executable<TPayload, TResult, TError>
{
TError extends Error = Error,
> implements Executable<TPayload, TResult, TError> {
#worker: Worker;
#linked: Remote<Executable<TPayload, TResult>>;

Expand Down
49 changes: 29 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
# Workerpool

An unopinionated small scale worker pool abstraction which serves as a base interface for more advanced worker managers.
An unopinionated small scale worker pool abstraction which serves as a base
interface for more advanced worker managers.

## Terminology

1. **Workerpool**

A manager that creates workers on the fly, executing tasks up to defined concurrency.
A manager that creates workers on the fly, executing tasks up to defined
concurrency.

2. **Runner**

Runners are internal wrappers for user provided runner classes, they maintain internal states such as active/busy and the retry counter.
Runners are internal wrappers for user provided runner classes, they maintain
internal states such as active/busy and the retry counter.

3. **Executable**

User implementation of task executors, where they all implements the `Executable` interface.
User implementation of task executors, where they all implements the
`Executable` interface.

Also called _workers_ for the sake of naming convension, but the usage kept minimal to avoid confusion with Web Workers.
Also called _workers_ for the sake of naming convension, but the usage kept
minimal to avoid confusion with Web Workers.

4. **Task**

Expand Down Expand Up @@ -94,15 +99,26 @@ const pool = new Workerpool<Payload>({

### Web Workers

Deno has built-in support for workers, our `ExecutableWorker` class serves as a simple proxy class via `comlink`.
Deno has built-in support for workers, our `ExecutableWorker` class serves as a
simple proxy class via `comlink`.

You'll need a separated script file for the worker.
```ts
import { ExecutableWorker } from "https://deno.land/x/workerpool/mod.ts";

class MyRunner extends ExecutableWorker<string, void> {
constructor() {
super(new URL("./worker.ts", import.meta.url).href);
}
}
```

You'll also need a separated script file for the worker itself.

```ts
// worker.ts
import { expose } from "https://deno.land/x/comlink/mod.ts";
import { initializeWorker } from "https://deno.land/x/workerpool/mod.ts";

expose({
initializeWorker({
execute: async (payload: string) => {
// Simulate async actions
await new Promise((resolve) => setTimeout(resolve, 1000));
Expand All @@ -112,18 +128,10 @@ expose({
});
```

Now register the runners into the workerpool:
Now register the runner into the workerpool:

```ts
import { ExecutableWorker } from "https://deno.land/x/workerpool/mod.ts";

class MyRunner extends ExecutableWorker<string, void> {
constructor() {
super(new URL("./worker.ts", import.meta.url).href);
}
}

const pool = new Workerpool({
const pool = new Workerpool<string, void>({
concurrency: 1,
workers: [MyRunner],
});
Expand All @@ -136,4 +144,5 @@ pool

## Sponsorship

If you appreciate my work, or want to see specific features to happen, [a coffee would do](https://www.github.com/sponsors/vicary).
If you appreciate my work, or want to see specific features to happen,
[a coffee would do](https://www.github.com/sponsors/vicary).
6 changes: 3 additions & 3 deletions Runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export class RunnerExecutionError extends Error {
constructor(
message: string,
readonly name: string,
readonly retryable = false
readonly retryable = false,
) {
super(message);
}
Expand All @@ -22,7 +22,7 @@ export class Runner<TPayload = unknown, TResult = unknown> {

constructor(
readonly runner: Executable<TPayload, TResult>,
readonly name: string
readonly name: string,
) {}

get busy() {
Expand Down Expand Up @@ -68,7 +68,7 @@ export class Runner<TPayload = unknown, TResult = unknown> {
throw new RunnerExecutionError(
error.message,
error.name,
retryable ?? true
retryable ?? true,
);
} finally {
this.#executionnCount++;
Expand Down
2 changes: 1 addition & 1 deletion Workerpool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Executable } from "./Executable.ts";
import { ExecutableWorker } from "./ExecutableWorker.ts";
import { Task } from "./Task.ts";
import { Workerpool } from "./Workerpool.ts";
import { comlink, type Class, type SetOptional } from "./deps.ts";
import { type Class, comlink, type SetOptional } from "./deps.ts";

export type ArrowFunction = (...args: unknown[]) => unknown;
type MemoryMutexTask<TPayload> = Task<TPayload> & { active?: boolean };
Expand Down
12 changes: 6 additions & 6 deletions Workerpool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ export type WorkerpoolOptions<TPayload = JsonValue, TResult = unknown> = {
(
error: Error,
result: null,
context: CallbackContext<TPayload, TResult>
context: CallbackContext<TPayload, TResult>,
): Promisable<void>;
(
error: null,
result: TResult,
context: CallbackContext<TPayload, TResult>
context: CallbackContext<TPayload, TResult>,
): Promisable<void>;
};

Expand Down Expand Up @@ -241,7 +241,7 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {
} else {
throw error;
}
}
},
)
.finally(() => {
if (runner.executionCount >= this.#maximumTaskPerRunner) {
Expand Down Expand Up @@ -272,7 +272,7 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {
#getRunner(name: string): Runner<TPayload, TResult> | undefined {
const idleRunners = [...this.#runners].filter((runner) => !runner.busy);
const runner = idleRunners.find(
({ name: runnerName }) => runnerName === name
({ name: runnerName }) => runnerName === name,
);
if (runner) {
return runner;
Expand All @@ -286,7 +286,7 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {

const runnerInstance = new Runner<TPayload, TResult>(
new executableClass(),
executableClass.name
executableClass.name,
);

this.#runners.add(runnerInstance);
Expand All @@ -295,7 +295,7 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {
} else {
// Discard idle runners of other types, if available.
const idleRunner = idleRunners.find(
({ name: runnerName }) => runnerName !== name
({ name: runnerName }) => runnerName !== name,
);

if (idleRunner) {
Expand Down
2 changes: 1 addition & 1 deletion __test__/example-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { comlink } from "../deps.ts";
import { type Executable } from "../Executable.ts";
import { type ArrowFunction } from "../Workerpool.test.ts";

const exposedObject: Executable<ArrowFunction> = {
const exposedObject: Executable<ArrowFunction, unknown> = {
async execute(payload) {
// Mimic async action.
await new Promise((resolve) => setTimeout(resolve, 100));
Expand Down
7 changes: 3 additions & 4 deletions mod.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
export type { Executable } from "./Executable.ts";
export { type Executable, initializeWorker } from "./Executable.ts";
export { ExecutableWorker } from "./ExecutableWorker.ts";
export { Runner, RunnerExecutionError } from "./Runner.ts";
export type { Task } from "./Task.ts";
export { Workerpool } from "./Workerpool.ts";
export type { WorkerpoolOptions } from "./Workerpool.ts";
export { type Task } from "./Task.ts";
export { Workerpool, type WorkerpoolOptions } from "./Workerpool.ts";

0 comments on commit 964ee7a

Please sign in to comment.