stream: don't destroy on async iterator success

Destroying on async iterator completion ignores autoDestroy.

PR-URL: https://github.com/nodejs/node/pull/35122
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Robert Nagy
2020-09-09 16:03:46 +02:00
committed by Node.js GitHub Bot
parent 5461794b12
commit 2b9003b165
2 changed files with 163 additions and 14 deletions

View File

@@ -1066,7 +1066,7 @@ Readable.prototype[SymbolAsyncIterator] = function() {
objectMode: true,
destroy(err, callback) {
destroyImpl.destroyer(src, err);
callback();
callback(err);
}
}).wrap(src);
}
@@ -1088,24 +1088,39 @@ async function* createAsyncIterator(stream) {
}
}
const state = stream._readableState;
let error = state.errored;
let errorEmitted = state.errorEmitted;
let endEmitted = state.endEmitted;
let closeEmitted = state.closeEmitted;
stream
.on('readable', next)
.on('error', next)
.on('end', next)
.on('close', next);
.on('error', function(err) {
error = err;
errorEmitted = true;
next.call(this);
})
.on('end', function() {
endEmitted = true;
next.call(this);
})
.on('close', function() {
closeEmitted = true;
next.call(this);
});
try {
const state = stream._readableState;
while (true) {
const chunk = stream.read();
if (chunk !== null) {
yield chunk;
} else if (state.errored) {
throw state.errored;
} else if (state.ended) {
} else if (errorEmitted) {
throw error;
} else if (endEmitted) {
break;
} else if (state.closed) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
} else if (closeEmitted) {
break;
} else {
await new Promise(next);
@@ -1115,7 +1130,10 @@ async function* createAsyncIterator(stream) {
destroyImpl.destroyer(stream, err);
throw err;
} finally {
destroyImpl.destroyer(stream, null);
if (state.autoDestroy || !endEmitted) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
destroyImpl.destroyer(stream, null);
}
}
}

View File

@@ -9,6 +9,7 @@ const {
pipeline
} = require('stream');
const assert = require('assert');
const http = require('http');
async function tests() {
{
@@ -44,9 +45,11 @@ async function tests() {
const iter = Readable.prototype[Symbol.asyncIterator].call(stream);
await iter.next();
await iter.next();
await iter.next().catch(common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
await iter.next()
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
}
{
@@ -581,6 +584,61 @@ async function tests() {
assert.strictEqual(err, _err);
}));
}
{
// Don't destroy if no auto destroy.
// https://github.com/nodejs/node/issues/35116
const r = new Readable({
autoDestroy: false,
read() {
this.push('asd');
this.push(null);
}
});
for await (const chunk of r) {
chunk;
}
assert.strictEqual(r.destroyed, false);
}
{
// Destroy if no auto destroy and premature break.
// https://github.com/nodejs/node/pull/35122/files#r485678318
const r = new Readable({
autoDestroy: false,
read() {
this.push('asd');
}
});
for await (const chunk of r) {
chunk;
break;
}
assert.strictEqual(r.destroyed, true);
}
{
// Don't destroy before 'end'.
const r = new Readable({
read() {
this.push('asd');
this.push(null);
}
}).on('end', () => {
assert.strictEqual(r.destroyed, false);
});
for await (const chunk of r) {
chunk;
}
assert.strictEqual(r.destroyed, true);
}
}
{
@@ -643,5 +701,78 @@ async function tests() {
});
}
{
let _req;
const server = http.createServer((request, response) => {
response.statusCode = 404;
response.write('never ends');
});
server.listen(() => {
_req = http.request(`http://localhost:${server.address().port}`)
.on('response', common.mustCall(async (res) => {
setTimeout(() => {
_req.destroy(new Error('something happened'));
}, 100);
res.on('error', common.mustCall());
let _err;
try {
for await (const chunk of res) {
chunk;
}
} catch (err) {
_err = err;
}
assert.strictEqual(_err.code, 'ECONNRESET');
server.close();
}))
.on('error', common.mustCall())
.end();
});
}
{
async function getParsedBody(request) {
let body = '';
for await (const data of request) {
body += data;
}
try {
return JSON.parse(body);
} catch {
return {};
}
}
const str = JSON.stringify({ asd: true });
const server = http.createServer(async (request, response) => {
const body = await getParsedBody(request);
response.statusCode = 200;
assert.strictEqual(JSON.stringify(body), str);
response.end(JSON.stringify(body));
}).listen(() => {
http
.request({
method: 'POST',
hostname: 'localhost',
port: server.address().port,
})
.end(str)
.on('response', async (res) => {
let body = '';
for await (const chunk of res) {
body += chunk;
}
assert.strictEqual(body, str);
server.close();
});
});
}
// To avoid missing some tests if a promise does not resolve
tests().then(common.mustCall());