From 2def5b68b899c5891ab265c5beafeed631269e56 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU <ardatanrikulu@gmail.com> Date: Fri, 5 Nov 2021 16:22:18 +0300 Subject: [PATCH] fix(url-loader): handle SSE ping event for Readable --- .changeset/plenty-nails-shave.md | 2 +- packages/loaders/url/package.json | 1 - .../url/src/event-stream/handleReadable.ts | 6 +- .../url/tests/handleEventStreamResponse.ts | 55 +++++++++++++++++++ .../url/tests/handleReadableStream.test.ts | 49 ----------------- .../url/tests/url-loader-browser.spec.ts | 7 +++ packages/loaders/url/tests/url-loader.spec.ts | 9 ++- 7 files changed, 75 insertions(+), 54 deletions(-) create mode 100644 packages/loaders/url/tests/handleEventStreamResponse.ts delete mode 100644 packages/loaders/url/tests/handleReadableStream.test.ts diff --git a/.changeset/plenty-nails-shave.md b/.changeset/plenty-nails-shave.md index c7f4275e29e..b893e62c8d1 100644 --- a/.changeset/plenty-nails-shave.md +++ b/.changeset/plenty-nails-shave.md @@ -2,4 +2,4 @@ "@graphql-tools/url-loader": patch --- -[@graphql-tools/url-loader] Fix SSE ping event +fix(url-loader): handle SSE ping event correctly diff --git a/packages/loaders/url/package.json b/packages/loaders/url/package.json index 279c451b282..a3f8e8bfdbe 100644 --- a/packages/loaders/url/package.json +++ b/packages/loaders/url/package.json @@ -43,7 +43,6 @@ "express-graphql": "0.12.0", "graphql-upload": "12.0.0", "puppeteer": "11.0.0", - "web-streams-polyfill": "^3.1.1", "webpack": "5.61.0" }, "dependencies": { diff --git a/packages/loaders/url/src/event-stream/handleReadable.ts b/packages/loaders/url/src/event-stream/handleReadable.ts index 38d13f02431..6a45b0eb88b 100644 --- a/packages/loaders/url/src/event-stream/handleReadable.ts +++ b/packages/loaders/url/src/event-stream/handleReadable.ts @@ -8,11 +8,13 @@ export async function* handleReadable(readable: Readable) { if (part) { const eventStr = part.split('event: ')[1]; const dataStr = part.split('data: ')[1]; - const data = JSON.parse(dataStr); if (eventStr === 'complete') { break outer; } - yield data.payload || data; + if (dataStr) { + const data = JSON.parse(dataStr); + yield data.payload || data; + } } } } diff --git a/packages/loaders/url/tests/handleEventStreamResponse.ts b/packages/loaders/url/tests/handleEventStreamResponse.ts new file mode 100644 index 00000000000..28664609ce8 --- /dev/null +++ b/packages/loaders/url/tests/handleEventStreamResponse.ts @@ -0,0 +1,55 @@ +import { handleEventStreamResponse } from '../src/event-stream/handleEventStreamResponse'; +import { TextEncoder } from 'util'; + +describe('handleEventStreamResponse', () => { + describe('ReadableStream', () => { + if (parseInt(process.versions.node.split('.')[0]) < 16) { + it('dummy', () => { }); + } + const { TransformStream } = require('stream/web'); + it('should handle an event with data', async () => { + const { readable, writable } = new TransformStream(); + const encoder = new TextEncoder(); + const stream = writable.getWriter(); + + const generator = await handleEventStreamResponse(readable); + + // stream.write(encoder.encode(':\n\n')); + stream.write(encoder.encode('event: complete\n')); + stream.write(encoder.encode('data: { "foo": "bar" }\n')); + stream.write(encoder.encode('\n')); + + expect((await generator.next()).value).toMatchInlineSnapshot(` + Object { + "foo": "bar", + } + `); + }); + + it('should ignore server pings', async () => { + const { readable, writable } = new TransformStream(); + const encoder = new TextEncoder(); + const stream = writable.getWriter(); + + const readStream = async () => { + const generator = await handleEventStreamResponse(readable); + + stream.write(encoder.encode(':\n\n')); + stream.write(encoder.encode('event: next\n')); + stream.write(encoder.encode('data: { "foo": "bar" }\n\n')); + + return generator.next(); + }; + + await expect(await readStream()).resolves.toMatchInlineSnapshot(` + Object { + "done": false, + "value": Object { + "foo": "bar", + }, + } + `); + }); + }); + +}) diff --git a/packages/loaders/url/tests/handleReadableStream.test.ts b/packages/loaders/url/tests/handleReadableStream.test.ts deleted file mode 100644 index bc3850d50ef..00000000000 --- a/packages/loaders/url/tests/handleReadableStream.test.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { handleReadableStream } from '../src/event-stream/handleReadableStream'; -import { TransformStream } from 'web-streams-polyfill/ponyfill'; -import { TextEncoder } from 'util'; - -describe('handleReadableStream', () => { - it('should handle an event with data', async () => { - const { readable, writable } = new TransformStream(); - const encoder = new TextEncoder(); - const stream = writable.getWriter(); - - const generator = handleReadableStream(readable as any); - - // stream.write(encoder.encode(':\n\n')); - stream.write(encoder.encode('event: complete\n')); - stream.write(encoder.encode('data: { "foo": "bar" }\n')); - stream.write(encoder.encode('\n')); - - expect((await generator.next()).value).toMatchInlineSnapshot(` - Object { - "foo": "bar", - } - `); - }); - - it('should ignore server pings', async () => { - const { readable, writable } = new TransformStream(); - const encoder = new TextEncoder(); - const stream = writable.getWriter(); - - const readStream = () => { - const generator = handleReadableStream(readable as any); - - stream.write(encoder.encode(':\n\n')); - stream.write(encoder.encode('event: next\n')); - stream.write(encoder.encode('data: { "foo": "bar" }\n\n')); - - return generator.next(); - }; - - await expect(readStream()).resolves.toMatchInlineSnapshot(` - Object { - "done": false, - "value": Object { - "foo": "bar", - }, - } - `); - }); -}); diff --git a/packages/loaders/url/tests/url-loader-browser.spec.ts b/packages/loaders/url/tests/url-loader-browser.spec.ts index c02be7a611b..211a7f27b42 100644 --- a/packages/loaders/url/tests/url-loader-browser.spec.ts +++ b/packages/loaders/url/tests/url-loader-browser.spec.ts @@ -290,12 +290,19 @@ describe('[url-loader] webpack bundle compat', () => { responseClosed$ = new Promise(resolve => res.once('close', () => resolve(true))); + const ping = setInterval(() => { + // Ping + res.write(':\n\n'); + }, 100); + for (const data of sentDatas) { await new Promise(resolve => setTimeout(resolve, 300)); res.write(`data: ${JSON.stringify(data)}\n\n`); await new Promise(resolve => setTimeout(resolve, 300)); } + clearInterval(ping); + }; const document = parse(/* GraphQL */ ` diff --git a/packages/loaders/url/tests/url-loader.spec.ts b/packages/loaders/url/tests/url-loader.spec.ts index e197d45ccfe..f283e384b9f 100644 --- a/packages/loaders/url/tests/url-loader.spec.ts +++ b/packages/loaders/url/tests/url-loader.spec.ts @@ -700,8 +700,15 @@ input TestInput { "Cache-Control": "no-cache", }); + const ping = setInterval(() => { + // Ping + res.write(':\n\n'); + }, 50); sentDatas.forEach(result => sleep(300).then(() => res.write(`data: ${JSON.stringify(result)}\n\n`))); - serverResponseEnded$ = new Promise(resolve => res.once('close', () => resolve(true))); + serverResponseEnded$ = new Promise(resolve => res.once('close', () => { + resolve(true); + clearInterval(ping); + })); }); await new Promise<void>((resolve) => httpServer.listen(serverPort, () => resolve()));