• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "node_platform.h"
2 #include "node_internals.h"
3 
4 #include "env-inl.h"
5 #include "debug_utils-inl.h"
6 #include <algorithm>  // find_if(), find(), move()
7 #include <cmath>  // llround()
8 #include <memory>  // unique_ptr(), shared_ptr(), make_shared()
9 
10 namespace node {
11 
12 using v8::Isolate;
13 using v8::Object;
14 using v8::Platform;
15 using v8::Task;
16 
17 namespace {
18 
19 struct PlatformWorkerData {
20   TaskQueue<Task>* task_queue;
21   Mutex* platform_workers_mutex;
22   ConditionVariable* platform_workers_ready;
23   int* pending_platform_workers;
24   int id;
25 };
26 
PlatformWorkerThread(void * data)27 static void PlatformWorkerThread(void* data) {
28   std::unique_ptr<PlatformWorkerData>
29       worker_data(static_cast<PlatformWorkerData*>(data));
30 
31   TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
32   TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
33                         "PlatformWorkerThread");
34 
35   // Notify the main thread that the platform worker is ready.
36   {
37     Mutex::ScopedLock lock(*worker_data->platform_workers_mutex);
38     (*worker_data->pending_platform_workers)--;
39     worker_data->platform_workers_ready->Signal(lock);
40   }
41 
42   while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
43     task->Run();
44     pending_worker_tasks->NotifyOfCompletion();
45   }
46 }
47 
48 }  // namespace
49 
50 class WorkerThreadsTaskRunner::DelayedTaskScheduler {
51  public:
DelayedTaskScheduler(TaskQueue<Task> * tasks)52   explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
53     : pending_worker_tasks_(tasks) {}
54 
Start()55   std::unique_ptr<uv_thread_t> Start() {
56     auto start_thread = [](void* data) {
57       static_cast<DelayedTaskScheduler*>(data)->Run();
58     };
59     std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
60     uv_sem_init(&ready_, 0);
61     CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
62     uv_sem_wait(&ready_);
63     uv_sem_destroy(&ready_);
64     return t;
65   }
66 
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)67   void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
68     tasks_.Push(std::make_unique<ScheduleTask>(this, std::move(task),
69                                                delay_in_seconds));
70     uv_async_send(&flush_tasks_);
71   }
72 
Stop()73   void Stop() {
74     tasks_.Push(std::make_unique<StopTask>(this));
75     uv_async_send(&flush_tasks_);
76   }
77 
78  private:
Run()79   void Run() {
80     TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
81                           "WorkerThreadsTaskRunner::DelayedTaskScheduler");
82     loop_.data = this;
83     CHECK_EQ(0, uv_loop_init(&loop_));
84     flush_tasks_.data = this;
85     CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
86     uv_sem_post(&ready_);
87 
88     uv_run(&loop_, UV_RUN_DEFAULT);
89     CheckedUvLoopClose(&loop_);
90   }
91 
FlushTasks(uv_async_t * flush_tasks)92   static void FlushTasks(uv_async_t* flush_tasks) {
93     DelayedTaskScheduler* scheduler =
94         ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
95     while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
96       task->Run();
97   }
98 
99   class StopTask : public Task {
100    public:
StopTask(DelayedTaskScheduler * scheduler)101     explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
102 
Run()103     void Run() override {
104       std::vector<uv_timer_t*> timers;
105       for (uv_timer_t* timer : scheduler_->timers_)
106         timers.push_back(timer);
107       for (uv_timer_t* timer : timers)
108         scheduler_->TakeTimerTask(timer);
109       uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
110                [](uv_handle_t* handle) {});
111     }
112 
113    private:
114      DelayedTaskScheduler* scheduler_;
115   };
116 
117   class ScheduleTask : public Task {
118    public:
ScheduleTask(DelayedTaskScheduler * scheduler,std::unique_ptr<Task> task,double delay_in_seconds)119     ScheduleTask(DelayedTaskScheduler* scheduler,
120                  std::unique_ptr<Task> task,
121                  double delay_in_seconds)
122       : scheduler_(scheduler),
123         task_(std::move(task)),
124         delay_in_seconds_(delay_in_seconds) {}
125 
Run()126     void Run() override {
127       uint64_t delay_millis = llround(delay_in_seconds_ * 1000);
128       std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
129       CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
130       timer->data = task_.release();
131       CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
132       scheduler_->timers_.insert(timer.release());
133     }
134 
135    private:
136     DelayedTaskScheduler* scheduler_;
137     std::unique_ptr<Task> task_;
138     double delay_in_seconds_;
139   };
140 
RunTask(uv_timer_t * timer)141   static void RunTask(uv_timer_t* timer) {
142     DelayedTaskScheduler* scheduler =
143         ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
144     scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
145   }
146 
TakeTimerTask(uv_timer_t * timer)147   std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
148     std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
149     uv_timer_stop(timer);
150     uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
151       delete reinterpret_cast<uv_timer_t*>(handle);
152     });
153     timers_.erase(timer);
154     return task;
155   }
156 
157   uv_sem_t ready_;
158   TaskQueue<Task>* pending_worker_tasks_;
159 
160   TaskQueue<Task> tasks_;
161   uv_loop_t loop_;
162   uv_async_t flush_tasks_;
163   std::unordered_set<uv_timer_t*> timers_;
164 };
165 
WorkerThreadsTaskRunner(int thread_pool_size)166 WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
167   Mutex platform_workers_mutex;
168   ConditionVariable platform_workers_ready;
169 
170   Mutex::ScopedLock lock(platform_workers_mutex);
171   int pending_platform_workers = thread_pool_size;
172 
173   delayed_task_scheduler_ = std::make_unique<DelayedTaskScheduler>(
174       &pending_worker_tasks_);
175   threads_.push_back(delayed_task_scheduler_->Start());
176 
177   for (int i = 0; i < thread_pool_size; i++) {
178     PlatformWorkerData* worker_data = new PlatformWorkerData{
179       &pending_worker_tasks_, &platform_workers_mutex,
180       &platform_workers_ready, &pending_platform_workers, i
181     };
182     std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
183     if (uv_thread_create(t.get(), PlatformWorkerThread,
184                          worker_data) != 0) {
185       break;
186     }
187     threads_.push_back(std::move(t));
188   }
189 
190   // Wait for platform workers to initialize before continuing with the
191   // bootstrap.
192   while (pending_platform_workers > 0) {
193     platform_workers_ready.Wait(lock);
194   }
195 }
196 
PostTask(std::unique_ptr<Task> task)197 void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
198   pending_worker_tasks_.Push(std::move(task));
199 }
200 
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)201 void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
202                                               double delay_in_seconds) {
203   delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
204 }
205 
BlockingDrain()206 void WorkerThreadsTaskRunner::BlockingDrain() {
207   pending_worker_tasks_.BlockingDrain();
208 }
209 
Shutdown()210 void WorkerThreadsTaskRunner::Shutdown() {
211   pending_worker_tasks_.Stop();
212   delayed_task_scheduler_->Stop();
213   for (size_t i = 0; i < threads_.size(); i++) {
214     CHECK_EQ(0, uv_thread_join(threads_[i].get()));
215   }
216 }
217 
NumberOfWorkerThreads() const218 int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
219   return threads_.size();
220 }
221 
PerIsolatePlatformData(Isolate * isolate,uv_loop_t * loop)222 PerIsolatePlatformData::PerIsolatePlatformData(
223     Isolate* isolate, uv_loop_t* loop)
224   : isolate_(isolate), loop_(loop) {
225   flush_tasks_ = new uv_async_t();
226   CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks));
227   flush_tasks_->data = static_cast<void*>(this);
228   uv_unref(reinterpret_cast<uv_handle_t*>(flush_tasks_));
229 }
230 
231 std::shared_ptr<v8::TaskRunner>
GetForegroundTaskRunner()232 PerIsolatePlatformData::GetForegroundTaskRunner() {
233   return shared_from_this();
234 }
235 
FlushTasks(uv_async_t * handle)236 void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
237   auto platform_data = static_cast<PerIsolatePlatformData*>(handle->data);
238   platform_data->FlushForegroundTasksInternal();
239 }
240 
PostIdleTask(std::unique_ptr<v8::IdleTask> task)241 void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
242   UNREACHABLE();
243 }
244 
PostTask(std::unique_ptr<Task> task)245 void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
246   if (flush_tasks_ == nullptr) {
247     // V8 may post tasks during Isolate disposal. In that case, the only
248     // sensible path forward is to discard the task.
249     return;
250   }
251   foreground_tasks_.Push(std::move(task));
252   uv_async_send(flush_tasks_);
253 }
254 
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)255 void PerIsolatePlatformData::PostDelayedTask(
256     std::unique_ptr<Task> task, double delay_in_seconds) {
257   if (flush_tasks_ == nullptr) {
258     // V8 may post tasks during Isolate disposal. In that case, the only
259     // sensible path forward is to discard the task.
260     return;
261   }
262   std::unique_ptr<DelayedTask> delayed(new DelayedTask());
263   delayed->task = std::move(task);
264   delayed->platform_data = shared_from_this();
265   delayed->timeout = delay_in_seconds;
266   foreground_delayed_tasks_.Push(std::move(delayed));
267   uv_async_send(flush_tasks_);
268 }
269 
PostNonNestableTask(std::unique_ptr<Task> task)270 void PerIsolatePlatformData::PostNonNestableTask(std::unique_ptr<Task> task) {
271   PostTask(std::move(task));
272 }
273 
PostNonNestableDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)274 void PerIsolatePlatformData::PostNonNestableDelayedTask(
275     std::unique_ptr<Task> task,
276     double delay_in_seconds) {
277   PostDelayedTask(std::move(task), delay_in_seconds);
278 }
279 
~PerIsolatePlatformData()280 PerIsolatePlatformData::~PerIsolatePlatformData() {
281   CHECK(!flush_tasks_);
282 }
283 
AddShutdownCallback(void (* callback)(void *),void * data)284 void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
285                                                  void* data) {
286   shutdown_callbacks_.emplace_back(ShutdownCallback { callback, data });
287 }
288 
Shutdown()289 void PerIsolatePlatformData::Shutdown() {
290   if (flush_tasks_ == nullptr)
291     return;
292 
293   // While there should be no V8 tasks in the queues at this point, it is
294   // possible that Node.js-internal tasks from e.g. the inspector are still
295   // lying around. We clear these queues and ignore the return value,
296   // effectively deleting the tasks instead of running them.
297   foreground_delayed_tasks_.PopAll();
298   foreground_tasks_.PopAll();
299   scheduled_delayed_tasks_.clear();
300 
301   // Both destroying the scheduled_delayed_tasks_ lists and closing
302   // flush_tasks_ handle add tasks to the event loop. We keep a count of all
303   // non-closed handles, and when that reaches zero, we inform any shutdown
304   // callbacks that the platform is done as far as this Isolate is concerned.
305   self_reference_ = shared_from_this();
306   uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
307            [](uv_handle_t* handle) {
308     std::unique_ptr<uv_async_t> flush_tasks {
309         reinterpret_cast<uv_async_t*>(handle) };
310     PerIsolatePlatformData* platform_data =
311         static_cast<PerIsolatePlatformData*>(flush_tasks->data);
312     platform_data->DecreaseHandleCount();
313     platform_data->self_reference_.reset();
314   });
315   flush_tasks_ = nullptr;
316 }
317 
DecreaseHandleCount()318 void PerIsolatePlatformData::DecreaseHandleCount() {
319   CHECK_GE(uv_handle_count_, 1);
320   if (--uv_handle_count_ == 0) {
321     for (const auto& callback : shutdown_callbacks_)
322       callback.cb(callback.data);
323   }
324 }
325 
NodePlatform(int thread_pool_size,v8::TracingController * tracing_controller)326 NodePlatform::NodePlatform(int thread_pool_size,
327                            v8::TracingController* tracing_controller) {
328   if (tracing_controller != nullptr) {
329     tracing_controller_ = tracing_controller;
330   } else {
331     tracing_controller_ = new v8::TracingController();
332   }
333   // TODO(addaleax): It's a bit icky that we use global state here, but we can't
334   // really do anything about it unless V8 starts exposing a way to access the
335   // current v8::Platform instance.
336   SetTracingController(tracing_controller_);
337   DCHECK_EQ(GetTracingController(), tracing_controller_);
338   worker_thread_task_runner_ =
339       std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
340 }
341 
~NodePlatform()342 NodePlatform::~NodePlatform() {
343   Shutdown();
344 }
345 
RegisterIsolate(Isolate * isolate,uv_loop_t * loop)346 void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) {
347   Mutex::ScopedLock lock(per_isolate_mutex_);
348   auto delegate = std::make_shared<PerIsolatePlatformData>(isolate, loop);
349   IsolatePlatformDelegate* ptr = delegate.get();
350   auto insertion = per_isolate_.emplace(
351     isolate,
352     std::make_pair(ptr, std::move(delegate)));
353   CHECK(insertion.second);
354 }
355 
RegisterIsolate(Isolate * isolate,IsolatePlatformDelegate * delegate)356 void NodePlatform::RegisterIsolate(Isolate* isolate,
357                                    IsolatePlatformDelegate* delegate) {
358   Mutex::ScopedLock lock(per_isolate_mutex_);
359   auto insertion = per_isolate_.emplace(
360     isolate,
361     std::make_pair(delegate, std::shared_ptr<PerIsolatePlatformData>{}));
362   CHECK(insertion.second);
363 }
364 
UnregisterIsolate(Isolate * isolate)365 void NodePlatform::UnregisterIsolate(Isolate* isolate) {
366   Mutex::ScopedLock lock(per_isolate_mutex_);
367   auto existing_it = per_isolate_.find(isolate);
368   CHECK_NE(existing_it, per_isolate_.end());
369   auto& existing = existing_it->second;
370   if (existing.second) {
371     existing.second->Shutdown();
372   }
373   per_isolate_.erase(existing_it);
374 }
375 
AddIsolateFinishedCallback(Isolate * isolate,void (* cb)(void *),void * data)376 void NodePlatform::AddIsolateFinishedCallback(Isolate* isolate,
377                                               void (*cb)(void*), void* data) {
378   Mutex::ScopedLock lock(per_isolate_mutex_);
379   auto it = per_isolate_.find(isolate);
380   if (it == per_isolate_.end()) {
381     cb(data);
382     return;
383   }
384   CHECK(it->second.second);
385   it->second.second->AddShutdownCallback(cb, data);
386 }
387 
Shutdown()388 void NodePlatform::Shutdown() {
389   if (has_shut_down_) return;
390   has_shut_down_ = true;
391   worker_thread_task_runner_->Shutdown();
392 
393   {
394     Mutex::ScopedLock lock(per_isolate_mutex_);
395     per_isolate_.clear();
396   }
397 }
398 
NumberOfWorkerThreads()399 int NodePlatform::NumberOfWorkerThreads() {
400   return worker_thread_task_runner_->NumberOfWorkerThreads();
401 }
402 
RunForegroundTask(std::unique_ptr<Task> task)403 void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
404   DebugSealHandleScope scope(isolate_);
405   Environment* env = Environment::GetCurrent(isolate_);
406   if (env != nullptr) {
407     v8::HandleScope scope(isolate_);
408     InternalCallbackScope cb_scope(env, Object::New(isolate_), { 0, 0 },
409                                    InternalCallbackScope::kNoFlags);
410     task->Run();
411   } else {
412     // The task is moved out of InternalCallbackScope if env is not available.
413     // This is a required else block, and should not be removed.
414     // See comment: https://github.com/nodejs/node/pull/34688#pullrequestreview-463867489
415     task->Run();
416   }
417 }
418 
DeleteFromScheduledTasks(DelayedTask * task)419 void PerIsolatePlatformData::DeleteFromScheduledTasks(DelayedTask* task) {
420   auto it = std::find_if(scheduled_delayed_tasks_.begin(),
421                          scheduled_delayed_tasks_.end(),
422                          [task](const DelayedTaskPointer& delayed) -> bool {
423           return delayed.get() == task;
424       });
425   CHECK_NE(it, scheduled_delayed_tasks_.end());
426   scheduled_delayed_tasks_.erase(it);
427 }
428 
RunForegroundTask(uv_timer_t * handle)429 void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
430   DelayedTask* delayed = ContainerOf(&DelayedTask::timer, handle);
431   delayed->platform_data->RunForegroundTask(std::move(delayed->task));
432   delayed->platform_data->DeleteFromScheduledTasks(delayed);
433 }
434 
DrainTasks(Isolate * isolate)435 void NodePlatform::DrainTasks(Isolate* isolate) {
436   std::shared_ptr<PerIsolatePlatformData> per_isolate = ForNodeIsolate(isolate);
437   if (!per_isolate) return;
438 
439   do {
440     // Worker tasks aren't associated with an Isolate.
441     worker_thread_task_runner_->BlockingDrain();
442   } while (per_isolate->FlushForegroundTasksInternal());
443 }
444 
FlushForegroundTasksInternal()445 bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
446   bool did_work = false;
447 
448   while (std::unique_ptr<DelayedTask> delayed =
449       foreground_delayed_tasks_.Pop()) {
450     did_work = true;
451     uint64_t delay_millis = llround(delayed->timeout * 1000);
452 
453     delayed->timer.data = static_cast<void*>(delayed.get());
454     uv_timer_init(loop_, &delayed->timer);
455     // Timers may not guarantee queue ordering of events with the same delay if
456     // the delay is non-zero. This should not be a problem in practice.
457     uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
458     uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
459     uv_handle_count_++;
460 
461     scheduled_delayed_tasks_.emplace_back(delayed.release(),
462                                           [](DelayedTask* delayed) {
463       uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
464                [](uv_handle_t* handle) {
465         std::unique_ptr<DelayedTask> task {
466             static_cast<DelayedTask*>(handle->data) };
467         task->platform_data->DecreaseHandleCount();
468       });
469     });
470   }
471   // Move all foreground tasks into a separate queue and flush that queue.
472   // This way tasks that are posted while flushing the queue will be run on the
473   // next call of FlushForegroundTasksInternal.
474   std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
475   while (!tasks.empty()) {
476     std::unique_ptr<Task> task = std::move(tasks.front());
477     tasks.pop();
478     did_work = true;
479     RunForegroundTask(std::move(task));
480   }
481   return did_work;
482 }
483 
CallOnWorkerThread(std::unique_ptr<Task> task)484 void NodePlatform::CallOnWorkerThread(std::unique_ptr<Task> task) {
485   worker_thread_task_runner_->PostTask(std::move(task));
486 }
487 
CallDelayedOnWorkerThread(std::unique_ptr<Task> task,double delay_in_seconds)488 void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<Task> task,
489                                              double delay_in_seconds) {
490   worker_thread_task_runner_->PostDelayedTask(std::move(task),
491                                               delay_in_seconds);
492 }
493 
494 
ForIsolate(Isolate * isolate)495 IsolatePlatformDelegate* NodePlatform::ForIsolate(Isolate* isolate) {
496   Mutex::ScopedLock lock(per_isolate_mutex_);
497   auto data = per_isolate_[isolate];
498   CHECK_NOT_NULL(data.first);
499   return data.first;
500 }
501 
502 std::shared_ptr<PerIsolatePlatformData>
ForNodeIsolate(Isolate * isolate)503 NodePlatform::ForNodeIsolate(Isolate* isolate) {
504   Mutex::ScopedLock lock(per_isolate_mutex_);
505   auto data = per_isolate_[isolate];
506   CHECK_NOT_NULL(data.first);
507   return data.second;
508 }
509 
FlushForegroundTasks(Isolate * isolate)510 bool NodePlatform::FlushForegroundTasks(Isolate* isolate) {
511   std::shared_ptr<PerIsolatePlatformData> per_isolate = ForNodeIsolate(isolate);
512   if (!per_isolate) return false;
513   return per_isolate->FlushForegroundTasksInternal();
514 }
515 
IdleTasksEnabled(Isolate * isolate)516 bool NodePlatform::IdleTasksEnabled(Isolate* isolate) {
517   return ForIsolate(isolate)->IdleTasksEnabled();
518 }
519 
520 std::shared_ptr<v8::TaskRunner>
GetForegroundTaskRunner(Isolate * isolate)521 NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
522   return ForIsolate(isolate)->GetForegroundTaskRunner();
523 }
524 
MonotonicallyIncreasingTime()525 double NodePlatform::MonotonicallyIncreasingTime() {
526   // Convert nanos to seconds.
527   return uv_hrtime() / 1e9;
528 }
529 
CurrentClockTimeMillis()530 double NodePlatform::CurrentClockTimeMillis() {
531   return SystemClockTimeMillis();
532 }
533 
GetTracingController()534 v8::TracingController* NodePlatform::GetTracingController() {
535   CHECK_NOT_NULL(tracing_controller_);
536   return tracing_controller_;
537 }
538 
GetStackTracePrinter()539 Platform::StackTracePrinter NodePlatform::GetStackTracePrinter() {
540   return []() {
541     fprintf(stderr, "\n");
542     DumpBacktrace(stderr);
543     fflush(stderr);
544   };
545 }
546 
547 template <class T>
TaskQueue()548 TaskQueue<T>::TaskQueue()
549     : lock_(), tasks_available_(), tasks_drained_(),
550       outstanding_tasks_(0), stopped_(false), task_queue_() { }
551 
552 template <class T>
Push(std::unique_ptr<T> task)553 void TaskQueue<T>::Push(std::unique_ptr<T> task) {
554   Mutex::ScopedLock scoped_lock(lock_);
555   outstanding_tasks_++;
556   task_queue_.push(std::move(task));
557   tasks_available_.Signal(scoped_lock);
558 }
559 
560 template <class T>
Pop()561 std::unique_ptr<T> TaskQueue<T>::Pop() {
562   Mutex::ScopedLock scoped_lock(lock_);
563   if (task_queue_.empty()) {
564     return std::unique_ptr<T>(nullptr);
565   }
566   std::unique_ptr<T> result = std::move(task_queue_.front());
567   task_queue_.pop();
568   return result;
569 }
570 
571 template <class T>
BlockingPop()572 std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
573   Mutex::ScopedLock scoped_lock(lock_);
574   while (task_queue_.empty() && !stopped_) {
575     tasks_available_.Wait(scoped_lock);
576   }
577   if (stopped_) {
578     return std::unique_ptr<T>(nullptr);
579   }
580   std::unique_ptr<T> result = std::move(task_queue_.front());
581   task_queue_.pop();
582   return result;
583 }
584 
585 template <class T>
NotifyOfCompletion()586 void TaskQueue<T>::NotifyOfCompletion() {
587   Mutex::ScopedLock scoped_lock(lock_);
588   if (--outstanding_tasks_ == 0) {
589     tasks_drained_.Broadcast(scoped_lock);
590   }
591 }
592 
593 template <class T>
BlockingDrain()594 void TaskQueue<T>::BlockingDrain() {
595   Mutex::ScopedLock scoped_lock(lock_);
596   while (outstanding_tasks_ > 0) {
597     tasks_drained_.Wait(scoped_lock);
598   }
599 }
600 
601 template <class T>
Stop()602 void TaskQueue<T>::Stop() {
603   Mutex::ScopedLock scoped_lock(lock_);
604   stopped_ = true;
605   tasks_available_.Broadcast(scoped_lock);
606 }
607 
608 template <class T>
PopAll()609 std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
610   Mutex::ScopedLock scoped_lock(lock_);
611   std::queue<std::unique_ptr<T>> result;
612   result.swap(task_queue_);
613   return result;
614 }
615 
CancelPendingDelayedTasks(Isolate * isolate)616 void MultiIsolatePlatform::CancelPendingDelayedTasks(Isolate* isolate) {}
617 
618 }  // namespace node
619