mirror of
https://github.com/zebrajr/node.git
synced 2026-01-15 12:15:26 +00:00
events: add EventEmitterAsyncResource to core
Signd-off-by: James M Snell <jasnell@gmail.com> PR-URL: https://github.com/nodejs/node/pull/41246 Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
This commit is contained in:
@@ -1166,6 +1166,89 @@ const emitter = new EventEmitter();
|
||||
setMaxListeners(5, target, emitter);
|
||||
```
|
||||
|
||||
## Class: `events.EventEmitterAsyncResource extends EventEmitter`
|
||||
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
Integrates `EventEmitter` with {AsyncResource} for `EventEmitter`s that
|
||||
require manual async tracking. Specifically, all events emitted by instances
|
||||
of `events.EventEmitterAsyncResource` will run within its [async context][].
|
||||
|
||||
```js
|
||||
const { EventEmitterAsyncResource } = require('events');
|
||||
const { notStrictEqual, strictEqual } = require('assert');
|
||||
const { executionAsyncId } = require('async_hooks');
|
||||
|
||||
// Async tracking tooling will identify this as 'Q'.
|
||||
const ee1 = new EventEmitterAsyncResource({ name: 'Q' });
|
||||
|
||||
// 'foo' listeners will run in the EventEmitters async context.
|
||||
ee1.on('foo', () => {
|
||||
strictEqual(executionAsyncId(), ee1.asyncId);
|
||||
strictEqual(triggerAsyncId(), ee1.triggerAsyncId);
|
||||
});
|
||||
|
||||
const ee2 = new EventEmitter();
|
||||
|
||||
// 'foo' listeners on ordinary EventEmitters that do not track async
|
||||
// context, however, run in the same async context as the emit().
|
||||
ee2.on('foo', () => {
|
||||
notStrictEqual(executionAsyncId(), ee2.asyncId);
|
||||
notStrictEqual(triggerAsyncId(), ee2.triggerAsyncId);
|
||||
});
|
||||
|
||||
Promise.resolve().then(() => {
|
||||
ee1.emit('foo');
|
||||
ee2.emit('foo');
|
||||
});
|
||||
```
|
||||
|
||||
The `EventEmitterAsyncResource` class has the same methods and takes the
|
||||
same options as `EventEmitter` and `AsyncResource` themselves.
|
||||
|
||||
### `new events.EventEmitterAsyncResource(options)`
|
||||
|
||||
* `options` {Object}
|
||||
* `captureRejections` {boolean} It enables
|
||||
[automatic capturing of promise rejection][capturerejections].
|
||||
**Default:** `false`.
|
||||
* `name` {string} The type of async event. **Default::**
|
||||
[`new.target.name`][].
|
||||
* `triggerAsyncId` {number} The ID of the execution context that created this
|
||||
async event. **Default:** `executionAsyncId()`.
|
||||
* `requireManualDestroy` {boolean} If set to `true`, disables `emitDestroy`
|
||||
when the object is garbage collected. This usually does not need to be set
|
||||
(even if `emitDestroy` is called manually), unless the resource's `asyncId`
|
||||
is retrieved and the sensitive API's `emitDestroy` is called with it.
|
||||
When set to `false`, the `emitDestroy` call on garbage collection
|
||||
will only take place if there is at least one active `destroy` hook.
|
||||
**Default:** `false`.
|
||||
|
||||
### `eventemitterasyncresource.asyncId`
|
||||
|
||||
* Type: {number} The unique `asyncId` assigned to the resource.
|
||||
|
||||
### `eventemitterasyncresource.asyncResource`
|
||||
|
||||
* Type: The underlying {AsyncResource}.
|
||||
|
||||
The returned `AsyncResource` object has an additional `eventEmitter` property
|
||||
that provides a reference to this `EventEmitterAsyncResource`.
|
||||
|
||||
### `eventemitterasyncresource.emitDestroy()`
|
||||
|
||||
Call all `destroy` hooks. This should only ever be called once. An error will
|
||||
be thrown if it is called more than once. This **must** be manually called. If
|
||||
the resource is left to be collected by the GC then the `destroy` hooks will
|
||||
never be called.
|
||||
|
||||
### `eventemitterasyncresource.triggerAsyncId`
|
||||
|
||||
* Type: {number} The same `triggerAsyncId` that is passed to the
|
||||
`AsyncResource` constructor.
|
||||
|
||||
<a id="event-target-and-event-api"></a>
|
||||
|
||||
## `EventTarget` and `Event` API
|
||||
@@ -1706,7 +1789,9 @@ to the `EventTarget`.
|
||||
[`events.defaultMaxListeners`]: #eventsdefaultmaxlisteners
|
||||
[`fs.ReadStream`]: fs.md#class-fsreadstream
|
||||
[`net.Server`]: net.md#class-netserver
|
||||
[`new.target.name`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/new.target
|
||||
[`process.on('warning')`]: process.md#event-warning
|
||||
[async context]: async_context.md
|
||||
[capturerejections]: #capture-rejections-of-promises
|
||||
[error]: #error-events
|
||||
[rejection]: #emittersymbolfornodejsrejectionerr-eventname-args
|
||||
|
||||
130
lib/events.js
130
lib/events.js
@@ -27,6 +27,7 @@ const {
|
||||
ArrayPrototypeShift,
|
||||
ArrayPrototypeSlice,
|
||||
ArrayPrototypeSplice,
|
||||
ArrayPrototypeUnshift,
|
||||
Boolean,
|
||||
Error,
|
||||
ErrorCaptureStackTrace,
|
||||
@@ -42,6 +43,7 @@ const {
|
||||
Promise,
|
||||
PromiseReject,
|
||||
PromiseResolve,
|
||||
ReflectApply,
|
||||
ReflectOwnKeys,
|
||||
String,
|
||||
StringPrototypeSplit,
|
||||
@@ -59,6 +61,7 @@ const {
|
||||
kEnhanceStackBeforeInspector,
|
||||
codes: {
|
||||
ERR_INVALID_ARG_TYPE,
|
||||
ERR_INVALID_THIS,
|
||||
ERR_OUT_OF_RANGE,
|
||||
ERR_UNHANDLED_ERROR
|
||||
},
|
||||
@@ -68,6 +71,7 @@ const {
|
||||
validateAbortSignal,
|
||||
validateBoolean,
|
||||
validateFunction,
|
||||
validateString,
|
||||
} = require('internal/validators');
|
||||
|
||||
const kCapture = Symbol('kCapture');
|
||||
@@ -76,6 +80,125 @@ const kMaxEventTargetListeners = Symbol('events.maxEventTargetListeners');
|
||||
const kMaxEventTargetListenersWarned =
|
||||
Symbol('events.maxEventTargetListenersWarned');
|
||||
|
||||
let EventEmitterAsyncResource;
|
||||
// The EventEmitterAsyncResource has to be initialized lazily because event.js
|
||||
// is loaded so early in the bootstrap process, before async_hooks is available.
|
||||
//
|
||||
// This implementation was adapted straight from addaleax's
|
||||
// eventemitter-asyncresource MIT-licensed userland module.
|
||||
// https://github.com/addaleax/eventemitter-asyncresource
|
||||
function lazyEventEmitterAsyncResource() {
|
||||
if (EventEmitterAsyncResource === undefined) {
|
||||
const {
|
||||
AsyncResource
|
||||
} = require('async_hooks');
|
||||
|
||||
const kEventEmitter = Symbol('kEventEmitter');
|
||||
const kAsyncResource = Symbol('kAsyncResource');
|
||||
class EventEmitterReferencingAsyncResource extends AsyncResource {
|
||||
/**
|
||||
* @param {EventEmitter} ee
|
||||
* @param {string} [type]
|
||||
* @param {{
|
||||
* triggerAsyncId?: number,
|
||||
* requireManualDestroy?: boolean,
|
||||
* }} [options]
|
||||
*/
|
||||
constructor(ee, type, options) {
|
||||
super(type, options);
|
||||
this[kEventEmitter] = ee;
|
||||
}
|
||||
|
||||
/**
|
||||
* @type {EventEmitter}
|
||||
*/
|
||||
get eventEmitter() {
|
||||
if (this[kEventEmitter] === undefined)
|
||||
throw new ERR_INVALID_THIS('EventEmitterReferencingAsyncResource');
|
||||
return this[kEventEmitter];
|
||||
}
|
||||
}
|
||||
|
||||
EventEmitterAsyncResource =
|
||||
class EventEmitterAsyncResource extends EventEmitter {
|
||||
/**
|
||||
* @param {{
|
||||
* name?: string,
|
||||
* triggerAsyncId?: number,
|
||||
* requireManualDestroy?: boolean,
|
||||
* }} [options]
|
||||
*/
|
||||
constructor(options = undefined) {
|
||||
let name;
|
||||
if (typeof options === 'string') {
|
||||
name = options;
|
||||
options = undefined;
|
||||
} else {
|
||||
if (new.target === EventEmitterAsyncResource) {
|
||||
validateString(options?.name, 'options.name');
|
||||
}
|
||||
name = options?.name || new.target.name;
|
||||
}
|
||||
super(options);
|
||||
|
||||
this[kAsyncResource] =
|
||||
new EventEmitterReferencingAsyncResource(this, name, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {symbol,string} event
|
||||
* @param {...any} args
|
||||
* @returns {boolean}
|
||||
*/
|
||||
emit(event, ...args) {
|
||||
if (this[kAsyncResource] === undefined)
|
||||
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
|
||||
const { asyncResource } = this;
|
||||
ArrayPrototypeUnshift(args, super.emit, this, event);
|
||||
return ReflectApply(asyncResource.runInAsyncScope, asyncResource,
|
||||
args);
|
||||
}
|
||||
|
||||
/**
|
||||
* @returns {void}
|
||||
*/
|
||||
emitDestroy() {
|
||||
if (this[kAsyncResource] === undefined)
|
||||
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
|
||||
this.asyncResource.emitDestroy();
|
||||
}
|
||||
|
||||
/**
|
||||
* @type {number}
|
||||
*/
|
||||
get asyncId() {
|
||||
if (this[kAsyncResource] === undefined)
|
||||
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
|
||||
return this.asyncResource.asyncId();
|
||||
}
|
||||
|
||||
/**
|
||||
* @type {number}
|
||||
*/
|
||||
get triggerAsyncId() {
|
||||
if (this[kAsyncResource] === undefined)
|
||||
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
|
||||
return this.asyncResource.triggerAsyncId();
|
||||
}
|
||||
|
||||
/**
|
||||
* @type {EventEmitterReferencingAsyncResource}
|
||||
*/
|
||||
get asyncResource() {
|
||||
if (this[kAsyncResource] === undefined)
|
||||
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
|
||||
return this[kAsyncResource];
|
||||
}
|
||||
};
|
||||
}
|
||||
return EventEmitterAsyncResource;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new `EventEmitter` instance.
|
||||
* @param {{ captureRejections?: boolean; }} [opts]
|
||||
@@ -106,6 +229,13 @@ ObjectDefineProperty(EventEmitter, 'captureRejections', {
|
||||
enumerable: true
|
||||
});
|
||||
|
||||
ObjectDefineProperty(EventEmitter, 'EventEmitterAsyncResource', {
|
||||
enumerable: true,
|
||||
get: lazyEventEmitterAsyncResource,
|
||||
set: undefined,
|
||||
configurable: true,
|
||||
});
|
||||
|
||||
EventEmitter.errorMonitor = kErrorMonitor;
|
||||
|
||||
// The default for captureRejections is false
|
||||
|
||||
132
test/parallel/test-eventemitter-asyncresource.js
Normal file
132
test/parallel/test-eventemitter-asyncresource.js
Normal file
@@ -0,0 +1,132 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const { EventEmitterAsyncResource } = require('events');
|
||||
const {
|
||||
createHook,
|
||||
executionAsyncId,
|
||||
} = require('async_hooks');
|
||||
|
||||
const {
|
||||
deepStrictEqual,
|
||||
strictEqual,
|
||||
} = require('assert');
|
||||
|
||||
const {
|
||||
setImmediate: tick,
|
||||
} = require('timers/promises');
|
||||
|
||||
function makeHook(trackedTypes) {
|
||||
const eventMap = new Map();
|
||||
|
||||
function log(asyncId, name) {
|
||||
const entry = eventMap.get(asyncId);
|
||||
if (entry !== undefined) entry.push({ name });
|
||||
}
|
||||
|
||||
const hook = createHook({
|
||||
init(asyncId, type, triggerAsyncId, resource) {
|
||||
if (trackedTypes.includes(type)) {
|
||||
eventMap.set(asyncId, [
|
||||
{
|
||||
name: 'init',
|
||||
type,
|
||||
triggerAsyncId,
|
||||
resource,
|
||||
},
|
||||
]);
|
||||
}
|
||||
},
|
||||
|
||||
before(asyncId) { log(asyncId, 'before'); },
|
||||
after(asyncId) { log(asyncId, 'after'); },
|
||||
destroy(asyncId) { log(asyncId, 'destroy'); }
|
||||
}).enable();
|
||||
|
||||
return {
|
||||
done() {
|
||||
hook.disable();
|
||||
return new Set(eventMap.values());
|
||||
},
|
||||
ids() {
|
||||
return new Set(eventMap.keys());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Tracks emit() calls correctly using async_hooks
|
||||
(async () => {
|
||||
const tracer = makeHook(['Foo']);
|
||||
|
||||
class Foo extends EventEmitterAsyncResource {}
|
||||
|
||||
const origExecutionAsyncId = executionAsyncId();
|
||||
const foo = new Foo();
|
||||
|
||||
foo.on('someEvent', common.mustCall());
|
||||
foo.emit('someEvent');
|
||||
|
||||
deepStrictEqual([foo.asyncId], [...tracer.ids()]);
|
||||
strictEqual(foo.triggerAsyncId, origExecutionAsyncId);
|
||||
strictEqual(foo.asyncResource.eventEmitter, foo);
|
||||
|
||||
foo.emitDestroy();
|
||||
|
||||
await tick();
|
||||
|
||||
deepStrictEqual(tracer.done(), new Set([
|
||||
[
|
||||
{
|
||||
name: 'init',
|
||||
type: 'Foo',
|
||||
triggerAsyncId: origExecutionAsyncId,
|
||||
resource: foo.asyncResource,
|
||||
},
|
||||
{ name: 'before' },
|
||||
{ name: 'after' },
|
||||
{ name: 'destroy' },
|
||||
],
|
||||
]));
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Can explicitly specify name as positional arg
|
||||
(async () => {
|
||||
const tracer = makeHook(['ResourceName']);
|
||||
|
||||
const origExecutionAsyncId = executionAsyncId();
|
||||
class Foo extends EventEmitterAsyncResource {}
|
||||
|
||||
const foo = new Foo('ResourceName');
|
||||
|
||||
deepStrictEqual(tracer.done(), new Set([
|
||||
[
|
||||
{
|
||||
name: 'init',
|
||||
type: 'ResourceName',
|
||||
triggerAsyncId: origExecutionAsyncId,
|
||||
resource: foo.asyncResource,
|
||||
},
|
||||
],
|
||||
]));
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Can explicitly specify name as option
|
||||
(async () => {
|
||||
const tracer = makeHook(['ResourceName']);
|
||||
|
||||
const origExecutionAsyncId = executionAsyncId();
|
||||
class Foo extends EventEmitterAsyncResource {}
|
||||
|
||||
const foo = new Foo({ name: 'ResourceName' });
|
||||
|
||||
deepStrictEqual(tracer.done(), new Set([
|
||||
[
|
||||
{
|
||||
name: 'init',
|
||||
type: 'ResourceName',
|
||||
triggerAsyncId: origExecutionAsyncId,
|
||||
resource: foo.asyncResource,
|
||||
},
|
||||
],
|
||||
]));
|
||||
})().then(common.mustCall());
|
||||
Reference in New Issue
Block a user