From c17a695623fbbf01a48894579dd10bbab1731ca7 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 8 Jan 2025 19:13:16 +0300 Subject: [PATCH] Some improvements on cancellation --- .../execution/__tests__/abort-signal.test.ts | 42 +++++---- .../src/execution/__tests__/stream-test.ts | 10 ++- packages/executor/src/execution/execute.ts | 33 ++++--- .../src/execution/normalizedExecutor.ts | 16 ++-- .../src/execution/promiseForObject.ts | 35 +++++--- .../utils/src/registerAbortSignalListener.ts | 87 +++++++++++++------ 6 files changed, 143 insertions(+), 80 deletions(-) diff --git a/packages/executor/src/execution/__tests__/abort-signal.test.ts b/packages/executor/src/execution/__tests__/abort-signal.test.ts index 4d7c242fa0b..af98c4bc729 100644 --- a/packages/executor/src/execution/__tests__/abort-signal.test.ts +++ b/packages/executor/src/execution/__tests__/abort-signal.test.ts @@ -18,7 +18,7 @@ describe('Abort Signal', () => { expect(spy.mock.calls.length).toBeLessThanOrEqual(1); }); it('should stop the subscription', async () => { - expect.assertions(3); + expect.assertions(4); let stopped = false; const schema = makeExecutableSchema({ typeDefs: /* GraphQL */ ` @@ -63,11 +63,15 @@ describe('Abort Signal', () => { }); assertAsyncIterable(result); const results: any[] = []; - for await (const value of result) { - results.push(value.data?.counter); - if (value.data?.counter === 4) { - controller.abort(); + try { + for await (const value of result) { + results.push(value.data?.counter); + if (value.data?.counter === 4) { + controller.abort(); + } } + } catch (e: any) { + expect(e.name).toBe('AbortError'); } expect(stopped).toBe(true); expect(results).toEqual([0, 1, 2, 3, 4]); @@ -163,18 +167,22 @@ describe('Abort Signal', () => { }, }, }); - const result$ = normalizedExecutor({ - schema, - document: parse(/* GraphQL */ ` - mutation { - first - second - third - } - `), - signal: controller.signal, - }); - expect(result$).rejects.toMatchInlineSnapshot(`DOMException {}`); + try { + await normalizedExecutor({ + schema, + document: parse(/* GraphQL */ ` + mutation { + first + second + third + } + `), + signal: controller.signal, + }); + expect(false).toBe(true); + } catch (e: any) { + expect(e.name).toBe('AbortError'); + } expect(didInvokeFirstFn).toBe(true); expect(didInvokeSecondFn).toBe(true); expect(didInvokeThirdFn).toBe(false); diff --git a/packages/executor/src/execution/__tests__/stream-test.ts b/packages/executor/src/execution/__tests__/stream-test.ts index 183cbf8b05f..c101c1e72fb 100644 --- a/packages/executor/src/execution/__tests__/stream-test.ts +++ b/packages/executor/src/execution/__tests__/stream-test.ts @@ -576,6 +576,9 @@ describe('Execute: stream directive', () => { path: ['friendList', 2], }, ], + hasNext: true, + }, + { hasNext: false, }, ]); @@ -645,11 +648,16 @@ describe('Execute: stream directive', () => { path: ['friendList', 2], }, ], + hasNext: true, + }, + }, + { + done: false, + value: { hasNext: false, }, }, { done: true, value: undefined }, - { done: true, value: undefined }, ]); }); diff --git a/packages/executor/src/execution/execute.ts b/packages/executor/src/execution/execute.ts index 3053d3b7079..2af6572652d 100644 --- a/packages/executor/src/execution/execute.ts +++ b/packages/executor/src/execution/execute.ts @@ -46,6 +46,7 @@ import { isObjectLike, isPromise, mapAsyncIterator, + mapMaybePromise, Maybe, MaybePromise, memoize1, @@ -558,20 +559,18 @@ function executeFieldsSerially( const fieldPath = addPath(path, responseName, parentType.name); exeContext.signal?.throwIfAborted(); - return new ValueOrPromise(() => + return mapMaybePromise( executeField(exeContext, parentType, sourceValue, fieldNodes, fieldPath), - ).then(result => { - if (result === undefined) { + result => { + if (result !== undefined) { + results[responseName] = result; + } return results; - } - - results[responseName] = result; - - return results; - }); + }, + ); }, Object.create(null), - ).resolve(); + ); } /** @@ -613,9 +612,17 @@ function executeFields( } catch (error) { if (containsPromise) { // Ensure that any promises returned by other fields are handled, as they may also reject. - return promiseForObject(results, exeContext.signal).finally(() => { - throw error; - }); + const pForObject$ = promiseForObject(results, exeContext.signal); + if (isPromise(pForObject$)) { + return pForObject$.then( + () => { + throw error; + }, + () => { + throw error; + }, + ); + } } throw error; } diff --git a/packages/executor/src/execution/normalizedExecutor.ts b/packages/executor/src/execution/normalizedExecutor.ts index da96e1d080e..32824039f5e 100644 --- a/packages/executor/src/execution/normalizedExecutor.ts +++ b/packages/executor/src/execution/normalizedExecutor.ts @@ -1,9 +1,9 @@ import { getOperationAST, GraphQLSchema } from 'graphql'; -import { ValueOrPromise } from 'value-or-promise'; import { ExecutionRequest, ExecutionResult, Executor, + mapMaybePromise, MaybeAsyncIterable, MaybePromise, memoize1, @@ -20,14 +20,12 @@ export function normalizedExecutor execute(args)) - .then((result): MaybeAsyncIterable> => { - if ('initialResult' in result) { - return flattenIncrementalResults(result); - } - return result; - }) - .resolve()!; + return mapMaybePromise(execute(args), (result): MaybeAsyncIterable> => { + if ('initialResult' in result) { + return flattenIncrementalResults(result); + } + return result; + }); } export const executorFromSchema = memoize1(function executorFromSchema( diff --git a/packages/executor/src/execution/promiseForObject.ts b/packages/executor/src/execution/promiseForObject.ts index fbfc6c8b9eb..0ddf6d15db8 100644 --- a/packages/executor/src/execution/promiseForObject.ts +++ b/packages/executor/src/execution/promiseForObject.ts @@ -1,4 +1,4 @@ -import { getAbortPromise } from '@graphql-tools/utils'; +import { getAbortPromise, isPromise, MaybePromise } from '@graphql-tools/utils'; type ResolvedObject = { [TKey in keyof TData]: TData[TKey] extends Promise ? TValue : TData[TKey]; @@ -11,19 +11,30 @@ type ResolvedObject = { * This is akin to bluebird's `Promise.props`, but implemented only using * `Promise.all` so it will work with any implementation of ES6 promises. */ -export async function promiseForObject( +export function promiseForObject( object: TData, signal?: AbortSignal, -): Promise> { - const resolvedObject = Object.create(null); - const promises = Promise.all( - Object.entries(object as any).map(async ([key, value]) => { - resolvedObject[key] = await value; - }), - ); +): MaybePromise> { + const jobs: PromiseLike[] = []; + for (const key in object) { + const value = object[key]; + if (isPromise(value)) { + jobs.push( + value.then(resolvedValue => { + object[key] = resolvedValue as any; + }), + ); + } + } + if (jobs.length === 0) { + return object as ResolvedObject; + } + const jobsPromise = Promise.all(jobs); if (signal) { - const abortPromise = getAbortPromise(signal); - return Promise.race([abortPromise, promises]).then(() => resolvedObject); + const abortSignalPromise = getAbortPromise(signal); + return Promise.race([abortSignalPromise, jobsPromise]).then( + () => object as ResolvedObject, + ); } - return promises.then(() => resolvedObject); + return jobsPromise.then(() => object as ResolvedObject); } diff --git a/packages/utils/src/registerAbortSignalListener.ts b/packages/utils/src/registerAbortSignalListener.ts index e0487390b4c..9c2dec81d24 100644 --- a/packages/utils/src/registerAbortSignalListener.ts +++ b/packages/utils/src/registerAbortSignalListener.ts @@ -1,20 +1,33 @@ -import { memoize1 } from './memoize.js'; +import { createDeferred } from './createDeferred.js'; +import { fakeRejectPromise } from './fakePromise.js'; -// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected" -// on Node.js -const getListenersOfAbortSignal = memoize1(function getListenersOfAbortSignal(signal: AbortSignal) { - const listeners = new Set(); - signal.addEventListener( - 'abort', - e => { +const listenersByAbortSignal = new WeakMap>(); +const mainListenerByAbortSignal = new WeakMap(); +const deferredByAbortSignal = new WeakMap>(); + +function ensureMainListener(signal: AbortSignal) { + if (mainListenerByAbortSignal.has(signal)) { + return; + } + function mainListener(e: Event) { + const deferred = deferredByAbortSignal.get(signal); + deferred?.reject(signal.reason); + if (deferred) { + deferredByAbortSignal.delete(signal); + } + const listeners = listenersByAbortSignal.get(signal); + if (listeners != null) { for (const listener of listeners) { listener(e); } - }, - { once: true }, - ); - return listeners; -}); + listenersByAbortSignal.delete(signal); + } + mainListenerByAbortSignal.delete(signal); + signal.removeEventListener('abort', mainListener); + } + mainListenerByAbortSignal.set(signal, mainListener); + signal.addEventListener('abort', mainListener, { once: true }); +} /** * Register an AbortSignal handler for a signal. @@ -22,24 +35,42 @@ const getListenersOfAbortSignal = memoize1(function getListenersOfAbortSignal(si * "possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit." * warning occuring on Node.js */ -export function registerAbortSignalListener(signal: AbortSignal, listener: () => void) { +export function registerAbortSignalListener(signal: AbortSignal, listener: (e?: Event) => void) { // If the signal is already aborted, call the listener immediately if (signal.aborted) { - listener(); - return; + return listener(); + } + let listeners = listenersByAbortSignal.get(signal); + if (listeners == null) { + listeners = new Set(); + listenersByAbortSignal.set(signal, listeners); } - getListenersOfAbortSignal(signal).add(listener); + listeners.add(listener); } -export const getAbortPromise = memoize1(function getAbortPromise(signal: AbortSignal) { - return new Promise((_resolve, reject) => { - // If the signal is already aborted, return a rejected promise - if (signal.aborted) { - reject(signal.reason); - return; +export function unregisterAbortSignalListener(signal: AbortSignal, listener: (e?: Event) => void) { + const listeners = listenersByAbortSignal.get(signal); + if (listeners != null) { + listeners.delete(listener); + if (listeners.size === 0) { + const mainListener = mainListenerByAbortSignal.get(signal); + if (mainListener != null) { + signal.removeEventListener('abort', mainListener); + mainListenerByAbortSignal.delete(signal); + } } - registerAbortSignalListener(signal, () => { - reject(signal.reason); - }); - }); -}); + } +} + +export function getAbortPromise(signal: AbortSignal): Promise { + if (signal.aborted) { + return fakeRejectPromise(signal.reason); + } + ensureMainListener(signal); + let deferred = deferredByAbortSignal.get(signal); + if (deferred == null) { + deferred = createDeferred(); + deferredByAbortSignal.set(signal, deferred); + } + return deferred.promise; +}