Skip to content

Commit

Permalink
test: adds test for stream reader
Browse files Browse the repository at this point in the history
  • Loading branch information
walterra committed Dec 30, 2024
1 parent 7e392f1 commit b5b644c
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
70 changes: 70 additions & 0 deletions __tests__/stream_reader.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
const { Readable } = require('stream');

const retry = require('async-retry');

const transformer = require('../dist/node-es-transformer.cjs');
const deleteIndex = require('./utils/delete_index');
const { elasticsearchUrl, getElasticsearchClient } = require('./utils/elasticsearch');

const client = getElasticsearchClient();
const targetIndexName = 'from_stream_10000';

class TestStream extends Readable {
constructor(maxDocuments = 10000) {
super({ objectMode: true }); // Enable object mode
this.counter = 0;
this.maxDocuments = maxDocuments;
}

_read() {
if (this.counter >= this.maxDocuments) {
this.push(null); // Signal end of stream
return;
}

// Push stringified data
this.push(
`${JSON.stringify({
id: `doc-${this.counter}`,
timestamp: new Date().toISOString(),
the_index: this.counter,
})}\n`,
);

this.counter += 1;
}
}

describe('streams 10000 docs', () => {
afterAll(async () => {
await deleteIndex(client, targetIndexName)();
await client.close();
});

it('should stream 10000 docs', done => {
(async () => {
const { events } = await transformer({
targetClient: client,
stream: new TestStream(10000),
targetIndexName,
verbose: false,
});

events.on('finish', async () => {
await client.indices.refresh({ index: targetIndexName });

await retry(async () => {
const res = await fetch(
`${elasticsearchUrl}/${targetIndexName}/_search?q=the_index:9999`,
);
expect(res.status).toBe(200);

const body = await res.json();
expect(body?.hits?.total?.value).toBe(1);
});

done();
});
})();
});
});
2 changes: 1 addition & 1 deletion src/_stream-reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export default function streamReaderFactory(indexer, stream, transform, splitReg
}
})
.on('error', err => {
console.log('Error while reading file.', err);
console.log('Error while reading stream.', err);
})
.on('end', () => {
if (verbose) console.log('Read entire stream.');
Expand Down

0 comments on commit b5b644c

Please sign in to comment.