mirror of
https://github.com/zebrajr/node.git
synced 2026-01-15 12:15:26 +00:00
stream: always invoke end callback
Ensure that the callback passed into end() is always invoke in order to avoid bug such as deadlock the user. PR-URL: https://github.com/nodejs/node/pull/29747 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
This commit is contained in:
committed by
Anna Henningsen
parent
535e9571f5
commit
9d09969f4c
@@ -611,11 +611,11 @@ Writable.prototype.end = function(chunk, encoding, cb) {
|
||||
}
|
||||
|
||||
// Ignore unnecessary end() calls.
|
||||
if (!state.ending)
|
||||
if (!state.ending) {
|
||||
endWritable(this, state, cb);
|
||||
else if (typeof cb === 'function') {
|
||||
} else if (typeof cb === 'function') {
|
||||
if (!state.finished) {
|
||||
this.once('finish', cb);
|
||||
onFinished(this, state, cb);
|
||||
} else {
|
||||
cb(new ERR_STREAM_ALREADY_FINISHED('end'));
|
||||
}
|
||||
@@ -695,7 +695,7 @@ function endWritable(stream, state, cb) {
|
||||
if (state.finished)
|
||||
process.nextTick(cb);
|
||||
else
|
||||
stream.once('finish', cb);
|
||||
onFinished(stream, state, cb);
|
||||
}
|
||||
state.ended = true;
|
||||
stream.writable = false;
|
||||
@@ -715,6 +715,32 @@ function onCorkedFinish(corkReq, state, err) {
|
||||
state.corkedRequestsFree.next = corkReq;
|
||||
}
|
||||
|
||||
function onFinished(stream, state, cb) {
|
||||
if (state.destroyed && state.errorEmitted) {
|
||||
// TODO(ronag): Backwards compat. Should be moved to end() without
|
||||
// errorEmitted check and with errorOrDestroy.
|
||||
const err = new ERR_STREAM_DESTROYED('end');
|
||||
process.nextTick(cb, err);
|
||||
return;
|
||||
}
|
||||
|
||||
function onerror(err) {
|
||||
stream.removeListener('finish', onfinish);
|
||||
stream.removeListener('error', onerror);
|
||||
cb(err);
|
||||
if (stream.listenerCount('error') === 0) {
|
||||
stream.emit('error', err);
|
||||
}
|
||||
}
|
||||
function onfinish() {
|
||||
stream.removeListener('finish', onfinish);
|
||||
stream.removeListener('error', onerror);
|
||||
cb();
|
||||
}
|
||||
stream.on('finish', onfinish);
|
||||
stream.prependListener('error', onerror);
|
||||
}
|
||||
|
||||
Object.defineProperty(Writable.prototype, 'destroyed', {
|
||||
// Making it explicit this property is not enumerable
|
||||
// because otherwise some prototype manipulation in
|
||||
|
||||
@@ -292,3 +292,55 @@ const assert = require('assert');
|
||||
}));
|
||||
write.uncork();
|
||||
}
|
||||
|
||||
{
|
||||
// Call end(cb) after error & destroy
|
||||
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { cb(new Error('asd')); }
|
||||
});
|
||||
write.on('error', common.mustCall(() => {
|
||||
write.destroy();
|
||||
let ticked = false;
|
||||
write.end(common.mustCall((err) => {
|
||||
assert.strictEqual(ticked, true);
|
||||
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
|
||||
}));
|
||||
ticked = true;
|
||||
}));
|
||||
write.write('asd');
|
||||
}
|
||||
|
||||
{
|
||||
// Call end(cb) after finish & destroy
|
||||
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { cb(); }
|
||||
});
|
||||
write.on('finish', common.mustCall(() => {
|
||||
write.destroy();
|
||||
let ticked = false;
|
||||
write.end(common.mustCall((err) => {
|
||||
assert.strictEqual(ticked, false);
|
||||
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
|
||||
}));
|
||||
ticked = true;
|
||||
}));
|
||||
write.end();
|
||||
}
|
||||
|
||||
{
|
||||
// Call end(cb) after error & destroy and don't trigger
|
||||
// unhandled exception.
|
||||
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { process.nextTick(cb); }
|
||||
});
|
||||
write.once('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err.message, 'asd');
|
||||
}));
|
||||
write.end('asd', common.mustCall((err) => {
|
||||
assert.strictEqual(err.message, 'asd');
|
||||
}));
|
||||
write.destroy(new Error('asd'));
|
||||
}
|
||||
|
||||
48
test/parallel/test-stream-writable-end-cb-error.js
Normal file
48
test/parallel/test-stream-writable-end-cb-error.js
Normal file
@@ -0,0 +1,48 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const stream = require('stream');
|
||||
|
||||
{
|
||||
// Invoke end callback on failure.
|
||||
const writable = new stream.Writable();
|
||||
|
||||
writable._write = (chunk, encoding, cb) => {
|
||||
process.nextTick(cb, new Error('kaboom'));
|
||||
};
|
||||
|
||||
writable.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err.message, 'kaboom');
|
||||
}));
|
||||
writable.write('asd');
|
||||
writable.end(common.mustCall((err) => {
|
||||
assert.strictEqual(err.message, 'kaboom');
|
||||
}));
|
||||
writable.end(common.mustCall((err) => {
|
||||
assert.strictEqual(err.message, 'kaboom');
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Don't invoke end callback twice
|
||||
const writable = new stream.Writable();
|
||||
|
||||
writable._write = (chunk, encoding, cb) => {
|
||||
process.nextTick(cb);
|
||||
};
|
||||
|
||||
let called = false;
|
||||
writable.end('asd', common.mustCall((err) => {
|
||||
called = true;
|
||||
assert.strictEqual(err, undefined);
|
||||
}));
|
||||
|
||||
writable.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err.message, 'kaboom');
|
||||
}));
|
||||
writable.on('finish', common.mustCall(() => {
|
||||
assert.strictEqual(called, true);
|
||||
writable.emit('error', new Error('kaboom'));
|
||||
}));
|
||||
}
|
||||
23
test/parallel/test-stream-writable-end-cb-uncaugth.js
Normal file
23
test/parallel/test-stream-writable-end-cb-uncaugth.js
Normal file
@@ -0,0 +1,23 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const stream = require('stream');
|
||||
|
||||
process.on('uncaughtException', common.mustCall((err) => {
|
||||
assert.strictEqual(err.message, 'kaboom');
|
||||
}));
|
||||
|
||||
const writable = new stream.Writable();
|
||||
|
||||
writable._write = (chunk, encoding, cb) => {
|
||||
cb();
|
||||
};
|
||||
writable._final = (cb) => {
|
||||
cb(new Error('kaboom'));
|
||||
};
|
||||
|
||||
writable.write('asd');
|
||||
writable.end(common.mustCall((err) => {
|
||||
assert.strictEqual(err.message, 'kaboom');
|
||||
}));
|
||||
Reference in New Issue
Block a user