From b5b644cff2be5d3c6a6f00c1c13f480b3f50cc13 Mon Sep 17 00:00:00 2001 From: Walter Rafelsberger Date: Mon, 30 Dec 2024 10:20:47 +0100 Subject: [PATCH] test: adds test for stream reader --- __tests__/stream_reader.test.js | 70 +++++++++++++++++++++++++++++++++ src/_stream-reader.js | 2 +- 2 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 __tests__/stream_reader.test.js diff --git a/__tests__/stream_reader.test.js b/__tests__/stream_reader.test.js new file mode 100644 index 0000000..e2448b1 --- /dev/null +++ b/__tests__/stream_reader.test.js @@ -0,0 +1,70 @@ +const { Readable } = require('stream'); + +const retry = require('async-retry'); + +const transformer = require('../dist/node-es-transformer.cjs'); +const deleteIndex = require('./utils/delete_index'); +const { elasticsearchUrl, getElasticsearchClient } = require('./utils/elasticsearch'); + +const client = getElasticsearchClient(); +const targetIndexName = 'from_stream_10000'; + +class TestStream extends Readable { + constructor(maxDocuments = 10000) { + super({ objectMode: true }); // Enable object mode + this.counter = 0; + this.maxDocuments = maxDocuments; + } + + _read() { + if (this.counter >= this.maxDocuments) { + this.push(null); // Signal end of stream + return; + } + + // Push stringified data + this.push( + `${JSON.stringify({ + id: `doc-${this.counter}`, + timestamp: new Date().toISOString(), + the_index: this.counter, + })}\n`, + ); + + this.counter += 1; + } +} + +describe('streams 10000 docs', () => { + afterAll(async () => { + await deleteIndex(client, targetIndexName)(); + await client.close(); + }); + + it('should stream 10000 docs', done => { + (async () => { + const { events } = await transformer({ + targetClient: client, + stream: new TestStream(10000), + targetIndexName, + verbose: false, + }); + + events.on('finish', async () => { + await client.indices.refresh({ index: targetIndexName }); + + await retry(async () => { + const res = await fetch( + `${elasticsearchUrl}/${targetIndexName}/_search?q=the_index:9999`, + ); + expect(res.status).toBe(200); + + const body = await res.json(); + expect(body?.hits?.total?.value).toBe(1); + }); + + done(); + }); + })(); + }); +}); diff --git a/src/_stream-reader.js b/src/_stream-reader.js index 8a5c519..b772598 100644 --- a/src/_stream-reader.js +++ b/src/_stream-reader.js @@ -36,7 +36,7 @@ export default function streamReaderFactory(indexer, stream, transform, splitReg } }) .on('error', err => { - console.log('Error while reading file.', err); + console.log('Error while reading stream.', err); }) .on('end', () => { if (verbose) console.log('Read entire stream.');