From 535c9d58ce18790fe158d2f8be3ab91222a7649e Mon Sep 17 00:00:00 2001 From: Jacob Lee Date: Wed, 29 Jan 2025 08:21:03 -0800 Subject: [PATCH] fix(core): Fix stream events bug when errors are thrown too quickly during iteration (#7617) --- langchain-core/src/runnables/base.ts | 32 ++++++++++++++++++- .../tests/runnable_stream_events_v2.test.ts | 20 ++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index 09c8c7162ee9..70c57faef041 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -914,12 +914,40 @@ export abstract class Runnable< // eslint-disable-next-line no-param-reassign config.callbacks = copiedCallbacks; } + const abortController = new AbortController(); // Call the runnable in streaming mode, // add each chunk to the output stream const outerThis = this; async function consumeRunnableStream() { try { - const runnableStream = await outerThis.stream(input, config); + let signal; + if (options?.signal) { + if ("any" in AbortSignal) { + // Use native AbortSignal.any() if available (Node 19+) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + signal = (AbortSignal as any).any([ + abortController.signal, + options.signal, + ]); + } else { + // Fallback for Node 18 and below - just use the provided signal + signal = options.signal; + // Ensure we still abort our controller when the parent signal aborts + options.signal.addEventListener( + "abort", + () => { + abortController.abort(); + }, + { once: true } + ); + } + } else { + signal = abortController.signal; + } + const runnableStream = await outerThis.stream(input, { + ...config, + signal, + }); const tappedStream = eventStreamer.tapOutputIterable( runId, runnableStream @@ -927,6 +955,7 @@ export abstract class Runnable< // eslint-disable-next-line @typescript-eslint/no-unused-vars for await (const _ of tappedStream) { // Just iterate so that the callback handler picks up events + if (abortController.signal.aborted) break; } } finally { await eventStreamer.finish(); @@ -959,6 +988,7 @@ export abstract class Runnable< yield event; } } finally { + abortController.abort(); await runnableStreamConsumePromise; } } diff --git a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts index 10c0e035c955..39537d16a6df 100644 --- a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts +++ b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts @@ -2263,3 +2263,23 @@ test("Runnable streamEvents method should respect passed signal", async () => { } }).rejects.toThrowError(); }); + +test("streamEvents method handles errors", async () => { + let caughtError: unknown; + const model = new FakeListChatModel({ + responses: ["abc"], + }); + + try { + // eslint-disable-next-line no-unreachable-loop + for await (const _ of model.streamEvents("Hello! Tell me about yourself.", { + version: "v2", + })) { + throw new Error("should catch this error"); + } + } catch (e) { + caughtError = e; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((caughtError as any)?.message).toEqual("should catch this error"); +});