stream: make pipeline try to wait for 'close'

Pipeline uses eos which will invoke the callback
on 'finish' and 'end' before all streams have been
fully destroyed.

Fixes: https://github.com/nodejs/node/issues/32032

PR-URL: https://github.com/nodejs/node/pull/32158
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
Robert Nagy
2020-03-09 17:00:01 +01:00
committed by Anna Henningsen
parent cba9f2e7a2
commit 1428a92492
2 changed files with 55 additions and 1 deletions

View File

@@ -63,15 +63,27 @@ function eos(stream, opts, callback) {
const wState = stream._writableState;
const rState = stream._readableState;
const state = wState || rState;
const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};
// TODO (ronag): Improve soft detection to include core modules and
// common ecosystem modules that do properly emit 'close' but fail
// this generic check.
const willEmitClose = (
state &&
state.autoDestroy &&
state.emitClose &&
state.closed === false
);
let writableFinished = stream.writableFinished ||
(wState && wState.finished);
const onfinish = () => {
writableFinished = true;
if (willEmitClose && (!stream.readable || readable)) return;
if (!readable || readableEnded) callback.call(stream);
};
@@ -79,6 +91,7 @@ function eos(stream, opts, callback) {
(rState && rState.endEmitted);
const onend = () => {
readableEnded = true;
if (willEmitClose && (!stream.writable || writable)) return;
if (!writable || writableFinished) callback.call(stream);
};

View File

@@ -7,7 +7,8 @@ const {
Readable,
Transform,
pipeline,
PassThrough
PassThrough,
Duplex
} = require('stream');
const assert = require('assert');
const http = require('http');
@@ -1077,3 +1078,43 @@ const { promisify } = require('util');
assert.ifError(err);
}));
}
{
let closed = false;
const src = new Readable({
read() {},
destroy(err, cb) {
process.nextTick(cb);
}
});
const dst = new Writable({
write(chunk, encoding, callback) {
callback();
}
});
src.on('close', () => {
closed = true;
});
src.push(null);
pipeline(src, dst, common.mustCall((err) => {
assert.strictEqual(closed, true);
}));
}
{
let closed = false;
const src = new Readable({
read() {},
destroy(err, cb) {
process.nextTick(cb);
}
});
const dst = new Duplex({});
src.on('close', common.mustCall(() => {
closed = true;
}));
src.push(null);
pipeline(src, dst, common.mustCall((err) => {
assert.strictEqual(closed, true);
}));
}