Skip to content

Commit

Permalink
feat: adds support for node stream as source
Browse files Browse the repository at this point in the history
  • Loading branch information
walterra committed Dec 23, 2024
1 parent b0b39c8 commit 281950c
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 3 deletions.
3 changes: 3 additions & 0 deletions src/_index-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ export default function indexQueueFactory({
(async () => {
await client.helpers.bulk({
concurrency: parallelCalls,
flushBytes,
flushInterval: 1000,
refreshOnCompletion: true,
datasource: ndjsonStreamIterator(stream),
onDocument(doc) {
return {
Expand Down
2 changes: 1 addition & 1 deletion src/_index-reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export default function indexReaderFactory(

const sc = scrollId ? await scroll(scrollId) : await search(fieldsWithData);

if (scrollId) {
if (!scrollId) {
progressBar.start(sc.hits.total.value, 0);
}

Expand Down
62 changes: 62 additions & 0 deletions src/_stream-reader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import es from 'event-stream';
import split from 'split2';

export default function streamReaderFactory(indexer, stream, transform, splitRegex, verbose) {
function startIndex() {
let finished = false;

const s = stream.pipe(split(splitRegex)).pipe(
es
.mapSync(line => {
try {
// skip empty lines
if (line === '') {
return;
}

const doc =
typeof transform === 'function' ? JSON.stringify(transform(JSON.parse(line))) : line;

// if doc is undefined we'll skip indexing it
if (typeof doc === 'undefined') {
s.resume();
return;
}

// the transform callback may return an array of docs so we can emit
// multiple docs from a single line
if (Array.isArray(doc)) {
doc.forEach(d => indexer.add(d));
return;
}

indexer.add(doc);
} catch (e) {
console.log('error', e);
}
})
.on('error', err => {
console.log('Error while reading file.', err);
})
.on('end', () => {
if (verbose) console.log('Read entire stream.');
indexer.finish();
finished = true;
}),
);

indexer.queueEmitter.on('pause', () => {
if (finished) return;
s.pause();
});

indexer.queueEmitter.on('resume', () => {
if (finished) return;
s.resume();
});
}

return () => {
startIndex();
};
}
14 changes: 12 additions & 2 deletions src/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import createMappingFactory from './_create-mapping';
import fileReaderFactory from './_file-reader';
import indexQueueFactory from './_index-queue';
import indexReaderFactory from './_index-reader';
import streamReaderFactory from './_stream-reader';

export default async function transformer({
deleteIndex = false,
sourceClientConfig,
targetClientConfig,
bufferSize = DEFAULT_BUFFER_SIZE,
searchSize = DEFAULT_SEARCH_SIZE,
stream,
fileName,
splitRegex = /\n/,
sourceIndexName,
Expand Down Expand Up @@ -62,8 +64,12 @@ export default async function transformer({
throw Error('Only either one of fileName or sourceIndexName can be specified.');
}

if (typeof fileName === 'undefined' && typeof sourceIndexName === 'undefined') {
throw Error('Either fileName or sourceIndexName must be specified.');
if (
(typeof fileName !== 'undefined' && typeof sourceIndexName !== 'undefined') ||
(typeof fileName !== 'undefined' && typeof stream !== 'undefined') ||
(typeof sourceIndexName !== 'undefined' && typeof stream !== 'undefined')
) {
throw Error('Only one of fileName, sourceIndexName, or stream can be specified.');
}

if (typeof fileName !== 'undefined') {
Expand All @@ -82,6 +88,10 @@ export default async function transformer({
);
}

if (typeof stream !== 'undefined') {
return streamReaderFactory(indexer, stream, transform, splitRegex, verbose);
}

return null;
}

Expand Down

0 comments on commit 281950c

Please sign in to comment.