stream: improve Readable.from error handling

PR-URL: https://github.com/nodejs/node/pull/37158
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
Benjamin Gruenbaum
2021-01-31 17:22:28 +02:00
parent 861a396a4a
commit 03380bc16a
2 changed files with 37 additions and 19 deletions

View File

@@ -43,9 +43,6 @@ function from(Readable, iterable, opts) {
// being called before last iteration completion.
let reading = false;
// Flag for when iterator needs to be explicitly closed.
let needToClose = false;
readable._read = function() {
if (!reading) {
reading = true;
@@ -54,19 +51,23 @@ function from(Readable, iterable, opts) {
};
readable._destroy = function(error, cb) {
if (needToClose) {
needToClose = false;
PromisePrototypeThen(
close(),
() => process.nextTick(cb, error),
(e) => process.nextTick(cb, error || e),
);
} else {
cb(error);
}
PromisePrototypeThen(
close(error),
() => process.nextTick(cb, error), // nextTick is here in case cb throws
(e) => process.nextTick(cb, e || error),
);
};
async function close() {
async function close(error) {
const hadError = (error !== undefined) && (error !== null);
const hasThrow = typeof iterator.throw === 'function';
if (hadError && hasThrow) {
const { value, done } = await iterator.throw(error);
await value;
if (done) {
return;
}
}
if (typeof iterator.return === 'function') {
const { value } = await iterator.return();
await value;
@@ -75,13 +76,9 @@ function from(Readable, iterable, opts) {
async function next() {
try {
needToClose = false;
const { value, done } = await iterator.next();
needToClose = !done;
if (done) {
readable.push(null);
} else if (readable.destroyed) {
await close();
} else {
const res = await value;
if (res === null) {

View File

@@ -4,6 +4,7 @@ const { mustCall } = require('../common');
const { once } = require('events');
const { Readable } = require('stream');
const { strictEqual, throws } = require('assert');
const common = require('../common');
{
throws(() => {
@@ -187,6 +188,25 @@ async function endWithError() {
}
}
async function destroyingStreamWithErrorThrowsInGenerator() {
const validateError = common.mustCall((e) => {
strictEqual(e, 'Boum');
});
async function* generate() {
try {
yield 1;
yield 2;
yield 3;
throw new Error();
} catch (e) {
validateError(e);
}
}
const stream = Readable.from(generate());
stream.read();
stream.once('error', common.mustCall());
stream.destroy('Boum');
}
Promise.all([
toReadableBasicSupport(),
@@ -198,5 +218,6 @@ Promise.all([
toReadableOnDataNonObject(),
destroysTheStreamWhenThrowing(),
asTransformStream(),
endWithError()
endWithError(),
destroyingStreamWithErrorThrowsInGenerator(),
]).then(mustCall());