Skip to content

Commit

Permalink
fix(ext/node): do not exit worker thread when there is pending async …
Browse files Browse the repository at this point in the history
…op (#27378)

This change fixes the premature exit of worker threads when there are still
remaining pending ops.

This change reuses the idea of #22647 (unref'ing `op_worker_recv_message` in
worker threads if closeOnIdle specified) and uses
`web_worker.has_message_event_listener` check in the opposite way as
#22944. (Now we continue the worker when `has_message_event_listener` is
true instead of stopping it when `has_message_event_listener` is false.

closes #23061
closes #26154
  • Loading branch information
kt3k authored Dec 19, 2024
1 parent 55d345b commit 350d9dc
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 38 deletions.
2 changes: 2 additions & 0 deletions cli/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ impl CliMainWorkerFactory {
serve_port: shared.options.serve_port,
serve_host: shared.options.serve_host.clone(),
otel_config: shared.otel_config.clone(),
close_on_idle: true,
},
extensions: custom_extensions,
startup_snapshot: crate::js::deno_isolate_init(),
Expand Down Expand Up @@ -812,6 +813,7 @@ fn create_web_worker_callback(
serve_port: shared.options.serve_port,
serve_host: shared.options.serve_host.clone(),
otel_config: shared.otel_config.clone(),
close_on_idle: args.close_on_idle,
},
extensions: vec![],
startup_snapshot: crate::js::deno_isolate_init(),
Expand Down
6 changes: 3 additions & 3 deletions ext/node/polyfills/worker_threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
nodeWorkerThreadCloseCb,
refMessagePort,
serializeJsMessageData,
unrefPollForMessages,
unrefParentPort,
} from "ext:deno_web/13_message_port.js";
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { notImplemented } from "ext:deno_node/_utils.ts";
Expand Down Expand Up @@ -451,10 +451,10 @@ internals.__initWorkerThreads = (
parentPort.emit("close");
});
parentPort.unref = () => {
parentPort[unrefPollForMessages] = true;
parentPort[unrefParentPort] = true;
};
parentPort.ref = () => {
parentPort[unrefPollForMessages] = false;
parentPort[unrefParentPort] = false;
};

