Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some improvements on cancellation #6833

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions packages/executor/src/execution/__tests__/abort-signal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('Abort Signal', () => {
expect(spy.mock.calls.length).toBeLessThanOrEqual(1);
});
it('should stop the subscription', async () => {
expect.assertions(3);
expect.assertions(4);
let stopped = false;
const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
Expand Down Expand Up @@ -63,11 +63,15 @@ describe('Abort Signal', () => {
});
assertAsyncIterable(result);
const results: any[] = [];
for await (const value of result) {
results.push(value.data?.counter);
if (value.data?.counter === 4) {
controller.abort();
try {
for await (const value of result) {
results.push(value.data?.counter);
if (value.data?.counter === 4) {
controller.abort();
}
}
} catch (e: any) {
expect(e.name).toBe('AbortError');
}
expect(stopped).toBe(true);
expect(results).toEqual([0, 1, 2, 3, 4]);
Expand Down Expand Up @@ -163,18 +167,22 @@ describe('Abort Signal', () => {
},
},
});
const result$ = normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
mutation {
first
second
third
}
`),
signal: controller.signal,
});
expect(result$).rejects.toMatchInlineSnapshot(`DOMException {}`);
try {
await normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
mutation {
first
second
third
}
`),
signal: controller.signal,
});
expect(false).toBe(true);
} catch (e: any) {
expect(e.name).toBe('AbortError');
}
expect(didInvokeFirstFn).toBe(true);
expect(didInvokeSecondFn).toBe(true);
expect(didInvokeThirdFn).toBe(false);
Expand Down
10 changes: 9 additions & 1 deletion packages/executor/src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,9 @@ describe('Execute: stream directive', () => {
path: ['friendList', 2],
},
],
hasNext: true,
},
{
hasNext: false,
},
]);
Expand Down Expand Up @@ -645,11 +648,16 @@ describe('Execute: stream directive', () => {
path: ['friendList', 2],
},
],
hasNext: true,
},
},
{
done: false,
value: {
hasNext: false,
},
},
{ done: true, value: undefined },
{ done: true, value: undefined },
]);
});

Expand Down
33 changes: 20 additions & 13 deletions packages/executor/src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
isObjectLike,
isPromise,
mapAsyncIterator,
mapMaybePromise,
Maybe,
MaybePromise,
memoize1,
Expand Down Expand Up @@ -558,20 +559,18 @@ function executeFieldsSerially<TData>(
const fieldPath = addPath(path, responseName, parentType.name);
exeContext.signal?.throwIfAborted();

return new ValueOrPromise(() =>
return mapMaybePromise(
executeField(exeContext, parentType, sourceValue, fieldNodes, fieldPath),
).then(result => {
if (result === undefined) {
result => {
if (result !== undefined) {
results[responseName] = result;
}
return results;
}

results[responseName] = result;

return results;
});
},
);
},
Object.create(null),
).resolve();
);
}

