lib: promise version of streams.finished call clean up

implement autoCleanup logic. update docs add autoCleanup description

ref: https://github.com/nodejs/node/issues/44556
PR-URL: https://github.com/nodejs/node/pull/44862
Refs: https://github.com/nodejs/node/issues/44556
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
This commit is contained in:
Naor Tedgi (Abu Emma)
2022-10-15 13:07:47 +03:00
committed by GitHub
parent 6fb466bc38
commit 84064bfd6c
3 changed files with 72 additions and 13 deletions

View File

@@ -2363,6 +2363,7 @@ changes:
-->
* `stream` {Stream} A readable and/or writable stream.
* `options` {Object}
* `error` {boolean} If set to `false`, then a call to `emit('error', err)` is
not treated as finished. **Default:** `true`.
@@ -2376,8 +2377,12 @@ changes:
underlying stream will _not_ be aborted if the signal is aborted. The
callback will get called with an `AbortError`. All registered
listeners added by this function will also be removed.
* `cleanup` {boolean} remove all registered stream listeners.
**Default:** `false`.
* `callback` {Function} A callback function that takes an optional error
argument.
* Returns: {Function} A cleanup function which removes all registered
listeners.

View File

@@ -19,6 +19,7 @@ const {
validateAbortSignal,
validateFunction,
validateObject,
validateBoolean
} = require('internal/validators');
const { Promise } = primordials;
@@ -243,8 +244,19 @@ function eos(stream, options, callback) {
}
function finished(stream, opts) {
let autoCleanup = false;
if (opts === null) {
opts = kEmptyObject;
}
if (opts?.cleanup) {
validateBoolean(opts.cleanup, 'cleanup');
autoCleanup = opts.cleanup;
}
return new Promise((resolve, reject) => {
eos(stream, opts, (err) => {
const cleanup = eos(stream, opts, (err) => {
if (autoCleanup) {
cleanup();
}
if (err) {
reject(err);
} else {

View File

@@ -3,13 +3,10 @@
const common = require('../common');
const stream = require('stream');
const {
Readable,
Writable,
promises,
Readable, Writable, promises,
} = stream;
const {
finished,
pipeline,
finished, pipeline,
} = require('stream/promises');
const fs = require('fs');
const assert = require('assert');
@@ -24,14 +21,11 @@ assert.strictEqual(finished, promisify(stream.finished));
{
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c'),
];
const expected = [Buffer.from('a'), Buffer.from('b'), Buffer.from('c')];
const read = new Readable({
read() { }
read() {
}
});
const write = new Writable({
@@ -59,7 +53,8 @@ assert.strictEqual(finished, promisify(stream.finished));
// pipeline error
{
const read = new Readable({
read() { }
read() {
}
});
const write = new Writable({
@@ -101,3 +96,50 @@ assert.strictEqual(finished, promisify(stream.finished));
code: 'ENOENT'
}).then(common.mustCall());
}
{
const streamObj = new Readable();
assert.throws(() => {
// Passing cleanup option not as boolean
// should throw error
finished(streamObj, { cleanup: 2 });
}, { code: 'ERR_INVALID_ARG_TYPE' });
}
// Below code should not throw any errors as the
// streamObj is `Stream` and cleanup is boolean
{
const streamObj = new Readable();
finished(streamObj, { cleanup: true });
}
// Cleanup function should not be called when cleanup is set to false
// listenerCount should be 1 after calling finish
{
const streamObj = new Writable();
assert.strictEqual(streamObj.listenerCount('end'), 0);
finished(streamObj, { cleanup: false }).then(() => {
assert.strictEqual(streamObj.listenerCount('end'), 1);
});
}
// Cleanup function should be called when cleanup is set to true
// listenerCount should be 0 after calling finish
{
const streamObj = new Writable();
assert.strictEqual(streamObj.listenerCount('end'), 0);
finished(streamObj, { cleanup: true }).then(() => {
assert.strictEqual(streamObj.listenerCount('end'), 0);
});
}
// Cleanup function should not be called when cleanup has not been set
// listenerCount should be 1 after calling finish
{
const streamObj = new Writable();
assert.strictEqual(streamObj.listenerCount('end'), 0);
finished(streamObj).then(() => {
assert.strictEqual(streamObj.listenerCount('end'), 1);
});
}