mirror of
https://github.com/zebrajr/node.git
synced 2026-01-15 12:15:26 +00:00
fs: remove custom Buffer pool for streams
The performance benefit of using a custom pool are negligable. Furthermore, it causes problems with Workers and transferrable. Rather than further adding complexity for compat with Workers, just remove the pooling logic. Refs: https://github.com/nodejs/node/issues/33880#issuecomment-644430693 Fixes: https://github.com/nodejs/node/issues/31733 PR-URL: https://github.com/nodejs/node/pull/33981 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
This commit is contained in:
@@ -26,29 +26,8 @@ const { toPathIfFileURL } = require('internal/url');
|
||||
const kIoDone = Symbol('kIoDone');
|
||||
const kIsPerformingIO = Symbol('kIsPerformingIO');
|
||||
|
||||
const kMinPoolSpace = 128;
|
||||
const kFs = Symbol('kFs');
|
||||
|
||||
let pool;
|
||||
// It can happen that we expect to read a large chunk of data, and reserve
|
||||
// a large chunk of the pool accordingly, but the read() call only filled
|
||||
// a portion of it. If a concurrently executing read() then uses the same pool,
|
||||
// the "reserved" portion cannot be used, so we allow it to be re-used as a
|
||||
// new pool later.
|
||||
const poolFragments = [];
|
||||
|
||||
function allocNewPool(poolSize) {
|
||||
if (poolFragments.length > 0)
|
||||
pool = poolFragments.pop();
|
||||
else
|
||||
pool = Buffer.allocUnsafe(poolSize);
|
||||
pool.used = 0;
|
||||
}
|
||||
|
||||
function roundUpToMultipleOf8(n) {
|
||||
return (n + 7) & ~7; // Align to 8 byte boundary.
|
||||
}
|
||||
|
||||
function _construct(callback) {
|
||||
const stream = this;
|
||||
if (typeof stream.fd === 'number') {
|
||||
@@ -188,70 +167,51 @@ ReadStream.prototype.open = openReadFs;
|
||||
ReadStream.prototype._construct = _construct;
|
||||
|
||||
ReadStream.prototype._read = function(n) {
|
||||
if (!pool || pool.length - pool.used < kMinPoolSpace) {
|
||||
// Discard the old pool.
|
||||
allocNewPool(this.readableHighWaterMark);
|
||||
n = this.pos !== undefined ?
|
||||
MathMin(this.end - this.pos + 1, n) :
|
||||
MathMin(this.end - this.bytesRead + 1, n);
|
||||
|
||||
if (n <= 0) {
|
||||
this.push(null);
|
||||
return;
|
||||
}
|
||||
|
||||
// Grab another reference to the pool in the case that while we're
|
||||
// in the thread pool another read() finishes up the pool, and
|
||||
// allocates a new one.
|
||||
const thisPool = pool;
|
||||
let toRead = MathMin(pool.length - pool.used, n);
|
||||
const start = pool.used;
|
||||
const buf = Buffer.allocUnsafeSlow(n);
|
||||
|
||||
if (this.pos !== undefined)
|
||||
toRead = MathMin(this.end - this.pos + 1, toRead);
|
||||
else
|
||||
toRead = MathMin(this.end - this.bytesRead + 1, toRead);
|
||||
|
||||
// Already read everything we were supposed to read!
|
||||
// treat as EOF.
|
||||
if (toRead <= 0)
|
||||
return this.push(null);
|
||||
|
||||
// the actual read.
|
||||
this[kIsPerformingIO] = true;
|
||||
this[kFs]
|
||||
.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
|
||||
.read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => {
|
||||
this[kIsPerformingIO] = false;
|
||||
|
||||
// Tell ._destroy() that it's safe to close the fd now.
|
||||
if (this.destroyed) return this.emit(kIoDone, er);
|
||||
if (this.destroyed) {
|
||||
this.emit(kIoDone, er);
|
||||
return;
|
||||
}
|
||||
|
||||
if (er) {
|
||||
errorOrDestroy(this, er);
|
||||
} else if (bytesRead > 0) {
|
||||
this.bytesRead += bytesRead;
|
||||
|
||||
if (bytesRead !== buf.length) {
|
||||
// Slow path. Shrink to fit.
|
||||
// Copy instead of slice so that we don't retain
|
||||
// large backing buffer for small reads.
|
||||
const dst = Buffer.allocUnsafeSlow(bytesRead);
|
||||
buf.copy(dst, 0, 0, bytesRead);
|
||||
buf = dst;
|
||||
}
|
||||
|
||||
this.push(buf);
|
||||
} else {
|
||||
let b = null;
|
||||
// Now that we know how much data we have actually read, re-wind the
|
||||
// 'used' field if we can, and otherwise allow the remainder of our
|
||||
// reservation to be used as a new pool later.
|
||||
if (start + toRead === thisPool.used && thisPool === pool) {
|
||||
const newUsed = thisPool.used + bytesRead - toRead;
|
||||
thisPool.used = roundUpToMultipleOf8(newUsed);
|
||||
} else {
|
||||
// Round down to the next lowest multiple of 8 to ensure the new pool
|
||||
// fragment start and end positions are aligned to an 8 byte boundary.
|
||||
const alignedEnd = (start + toRead) & ~7;
|
||||
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
|
||||
if (alignedEnd - alignedStart >= kMinPoolSpace) {
|
||||
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
|
||||
}
|
||||
}
|
||||
|
||||
if (bytesRead > 0) {
|
||||
this.bytesRead += bytesRead;
|
||||
b = thisPool.slice(start, start + bytesRead);
|
||||
}
|
||||
|
||||
this.push(b);
|
||||
this.push(null);
|
||||
}
|
||||
});
|
||||
|
||||
// Move the pool positions, and internal position for reading.
|
||||
if (this.pos !== undefined)
|
||||
this.pos += toRead;
|
||||
|
||||
pool.used = roundUpToMultipleOf8(pool.used + toRead);
|
||||
if (this.pos !== undefined) {
|
||||
this.pos += n;
|
||||
}
|
||||
};
|
||||
|
||||
ReadStream.prototype._destroy = function(err, cb) {
|
||||
|
||||
@@ -2,6 +2,9 @@
|
||||
'use strict';
|
||||
// Refs: https://github.com/nodejs/node/issues/31733
|
||||
const common = require('../common');
|
||||
if (!common.hasCrypto)
|
||||
common.skip('missing crypto');
|
||||
|
||||
const assert = require('assert');
|
||||
const crypto = require('crypto');
|
||||
const fs = require('fs');
|
||||
@@ -121,7 +124,6 @@ function test(config) {
|
||||
|
||||
tmpdir.refresh();
|
||||
|
||||
// OK
|
||||
test({
|
||||
cipher: 'aes-128-ccm',
|
||||
aad: Buffer.alloc(1),
|
||||
@@ -131,7 +133,6 @@ test({
|
||||
plaintextLength: 32768,
|
||||
});
|
||||
|
||||
// Fails the fstream test.
|
||||
test({
|
||||
cipher: 'aes-128-ccm',
|
||||
aad: Buffer.alloc(1),
|
||||
Reference in New Issue
Block a user