Node.js  v8.x
Node.js is a JavaScript runtime built on Chrome's V8 JavaScript engine
node_platform.cc
Go to the documentation of this file.
1 #include "node_platform.h"
2 
3 #include "util.h"
4 
5 namespace node {
6 
7 using v8::Isolate;
8 using v8::Platform;
9 using v8::Task;
10 using v8::TracingController;
11 
12 static void FlushTasks(uv_async_t* handle) {
13  NodePlatform* platform = static_cast<NodePlatform*>(handle->data);
14  platform->FlushForegroundTasksInternal();
15 }
16 
17 static void BackgroundRunner(void* data) {
18  TaskQueue<Task>* background_tasks = static_cast<TaskQueue<Task>*>(data);
19  while (Task* task = background_tasks->BlockingPop()) {
20  task->Run();
21  delete task;
22  background_tasks->NotifyOfCompletion();
23  }
24 }
25 
26 NodePlatform::NodePlatform(int thread_pool_size, uv_loop_t* loop,
27  TracingController* tracing_controller)
28  : loop_(loop) {
29  CHECK_EQ(0, uv_async_init(loop, &flush_tasks_, FlushTasks));
30  flush_tasks_.data = static_cast<void*>(this);
31  uv_unref(reinterpret_cast<uv_handle_t*>(&flush_tasks_));
32  if (tracing_controller) {
33  tracing_controller_.reset(tracing_controller);
34  } else {
35  TracingController* controller = new TracingController();
36  tracing_controller_.reset(controller);
37  }
38  for (int i = 0; i < thread_pool_size; i++) {
39  uv_thread_t* t = new uv_thread_t();
40  if (uv_thread_create(t, BackgroundRunner, &background_tasks_) != 0) {
41  delete t;
42  break;
43  }
44  threads_.push_back(std::unique_ptr<uv_thread_t>(t));
45  }
46 }
47 
49  background_tasks_.Stop();
50  for (size_t i = 0; i < threads_.size(); i++) {
51  CHECK_EQ(0, uv_thread_join(threads_[i].get()));
52  }
53  // uv_run cannot be called from the time before the beforeExit callback
54  // runs until the program exits unless the event loop has any referenced
55  // handles after beforeExit terminates. This prevents unrefed timers
56  // that happen to terminate during shutdown from being run unsafely.
57  // Since uv_run cannot be called, this handle will never be fully cleaned
58  // up.
59  uv_close(reinterpret_cast<uv_handle_t*>(&flush_tasks_), nullptr);
60 }
61 
63  return threads_.size();
64 }
65 
66 static void RunForegroundTask(uv_timer_t* handle) {
67  Task* task = static_cast<Task*>(handle->data);
68  task->Run();
69  delete task;
70  uv_close(reinterpret_cast<uv_handle_t*>(handle), [](uv_handle_t* handle) {
71  delete reinterpret_cast<uv_timer_t*>(handle);
72  });
73 }
74 
77  background_tasks_.BlockingDrain();
78 }
79 
81  while (auto delayed = foreground_delayed_tasks_.Pop()) {
82  uint64_t delay_millis =
83  static_cast<uint64_t>(delayed->second + 0.5) * 1000;
84  uv_timer_t* handle = new uv_timer_t();
85  handle->data = static_cast<void*>(delayed->first);
86  uv_timer_init(loop_, handle);
87  // Timers may not guarantee queue ordering of events with the same delay if
88  // the delay is non-zero. This should not be a problem in practice.
89  uv_timer_start(handle, RunForegroundTask, delay_millis, 0);
90  uv_unref(reinterpret_cast<uv_handle_t*>(handle));
91  delete delayed;
92  }
93  while (Task* task = foreground_tasks_.Pop()) {
94  task->Run();
95  delete task;
96  }
97 }
98 
100  ExpectedRuntime expected_runtime) {
101  background_tasks_.Push(task);
102 }
103 
104 void NodePlatform::CallOnForegroundThread(Isolate* isolate, Task* task) {
105  foreground_tasks_.Push(task);
106  uv_async_send(&flush_tasks_);
107 }
108 
110  Task* task,
111  double delay_in_seconds) {
112  auto pair = new std::pair<Task*, double>(task, delay_in_seconds);
113  foreground_delayed_tasks_.Push(pair);
114  uv_async_send(&flush_tasks_);
115 }
116 
117 bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }
118 
120  // Convert nanos to seconds.
121  return uv_hrtime() / 1e9;
122 }
123 
125  return tracing_controller_.get();
126 }
127 
128 template <class T>
130  : lock_(), tasks_available_(), tasks_drained_(),
131  outstanding_tasks_(0), stopped_(false), task_queue_() { }
132 
133 template <class T>
134 void TaskQueue<T>::Push(T* task) {
135  Mutex::ScopedLock scoped_lock(lock_);
136  outstanding_tasks_++;
137  task_queue_.push(task);
138  tasks_available_.Signal(scoped_lock);
139 }
140 
141 template <class T>
143  Mutex::ScopedLock scoped_lock(lock_);
144  T* result = nullptr;
145  if (!task_queue_.empty()) {
146  result = task_queue_.front();
147  task_queue_.pop();
148  }
149  return result;
150 }
151 
152 template <class T>
154  Mutex::ScopedLock scoped_lock(lock_);
155  while (task_queue_.empty() && !stopped_) {
156  tasks_available_.Wait(scoped_lock);
157  }
158  if (stopped_) {
159  return nullptr;
160  }
161  T* result = task_queue_.front();
162  task_queue_.pop();
163  return result;
164 }
165 
166 template <class T>
168  Mutex::ScopedLock scoped_lock(lock_);
169  if (--outstanding_tasks_ == 0) {
170  tasks_drained_.Broadcast(scoped_lock);
171  }
172 }
173 
174 template <class T>
176  Mutex::ScopedLock scoped_lock(lock_);
177  while (outstanding_tasks_ > 0) {
178  tasks_drained_.Wait(scoped_lock);
179  }
180 }
181 
182 template <class T>
184  Mutex::ScopedLock scoped_lock(lock_);
185  stopped_ = true;
186  tasks_available_.Broadcast(scoped_lock);
187 }
188 
189 } // namespace node
v8::TracingController * GetTracingController() override
void FlushForegroundTasksInternal()
void Push(T *task)
double MonotonicallyIncreasingTime() override
NodePlatform(int thread_pool_size, uv_loop_t *loop, v8::TracingController *tracing_controller)
union node::cares_wrap::@8::CaresAsyncData::@0 data
bool IdleTasksEnabled(v8::Isolate *isolate) override
void CallOnForegroundThread(v8::Isolate *isolate, v8::Task *task) override
dtrace t
Definition: v8ustack.d:582
void CallDelayedOnForegroundThread(v8::Isolate *isolate, v8::Task *task, double delay_in_seconds) override
size_t NumberOfAvailableBackgroundThreads() override
void CallOnBackgroundThread(v8::Task *task, ExpectedRuntime expected_runtime) override