Skip to content

Commit

Permalink
Write Stream emits close - Fixes #1
Browse files Browse the repository at this point in the history
For a write sream to support flushing on finish,
it must listen to itself, flush then emit `close`.

USers of such a stream must listen to `close` to know
when the data has actually been flushed.

Reference: nodejs/node-v0.x-archive#5315 (comment)
  • Loading branch information
hmalphettes committed Nov 24, 2014
1 parent c75a716 commit c0afa2e
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 17 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ var toBulk = new TransformToBulk(function getIndexTypeId(doc) { return { _id: do
require('random-document-stream')(42).pipe(toBulk).pipe(ws).on('finish', done);
```

NOTE: One must listen to the `close` event emitted by the write stream to know
when all the data has been written and flushed to Elasticsearch.

Listening to `finish` does not mean much really as we are in this situation:
https://github.com/joyent/node/issues/5315#issuecomment-16670354

## Stream search results from Elasticsearch
```
var ReadableSearch = require('elasticsearch-streams').ReadableSearch;
Expand All @@ -53,7 +59,7 @@ ws._write = function(chunk, enc, next) {
next();
};
rs.pipe(ws).on('finish', done);
rs.pipe(ws).on('close', done);
```

If we want to start the stream at an offset and define a limit:
Expand Down
23 changes: 9 additions & 14 deletions lib/writable-bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ function WritableBulk(bulkExec, highWaterMark) {
this.bulk = [];
this.bulkCount = 0;
this.expectingPayload = false;

// when end is called we still need to flush but we must not overwrite end ourself.
// now we need to tell everyone to listen to the close event to know when we are done.
// Not great. See: https://github.com/joyent/node/issues/5315#issuecomment-16670354
this.on('finish', function() {
this._flushBulk(function() {
this.emit('close');
}.bind(this));
}.bind(this));
}

WritableBulk.prototype = Object.create(Writable.prototype, {constructor: {value: WritableBulk}});
Expand Down Expand Up @@ -83,17 +92,3 @@ WritableBulk.prototype._flushBulk = function(callback) {
callback();
});
};

WritableBulk.prototype.end = function(data) {
var self = this;
if (!data) {
return this._flushBulk(function() {
self.emit('finish');
});
}
this._write(data, 'json', function() {
self._flushBulk(function() {
self.emit('finish');
});
});
};
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
},
"devDependencies": {
"chai": "*",
"elasticsearch": "*"
"elasticsearch": "*",
"random-document-stream": "*"
}
}
2 changes: 1 addition & 1 deletion test/test-write.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ describe('When writing', function() {
ws = new WritableBulk(bulkExec);
ws.on('error', function(e) {
err = e;
}).on('finish', function() {
}).on('close', function() {
done(err);
});

Expand Down

0 comments on commit c0afa2e

Please sign in to comment.