if (isWorkerThread) {
Expand Down
4 changes: 2 additions & 2 deletions ext/web/13_message_port.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
export const refMessagePort = Symbol("refMessagePort");
/** It is used by 99_main.js and worker_threads to
* unref/ref on the global pollForMessages promise. */
export const unrefPollForMessages = Symbol("unrefPollForMessages");
* unref/ref on the global message event handler count. */
export const unrefParentPort = Symbol("unrefParentPort");

/**
* @param {number} id
Expand Down
11 changes: 9 additions & 2 deletions runtime/js/99_main.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ function postMessage(message, transferOrOptions = { __proto__: null }) {

let isClosing = false;
let globalDispatchEvent;
let closeOnIdle;

function hasMessageEventListener() {
// the function name is kind of a misnomer, but we want to behave
// as if we have message event listeners if a node message port is explicitly
// refed (and the inverse as well)
return event.listenerCount(globalThis, "message") > 0 ||
return (event.listenerCount(globalThis, "message") > 0 &&
!globalThis[messagePort.unrefParentPort]) ||
messagePort.refedMessagePortsCount > 0;
}

Expand All @@ -188,7 +190,10 @@ async function pollForMessages() {
}
while (!isClosing) {
const recvMessage = op_worker_recv_message();
if (globalThis[messagePort.unrefPollForMessages] === true) {
// In a Node.js worker, unref() the op promise to prevent it from
// keeping the event loop alive. This avoids the need to explicitly
// call self.close() or worker.terminate().
if (closeOnIdle) {
core.unrefOpPromise(recvMessage);
}
const data = await recvMessage;
Expand Down Expand Up @@ -915,6 +920,7 @@ function bootstrapWorkerRuntime(
6: argv0,
7: nodeDebug,
13: otelConfig,
14: closeOnIdle_,
} = runtimeOptions;

performance.setTimeOrigin();
Expand Down Expand Up @@ -967,6 +973,7 @@ function bootstrapWorkerRuntime(

globalThis.pollForMessages = pollForMessages;
globalThis.hasMessageEventListener = hasMessageEventListener;
closeOnIdle = closeOnIdle_;

for (let i = 0; i <= unstableFeatures.length; i++) {
const id = unstableFeatures[i];
Expand Down
33 changes: 4 additions & 29 deletions runtime/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use std::task::Poll;
use crate::inspector_server::InspectorServer;
use crate::ops;
use crate::ops::process::NpmProcessStateProviderRc;
use crate::ops::worker_host::WorkersTable;
use crate::shared::maybe_transpile_source;
use crate::shared::runtime;
use crate::tokio_util::create_and_run_current_thread;
Expand Down Expand Up @@ -385,7 +384,6 @@ pub struct WebWorker {
pub js_runtime: JsRuntime,
pub name: String,
close_on_idle: bool,
has_executed_main_module: bool,
internal_handle: WebWorkerInternalHandle,
pub worker_type: WebWorkerType,
pub main_module: ModuleSpecifier,
Expand Down Expand Up @@ -658,7 +656,6 @@ impl WebWorker {
has_message_event_listener_fn: None,
bootstrap_fn_global: Some(bootstrap_fn_global),
close_on_idle: options.close_on_idle,
has_executed_main_module: false,
maybe_worker_metadata: options.maybe_worker_metadata,
},
external_handle,
Expand Down Expand Up @@ -799,7 +796,6 @@ impl WebWorker {

maybe_result = &mut receiver => {
debug!("received worker module evaluate {:#?}", maybe_result);
self.has_executed_main_module = true;
maybe_result
}

Expand Down Expand Up @@ -837,6 +833,9 @@ impl WebWorker {
}

if self.close_on_idle {
if self.has_message_event_listener() {
return Poll::Pending;
}
return Poll::Ready(Ok(()));
}

Expand All @@ -851,22 +850,7 @@ impl WebWorker {
Poll::Ready(Ok(()))
}
}
Poll::Pending => {
// This is special code path for workers created from `node:worker_threads`
// module that have different semantics than Web workers.
// We want the worker thread to terminate automatically if we've done executing
// Top-Level await, there are no child workers spawned by that workers
// and there's no "message" event listener.
if self.close_on_idle
&& self.has_executed_main_module
&& !self.has_child_workers()
&& !self.has_message_event_listener()
{
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}

Expand Down Expand Up @@ -904,15 +888,6 @@ impl WebWorker {
None => false,
}
}

fn has_child_workers(&mut self) -> bool {
!self
.js_runtime
.op_state()
.borrow()
.borrow::<WorkersTable>()
.is_empty()
}
}

fn print_worker_error(
Expand Down
5 changes: 5 additions & 0 deletions runtime/worker_bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub struct BootstrapOptions {
pub serve_port: Option<u16>,
pub serve_host: Option<String>,
pub otel_config: OtelConfig,
pub close_on_idle: bool,
}

impl Default for BootstrapOptions {
Expand Down Expand Up @@ -155,6 +156,7 @@ impl Default for BootstrapOptions {
serve_port: Default::default(),
serve_host: Default::default(),
otel_config: Default::default(),
close_on_idle: false,
}
}
}
Expand Down Expand Up @@ -198,6 +200,8 @@ struct BootstrapV8<'a>(
Option<usize>,
// OTEL config
Box<[u8]>,
// close on idle
bool,
);

impl BootstrapOptions {
Expand Down Expand Up @@ -225,6 +229,7 @@ impl BootstrapOptions {
serve_is_main,
serve_worker_count,
self.otel_config.as_v8(),
self.close_on_idle,
);

bootstrap.serialize(ser).unwrap()
Expand Down
3 changes: 1 addition & 2 deletions tests/specs/permission/allow_import_worker/denied.out
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ await import(specifier);
^
at async file:///[WILDLINE]
error: Uncaught (in promise) Error: Unhandled error in child worker.
at [WILDLINE]
at [WILDLINE]
at [WILDCARD]
23 changes: 23 additions & 0 deletions tests/unit_node/worker_threads_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -841,3 +841,26 @@ Deno.test({
assertEquals(result, true);
},
});

Deno.test("[node/worker_threads] Worker runs async ops correctly", async () => {
const recvMessage = Promise.withResolvers<void>();
const timer = setTimeout(() => recvMessage.reject(), 1000);
const worker = new workerThreads.Worker(
`
import { parentPort } from "node:worker_threads";
setTimeout(() => {
parentPort.postMessage("Hello from worker");
}, 10);
`,
{ eval: true },
);

worker.on("message", (msg) => {
assertEquals(msg, "Hello from worker");
worker.terminate();
recvMessage.resolve();
clearTimeout(timer);
});

await recvMessage.promise;
});

0 comments on commit 350d9dc

Please sign in to comment.