mirror of
https://github.com/zebrajr/node.git
synced 2026-01-15 12:15:26 +00:00
node-api: faster threadsafe_function
Invoke threadsafe_function during the same tick and avoid marshalling costs between threads and/or churning event loop if either: 1. There's a queued call already 2. `Push()` is called while the main thread was running threadsafe_function PR-URL: https://github.com/nodejs/node/pull/38506 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
@@ -12,6 +12,7 @@
|
||||
#include "tracing/traced_value.h"
|
||||
#include "util-inl.h"
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
|
||||
struct node_napi_env__ : public napi_env__ {
|
||||
@@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource {
|
||||
*v8::String::Utf8Value(env_->isolate, name)),
|
||||
thread_count(thread_count_),
|
||||
is_closing(false),
|
||||
dispatch_state(kDispatchIdle),
|
||||
context(context_),
|
||||
max_queue_size(max_queue_size_),
|
||||
env(env_),
|
||||
@@ -176,10 +178,8 @@ class ThreadSafeFunction : public node::AsyncResource {
|
||||
return napi_closing;
|
||||
}
|
||||
} else {
|
||||
if (uv_async_send(&async) != 0) {
|
||||
return napi_generic_failure;
|
||||
}
|
||||
queue.push(data);
|
||||
Send();
|
||||
return napi_ok;
|
||||
}
|
||||
}
|
||||
@@ -211,9 +211,7 @@ class ThreadSafeFunction : public node::AsyncResource {
|
||||
if (is_closing && max_queue_size > 0) {
|
||||
cond->Signal(lock);
|
||||
}
|
||||
if (uv_async_send(&async) != 0) {
|
||||
return napi_generic_failure;
|
||||
}
|
||||
Send();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -238,7 +236,6 @@ class ThreadSafeFunction : public node::AsyncResource {
|
||||
cond = std::make_unique<node::ConditionVariable>();
|
||||
}
|
||||
if (max_queue_size == 0 || cond) {
|
||||
CHECK_EQ(0, uv_idle_init(loop, &idle));
|
||||
return napi_ok;
|
||||
}
|
||||
|
||||
@@ -263,21 +260,46 @@ class ThreadSafeFunction : public node::AsyncResource {
|
||||
|
||||
napi_status Unref() {
|
||||
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
|
||||
uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
|
||||
|
||||
return napi_ok;
|
||||
}
|
||||
|
||||
napi_status Ref() {
|
||||
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
|
||||
uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
|
||||
|
||||
return napi_ok;
|
||||
}
|
||||
|
||||
void DispatchOne() {
|
||||
inline void* Context() {
|
||||
return context;
|
||||
}
|
||||
|
||||
protected:
|
||||
void Dispatch() {
|
||||
bool has_more = true;
|
||||
|
||||
// Limit maximum synchronous iteration count to prevent event loop
|
||||
// starvation. See `src/node_messaging.cc` for an inspiration.
|
||||
unsigned int iterations_left = kMaxIterationCount;
|
||||
while (has_more && --iterations_left != 0) {
|
||||
dispatch_state = kDispatchRunning;
|
||||
has_more = DispatchOne();
|
||||
|
||||
// Send() was called while we were executing the JS function
|
||||
if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) {
|
||||
has_more = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (has_more) {
|
||||
Send();
|
||||
}
|
||||
}
|
||||
|
||||
bool DispatchOne() {
|
||||
void* data = nullptr;
|
||||
bool popped_value = false;
|
||||
bool has_more = false;
|
||||
|
||||
{
|
||||
node::Mutex::ScopedLock lock(this->mutex);
|
||||
@@ -302,9 +324,9 @@ class ThreadSafeFunction : public node::AsyncResource {
|
||||
cond->Signal(lock);
|
||||
}
|
||||
CloseHandlesAndMaybeDelete();
|
||||
} else {
|
||||
CHECK_EQ(0, uv_idle_stop(&idle));
|
||||
}
|
||||
} else {
|
||||
has_more = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -322,6 +344,8 @@ class ThreadSafeFunction : public node::AsyncResource {
|
||||
call_js_cb(env, js_callback, context, data);
|
||||
});
|
||||
}
|
||||
|
||||
return has_more;
|
||||
}
|
||||
|
||||
void Finalize() {
|
||||
@@ -335,10 +359,6 @@ class ThreadSafeFunction : public node::AsyncResource {
|
||||
EmptyQueueAndDelete();
|
||||
}
|
||||
|
||||
inline void* Context() {
|
||||
return context;
|
||||
}
|
||||
|
||||
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
|
||||
v8::HandleScope scope(env->isolate);
|
||||
if (set_closing) {
|
||||
@@ -358,18 +378,20 @@ class ThreadSafeFunction : public node::AsyncResource {
|
||||
ThreadSafeFunction* ts_fn =
|
||||
node::ContainerOf(&ThreadSafeFunction::async,
|
||||
reinterpret_cast<uv_async_t*>(handle));
|
||||
v8::HandleScope scope(ts_fn->env->isolate);
|
||||
ts_fn->env->node_env()->CloseHandle(
|
||||
reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
|
||||
[](uv_handle_t* handle) -> void {
|
||||
ThreadSafeFunction* ts_fn =
|
||||
node::ContainerOf(&ThreadSafeFunction::idle,
|
||||
reinterpret_cast<uv_idle_t*>(handle));
|
||||
ts_fn->Finalize();
|
||||
});
|
||||
ts_fn->Finalize();
|
||||
});
|
||||
}
|
||||
|
||||
void Send() {
|
||||
// Ask currently running Dispatch() to make one more iteration
|
||||
unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
|
||||
if ((current_state & kDispatchRunning) == kDispatchRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
CHECK_EQ(0, uv_async_send(&async));
|
||||
}
|
||||
|
||||
// Default way of calling into JavaScript. Used when ThreadSafeFunction is
|
||||
// without a call_js_cb_.
|
||||
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
|
||||
@@ -393,16 +415,10 @@ class ThreadSafeFunction : public node::AsyncResource {
|
||||
}
|
||||
}
|
||||
|
||||
static void IdleCb(uv_idle_t* idle) {
|
||||
ThreadSafeFunction* ts_fn =
|
||||
node::ContainerOf(&ThreadSafeFunction::idle, idle);
|
||||
ts_fn->DispatchOne();
|
||||
}
|
||||
|
||||
static void AsyncCb(uv_async_t* async) {
|
||||
ThreadSafeFunction* ts_fn =
|
||||
node::ContainerOf(&ThreadSafeFunction::async, async);
|
||||
CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb));
|
||||
ts_fn->Dispatch();
|
||||
}
|
||||
|
||||
static void Cleanup(void* data) {
|
||||
@@ -411,14 +427,20 @@ class ThreadSafeFunction : public node::AsyncResource {
|
||||
}
|
||||
|
||||
private:
|
||||
static const unsigned char kDispatchIdle = 0;
|
||||
static const unsigned char kDispatchRunning = 1 << 0;
|
||||
static const unsigned char kDispatchPending = 1 << 1;
|
||||
|
||||
static const unsigned int kMaxIterationCount = 1000;
|
||||
|
||||
// These are variables protected by the mutex.
|
||||
node::Mutex mutex;
|
||||
std::unique_ptr<node::ConditionVariable> cond;
|
||||
std::queue<void*> queue;
|
||||
uv_async_t async;
|
||||
uv_idle_t idle;
|
||||
size_t thread_count;
|
||||
bool is_closing;
|
||||
std::atomic_uchar dispatch_state;
|
||||
|
||||
// These are variables set once, upon creation, and then never again, which
|
||||
// means we don't need the mutex to read them.
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
#include <node_api.h>
|
||||
#include "../../js-native-api/common.h"
|
||||
|
||||
#define ARRAY_LENGTH 10
|
||||
#define ARRAY_LENGTH 10000
|
||||
#define MAX_QUEUE_SIZE 2
|
||||
|
||||
static uv_thread_t uv_threads[2];
|
||||
@@ -72,7 +72,7 @@ static void data_source_thread(void* data) {
|
||||
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
|
||||
status = napi_call_threadsafe_function(ts_fn, &ints[index],
|
||||
ts_fn_info->block_on_full);
|
||||
if (ts_fn_info->max_queue_size == 0) {
|
||||
if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
|
||||
// Let's make this thread really busy for 200 ms to give the main thread a
|
||||
// chance to abort.
|
||||
uint64_t start = uv_hrtime();
|
||||
|
||||
@@ -211,6 +211,15 @@ new Promise(function testWithoutJSMarshaller(resolve) {
|
||||
}))
|
||||
.then((result) => assert.strictEqual(result.indexOf(0), -1))
|
||||
|
||||
// Make sure that threadsafe function isn't stalled when we hit
|
||||
// `kMaxIterationCount` in `src/node_api.cc`
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThreadNonblocking',
|
||||
maxQueueSize: binding.ARRAY_LENGTH >>> 1,
|
||||
quitAfter: binding.ARRAY_LENGTH
|
||||
}))
|
||||
.then((result) => assert.deepStrictEqual(result, expectedArray))
|
||||
|
||||
// Start a child process to test rapid teardown
|
||||
.then(() => testUnref(binding.MAX_QUEUE_SIZE))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user