worker: add experimental BroadcastChannel

Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: https://github.com/nodejs/node/pull/36271
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
James M Snell
2020-11-26 13:54:23 -08:00
parent 09fd8f13c8
commit 9e446b3e9f
8 changed files with 690 additions and 73 deletions

View File

@@ -274,6 +274,98 @@ if (isMainThread) {
}
```
## Class: `BroadcastChannel extends EventTarget`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
Instances of `BroadcastChannel` allow asynchronous one-to-many communication
with all other `BroadcastChannel` instances bound to the same channel name.
```js
'use strict';
const {
isMainThread,
BroadcastChannel,
Worker
} = require('worker_threads');
const bc = new BroadcastChannel('hello');
if (isMainThread) {
let c = 0;
bc.onmessage = (event) => {
console.log(event.data);
if (++c === 10) bc.close();
};
for (let n = 0; n < 10; n++)
new Worker(__filename);
} else {
bc.postMessage('hello from every worker');
bc.close();
}
```
### `new BroadcastChannel(name)`
<!-- YAML
added: REPLACEME
-->
* `name` {any} The name of the channel to connect to. Any JavaScript value
that can be converted to a string using ``${name}`` is permitted.
### `broadcastChannel.close()`
<!-- YAML
added: REPLACEME
-->
Closes the `BroadcastChannel` connection.
### `broadcastChannel.onmessage`
<!-- YAML
added: REPLACEME
-->
* Type: {Function} Invoked with a single `MessageEvent` argument
when a message is received.
### `broadcastChannel.onmessageerror`
<!-- YAML
added: REPLACEME
-->
* Type: {Function} Invoked with a received message cannot be
deserialized.
### `broadcastChannel.postMessage(message)`
<!-- YAML
added: REPLACEME
-->
* `message` {any} Any cloneable JavaScript value.
### `broadcastChannel.ref()`
<!-- YAML
added: REPLACEME
-->
Opposite of `unref()`. Calling `ref()` on a previously `unref()`ed
BroadcastChannel will *not* let the program exit if it's the only active handle
left (the default behavior). If the port is `ref()`ed, calling `ref()` again
will have no effect.
### `broadcastChannel.unref()`
<!-- YAML
added: REPLACEME
-->
Calling `unref()` on a BroadcastChannel will allow the thread to exit if this
is the only active handle in the event system. If the BroadcastChannel is
already `unref()`ed calling `unref()` again will have no effect.
## Class: `MessageChannel`
<!-- YAML
added: v10.5.0

View File

@@ -19,11 +19,13 @@ const {
const {
MessagePort,
MessageChannel,
broadcastChannel,
drainMessagePort,
moveMessagePortToContext,
receiveMessageOnPort: receiveMessageOnPort_,
stopMessagePort,
checkMessagePort
checkMessagePort,
DOMException,
} = internalBinding('messaging');
const {
getEnvMessagePort
@@ -41,14 +43,20 @@ const {
} = require('internal/event_target');
const { inspect } = require('internal/util/inspect');
const {
ERR_INVALID_ARG_TYPE
} = require('internal/errors').codes;
codes: {
ERR_INVALID_ARG_TYPE,
ERR_MISSING_ARGS,
}
} = require('internal/errors');
const kData = Symbol('kData');
const kHandle = Symbol('kHandle');
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const kLastEventId = Symbol('kLastEventId');
const kName = Symbol('kName');
const kOrigin = Symbol('kOrigin');
const kOnMessage = Symbol('kOnMessage');
const kOnMessageError = Symbol('kOnMessageError');
const kPort = Symbol('kPort');
const kPorts = Symbol('kPorts');
const kWaitingStreams = Symbol('kWaitingStreams');
@@ -324,6 +332,76 @@ function receiveMessageOnPort(port) {
return { message };
}
function onMessageEvent(type, data) {
this.dispatchEvent(new MessageEvent(type, { data }));
}
class BroadcastChannel extends EventTarget {
constructor(name) {
if (arguments.length === 0)
throw new ERR_MISSING_ARGS('name');
super();
this[kName] = `${name}`;
this[kHandle] = broadcastChannel(this[kName]);
this[kOnMessage] = onMessageEvent.bind(this, 'message');
this[kOnMessageError] = onMessageEvent.bind(this, 'messageerror');
this[kHandle].on('message', this[kOnMessage]);
this[kHandle].on('messageerror', this[kOnMessageError]);
}
[inspect.custom](depth, options) {
if (depth < 0)
return 'BroadcastChannel';
const opts = {
...options,
depth: options.depth == null ? null : options.depth - 1
};
return `BroadcastChannel ${inspect({
name: this[kName],
active: this[kHandle] !== undefined,
}, opts)}`;
}
get name() { return this[kName]; }
close() {
if (this[kHandle] === undefined)
return;
this[kHandle].off('message', this[kOnMessage]);
this[kHandle].off('messageerror', this[kOnMessageError]);
this[kOnMessage] = undefined;
this[kOnMessageError] = undefined;
this[kHandle].close();
this[kHandle] = undefined;
}
postMessage(message) {
if (arguments.length === 0)
throw new ERR_MISSING_ARGS('message');
if (this[kHandle] === undefined)
throw new DOMException('BroadcastChannel is closed.');
if (this[kHandle].postMessage(message) === undefined)
throw new DOMException('Message could not be posted.');
}
ref() {
if (this[kHandle])
this[kHandle].ref();
return this;
}
unref() {
if (this[kHandle])
this[kHandle].unref();
return this;
}
}
defineEventHandler(BroadcastChannel.prototype, 'message');
defineEventHandler(BroadcastChannel.prototype, 'messageerror');
module.exports = {
drainMessagePort,
messageTypes,
@@ -339,5 +417,6 @@ module.exports = {
setupPortReferencing,
ReadableWorkerStdio,
WritableWorkerStdio,
createWorkerStdio
createWorkerStdio,
BroadcastChannel,
};

View File

@@ -13,6 +13,7 @@ const {
MessageChannel,
moveMessagePortToContext,
receiveMessageOnPort,
BroadcastChannel,
} = require('internal/worker/io');
const {
@@ -32,4 +33,5 @@ module.exports = {
Worker,
parentPort: null,
workerData: null,
BroadcastChannel,
};

View File

@@ -161,7 +161,6 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
std::move(shared_array_buffers_[i]));
shared_array_buffers.push_back(sab);
}
shared_array_buffers_.clear();
DeserializerDelegate delegate(
this, env, host_objects, shared_array_buffers, wasm_modules_);
@@ -178,7 +177,6 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
deserializer.TransferArrayBuffer(i, ab);
}
array_buffers_.clear();
if (deserializer.ReadHeader(context).IsNothing())
return {};
@@ -517,7 +515,20 @@ void Message::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("transferables", transferables_);
}
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
// TODO(@jasnell): The name here will be an empty string if the
// one-to-one MessageChannel is used. In such cases,
// SiblingGroup::Get() will return nothing and group_ will be
// an empty pointer. @addaleax suggests that the code here
// could be clearer if attaching the SiblingGroup were a
// separate step rather than part of the constructor here.
MessagePortData::MessagePortData(
MessagePort* owner,
const std::string& name)
: owner_(owner),
group_(SiblingGroup::Get(name)) {
if (group_)
group_->Entangle(this);
}
MessagePortData::~MessagePortData() {
CHECK_NULL(owner_);
@@ -529,7 +540,7 @@ void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("incoming_messages", incoming_messages_);
}
void MessagePortData::AddToIncomingQueue(Message&& message) {
void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
// This function will be called by other threads.
Mutex::ScopedLock lock(mutex_);
incoming_messages_.emplace_back(std::move(message));
@@ -541,32 +552,17 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
}
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
CHECK_NULL(a->sibling_);
CHECK_NULL(b->sibling_);
a->sibling_ = b;
b->sibling_ = a;
a->sibling_mutex_ = b->sibling_mutex_;
CHECK(!a->group_);
CHECK(!b->group_);
b->group_ = a->group_ = std::make_shared<SiblingGroup>();
a->group_->Entangle(a);
a->group_->Entangle(b);
}
void MessagePortData::Disentangle() {
// Grab a copy of the sibling mutex, then replace it so that each sibling
// has its own sibling_mutex_ now.
std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
Mutex::ScopedLock sibling_lock(*sibling_mutex);
sibling_mutex_ = std::make_shared<Mutex>();
MessagePortData* sibling = sibling_;
if (sibling_ != nullptr) {
sibling_->sibling_ = nullptr;
sibling_ = nullptr;
}
// We close MessagePorts after disentanglement, so we enqueue a corresponding
// message and trigger the corresponding uv_async_t to let them know that
// this happened.
AddToIncomingQueue(Message());
if (sibling != nullptr) {
sibling->AddToIncomingQueue(Message());
if (group_) {
group_->Disentangle(this);
group_.reset();
}
}
@@ -576,12 +572,13 @@ MessagePort::~MessagePort() {
MessagePort::MessagePort(Environment* env,
Local<Context> context,
Local<Object> wrap)
Local<Object> wrap,
const std::string& name)
: HandleWrap(env,
wrap,
reinterpret_cast<uv_handle_t*>(&async_),
AsyncWrap::PROVIDER_MESSAGEPORT),
data_(new MessagePortData(this)) {
data_(new MessagePortData(this, name)) {
auto onmessage = [](uv_async_t* handle) {
// Called when data has been put into the queue.
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
@@ -647,7 +644,8 @@ void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
MessagePort* MessagePort::New(
Environment* env,
Local<Context> context,
std::unique_ptr<MessagePortData> data) {
std::unique_ptr<MessagePortData> data,
const std::string& name) {
Context::Scope context_scope(context);
Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
@@ -656,7 +654,7 @@ MessagePort* MessagePort::New(
Local<Object> instance;
if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
return nullptr;
MessagePort* port = new MessagePort(env, context, instance);
MessagePort* port = new MessagePort(env, context, instance, name);
CHECK_NOT_NULL(port);
if (port->IsHandleClosing()) {
// Construction failed with an exception.
@@ -681,7 +679,7 @@ MessagePort* MessagePort::New(
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
bool only_if_receiving) {
Message received;
std::shared_ptr<Message> received;
{
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
@@ -695,22 +693,22 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
// receive is not the final "close" message.
if (data_->incoming_messages_.empty() ||
(!wants_message &&
!data_->incoming_messages_.front().IsCloseMessage())) {
!data_->incoming_messages_.front()->IsCloseMessage())) {
return env()->no_message_symbol();
}
received = std::move(data_->incoming_messages_.front());
received = data_->incoming_messages_.front();
data_->incoming_messages_.pop_front();
}
if (received.IsCloseMessage()) {
if (received->IsCloseMessage()) {
Close();
return env()->no_message_symbol();
}
if (!env()->can_call_into_js()) return MaybeLocal<Value>();
return received.Deserialize(env(), context);
return received->Deserialize(env(), context);
}
void MessagePort::OnMessage() {
@@ -829,13 +827,13 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
Local<Object> obj = object(isolate);
Local<Context> context = obj->CreationContext();
Message msg;
std::shared_ptr<Message> msg = std::make_shared<Message>();
// Per spec, we need to both check if transfer list has the source port, and
// serialize the input message, even if the MessagePort is closed or detached.
Maybe<bool> serialization_maybe =
msg.Serialize(env, context, message_v, transfer_v, obj);
msg->Serialize(env, context, message_v, transfer_v, obj);
if (data_ == nullptr) {
return serialization_maybe;
}
@@ -843,26 +841,26 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
return Nothing<bool>();
}
Mutex::ScopedLock lock(*data_->sibling_mutex_);
bool doomed = false;
std::string error;
Maybe<bool> res = data_->Dispatch(msg, &error);
if (res.IsNothing())
return res;
// Check if the target port is posted to itself.
if (data_->sibling_ != nullptr) {
for (const auto& transferable : msg.transferables()) {
if (data_->sibling_ == transferable.get()) {
doomed = true;
ProcessEmitWarning(env, "The target port was posted to itself, and "
"the communication channel was lost");
break;
}
}
if (!error.empty())
ProcessEmitWarning(env, error.c_str());
return res;
}
Maybe<bool> MessagePortData::Dispatch(
std::shared_ptr<Message> message,
std::string* error) {
if (!group_) {
if (error != nullptr)
*error = "MessagePortData is not entangled.";
return Nothing<bool>();
}
if (data_->sibling_ == nullptr || doomed)
return Just(true);
data_->sibling_->AddToIncomingQueue(std::move(msg));
return Just(true);
return group_->Dispatch(this, message, error);
}
static Maybe<bool> ReadIterable(Environment* env,
@@ -969,7 +967,9 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
return;
}
port->PostMessage(env, args[0], transfer_list);
Maybe<bool> res = port->PostMessage(env, args[0], transfer_list);
if (res.IsJust())
args.GetReturnValue().Set(res.FromJust());
}
void MessagePort::Start() {
@@ -1273,6 +1273,99 @@ Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
return ret;
}
std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
if (name.empty()) return {};
Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
std::shared_ptr<SiblingGroup> group;
auto i = groups_.find(name);
if (i == groups_.end() || i->second.expired()) {
group = std::make_shared<SiblingGroup>(name);
groups_[name] = group;
} else {
group = i->second.lock();
}
return group;
}
void SiblingGroup::CheckSiblingGroup(const std::string& name) {
Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
auto i = groups_.find(name);
if (i != groups_.end() && i->second.expired())
groups_.erase(name);
}
SiblingGroup::SiblingGroup(const std::string& name)
: name_(name) { }
SiblingGroup::~SiblingGroup() {
// If this is a named group, check to see if we can remove the group
if (!name_.empty())
CheckSiblingGroup(name_);
}
Maybe<bool> SiblingGroup::Dispatch(
MessagePortData* source,
std::shared_ptr<Message> message,
std::string* error) {
Mutex::ScopedLock lock(group_mutex_);
// The source MessagePortData is not part of this group.
if (ports_.find(source) == ports_.end()) {
if (error != nullptr)
*error = "Source MessagePort is not entangled with this group.";
return Nothing<bool>();
}
// There are no destination ports.
if (size() <= 1)
return Just(false);
// Transferables cannot be used when there is more
// than a single destination.
if (size() > 2 && message->transferables().size()) {
if (error != nullptr)
*error = "Transferables cannot be used with multiple destinations.";
return Nothing<bool>();
}
for (MessagePortData* port : ports_) {
if (port == source)
continue;
// This loop should only be entered if there's only a single destination
for (const auto& transferable : message->transferables()) {
if (port == transferable.get()) {
if (error != nullptr) {
*error = "The target port was posted to itself, and the "
"communication channel was lost";
}
return Just(true);
}
}
port->AddToIncomingQueue(message);
}
return Just(true);
}
void SiblingGroup::Entangle(MessagePortData* data) {
Mutex::ScopedLock lock(group_mutex_);
ports_.insert(data);
}
void SiblingGroup::Disentangle(MessagePortData* data) {
Mutex::ScopedLock lock(group_mutex_);
ports_.erase(data);
data->AddToIncomingQueue(std::make_shared<Message>());
// If this is an anonymous group and there's another port, close it.
if (size() == 1 && name_.empty())
(*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
}
SiblingGroup::Map SiblingGroup::groups_;
Mutex SiblingGroup::groups_mutex_;
namespace {
static void SetDeserializerCreateObjectFunction(
@@ -1308,6 +1401,16 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
.Check();
}
static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsString());
Environment* env = Environment::GetCurrent(args);
Context::Scope context_scope(env->context());
Utf8Value name(env->isolate(), args[0]);
MessagePort* port =
MessagePort::New(env, env->context(), nullptr, std::string(*name));
args.GetReturnValue().Set(port->object());
}
static void InitMessaging(Local<Object> target,
Local<Value> unused,
Local<Context> context,
@@ -1352,6 +1455,7 @@ static void InitMessaging(Local<Object> target,
MessagePort::MoveToContext);
env->SetMethod(target, "setDeserializerCreateObjectFunction",
SetDeserializerCreateObjectFunction);
env->SetMethod(target, "broadcastChannel", BroadcastChannel);
{
Local<Function> domexception = GetDOMException(context).ToLocalChecked();
@@ -1365,6 +1469,7 @@ static void InitMessaging(Local<Object> target,
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(MessageChannel);
registry->Register(BroadcastChannel);
registry->Register(JSTransferable::New);
registry->Register(MessagePort::New);
registry->Register(MessagePort::PostMessage);

View File

@@ -5,7 +5,11 @@
#include "env.h"
#include "node_mutex.h"
#include <list>
#include "v8.h"
#include <deque>
#include <string>
#include <unordered_map>
#include <set>
namespace node {
namespace worker {
@@ -45,6 +49,7 @@ class Message : public MemoryRetainer {
// V8 ValueSerializer API. If `payload` is empty, this message indicates
// that the receiving message port should close itself.
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
~Message() = default;
Message(Message&& other) = default;
Message& operator=(Message&& other) = default;
@@ -105,11 +110,58 @@ class Message : public MemoryRetainer {
friend class MessagePort;
};
class SiblingGroup {
public:
// Named SiblingGroup, Used for one-to-many BroadcastChannels.
static std::shared_ptr<SiblingGroup> Get(const std::string& name);
// Anonymous SiblingGroup, Used for one-to-one MessagePort pairs.
SiblingGroup() = default;
explicit SiblingGroup(const std::string& name);
~SiblingGroup();
// Dispatches the Message to the collection of associated
// ports. If there is more than one destination port and
// the Message contains transferables, Dispatch will fail.
// Returns Just(true) if successful and the message was
// dispatched to at least one destination. Returns Just(false)
// if there were no destinations. Returns Nothing<bool>()
// if there was an error. If error is not nullptr, it will
// be set to an error message or warning message as appropriate.
v8::Maybe<bool> Dispatch(
MessagePortData* source,
std::shared_ptr<Message> message,
std::string* error = nullptr);
void Entangle(MessagePortData* data);
void Disentangle(MessagePortData* data);
const std::string& name() const { return name_; }
size_t size() const { return ports_.size(); }
private:
std::string name_;
std::set<MessagePortData*> ports_;
Mutex group_mutex_;
static void CheckSiblingGroup(const std::string& name);
using Map =
std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>;
static Mutex groups_mutex_;
static Map groups_;
};
// This contains all data for a `MessagePort` instance that is not tied to
// a specific Environment/Isolate/event loop, for easier transfer between those.
class MessagePortData : public TransferData {
public:
explicit MessagePortData(MessagePort* owner);
explicit MessagePortData(
MessagePort* owner,
const std::string& name = std::string());
~MessagePortData() override;
MessagePortData(MessagePortData&& other) = delete;
@@ -119,7 +171,10 @@ class MessagePortData : public TransferData {
// Add a message to the incoming queue and notify the receiver.
// This may be called from any thread.
void AddToIncomingQueue(Message&& message);
void AddToIncomingQueue(std::shared_ptr<Message> message);
v8::Maybe<bool> Dispatch(
std::shared_ptr<Message> message,
std::string* error = nullptr);
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
@@ -144,14 +199,9 @@ class MessagePortData : public TransferData {
// This mutex protects all fields below it, with the exception of
// sibling_.
mutable Mutex mutex_;
std::list<Message> incoming_messages_;
std::deque<std::shared_ptr<Message>> incoming_messages_;
MessagePort* owner_ = nullptr;
// This mutex protects the sibling_ field and is shared between two entangled
// MessagePorts. If both mutexes are acquired, this one needs to be
// acquired first.
std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>();
MessagePortData* sibling_ = nullptr;
std::shared_ptr<SiblingGroup> group_;
friend class MessagePort;
};
@@ -166,7 +216,8 @@ class MessagePort : public HandleWrap {
// creating MessagePort instances.
MessagePort(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Object> wrap);
v8::Local<v8::Object> wrap,
const std::string& name = std::string());
public:
~MessagePort() override;
@@ -175,7 +226,8 @@ class MessagePort : public HandleWrap {
// `MessagePortData` object.
static MessagePort* New(Environment* env,
v8::Local<v8::Context> context,
std::unique_ptr<MessagePortData> data = nullptr);
std::unique_ptr<MessagePortData> data = nullptr,
const std::string& name = std::string());
// Send a message, i.e. deliver it into the sibling's incoming queue.
// If this port is closed, or if there is no sibling, this message is

View File

@@ -0,0 +1,148 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const {
BroadcastChannel,
} = require('worker_threads');
{
const c1 = new BroadcastChannel('eventType').unref();
const c2 = new BroadcastChannel('eventType');
c2.onmessage = common.mustCall((e) => {
assert(e instanceof MessageEvent);
assert.strictEqual(e.target, c2);
assert.strictEqual(e.type, 'message');
assert.strictEqual(e.data, 'hello world');
c2.close();
});
c1.postMessage('hello world');
}
{
// Messages are delivered in port creation order.
// TODO(@jasnell): The ordering here is different than
// what the browsers would implement due to the different
// dispatching algorithm under the covers. What's not
// immediately clear is whether the ordering is spec
// mandated. In this test, c1 should receive events
// first, then c2, then c3. In the Node.js dispatching
// algorithm this means the ordering is:
// from c3 (c1 from c3)
// done (c1 from c2)
// from c1 (c2 from c1)
// from c3 (c2 from c3)
// from c1 (c3 from c1)
// done (c3 from c2)
//
// Whereas in the browser-ordering (as illustrated in the
// Web Platform Tests) it would be:
// from c1 (c2 from c1)
// from c1 (c3 from c1)
// from c3 (c1 from c3)
// from c3 (c2 from c3)
// done (c1 from c2)
// done (c3 from c2)
const c1 = new BroadcastChannel('order');
const c2 = new BroadcastChannel('order');
const c3 = new BroadcastChannel('order');
const events = [];
let doneCount = 0;
const handler = common.mustCall((e) => {
events.push(e);
if (e.data === 'done') {
doneCount++;
if (doneCount === 2) {
assert.strictEqual(events.length, 6);
assert.strictEqual(events[0].data, 'from c3');
assert.strictEqual(events[1].data, 'done');
assert.strictEqual(events[2].data, 'from c1');
assert.strictEqual(events[3].data, 'from c3');
assert.strictEqual(events[4].data, 'from c1');
assert.strictEqual(events[5].data, 'done');
c1.close();
c2.close();
c3.close();
}
}
}, 6);
c1.onmessage = handler;
c2.onmessage = handler;
c3.onmessage = handler;
c1.postMessage('from c1');
c3.postMessage('from c3');
c2.postMessage('done');
}
{
// Messages aren't delivered to a closed port
const c1 = new BroadcastChannel('closed1').unref();
const c2 = new BroadcastChannel('closed1');
const c3 = new BroadcastChannel('closed1');
c2.onmessage = common.mustNotCall();
c2.close();
c3.onmessage = common.mustCall(() => c3.close());
c1.postMessage('test');
}
{
// Messages aren't delivered to a port closed after calling postMessage.
const c1 = new BroadcastChannel('closed2').unref();
const c2 = new BroadcastChannel('closed2');
const c3 = new BroadcastChannel('closed2');
c2.onmessage = common.mustNotCall();
c3.onmessage = common.mustCall(() => c3.close());
c1.postMessage('test');
c2.close();
}
{
// Closing and creating channels during message delivery works correctly
const c1 = new BroadcastChannel('create-in-onmessage').unref();
const c2 = new BroadcastChannel('create-in-onmessage');
c2.onmessage = common.mustCall((e) => {
assert.strictEqual(e.data, 'first');
c2.close();
const c3 = new BroadcastChannel('create-in-onmessage');
c3.onmessage = common.mustCall((event) => {
assert.strictEqual(event.data, 'done');
c3.close();
});
c1.postMessage('done');
});
c1.postMessage('first');
c2.postMessage('second');
}
{
// Closing a channel in onmessage prevents already queued tasks
// from firing onmessage events
const c1 = new BroadcastChannel('close-in-onmessage2').unref();
const c2 = new BroadcastChannel('close-in-onmessage2');
const c3 = new BroadcastChannel('close-in-onmessage2');
const events = [];
c1.onmessage = (e) => events.push('c1: ' + e.data);
c2.onmessage = (e) => events.push('c2: ' + e.data);
c3.onmessage = (e) => events.push('c3: ' + e.data);
// c2 closes itself when it receives the first message
c2.addEventListener('message', common.mustCall(() => c2.close()));
c3.addEventListener('message', common.mustCall((e) => {
if (e.data === 'done') {
assert.deepStrictEqual(events, [
'c2: first',
'c3: first',
'c3: done']);
c3.close();
}
}, 2));
c1.postMessage('first');
c1.postMessage('done');
}

View File

@@ -0,0 +1,135 @@
'use strict';
const common = require('../common');
const {
BroadcastChannel,
Worker,
} = require('worker_threads');
const assert = require('assert');
assert.throws(() => new BroadcastChannel(Symbol('test')), {
message: /Cannot convert a Symbol value to a string/
});
assert.throws(() => new BroadcastChannel(), {
message: /The "name" argument must be specified/
});
// These should all just work
[undefined, 1, null, 'test', 1n, false, Infinity].forEach((i) => {
const bc = new BroadcastChannel(i);
assert.strictEqual(bc.name, `${i}`);
bc.close();
});
{
// Empty postMessage throws
const bc = new BroadcastChannel('whatever');
assert.throws(() => bc.postMessage(), {
message: /The "message" argument must be specified/
});
bc.close();
// Calling close multiple times should not throw
bc.close();
// Calling postMessage after close should throw
assert.throws(() => bc.postMessage(null), {
message: /BroadcastChannel is closed/
});
}
{
const bc1 = new BroadcastChannel('channel1');
const bc2 = new BroadcastChannel('channel1');
const bc3 = new BroadcastChannel('channel1');
const bc4 = new BroadcastChannel('channel2');
assert.strictEqual(bc1.name, 'channel1');
assert.strictEqual(bc2.name, 'channel1');
assert.strictEqual(bc3.name, 'channel1');
assert.strictEqual(bc4.name, 'channel2');
bc1.addEventListener('message', common.mustCall((event) => {
assert.strictEqual(event.data, 'hello');
bc1.close();
bc2.close();
bc4.close();
}));
bc3.addEventListener('message', common.mustCall((event) => {
assert.strictEqual(event.data, 'hello');
bc3.close();
}));
bc2.addEventListener('message', common.mustNotCall());
bc4.addEventListener('message', common.mustNotCall());
bc2.postMessage('hello');
}
{
const bc1 = new BroadcastChannel('onmessage-channel1');
const bc2 = new BroadcastChannel('onmessage-channel1');
const bc3 = new BroadcastChannel('onmessage-channel1');
const bc4 = new BroadcastChannel('onmessage-channel2');
assert.strictEqual(bc1.name, 'onmessage-channel1');
assert.strictEqual(bc2.name, 'onmessage-channel1');
assert.strictEqual(bc3.name, 'onmessage-channel1');
assert.strictEqual(bc4.name, 'onmessage-channel2');
bc1.onmessage = common.mustCall((event) => {
assert.strictEqual(event.data, 'hello');
bc1.close();
bc2.close();
bc4.close();
});
bc3.onmessage = common.mustCall((event) => {
assert.strictEqual(event.data, 'hello');
bc3.close();
});
bc2.onmessage = common.mustNotCall();
bc4.onmessage = common.mustNotCall();
bc2.postMessage('hello');
}
{
const bc = new BroadcastChannel('worker1');
new Worker(`
const assert = require('assert');
const { BroadcastChannel } = require('worker_threads');
const bc = new BroadcastChannel('worker1');
bc.addEventListener('message', (event) => {
assert.strictEqual(event.data, 123);
// If this close() is not executed, the test should hang and timeout.
// If the test does hang and timeout in CI, then the first step should
// be to check that the two bc.close() calls are being made.
bc.close();
});
bc.postMessage(321);
`, { eval: true });
bc.addEventListener('message', common.mustCall(({ data }) => {
assert.strictEqual(data, 321);
bc.postMessage(123);
bc.close();
}));
}
{
const bc1 = new BroadcastChannel('channel3');
const bc2 = new BroadcastChannel('channel3');
bc2.postMessage(new SharedArrayBuffer(10));
bc1.addEventListener('message', common.mustCall(({ data }) => {
assert(data instanceof SharedArrayBuffer);
bc1.close();
bc2.close();
}));
}
{
const bc1 = new BroadcastChannel('channel3');
const mc = new MessageChannel();
assert.throws(() => bc1.postMessage(mc), {
message: /Object that needs transfer was found/
});
assert.throws(() => bc1.postMessage(Symbol()), {
message: /Symbol\(\) could not be cloned/
});
bc1.close();
assert.throws(() => bc1.postMessage(Symbol()), {
message: /BroadcastChannel is closed/
});
}

View File

@@ -40,6 +40,10 @@ const customTypesMap = {
'WebAssembly.Instance':
`${jsDocPrefix}Reference/Global_Objects/WebAssembly/Instance`,
'BroadcastChannel':
'worker_threads.html#worker_threads_class_broadcastchannel_' +
'extends_eventtarget',
'Iterable':
`${jsDocPrefix}Reference/Iteration_protocols#The_iterable_protocol`,
'Iterator':