mirror of
https://github.com/zebrajr/node.git
synced 2026-01-15 12:15:26 +00:00
src: remove AsyncRequest
Remove `AsyncRequest` from the source code, and replace its usage with threadsafe `SetImmediate()` calls. This has the advantage of being able to pass in any function, rather than one that is defined when the `AsyncRequest` is “installed”. This necessitates two changes: - The stopping flag (which was only used in one case and ignored in the other) is now a direct member of the `Environment` class. - Workers no longer have their own libuv handles, requiring manual management of their libuv ref count. As a drive-by fix, the `can_call_into_js` variable was turned into an atomic variable. While there have been no bug reports, the flag is set from `Stop(env)` calls, which are supposed to be possible from any thread. PR-URL: https://github.com/nodejs/node/pull/31386 Refs: https://github.com/openjs-foundation/summit/pull/240 Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
@@ -897,8 +897,21 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) {
|
||||
sub_worker_contexts_.erase(context);
|
||||
}
|
||||
|
||||
inline void Environment::add_refs(int64_t diff) {
|
||||
task_queues_async_refs_ += diff;
|
||||
CHECK_GE(task_queues_async_refs_, 0);
|
||||
if (task_queues_async_refs_ == 0)
|
||||
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
|
||||
else
|
||||
uv_ref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
|
||||
}
|
||||
|
||||
inline bool Environment::is_stopping() const {
|
||||
return thread_stopper_.is_stopped();
|
||||
return is_stopping_.load();
|
||||
}
|
||||
|
||||
inline void Environment::set_stopping(bool value) {
|
||||
is_stopping_.store(value);
|
||||
}
|
||||
|
||||
inline std::list<node_module>* Environment::extra_linked_bindings() {
|
||||
@@ -1218,14 +1231,6 @@ int64_t Environment::base_object_count() const {
|
||||
return base_object_count_;
|
||||
}
|
||||
|
||||
bool AsyncRequest::is_stopped() const {
|
||||
return stopped_.load();
|
||||
}
|
||||
|
||||
void AsyncRequest::set_stopped(bool flag) {
|
||||
stopped_.store(flag);
|
||||
}
|
||||
|
||||
#define VP(PropertyName, StringValue) V(v8::Private, PropertyName)
|
||||
#define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName)
|
||||
#define VS(PropertyName, StringValue) V(v8::String, PropertyName)
|
||||
|
||||
46
src/env.cc
46
src/env.cc
@@ -471,14 +471,6 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
|
||||
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
|
||||
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
|
||||
|
||||
thread_stopper()->Install(
|
||||
this, static_cast<void*>(this), [](uv_async_t* handle) {
|
||||
Environment* env = static_cast<Environment*>(handle->data);
|
||||
uv_stop(env->event_loop());
|
||||
});
|
||||
thread_stopper()->set_stopped(false);
|
||||
uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper()->GetHandle()));
|
||||
|
||||
// Register clean-up cb to be called to clean up the handles
|
||||
// when the environment is freed, note that they are not cleaned in
|
||||
// the one environment per process setup, but will be called in
|
||||
@@ -496,8 +488,9 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
|
||||
|
||||
void Environment::ExitEnv() {
|
||||
set_can_call_into_js(false);
|
||||
thread_stopper()->Stop();
|
||||
set_stopping(true);
|
||||
isolate_->TerminateExecution();
|
||||
SetImmediateThreadsafe([](Environment* env) { uv_stop(env->event_loop()); });
|
||||
}
|
||||
|
||||
void Environment::RegisterHandleCleanups() {
|
||||
@@ -602,7 +595,6 @@ void Environment::RunCleanup() {
|
||||
started_cleanup_ = true;
|
||||
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
|
||||
"RunCleanup", this);
|
||||
thread_stopper()->Uninstall();
|
||||
CleanupHandles();
|
||||
|
||||
while (!cleanup_hooks_.empty()) {
|
||||
@@ -1014,7 +1006,6 @@ inline size_t Environment::SelfSize() const {
|
||||
// TODO(joyeecheung): refactor the MemoryTracker interface so
|
||||
// this can be done for common types within the Track* calls automatically
|
||||
// if a certain scope is entered.
|
||||
size -= sizeof(thread_stopper_);
|
||||
size -= sizeof(async_hooks_);
|
||||
size -= sizeof(tick_info_);
|
||||
size -= sizeof(immediate_info_);
|
||||
@@ -1036,7 +1027,6 @@ void Environment::MemoryInfo(MemoryTracker* tracker) const {
|
||||
tracker->TrackField("fs_stats_field_array", fs_stats_field_array_);
|
||||
tracker->TrackField("fs_stats_field_bigint_array",
|
||||
fs_stats_field_bigint_array_);
|
||||
tracker->TrackField("thread_stopper", thread_stopper_);
|
||||
tracker->TrackField("cleanup_hooks", cleanup_hooks_);
|
||||
tracker->TrackField("async_hooks", async_hooks_);
|
||||
tracker->TrackField("immediate_info", immediate_info_);
|
||||
@@ -1100,38 +1090,6 @@ void Environment::CleanupFinalizationGroups() {
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
|
||||
CHECK_NULL(async_);
|
||||
env_ = env;
|
||||
async_ = new uv_async_t;
|
||||
async_->data = data;
|
||||
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
|
||||
}
|
||||
|
||||
void AsyncRequest::Uninstall() {
|
||||
if (async_ != nullptr) {
|
||||
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
|
||||
async_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncRequest::Stop() {
|
||||
set_stopped(true);
|
||||
if (async_ != nullptr) uv_async_send(async_);
|
||||
}
|
||||
|
||||
uv_async_t* AsyncRequest::GetHandle() {
|
||||
return async_;
|
||||
}
|
||||
|
||||
void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
|
||||
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
|
||||
}
|
||||
|
||||
AsyncRequest::~AsyncRequest() {
|
||||
CHECK_NULL(async_);
|
||||
}
|
||||
|
||||
// Not really any better place than env.cc at this moment.
|
||||
void BaseObject::DeleteMe(void* data) {
|
||||
BaseObject* self = static_cast<BaseObject*>(data);
|
||||
|
||||
47
src/env.h
47
src/env.h
@@ -587,34 +587,6 @@ struct AllocatedBuffer {
|
||||
friend class Environment;
|
||||
};
|
||||
|
||||
class AsyncRequest : public MemoryRetainer {
|
||||
public:
|
||||
AsyncRequest() = default;
|
||||
~AsyncRequest() override;
|
||||
|
||||
AsyncRequest(const AsyncRequest&) = delete;
|
||||
AsyncRequest& operator=(const AsyncRequest&) = delete;
|
||||
AsyncRequest(AsyncRequest&&) = delete;
|
||||
AsyncRequest& operator=(AsyncRequest&&) = delete;
|
||||
|
||||
void Install(Environment* env, void* data, uv_async_cb target);
|
||||
void Uninstall();
|
||||
void Stop();
|
||||
inline void set_stopped(bool flag);
|
||||
inline bool is_stopped() const;
|
||||
uv_async_t* GetHandle();
|
||||
void MemoryInfo(MemoryTracker* tracker) const override;
|
||||
|
||||
|
||||
SET_MEMORY_INFO_NAME(AsyncRequest)
|
||||
SET_SELF_SIZE(AsyncRequest)
|
||||
|
||||
private:
|
||||
Environment* env_;
|
||||
uv_async_t* async_ = nullptr;
|
||||
std::atomic_bool stopped_ {true};
|
||||
};
|
||||
|
||||
class KVStore {
|
||||
public:
|
||||
KVStore() = default;
|
||||
@@ -1065,6 +1037,14 @@ class Environment : public MemoryRetainer {
|
||||
inline bool can_call_into_js() const;
|
||||
inline void set_can_call_into_js(bool can_call_into_js);
|
||||
|
||||
// Increase or decrease a counter that manages whether this Environment
|
||||
// keeps the event loop alive on its own or not. The counter starts out at 0,
|
||||
// meaning it does not, and any positive value will make it keep the event
|
||||
// loop alive.
|
||||
// This is used by Workers to manage their own .ref()/.unref() implementation,
|
||||
// as Workers aren't directly associated with their own libuv handles.
|
||||
inline void add_refs(int64_t diff);
|
||||
|
||||
inline bool has_run_bootstrapping_code() const;
|
||||
inline void set_has_run_bootstrapping_code(bool has_run_bootstrapping_code);
|
||||
|
||||
@@ -1085,6 +1065,7 @@ class Environment : public MemoryRetainer {
|
||||
inline void remove_sub_worker_context(worker::Worker* context);
|
||||
void stop_sub_worker_contexts();
|
||||
inline bool is_stopping() const;
|
||||
inline void set_stopping(bool value);
|
||||
inline std::list<node_module>* extra_linked_bindings();
|
||||
inline node_module* extra_linked_bindings_head();
|
||||
inline const Mutex& extra_linked_bindings_mutex() const;
|
||||
@@ -1226,8 +1207,6 @@ class Environment : public MemoryRetainer {
|
||||
inline std::shared_ptr<EnvironmentOptions> options();
|
||||
inline std::shared_ptr<HostPort> inspector_host_port();
|
||||
|
||||
inline AsyncRequest* thread_stopper() { return &thread_stopper_; }
|
||||
|
||||
// The BaseObject count is a debugging helper that makes sure that there are
|
||||
// no memory leaks caused by BaseObjects staying alive longer than expected
|
||||
// (in particular, no circular BaseObjectPtr references).
|
||||
@@ -1288,6 +1267,7 @@ class Environment : public MemoryRetainer {
|
||||
uv_prepare_t idle_prepare_handle_;
|
||||
uv_check_t idle_check_handle_;
|
||||
uv_async_t task_queues_async_;
|
||||
int64_t task_queues_async_refs_ = 0;
|
||||
bool profiler_idle_notifier_started_ = false;
|
||||
|
||||
AsyncHooks async_hooks_;
|
||||
@@ -1345,7 +1325,7 @@ class Environment : public MemoryRetainer {
|
||||
bool has_run_bootstrapping_code_ = false;
|
||||
bool has_serialized_options_ = false;
|
||||
|
||||
bool can_call_into_js_ = true;
|
||||
std::atomic_bool can_call_into_js_ { true };
|
||||
Flags flags_;
|
||||
uint64_t thread_id_;
|
||||
std::unordered_set<worker::Worker*> sub_worker_contexts_;
|
||||
@@ -1463,10 +1443,7 @@ class Environment : public MemoryRetainer {
|
||||
bool started_cleanup_ = false;
|
||||
|
||||
int64_t base_object_count_ = 0;
|
||||
|
||||
// A custom async abstraction (a pair of async handle and a state variable)
|
||||
// Used by embedders to shutdown running Node instance.
|
||||
AsyncRequest thread_stopper_;
|
||||
std::atomic_bool is_stopping_ { false };
|
||||
|
||||
template <typename T>
|
||||
void ForEachBaseObject(T&& iterator);
|
||||
|
||||
@@ -268,7 +268,7 @@ void Worker::Run() {
|
||||
stopped_ = true;
|
||||
this->env_ = nullptr;
|
||||
}
|
||||
env_->thread_stopper()->set_stopped(true);
|
||||
env_->set_stopping(true);
|
||||
env_->stop_sub_worker_contexts();
|
||||
env_->RunCleanup();
|
||||
RunAtExit(env_.get());
|
||||
@@ -412,7 +412,6 @@ void Worker::JoinThread() {
|
||||
thread_joined_ = true;
|
||||
|
||||
env()->remove_sub_worker_context(this);
|
||||
on_thread_finished_.Uninstall();
|
||||
|
||||
{
|
||||
HandleScope handle_scope(env()->isolate());
|
||||
@@ -439,6 +438,8 @@ void Worker::JoinThread() {
|
||||
}
|
||||
|
||||
Worker::~Worker() {
|
||||
JoinThread();
|
||||
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
|
||||
CHECK(stopped_);
|
||||
@@ -574,18 +575,16 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
|
||||
w->stopped_ = false;
|
||||
w->thread_joined_ = false;
|
||||
|
||||
w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
|
||||
Worker* w_ = static_cast<Worker*>(handle->data);
|
||||
CHECK(w_->is_stopped());
|
||||
w_->parent_port_ = nullptr;
|
||||
w_->JoinThread();
|
||||
delete w_;
|
||||
});
|
||||
if (w->has_ref_)
|
||||
w->env()->add_refs(1);
|
||||
|
||||
uv_thread_options_t thread_options;
|
||||
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
|
||||
thread_options.stack_size = kStackSize;
|
||||
CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
|
||||
// XXX: This could become a std::unique_ptr, but that makes at least
|
||||
// gcc 6.3 detect undefined behaviour when there shouldn't be any.
|
||||
// gcc 7+ handles this well.
|
||||
Worker* w = static_cast<Worker*>(arg);
|
||||
const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
|
||||
|
||||
@@ -596,7 +595,12 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
|
||||
w->Run();
|
||||
|
||||
Mutex::ScopedLock lock(w->mutex_);
|
||||
w->on_thread_finished_.Stop();
|
||||
w->env()->SetImmediateThreadsafe(
|
||||
[w = std::unique_ptr<Worker>(w)](Environment* env) {
|
||||
if (w->has_ref_)
|
||||
env->add_refs(-1);
|
||||
// implicitly delete w
|
||||
});
|
||||
}, static_cast<void*>(w)), 0);
|
||||
}
|
||||
|
||||
@@ -611,13 +615,19 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
|
||||
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
|
||||
Worker* w;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
|
||||
uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
|
||||
if (!w->has_ref_) {
|
||||
w->has_ref_ = true;
|
||||
w->env()->add_refs(1);
|
||||
}
|
||||
}
|
||||
|
||||
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
|
||||
Worker* w;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
|
||||
uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
|
||||
if (w->has_ref_) {
|
||||
w->has_ref_ = false;
|
||||
w->env()->add_refs(-1);
|
||||
}
|
||||
}
|
||||
|
||||
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
|
||||
|
||||
@@ -41,7 +41,6 @@ class Worker : public AsyncWrap {
|
||||
|
||||
void MemoryInfo(MemoryTracker* tracker) const override {
|
||||
tracker->TrackField("parent_port", parent_port_);
|
||||
tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_");
|
||||
}
|
||||
|
||||
SET_MEMORY_INFO_NAME(Worker)
|
||||
@@ -107,14 +106,14 @@ class Worker : public AsyncWrap {
|
||||
// instance refers to it via its [kPort] property.
|
||||
MessagePort* parent_port_ = nullptr;
|
||||
|
||||
AsyncRequest on_thread_finished_;
|
||||
|
||||
// A raw flag that is used by creator and worker threads to
|
||||
// sync up on pre-mature termination of worker - while in the
|
||||
// warmup phase. Once the worker is fully warmed up, use the
|
||||
// async handle of the worker's Environment for the same purpose.
|
||||
bool stopped_ = true;
|
||||
|
||||
bool has_ref_ = true;
|
||||
|
||||
// The real Environment of the worker object. It has a lesser
|
||||
// lifespan than the worker object itself - comes to life
|
||||
// when the worker thread creates a new Environment, and gets
|
||||
|
||||
Reference in New Issue
Block a user