Skip to content

Commit

Permalink
fix(core): Fix stream events bug when errors are thrown too quickly d…
Browse files Browse the repository at this point in the history
…uring iteration (#7617)
  • Loading branch information
jacoblee93 authored Jan 29, 2025
1 parent c1cda51 commit 535c9d5
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
32 changes: 31 additions & 1 deletion langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -914,19 +914,48 @@ export abstract class Runnable<
// eslint-disable-next-line no-param-reassign
config.callbacks = copiedCallbacks;
}
const abortController = new AbortController();
// Call the runnable in streaming mode,
// add each chunk to the output stream
const outerThis = this;
async function consumeRunnableStream() {
try {
const runnableStream = await outerThis.stream(input, config);
let signal;
if (options?.signal) {
if ("any" in AbortSignal) {
// Use native AbortSignal.any() if available (Node 19+)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
signal = (AbortSignal as any).any([
abortController.signal,
options.signal,
]);
} else {
// Fallback for Node 18 and below - just use the provided signal
signal = options.signal;
// Ensure we still abort our controller when the parent signal aborts
options.signal.addEventListener(
"abort",
() => {
abortController.abort();
},
{ once: true }
);
}
} else {
signal = abortController.signal;
}
const runnableStream = await outerThis.stream(input, {
...config,
signal,
});
const tappedStream = eventStreamer.tapOutputIterable(
runId,
runnableStream
);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of tappedStream) {
// Just iterate so that the callback handler picks up events
if (abortController.signal.aborted) break;
}
} finally {
await eventStreamer.finish();
Expand Down Expand Up @@ -959,6 +988,7 @@ export abstract class Runnable<
yield event;
}
} finally {
abortController.abort();
await runnableStreamConsumePromise;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2263,3 +2263,23 @@ test("Runnable streamEvents method should respect passed signal", async () => {
}
}).rejects.toThrowError();
});

test("streamEvents method handles errors", async () => {
let caughtError: unknown;
const model = new FakeListChatModel({
responses: ["abc"],
});

try {
// eslint-disable-next-line no-unreachable-loop
for await (const _ of model.streamEvents("Hello! Tell me about yourself.", {
version: "v2",
})) {
throw new Error("should catch this error");
}
} catch (e) {
caughtError = e;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
expect((caughtError as any)?.message).toEqual("should catch this error");
});

0 comments on commit 535c9d5

Please sign in to comment.