• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "node_platform.h"
2 #include "node_internals.h"
3 
4 #include "env-inl.h"
5 #include "debug_utils-inl.h"
6 #include <algorithm>  // find_if(), find(), move()
7 #include <cmath>  // llround()
8 #include <memory>  // unique_ptr(), shared_ptr(), make_shared()
9 
10 namespace node {
11 
12 using v8::Isolate;
13 using v8::Object;
14 using v8::Platform;
15 using v8::Task;
16 
17 namespace {
18 
19 struct PlatformWorkerData {
20   TaskQueue<Task>* task_queue;
21   Mutex* platform_workers_mutex;
22   ConditionVariable* platform_workers_ready;
23   int* pending_platform_workers;
24   int id;
25 };
26 
PlatformWorkerThread(void * data)27 static void PlatformWorkerThread(void* data) {
28   std::unique_ptr<PlatformWorkerData>
29       worker_data(static_cast<PlatformWorkerData*>(data));
30 
31   TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
32   TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
33                         "PlatformWorkerThread");
34 
35   // Notify the main thread that the platform worker is ready.
36   {
37     Mutex::ScopedLock lock(*worker_data->platform_workers_mutex);
38     (*worker_data->pending_platform_workers)--;
39     worker_data->platform_workers_ready->Signal(lock);
40   }
41 
42   while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
43     task->Run();
44     pending_worker_tasks->NotifyOfCompletion();
45   }
46 }
47 
48 }  // namespace
49 
50 class WorkerThreadsTaskRunner::DelayedTaskScheduler {
51  public:
DelayedTaskScheduler(TaskQueue<Task> * tasks)52   explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
53     : pending_worker_tasks_(tasks) {}
54 
Start()55   std::unique_ptr<uv_thread_t> Start() {
56     auto start_thread = [](void* data) {
57       static_cast<DelayedTaskScheduler*>(data)->Run();
58     };
59     std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
60     uv_sem_init(&ready_, 0);
61     CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
62     uv_sem_wait(&ready_);
63     uv_sem_destroy(&ready_);
64     return t;
65   }
66 
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)67   void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
68     tasks_.Push(std::make_unique<ScheduleTask>(this, std::move(task),
69                                                delay_in_seconds));
70     uv_async_send(&flush_tasks_);
71   }
72 
Stop()73   void Stop() {
74     tasks_.Push(std::make_unique<StopTask>(this));
75     uv_async_send(&flush_tasks_);
76   }
77 
78  private:
Run()79   void Run() {
80     TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
81                           "WorkerThreadsTaskRunner::DelayedTaskScheduler");
82     loop_.data = this;
83     CHECK_EQ(0, uv_loop_init(&loop_));
84     flush_tasks_.data = this;
85     CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
86     uv_sem_post(&ready_);
87 
88     uv_run(&loop_, UV_RUN_DEFAULT);
89     CheckedUvLoopClose(&loop_);
90   }
91 
FlushTasks(uv_async_t * flush_tasks)92   static void FlushTasks(uv_async_t* flush_tasks) {
93     DelayedTaskScheduler* scheduler =
94         ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
95     while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
96       task->Run();
97   }
98 
99   class StopTask : public Task {
100    public:
StopTask(DelayedTaskScheduler * scheduler)101     explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
102 
Run()103     void Run() override {
104       std::vector<uv_timer_t*> timers;
105       for (uv_timer_t* timer : scheduler_->timers_)
106         timers.push_back(timer);
107       for (uv_timer_t* timer : timers)
108         scheduler_->TakeTimerTask(timer);
109       uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
110                [](uv_handle_t* handle) {});
111     }
112 
113    private:
114      DelayedTaskScheduler* scheduler_;
115   };
116 
117   class ScheduleTask : public Task {
118    public:
ScheduleTask(DelayedTaskScheduler * scheduler,std::unique_ptr<Task> task,double delay_in_seconds)119     ScheduleTask(DelayedTaskScheduler* scheduler,
120                  std::unique_ptr<Task> task,
121                  double delay_in_seconds)
122       : scheduler_(scheduler),
123         task_(std::move(task)),
124         delay_in_seconds_(delay_in_seconds) {}
125 
Run()126     void Run() override {
127       uint64_t delay_millis = llround(delay_in_seconds_ * 1000);
128       std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
129       CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
130       timer->data = task_.release();
131       CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
132       scheduler_->timers_.insert(timer.release());
133     }
134 
135    private:
136     DelayedTaskScheduler* scheduler_;
137     std::unique_ptr<Task> task_;
138     double delay_in_seconds_;
139   };
140 
RunTask(uv_timer_t * timer)141   static void RunTask(uv_timer_t* timer) {
142     DelayedTaskScheduler* scheduler =
143         ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
144     scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
145   }
146 
TakeTimerTask(uv_timer_t * timer)147   std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
148     std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
149     uv_timer_stop(timer);
150     uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
151       delete reinterpret_cast<uv_timer_t*>(handle);
152     });
153     timers_.erase(timer);
154     return task;
155   }
156 
157   uv_sem_t ready_;
158   TaskQueue<Task>* pending_worker_tasks_;
159 
160   TaskQueue<Task> tasks_;
161   uv_loop_t loop_;
162   uv_async_t flush_tasks_;
163   std::unordered_set<uv_timer_t*> timers_;
164 };
165 
WorkerThreadsTaskRunner(int thread_pool_size)166 WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
167   Mutex platform_workers_mutex;
168   ConditionVariable platform_workers_ready;
169 
170   Mutex::ScopedLock lock(platform_workers_mutex);
171   int pending_platform_workers = thread_pool_size;
172 
173   delayed_task_scheduler_ = std::make_unique<DelayedTaskScheduler>(
174       &pending_worker_tasks_);
175   threads_.push_back(delayed_task_scheduler_->Start());
176 
177   for (int i = 0; i < thread_pool_size; i++) {
178     PlatformWorkerData* worker_data = new PlatformWorkerData{
179       &pending_worker_tasks_, &platform_workers_mutex,
180       &platform_workers_ready, &pending_platform_workers, i
181     };
182     std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
183     if (uv_thread_create(t.get(), PlatformWorkerThread,
184                          worker_data) != 0) {
185       break;
186     }
187     threads_.push_back(std::move(t));
188   }
189 
190   // Wait for platform workers to initialize before continuing with the
191   // bootstrap.
192   while (pending_platform_workers > 0) {
193     platform_workers_ready.Wait(lock);
194   }
195 }
196 
PostTask(std::unique_ptr<Task> task)197 void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
198   pending_worker_tasks_.Push(std::move(task));
199 }
200 
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)201 void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
202                                               double delay_in_seconds) {
203   delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
204 }
205 
BlockingDrain()206 void WorkerThreadsTaskRunner::BlockingDrain() {
207   pending_worker_tasks_.BlockingDrain();
208 }
209 
Shutdown()210 void WorkerThreadsTaskRunner::Shutdown() {
211   pending_worker_tasks_.Stop();
212   delayed_task_scheduler_->Stop();
213   for (size_t i = 0; i < threads_.size(); i++) {
214     CHECK_EQ(0, uv_thread_join(threads_[i].get()));
215   }
216 }
217 
NumberOfWorkerThreads() const218 int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
219   return threads_.size();
220 }
221 
PerIsolatePlatformData(Isolate * isolate,uv_loop_t * loop)222 PerIsolatePlatformData::PerIsolatePlatformData(
223     Isolate* isolate, uv_loop_t* loop)
224   : isolate_(isolate), loop_(loop) {
225   flush_tasks_ = new uv_async_t();
226   CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks));
227   flush_tasks_->data = static_cast<void*>(this);
228   uv_unref(reinterpret_cast<uv_handle_t*>(flush_tasks_));
229 }
230 
FlushTasks(uv_async_t * handle)231 void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
232   auto platform_data = static_cast<PerIsolatePlatformData*>(handle->data);
233   platform_data->FlushForegroundTasksInternal();
234 }
235 
PostIdleTask(std::unique_ptr<v8::IdleTask> task)236 void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
237   UNREACHABLE();
238 }
239 
PostTask(std::unique_ptr<Task> task)240 void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
241   if (flush_tasks_ == nullptr) {
242     // V8 may post tasks during Isolate disposal. In that case, the only
243     // sensible path forward is to discard the task.
244     return;
245   }
246   foreground_tasks_.Push(std::move(task));
247   uv_async_send(flush_tasks_);
248 }
249 
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)250 void PerIsolatePlatformData::PostDelayedTask(
251     std::unique_ptr<Task> task, double delay_in_seconds) {
252   if (flush_tasks_ == nullptr) {
253     // V8 may post tasks during Isolate disposal. In that case, the only
254     // sensible path forward is to discard the task.
255     return;
256   }
257   std::unique_ptr<DelayedTask> delayed(new DelayedTask());
258   delayed->task = std::move(task);
259   delayed->platform_data = shared_from_this();
260   delayed->timeout = delay_in_seconds;
261   foreground_delayed_tasks_.Push(std::move(delayed));
262   uv_async_send(flush_tasks_);
263 }
264 
PostNonNestableTask(std::unique_ptr<Task> task)265 void PerIsolatePlatformData::PostNonNestableTask(std::unique_ptr<Task> task) {
266   PostTask(std::move(task));
267 }
268 
PostNonNestableDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)269 void PerIsolatePlatformData::PostNonNestableDelayedTask(
270     std::unique_ptr<Task> task,
271     double delay_in_seconds) {
272   PostDelayedTask(std::move(task), delay_in_seconds);
273 }
274 
~PerIsolatePlatformData()275 PerIsolatePlatformData::~PerIsolatePlatformData() {
276   Shutdown();
277 }
278 
AddShutdownCallback(void (* callback)(void *),void * data)279 void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
280                                                  void* data) {
281   shutdown_callbacks_.emplace_back(ShutdownCallback { callback, data });
282 }
283 
Shutdown()284 void PerIsolatePlatformData::Shutdown() {
285   if (flush_tasks_ == nullptr)
286     return;
287 
288   // While there should be no V8 tasks in the queues at this point, it is
289   // possible that Node.js-internal tasks from e.g. the inspector are still
290   // lying around. We clear these queues and ignore the return value,
291   // effectively deleting the tasks instead of running them.
292   foreground_delayed_tasks_.PopAll();
293   foreground_tasks_.PopAll();
294   scheduled_delayed_tasks_.clear();
295 
296   // Both destroying the scheduled_delayed_tasks_ lists and closing
297   // flush_tasks_ handle add tasks to the event loop. We keep a count of all
298   // non-closed handles, and when that reaches zero, we inform any shutdown
299   // callbacks that the platform is done as far as this Isolate is concerned.
300   self_reference_ = shared_from_this();
301   uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
302            [](uv_handle_t* handle) {
303     std::unique_ptr<uv_async_t> flush_tasks {
304         reinterpret_cast<uv_async_t*>(handle) };
305     PerIsolatePlatformData* platform_data =
306         static_cast<PerIsolatePlatformData*>(flush_tasks->data);
307     platform_data->DecreaseHandleCount();
308     platform_data->self_reference_.reset();
309   });
310   flush_tasks_ = nullptr;
311 }
312 
DecreaseHandleCount()313 void PerIsolatePlatformData::DecreaseHandleCount() {
314   CHECK_GE(uv_handle_count_, 1);
315   if (--uv_handle_count_ == 0) {
316     for (const auto& callback : shutdown_callbacks_)
317       callback.cb(callback.data);
318   }
319 }
320 
NodePlatform(int thread_pool_size,v8::TracingController * tracing_controller)321 NodePlatform::NodePlatform(int thread_pool_size,
322                            v8::TracingController* tracing_controller) {
323   if (tracing_controller != nullptr) {
324     tracing_controller_ = tracing_controller;
325   } else {
326     tracing_controller_ = new v8::TracingController();
327   }
328   // TODO(addaleax): It's a bit icky that we use global state here, but we can't
329   // really do anything about it unless V8 starts exposing a way to access the
330   // current v8::Platform instance.
331   SetTracingController(tracing_controller_);
332   DCHECK_EQ(GetTracingController(), tracing_controller_);
333   worker_thread_task_runner_ =
334       std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
335 }
336 
~NodePlatform()337 NodePlatform::~NodePlatform() {
338   Shutdown();
339 }
340 
RegisterIsolate(Isolate * isolate,uv_loop_t * loop)341 void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) {
342   Mutex::ScopedLock lock(per_isolate_mutex_);
343   std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
344   CHECK(!existing);
345   per_isolate_[isolate] =
346       std::make_shared<PerIsolatePlatformData>(isolate, loop);
347 }
348 
UnregisterIsolate(Isolate * isolate)349 void NodePlatform::UnregisterIsolate(Isolate* isolate) {
350   Mutex::ScopedLock lock(per_isolate_mutex_);
351   std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
352   CHECK(existing);
353   existing->Shutdown();
354   per_isolate_.erase(isolate);
355 }
356 
AddIsolateFinishedCallback(Isolate * isolate,void (* cb)(void *),void * data)357 void NodePlatform::AddIsolateFinishedCallback(Isolate* isolate,
358                                               void (*cb)(void*), void* data) {
359   Mutex::ScopedLock lock(per_isolate_mutex_);
360   auto it = per_isolate_.find(isolate);
361   if (it == per_isolate_.end()) {
362     CHECK(it->second);
363     cb(data);
364     return;
365   }
366   it->second->AddShutdownCallback(cb, data);
367 }
368 
Shutdown()369 void NodePlatform::Shutdown() {
370   if (has_shut_down_) return;
371   has_shut_down_ = true;
372   worker_thread_task_runner_->Shutdown();
373 
374   {
375     Mutex::ScopedLock lock(per_isolate_mutex_);
376     per_isolate_.clear();
377   }
378 }
379 
NumberOfWorkerThreads()380 int NodePlatform::NumberOfWorkerThreads() {
381   return worker_thread_task_runner_->NumberOfWorkerThreads();
382 }
383 
RunForegroundTask(std::unique_ptr<Task> task)384 void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
385   DebugSealHandleScope scope(isolate_);
386   Environment* env = Environment::GetCurrent(isolate_);
387   if (env != nullptr) {
388     v8::HandleScope scope(isolate_);
389     InternalCallbackScope cb_scope(env, Object::New(isolate_), { 0, 0 },
390                                    InternalCallbackScope::kNoFlags);
391     task->Run();
392   } else {
393     task->Run();
394   }
395 }
396 
DeleteFromScheduledTasks(DelayedTask * task)397 void PerIsolatePlatformData::DeleteFromScheduledTasks(DelayedTask* task) {
398   auto it = std::find_if(scheduled_delayed_tasks_.begin(),
399                          scheduled_delayed_tasks_.end(),
400                          [task](const DelayedTaskPointer& delayed) -> bool {
401           return delayed.get() == task;
402       });
403   CHECK_NE(it, scheduled_delayed_tasks_.end());
404   scheduled_delayed_tasks_.erase(it);
405 }
406 
RunForegroundTask(uv_timer_t * handle)407 void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
408   DelayedTask* delayed = ContainerOf(&DelayedTask::timer, handle);
409   delayed->platform_data->RunForegroundTask(std::move(delayed->task));
410   delayed->platform_data->DeleteFromScheduledTasks(delayed);
411 }
412 
DrainTasks(Isolate * isolate)413 void NodePlatform::DrainTasks(Isolate* isolate) {
414   std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);
415   if (!per_isolate) return;
416 
417   do {
418     // Worker tasks aren't associated with an Isolate.
419     worker_thread_task_runner_->BlockingDrain();
420   } while (per_isolate->FlushForegroundTasksInternal());
421 }
422 
FlushForegroundTasksInternal()423 bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
424   bool did_work = false;
425 
426   while (std::unique_ptr<DelayedTask> delayed =
427       foreground_delayed_tasks_.Pop()) {
428     did_work = true;
429     uint64_t delay_millis = llround(delayed->timeout * 1000);
430 
431     delayed->timer.data = static_cast<void*>(delayed.get());
432     uv_timer_init(loop_, &delayed->timer);
433     // Timers may not guarantee queue ordering of events with the same delay if
434     // the delay is non-zero. This should not be a problem in practice.
435     uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
436     uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
437     uv_handle_count_++;
438 
439     scheduled_delayed_tasks_.emplace_back(delayed.release(),
440                                           [](DelayedTask* delayed) {
441       uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
442                [](uv_handle_t* handle) {
443         std::unique_ptr<DelayedTask> task {
444             static_cast<DelayedTask*>(handle->data) };
445         task->platform_data->DecreaseHandleCount();
446       });
447     });
448   }
449   // Move all foreground tasks into a separate queue and flush that queue.
450   // This way tasks that are posted while flushing the queue will be run on the
451   // next call of FlushForegroundTasksInternal.
452   std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
453   while (!tasks.empty()) {
454     std::unique_ptr<Task> task = std::move(tasks.front());
455     tasks.pop();
456     did_work = true;
457     RunForegroundTask(std::move(task));
458   }
459   return did_work;
460 }
461 
CallOnWorkerThread(std::unique_ptr<Task> task)462 void NodePlatform::CallOnWorkerThread(std::unique_ptr<Task> task) {
463   worker_thread_task_runner_->PostTask(std::move(task));
464 }
465 
CallDelayedOnWorkerThread(std::unique_ptr<Task> task,double delay_in_seconds)466 void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<Task> task,
467                                              double delay_in_seconds) {
468   worker_thread_task_runner_->PostDelayedTask(std::move(task),
469                                               delay_in_seconds);
470 }
471 
472 
473 std::shared_ptr<PerIsolatePlatformData>
ForIsolate(Isolate * isolate)474 NodePlatform::ForIsolate(Isolate* isolate) {
475   Mutex::ScopedLock lock(per_isolate_mutex_);
476   std::shared_ptr<PerIsolatePlatformData> data = per_isolate_[isolate];
477   CHECK(data);
478   return data;
479 }
480 
FlushForegroundTasks(Isolate * isolate)481 bool NodePlatform::FlushForegroundTasks(Isolate* isolate) {
482   std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);
483   if (!per_isolate) return false;
484   return per_isolate->FlushForegroundTasksInternal();
485 }
486 
IdleTasksEnabled(Isolate * isolate)487 bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }
488 
489 std::shared_ptr<v8::TaskRunner>
GetForegroundTaskRunner(Isolate * isolate)490 NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
491   return ForIsolate(isolate);
492 }
493 
MonotonicallyIncreasingTime()494 double NodePlatform::MonotonicallyIncreasingTime() {
495   // Convert nanos to seconds.
496   return uv_hrtime() / 1e9;
497 }
498 
CurrentClockTimeMillis()499 double NodePlatform::CurrentClockTimeMillis() {
500   return SystemClockTimeMillis();
501 }
502 
GetTracingController()503 v8::TracingController* NodePlatform::GetTracingController() {
504   CHECK_NOT_NULL(tracing_controller_);
505   return tracing_controller_;
506 }
507 
GetStackTracePrinter()508 Platform::StackTracePrinter NodePlatform::GetStackTracePrinter() {
509   return []() {
510     fprintf(stderr, "\n");
511     DumpBacktrace(stderr);
512     fflush(stderr);
513   };
514 }
515 
516 template <class T>
TaskQueue()517 TaskQueue<T>::TaskQueue()
518     : lock_(), tasks_available_(), tasks_drained_(),
519       outstanding_tasks_(0), stopped_(false), task_queue_() { }
520 
521 template <class T>
Push(std::unique_ptr<T> task)522 void TaskQueue<T>::Push(std::unique_ptr<T> task) {
523   Mutex::ScopedLock scoped_lock(lock_);
524   outstanding_tasks_++;
525   task_queue_.push(std::move(task));
526   tasks_available_.Signal(scoped_lock);
527 }
528 
529 template <class T>
Pop()530 std::unique_ptr<T> TaskQueue<T>::Pop() {
531   Mutex::ScopedLock scoped_lock(lock_);
532   if (task_queue_.empty()) {
533     return std::unique_ptr<T>(nullptr);
534   }
535   std::unique_ptr<T> result = std::move(task_queue_.front());
536   task_queue_.pop();
537   return result;
538 }
539 
540 template <class T>
BlockingPop()541 std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
542   Mutex::ScopedLock scoped_lock(lock_);
543   while (task_queue_.empty() && !stopped_) {
544     tasks_available_.Wait(scoped_lock);
545   }
546   if (stopped_) {
547     return std::unique_ptr<T>(nullptr);
548   }
549   std::unique_ptr<T> result = std::move(task_queue_.front());
550   task_queue_.pop();
551   return result;
552 }
553 
554 template <class T>
NotifyOfCompletion()555 void TaskQueue<T>::NotifyOfCompletion() {
556   Mutex::ScopedLock scoped_lock(lock_);
557   if (--outstanding_tasks_ == 0) {
558     tasks_drained_.Broadcast(scoped_lock);
559   }
560 }
561 
562 template <class T>
BlockingDrain()563 void TaskQueue<T>::BlockingDrain() {
564   Mutex::ScopedLock scoped_lock(lock_);
565   while (outstanding_tasks_ > 0) {
566     tasks_drained_.Wait(scoped_lock);
567   }
568 }
569 
570 template <class T>
Stop()571 void TaskQueue<T>::Stop() {
572   Mutex::ScopedLock scoped_lock(lock_);
573   stopped_ = true;
574   tasks_available_.Broadcast(scoped_lock);
575 }
576 
577 template <class T>
PopAll()578 std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
579   Mutex::ScopedLock scoped_lock(lock_);
580   std::queue<std::unique_ptr<T>> result;
581   result.swap(task_queue_);
582   return result;
583 }
584 
CancelPendingDelayedTasks(Isolate * isolate)585 void MultiIsolatePlatform::CancelPendingDelayedTasks(Isolate* isolate) {}
586 
587 }  // namespace node
588