10 using v8::TracingController;
12 static void FlushTasks(uv_async_t* handle) {
13 NodePlatform* platform =
static_cast<NodePlatform*
>(handle->data);
14 platform->FlushForegroundTasksInternal();
17 static void BackgroundRunner(
void*
data) {
18 TaskQueue<Task>* background_tasks =
static_cast<TaskQueue<Task>*
>(
data);
19 while (Task* task = background_tasks->BlockingPop()) {
22 background_tasks->NotifyOfCompletion();
27 TracingController* tracing_controller)
29 CHECK_EQ(0, uv_async_init(loop, &flush_tasks_, FlushTasks));
30 flush_tasks_.data =
static_cast<void*
>(
this);
31 uv_unref(reinterpret_cast<uv_handle_t*>(&flush_tasks_));
32 if (tracing_controller) {
33 tracing_controller_.reset(tracing_controller);
35 TracingController* controller =
new TracingController();
36 tracing_controller_.reset(controller);
38 for (
int i = 0; i < thread_pool_size; i++) {
39 uv_thread_t*
t =
new uv_thread_t();
40 if (uv_thread_create(t, BackgroundRunner, &background_tasks_) != 0) {
44 threads_.push_back(std::unique_ptr<uv_thread_t>(t));
49 background_tasks_.
Stop();
50 for (
size_t i = 0; i < threads_.size(); i++) {
51 CHECK_EQ(0, uv_thread_join(threads_[i].
get()));
59 uv_close(reinterpret_cast<uv_handle_t*>(&flush_tasks_),
nullptr);
63 return threads_.size();
66 static void RunForegroundTask(uv_timer_t* handle) {
67 Task* task =
static_cast<Task*
>(handle->data);
70 uv_close(reinterpret_cast<uv_handle_t*>(handle), [](uv_handle_t* handle) {
71 delete reinterpret_cast<uv_timer_t*
>(handle);
81 while (
auto delayed = foreground_delayed_tasks_.
Pop()) {
82 uint64_t delay_millis =
83 static_cast<uint64_t
>(delayed->second + 0.5) * 1000;
84 uv_timer_t* handle =
new uv_timer_t();
85 handle->data =
static_cast<void*
>(delayed->first);
86 uv_timer_init(loop_, handle);
89 uv_timer_start(handle, RunForegroundTask, delay_millis, 0);
90 uv_unref(reinterpret_cast<uv_handle_t*>(handle));
93 while (Task* task = foreground_tasks_.
Pop()) {
100 ExpectedRuntime expected_runtime) {
101 background_tasks_.
Push(task);
105 foreground_tasks_.
Push(task);
106 uv_async_send(&flush_tasks_);
111 double delay_in_seconds) {
112 auto pair =
new std::pair<Task*, double>(task, delay_in_seconds);
113 foreground_delayed_tasks_.
Push(pair);
114 uv_async_send(&flush_tasks_);
121 return uv_hrtime() / 1e9;
125 return tracing_controller_.get();
130 : lock_(), tasks_available_(), tasks_drained_(),
131 outstanding_tasks_(0), stopped_(false), task_queue_() { }
135 Mutex::ScopedLock scoped_lock(lock_);
136 outstanding_tasks_++;
137 task_queue_.push(task);
138 tasks_available_.Signal(scoped_lock);
143 Mutex::ScopedLock scoped_lock(lock_);
145 if (!task_queue_.empty()) {
146 result = task_queue_.front();
154 Mutex::ScopedLock scoped_lock(lock_);
155 while (task_queue_.empty() && !stopped_) {
156 tasks_available_.Wait(scoped_lock);
161 T* result = task_queue_.front();
168 Mutex::ScopedLock scoped_lock(lock_);
169 if (--outstanding_tasks_ == 0) {
170 tasks_drained_.Broadcast(scoped_lock);
176 Mutex::ScopedLock scoped_lock(lock_);
177 while (outstanding_tasks_ > 0) {
178 tasks_drained_.Wait(scoped_lock);
184 Mutex::ScopedLock scoped_lock(lock_);
186 tasks_available_.Broadcast(scoped_lock);
union node::cares_wrap::@8::CaresAsyncData::@0 data