/**
Expand Down Expand Up @@ -613,9 +612,17 @@ function executeFields(
} catch (error) {
if (containsPromise) {
// Ensure that any promises returned by other fields are handled, as they may also reject.
return promiseForObject(results, exeContext.signal).finally(() => {
throw error;
});
const pForObject$ = promiseForObject(results, exeContext.signal);
if (isPromise(pForObject$)) {
return pForObject$.then(
() => {
throw error;
},
() => {
throw error;
},
);
}
}
throw error;
}
Expand Down
16 changes: 7 additions & 9 deletions packages/executor/src/execution/normalizedExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { getOperationAST, GraphQLSchema } from 'graphql';
import { ValueOrPromise } from 'value-or-promise';
import {
ExecutionRequest,
ExecutionResult,
Executor,
mapMaybePromise,
MaybeAsyncIterable,
MaybePromise,
memoize1,
Expand All @@ -20,14 +20,12 @@ export function normalizedExecutor<TData = any, TVariables = any, TContext = any
if (operationAST.operation === 'subscription') {
return subscribe(args);
}
return new ValueOrPromise(() => execute(args))
.then((result): MaybeAsyncIterable<ExecutionResult<TData>> => {
if ('initialResult' in result) {
return flattenIncrementalResults(result);
}
return result;
})
.resolve()!;
return mapMaybePromise(execute(args), (result): MaybeAsyncIterable<ExecutionResult<TData>> => {
if ('initialResult' in result) {
return flattenIncrementalResults(result);
}
return result;
});
}

export const executorFromSchema = memoize1(function executorFromSchema(
Expand Down
35 changes: 23 additions & 12 deletions packages/executor/src/execution/promiseForObject.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getAbortPromise } from '@graphql-tools/utils';
import { getAbortPromise, isPromise, MaybePromise } from '@graphql-tools/utils';

type ResolvedObject<TData> = {
[TKey in keyof TData]: TData[TKey] extends Promise<infer TValue> ? TValue : TData[TKey];
Expand All @@ -11,19 +11,30 @@ type ResolvedObject<TData> = {
* This is akin to bluebird's `Promise.props`, but implemented only using
* `Promise.all` so it will work with any implementation of ES6 promises.
*/
export async function promiseForObject<TData>(
export function promiseForObject<TData>(
object: TData,
signal?: AbortSignal,
): Promise<ResolvedObject<TData>> {
const resolvedObject = Object.create(null);
const promises = Promise.all(
Object.entries(object as any).map(async ([key, value]) => {
resolvedObject[key] = await value;
}),
);
): MaybePromise<ResolvedObject<TData>> {
const jobs: PromiseLike<any>[] = [];
for (const key in object) {
const value = object[key];
if (isPromise(value)) {
jobs.push(
value.then(resolvedValue => {
object[key] = resolvedValue as any;
}),
);
}
}
if (jobs.length === 0) {
return object as ResolvedObject<TData>;
}
const jobsPromise = Promise.all(jobs);
if (signal) {
const abortPromise = getAbortPromise(signal);
return Promise.race([abortPromise, promises]).then(() => resolvedObject);
const abortSignalPromise = getAbortPromise(signal);
return Promise.race([abortSignalPromise, jobsPromise]).then(
() => object as ResolvedObject<TData>,
);
}
return promises.then(() => resolvedObject);
return jobsPromise.then(() => object as ResolvedObject<TData>);
}
87 changes: 59 additions & 28 deletions packages/utils/src/registerAbortSignalListener.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,76 @@
import { memoize1 } from './memoize.js';
import { createDeferred } from './createDeferred.js';
import { fakeRejectPromise } from './fakePromise.js';

// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected"
// on Node.js
const getListenersOfAbortSignal = memoize1(function getListenersOfAbortSignal(signal: AbortSignal) {
const listeners = new Set<EventListener>();
signal.addEventListener(
'abort',
e => {
const listenersByAbortSignal = new WeakMap<AbortSignal, Set<EventListener>>();
const mainListenerByAbortSignal = new WeakMap<AbortSignal, EventListener>();
const deferredByAbortSignal = new WeakMap<AbortSignal, PromiseWithResolvers<void>>();

function ensureMainListener(signal: AbortSignal) {
if (mainListenerByAbortSignal.has(signal)) {
return;
}
function mainListener(e: Event) {
const deferred = deferredByAbortSignal.get(signal);
deferred?.reject(signal.reason);
if (deferred) {
deferredByAbortSignal.delete(signal);
}
const listeners = listenersByAbortSignal.get(signal);
if (listeners != null) {
for (const listener of listeners) {
listener(e);
}
},
{ once: true },
);
return listeners;
});
listenersByAbortSignal.delete(signal);
}
mainListenerByAbortSignal.delete(signal);
signal.removeEventListener('abort', mainListener);
}
mainListenerByAbortSignal.set(signal, mainListener);
signal.addEventListener('abort', mainListener, { once: true });
}

/**
* Register an AbortSignal handler for a signal.
* This helper function mainly exists to work around the
* "possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit."
* warning occuring on Node.js
*/
export function registerAbortSignalListener(signal: AbortSignal, listener: () => void) {
export function registerAbortSignalListener(signal: AbortSignal, listener: (e?: Event) => void) {
// If the signal is already aborted, call the listener immediately
if (signal.aborted) {
listener();
return;
return listener();
}
let listeners = listenersByAbortSignal.get(signal);
if (listeners == null) {
listeners = new Set<EventListener>();
listenersByAbortSignal.set(signal, listeners);
}
getListenersOfAbortSignal(signal).add(listener);
listeners.add(listener);
}

export const getAbortPromise = memoize1(function getAbortPromise(signal: AbortSignal) {
return new Promise<void>((_resolve, reject) => {
// If the signal is already aborted, return a rejected promise
if (signal.aborted) {
reject(signal.reason);
return;
export function unregisterAbortSignalListener(signal: AbortSignal, listener: (e?: Event) => void) {
const listeners = listenersByAbortSignal.get(signal);
if (listeners != null) {
listeners.delete(listener);
if (listeners.size === 0) {
const mainListener = mainListenerByAbortSignal.get(signal);
if (mainListener != null) {
signal.removeEventListener('abort', mainListener);
mainListenerByAbortSignal.delete(signal);
}
}
registerAbortSignalListener(signal, () => {
reject(signal.reason);
});
});
});
}
}

export function getAbortPromise(signal: AbortSignal): Promise<void> {
if (signal.aborted) {
return fakeRejectPromise(signal.reason);
}
ensureMainListener(signal);
let deferred = deferredByAbortSignal.get(signal);
if (deferred == null) {
deferred = createDeferred<void>();
deferredByAbortSignal.set(signal, deferred);
}
return deferred.promise;
}
Loading