cluster: refactor to use more primordials

PR-URL: https://github.com/nodejs/node/pull/36011
Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
Antoine du Hamel
2020-11-06 23:37:39 +01:00
committed by Node.js GitHub Bot
parent 28f31bdb6a
commit c7d2a4544b
6 changed files with 51 additions and 33 deletions

View File

@@ -1,8 +1,11 @@
'use strict';
const {
Map,
ArrayPrototypeJoin,
FunctionPrototype,
ObjectAssign,
ReflectApply,
SafeMap,
} = primordials;
const assert = require('internal/assert');
@@ -12,9 +15,9 @@ const { owner_symbol } = require('internal/async_hooks').symbols;
const Worker = require('internal/cluster/worker');
const { internal, sendHelper } = require('internal/cluster/utils');
const cluster = new EventEmitter();
const handles = new Map();
const indexes = new Map();
const noop = () => {};
const handles = new SafeMap();
const indexes = new SafeMap();
const noop = FunctionPrototype;
module.exports = cluster;
@@ -49,7 +52,7 @@ cluster._setupWorker = function() {
if (message.act === 'newconn')
onconnection(message, handle);
else if (message.act === 'disconnect')
_disconnect.call(worker, true);
ReflectApply(_disconnect, worker, [true]);
}
};
@@ -62,10 +65,13 @@ cluster._getServer = function(obj, options, cb) {
process.platform !== 'win32')
address = path.resolve(address);
const indexesKey = [address,
options.port,
options.addressType,
options.fd ].join(':');
const indexesKey = ArrayPrototypeJoin(
[
address,
options.port,
options.addressType,
options.fd,
], ':');
let index = indexes.get(indexesKey);
@@ -119,7 +125,7 @@ function shared(message, handle, indexesKey, cb) {
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
return close.apply(handle, arguments);
return ReflectApply(close, handle, arguments);
};
assert(handles.has(key) === false);
handles.set(key, handle);
@@ -228,9 +234,9 @@ function _disconnect(masterInitiated) {
// Extend generic Worker with methods specific to worker processes.
Worker.prototype.disconnect = function() {
if (![ 'disconnecting', 'destroying' ].includes(this.state)) {
if (this.state !== 'disconnecting' && this.state !== 'destroying') {
this.state = 'disconnecting';
_disconnect.call(this);
ReflectApply(_disconnect, this, []);
}
return this;

View File

@@ -1,9 +1,14 @@
'use strict';
const {
Map,
ArrayPrototypePush,
ArrayPrototypeSlice,
ArrayPrototypeSome,
ObjectKeys,
ObjectValues,
RegExpPrototypeTest,
SafeMap,
StringPrototypeStartsWith,
} = primordials;
const assert = require('internal/assert');
@@ -23,7 +28,7 @@ const { validatePort } = require('internal/validators');
module.exports = cluster;
const handles = new Map();
const handles = new SafeMap();
cluster.isWorker = false;
cluster.isMaster = true;
cluster.Worker = Worker;
@@ -53,7 +58,7 @@ cluster.schedulingPolicy = schedulingPolicy;
cluster.setupMaster = function(options) {
const settings = {
args: process.argv.slice(2),
args: ArrayPrototypeSlice(process.argv, 2),
exec: process.argv[1],
execArgv: process.execArgv,
silent: false,
@@ -65,8 +70,10 @@ cluster.setupMaster = function(options) {
// Without --logfile=v8-%p.log, everything ends up in a single, unusable
// file. (Unusable because what V8 logs are memory addresses and each
// process has its own memory mappings.)
if (settings.execArgv.some((s) => s.startsWith('--prof')) &&
!settings.execArgv.some((s) => s.startsWith('--logfile='))) {
if (ArrayPrototypeSome(settings.execArgv,
(s) => StringPrototypeStartsWith(s, '--prof')) &&
!ArrayPrototypeSome(settings.execArgv,
(s) => StringPrototypeStartsWith(s, '--logfile='))) {
settings.execArgv = [...settings.execArgv, '--logfile=v8-%p.log'];
}
@@ -109,8 +116,9 @@ function createWorkerProcess(id, env) {
const nodeOptions = process.env.NODE_OPTIONS ?
process.env.NODE_OPTIONS : '';
if (execArgv.some((arg) => arg.match(debugArgRegex)) ||
nodeOptions.match(debugArgRegex)) {
if (ArrayPrototypeSome(execArgv,
(arg) => RegExpPrototypeTest(debugArgRegex, arg)) ||
RegExpPrototypeTest(debugArgRegex, nodeOptions)) {
let inspectPort;
if ('inspectPort' in cluster.settings) {
if (typeof cluster.settings.inspectPort === 'function')
@@ -126,7 +134,7 @@ function createWorkerProcess(id, env) {
debugPortOffset++;
}
execArgv.push(`--inspect-port=${inspectPort}`);
ArrayPrototypePush(execArgv, `--inspect-port=${inspectPort}`);
}
return fork(cluster.settings.exec, cluster.settings.args, {

View File

@@ -2,8 +2,10 @@
const {
ArrayIsArray,
ArrayPrototypePush,
ArrayPrototypeShift,
Boolean,
Map,
SafeMap,
} = primordials;
const assert = require('internal/assert');
@@ -15,8 +17,8 @@ module.exports = RoundRobinHandle;
function RoundRobinHandle(key, address, { port, fd, flags }) {
this.key = key;
this.all = new Map();
this.free = new Map();
this.all = new SafeMap();
this.free = new SafeMap();
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
@@ -90,7 +92,7 @@ RoundRobinHandle.prototype.remove = function(worker) {
};
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
ArrayPrototypePush(this.handles, handle);
const [ workerEntry ] = this.free;
if (ArrayIsArray(workerEntry)) {
@@ -105,7 +107,7 @@ RoundRobinHandle.prototype.handoff = function(worker) {
return; // Worker is closing (or has closed) the server.
}
const handle = this.handles.shift();
const handle = ArrayPrototypeShift(this.handles);
if (handle === undefined) {
this.free.set(worker.id, worker); // Add to ready queue again.

View File

@@ -1,5 +1,5 @@
'use strict';
const { Map } = primordials;
const { SafeMap } = primordials;
const assert = require('internal/assert');
const dgram = require('internal/dgram');
const net = require('net');
@@ -8,7 +8,7 @@ module.exports = SharedHandle;
function SharedHandle(key, address, { port, addressType, fd, flags }) {
this.key = key;
this.workers = new Map();
this.workers = new SafeMap();
this.handle = null;
this.errno = 0;

View File

@@ -1,7 +1,8 @@
'use strict';
const {
Map,
ReflectApply,
SafeMap,
} = primordials;
module.exports = {
@@ -9,7 +10,7 @@ module.exports = {
internal
};
const callbacks = new Map();
const callbacks = new SafeMap();
let seq = 0;
function sendHelper(proc, message, handle, cb) {
@@ -44,6 +45,6 @@ function internal(worker, cb) {
}
}
fn.apply(worker, arguments);
ReflectApply(fn, worker, arguments);
};
}

View File

@@ -2,6 +2,7 @@
const {
ObjectSetPrototypeOf,
ReflectApply,
} = primordials;
const EventEmitter = require('events');
@@ -13,7 +14,7 @@ function Worker(options) {
if (!(this instanceof Worker))
return new Worker(options);
EventEmitter.call(this);
ReflectApply(EventEmitter, this, []);
if (options === null || typeof options !== 'object')
options = {};
@@ -38,11 +39,11 @@ ObjectSetPrototypeOf(Worker.prototype, EventEmitter.prototype);
ObjectSetPrototypeOf(Worker, EventEmitter);
Worker.prototype.kill = function() {
this.destroy.apply(this, arguments);
ReflectApply(this.destroy, this, arguments);
};
Worker.prototype.send = function() {
return this.process.send.apply(this.process, arguments);
return ReflectApply(this.process.send, this.process, arguments);
};
Worker.prototype.isDead = function() {