fs: synchronize close with other I/O for streams

Part of the flakiness in the
parallel/test-readline-async-iterators-destroy test comes from
fs streams starting `_read()` and `_destroy()` without waiting
for the other to finish, which can lead to the `fs.read()` call
resulting in `EBADF` if timing is bad.

Fix this by synchronizing write and read operations with `close()`.

Refs: https://github.com/nodejs/node/issues/30660

PR-URL: https://github.com/nodejs/node/pull/30837
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
Anna Henningsen
2019-12-07 14:17:04 +01:00
parent 4a5fb74fb1
commit 8a5c7f6abd

View File

@@ -7,10 +7,12 @@ const {
NumberIsSafeInteger,
ObjectDefineProperty,
ObjectSetPrototypeOf,
Symbol,
} = primordials;
const {
ERR_OUT_OF_RANGE
ERR_OUT_OF_RANGE,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;
const internalUtil = require('internal/util');
const { validateNumber } = require('internal/validators');
@@ -22,6 +24,8 @@ const {
} = require('internal/fs/utils');
const { Readable, Writable } = require('stream');
const { toPathIfFileURL } = require('internal/url');
const kIoDone = Symbol('kIoDone');
const kIsPerformingIO = Symbol('kIsPerformingIO');
const kMinPoolSpace = 128;
@@ -86,6 +90,7 @@ function ReadStream(path, options) {
this.pos = undefined;
this.bytesRead = 0;
this.closed = false;
this[kIsPerformingIO] = false;
if (this.start !== undefined) {
checkPosition(this.start, 'start');
@@ -155,6 +160,8 @@ ReadStream.prototype._read = function(n) {
});
}
if (this.destroyed) return;
if (!pool || pool.length - pool.used < kMinPoolSpace) {
// Discard the old pool.
allocNewPool(this.readableHighWaterMark);
@@ -178,7 +185,12 @@ ReadStream.prototype._read = function(n) {
return this.push(null);
// the actual read.
this[kIsPerformingIO] = true;
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);
if (er) {
if (this.autoClose) {
this.destroy();
@@ -224,8 +236,12 @@ ReadStream.prototype._destroy = function(err, cb) {
return;
}
if (this[kIsPerformingIO]) {
this.once(kIoDone, (er) => closeFsStream(this, cb, err || er));
return;
}
closeFsStream(this, cb, err);
this.fd = null;
};
function closeFsStream(stream, cb, err) {
@@ -236,6 +252,8 @@ function closeFsStream(stream, cb, err) {
if (!er)
stream.emit('close');
});
stream.fd = null;
}
ReadStream.prototype.close = function(cb) {
@@ -274,6 +292,7 @@ function WriteStream(path, options) {
this.pos = undefined;
this.bytesWritten = 0;
this.closed = false;
this[kIsPerformingIO] = false;
if (this.start !== undefined) {
checkPosition(this.start, 'start');
@@ -339,7 +358,17 @@ WriteStream.prototype._write = function(data, encoding, cb) {
});
}
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
this[kIsPerformingIO] = true;
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
cb(er);
return this.emit(kIoDone, er);
}
if (er) {
if (this.autoClose) {
this.destroy();
@@ -362,7 +391,8 @@ WriteStream.prototype._writev = function(data, cb) {
});
}
const self = this;
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
const len = data.length;
const chunks = new Array(len);
let size = 0;
@@ -374,12 +404,22 @@ WriteStream.prototype._writev = function(data, cb) {
size += chunk.length;
}
fs.writev(this.fd, chunks, this.pos, function(er, bytes) {
this[kIsPerformingIO] = true;
fs.writev(this.fd, chunks, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
cb(er);
return this.emit(kIoDone, er);
}
if (er) {
self.destroy();
if (this.autoClose) {
this.destroy();
}
return cb(er);
}
self.bytesWritten += bytes;
this.bytesWritten += bytes;
cb();
});