• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "node_platform.h"
2 #include "node_internals.h"
3 
4 #include "env-inl.h"
5 #include "debug_utils-inl.h"
6 #include <algorithm>  // find_if(), find(), move()
7 #include <cmath>  // llround()
8 #include <memory>  // unique_ptr(), shared_ptr(), make_shared()
9 
10 namespace node {
11 
12 using v8::Isolate;
13 using v8::Object;
14 using v8::Platform;
15 using v8::Task;
16 
17 namespace {
18 
19 struct PlatformWorkerData {
20   TaskQueue<Task>* task_queue;
21   Mutex* platform_workers_mutex;
22   ConditionVariable* platform_workers_ready;
23   int* pending_platform_workers;
24   int id;
25 };
26 
PlatformWorkerThread(void * data)27 static void PlatformWorkerThread(void* data) {
28   std::unique_ptr<PlatformWorkerData>
29       worker_data(static_cast<PlatformWorkerData*>(data));
30 
31   TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
32   TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
33                         "PlatformWorkerThread");
34 
35   // Notify the main thread that the platform worker is ready.
36   {
37     Mutex::ScopedLock lock(*worker_data->platform_workers_mutex);
38     (*worker_data->pending_platform_workers)--;
39     worker_data->platform_workers_ready->Signal(lock);
40   }
41 
42   while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
43     task->Run();
44     pending_worker_tasks->NotifyOfCompletion();
45   }
46 }
47 
GetActualThreadPoolSize(int thread_pool_size)48 static int GetActualThreadPoolSize(int thread_pool_size) {
49   if (thread_pool_size < 1) {
50     thread_pool_size = uv_available_parallelism() - 1;
51   }
52   return std::max(thread_pool_size, 1);
53 }
54 
55 }  // namespace
56 
57 class WorkerThreadsTaskRunner::DelayedTaskScheduler {
58  public:
DelayedTaskScheduler(TaskQueue<Task> * tasks)59   explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
60     : pending_worker_tasks_(tasks) {}
61 
Start()62   std::unique_ptr<uv_thread_t> Start() {
63     auto start_thread = [](void* data) {
64       static_cast<DelayedTaskScheduler*>(data)->Run();
65     };
66     std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
67     uv_sem_init(&ready_, 0);
68     CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
69     uv_sem_wait(&ready_);
70     uv_sem_destroy(&ready_);
71     return t;
72   }
73 
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)74   void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
75     tasks_.Push(std::make_unique<ScheduleTask>(this, std::move(task),
76                                                delay_in_seconds));
77     uv_async_send(&flush_tasks_);
78   }
79 
Stop()80   void Stop() {
81     tasks_.Push(std::make_unique<StopTask>(this));
82     uv_async_send(&flush_tasks_);
83   }
84 
85  private:
Run()86   void Run() {
87     TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
88                           "WorkerThreadsTaskRunner::DelayedTaskScheduler");
89     loop_.data = this;
90     CHECK_EQ(0, uv_loop_init(&loop_));
91     flush_tasks_.data = this;
92     CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
93     uv_sem_post(&ready_);
94 
95     uv_run(&loop_, UV_RUN_DEFAULT);
96     CheckedUvLoopClose(&loop_);
97   }
98 
FlushTasks(uv_async_t * flush_tasks)99   static void FlushTasks(uv_async_t* flush_tasks) {
100     DelayedTaskScheduler* scheduler =
101         ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
102     while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
103       task->Run();
104   }
105 
106   class StopTask : public Task {
107    public:
StopTask(DelayedTaskScheduler * scheduler)108     explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
109 
Run()110     void Run() override {
111       std::vector<uv_timer_t*> timers;
112       for (uv_timer_t* timer : scheduler_->timers_)
113         timers.push_back(timer);
114       for (uv_timer_t* timer : timers)
115         scheduler_->TakeTimerTask(timer);
116       uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
117                [](uv_handle_t* handle) {});
118     }
119 
120    private:
121      DelayedTaskScheduler* scheduler_;
122   };
123 
124   class ScheduleTask : public Task {
125    public:
ScheduleTask(DelayedTaskScheduler * scheduler,std::unique_ptr<Task> task,double delay_in_seconds)126     ScheduleTask(DelayedTaskScheduler* scheduler,
127                  std::unique_ptr<Task> task,
128                  double delay_in_seconds)
129       : scheduler_(scheduler),
130         task_(std::move(task)),
131         delay_in_seconds_(delay_in_seconds) {}
132 
Run()133     void Run() override {
134       uint64_t delay_millis = llround(delay_in_seconds_ * 1000);
135       std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
136       CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
137       timer->data = task_.release();
138       CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
139       scheduler_->timers_.insert(timer.release());
140     }
141 
142    private:
143     DelayedTaskScheduler* scheduler_;
144     std::unique_ptr<Task> task_;
145     double delay_in_seconds_;
146   };
147 
RunTask(uv_timer_t * timer)148   static void RunTask(uv_timer_t* timer) {
149     DelayedTaskScheduler* scheduler =
150         ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
151     scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
152   }
153 
TakeTimerTask(uv_timer_t * timer)154   std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
155     std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
156     uv_timer_stop(timer);
157     uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
158       delete reinterpret_cast<uv_timer_t*>(handle);
159     });
160     timers_.erase(timer);
161     return task;
162   }
163 
164   uv_sem_t ready_;
165   TaskQueue<Task>* pending_worker_tasks_;
166 
167   TaskQueue<Task> tasks_;
168   uv_loop_t loop_;
169   uv_async_t flush_tasks_;
170   std::unordered_set<uv_timer_t*> timers_;
171 };
172 
WorkerThreadsTaskRunner(int thread_pool_size)173 WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
174   Mutex platform_workers_mutex;
175   ConditionVariable platform_workers_ready;
176 
177   Mutex::ScopedLock lock(platform_workers_mutex);
178   int pending_platform_workers = thread_pool_size;
179 
180   delayed_task_scheduler_ = std::make_unique<DelayedTaskScheduler>(
181       &pending_worker_tasks_);
182   threads_.push_back(delayed_task_scheduler_->Start());
183 
184   for (int i = 0; i < thread_pool_size; i++) {
185     PlatformWorkerData* worker_data = new PlatformWorkerData{
186       &pending_worker_tasks_, &platform_workers_mutex,
187       &platform_workers_ready, &pending_platform_workers, i
188     };
189     std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
190     if (uv_thread_create(t.get(), PlatformWorkerThread,
191                          worker_data) != 0) {
192       break;
193     }
194     threads_.push_back(std::move(t));
195   }
196 
197   // Wait for platform workers to initialize before continuing with the
198   // bootstrap.
199   while (pending_platform_workers > 0) {
200     platform_workers_ready.Wait(lock);
201   }
202 }
203 
PostTask(std::unique_ptr<Task> task)204 void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
205   pending_worker_tasks_.Push(std::move(task));
206 }
207 
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)208 void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
209                                               double delay_in_seconds) {
210   delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
211 }
212 
BlockingDrain()213 void WorkerThreadsTaskRunner::BlockingDrain() {
214   pending_worker_tasks_.BlockingDrain();
215 }
216 
Shutdown()217 void WorkerThreadsTaskRunner::Shutdown() {
218   pending_worker_tasks_.Stop();
219   delayed_task_scheduler_->Stop();
220   for (size_t i = 0; i < threads_.size(); i++) {
221     CHECK_EQ(0, uv_thread_join(threads_[i].get()));
222   }
223 }
224 
NumberOfWorkerThreads() const225 int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
226   return threads_.size();
227 }
228 
PerIsolatePlatformData(Isolate * isolate,uv_loop_t * loop)229 PerIsolatePlatformData::PerIsolatePlatformData(
230     Isolate* isolate, uv_loop_t* loop)
231   : isolate_(isolate), loop_(loop) {
232   flush_tasks_ = new uv_async_t();
233   CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks));
234   flush_tasks_->data = static_cast<void*>(this);
235   uv_unref(reinterpret_cast<uv_handle_t*>(flush_tasks_));
236 }
237 
238 std::shared_ptr<v8::TaskRunner>
GetForegroundTaskRunner()239 PerIsolatePlatformData::GetForegroundTaskRunner() {
240   return shared_from_this();
241 }
242 
FlushTasks(uv_async_t * handle)243 void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
244   auto platform_data = static_cast<PerIsolatePlatformData*>(handle->data);
245   platform_data->FlushForegroundTasksInternal();
246 }
247 
PostIdleTask(std::unique_ptr<v8::IdleTask> task)248 void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
249   UNREACHABLE();
250 }
251 
PostTask(std::unique_ptr<Task> task)252 void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
253   if (flush_tasks_ == nullptr) {
254     // V8 may post tasks during Isolate disposal. In that case, the only
255     // sensible path forward is to discard the task.
256     return;
257   }
258   foreground_tasks_.Push(std::move(task));
259   uv_async_send(flush_tasks_);
260 }
261 
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)262 void PerIsolatePlatformData::PostDelayedTask(
263     std::unique_ptr<Task> task, double delay_in_seconds) {
264   if (flush_tasks_ == nullptr) {
265     // V8 may post tasks during Isolate disposal. In that case, the only
266     // sensible path forward is to discard the task.
267     return;
268   }
269   std::unique_ptr<DelayedTask> delayed(new DelayedTask());
270   delayed->task = std::move(task);
271   delayed->platform_data = shared_from_this();
272   delayed->timeout = delay_in_seconds;
273   foreground_delayed_tasks_.Push(std::move(delayed));
274   uv_async_send(flush_tasks_);
275 }
276 
PostNonNestableTask(std::unique_ptr<Task> task)277 void PerIsolatePlatformData::PostNonNestableTask(std::unique_ptr<Task> task) {
278   PostTask(std::move(task));
279 }
280 
PostNonNestableDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)281 void PerIsolatePlatformData::PostNonNestableDelayedTask(
282     std::unique_ptr<Task> task,
283     double delay_in_seconds) {
284   PostDelayedTask(std::move(task), delay_in_seconds);
285 }
286 
~PerIsolatePlatformData()287 PerIsolatePlatformData::~PerIsolatePlatformData() {
288   CHECK(!flush_tasks_);
289 }
290 
AddShutdownCallback(void (* callback)(void *),void * data)291 void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
292                                                  void* data) {
293   shutdown_callbacks_.emplace_back(ShutdownCallback { callback, data });
294 }
295 
Shutdown()296 void PerIsolatePlatformData::Shutdown() {
297   if (flush_tasks_ == nullptr)
298     return;
299 
300   // While there should be no V8 tasks in the queues at this point, it is
301   // possible that Node.js-internal tasks from e.g. the inspector are still
302   // lying around. We clear these queues and ignore the return value,
303   // effectively deleting the tasks instead of running them.
304   foreground_delayed_tasks_.PopAll();
305   foreground_tasks_.PopAll();
306   scheduled_delayed_tasks_.clear();
307 
308   // Both destroying the scheduled_delayed_tasks_ lists and closing
309   // flush_tasks_ handle add tasks to the event loop. We keep a count of all
310   // non-closed handles, and when that reaches zero, we inform any shutdown
311   // callbacks that the platform is done as far as this Isolate is concerned.
312   self_reference_ = shared_from_this();
313   uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
314            [](uv_handle_t* handle) {
315     std::unique_ptr<uv_async_t> flush_tasks {
316         reinterpret_cast<uv_async_t*>(handle) };
317     PerIsolatePlatformData* platform_data =
318         static_cast<PerIsolatePlatformData*>(flush_tasks->data);
319     platform_data->DecreaseHandleCount();
320     platform_data->self_reference_.reset();
321   });
322   flush_tasks_ = nullptr;
323 }
324 
DecreaseHandleCount()325 void PerIsolatePlatformData::DecreaseHandleCount() {
326   CHECK_GE(uv_handle_count_, 1);
327   if (--uv_handle_count_ == 0) {
328     for (const auto& callback : shutdown_callbacks_)
329       callback.cb(callback.data);
330   }
331 }
332 
NodePlatform(int thread_pool_size,v8::TracingController * tracing_controller,v8::PageAllocator * page_allocator)333 NodePlatform::NodePlatform(int thread_pool_size,
334                            v8::TracingController* tracing_controller,
335                            v8::PageAllocator* page_allocator) {
336   if (tracing_controller != nullptr) {
337     tracing_controller_ = tracing_controller;
338   } else {
339     tracing_controller_ = new v8::TracingController();
340   }
341 
342   // V8 will default to its built in allocator if none is provided.
343   page_allocator_ = page_allocator;
344 
345   // TODO(addaleax): It's a bit icky that we use global state here, but we can't
346   // really do anything about it unless V8 starts exposing a way to access the
347   // current v8::Platform instance.
348   SetTracingController(tracing_controller_);
349   DCHECK_EQ(GetTracingController(), tracing_controller_);
350 
351   thread_pool_size = GetActualThreadPoolSize(thread_pool_size);
352   worker_thread_task_runner_ =
353       std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
354 }
355 
~NodePlatform()356 NodePlatform::~NodePlatform() {
357   Shutdown();
358 }
359 
RegisterIsolate(Isolate * isolate,uv_loop_t * loop)360 void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) {
361   Mutex::ScopedLock lock(per_isolate_mutex_);
362   auto delegate = std::make_shared<PerIsolatePlatformData>(isolate, loop);
363   IsolatePlatformDelegate* ptr = delegate.get();
364   auto insertion = per_isolate_.emplace(
365     isolate,
366     std::make_pair(ptr, std::move(delegate)));
367   CHECK(insertion.second);
368 }
369 
RegisterIsolate(Isolate * isolate,IsolatePlatformDelegate * delegate)370 void NodePlatform::RegisterIsolate(Isolate* isolate,
371                                    IsolatePlatformDelegate* delegate) {
372   Mutex::ScopedLock lock(per_isolate_mutex_);
373   auto insertion = per_isolate_.emplace(
374     isolate,
375     std::make_pair(delegate, std::shared_ptr<PerIsolatePlatformData>{}));
376   CHECK(insertion.second);
377 }
378 
UnregisterIsolate(Isolate * isolate)379 void NodePlatform::UnregisterIsolate(Isolate* isolate) {
380   Mutex::ScopedLock lock(per_isolate_mutex_);
381   auto existing_it = per_isolate_.find(isolate);
382   CHECK_NE(existing_it, per_isolate_.end());
383   auto& existing = existing_it->second;
384   if (existing.second) {
385     existing.second->Shutdown();
386   }
387   per_isolate_.erase(existing_it);
388 }
389 
AddIsolateFinishedCallback(Isolate * isolate,void (* cb)(void *),void * data)390 void NodePlatform::AddIsolateFinishedCallback(Isolate* isolate,
391                                               void (*cb)(void*), void* data) {
392   Mutex::ScopedLock lock(per_isolate_mutex_);
393   auto it = per_isolate_.find(isolate);
394   if (it == per_isolate_.end()) {
395     cb(data);
396     return;
397   }
398   CHECK(it->second.second);
399   it->second.second->AddShutdownCallback(cb, data);
400 }
401 
Shutdown()402 void NodePlatform::Shutdown() {
403   if (has_shut_down_) return;
404   has_shut_down_ = true;
405   worker_thread_task_runner_->Shutdown();
406 
407   {
408     Mutex::ScopedLock lock(per_isolate_mutex_);
409     per_isolate_.clear();
410   }
411 }
412 
NumberOfWorkerThreads()413 int NodePlatform::NumberOfWorkerThreads() {
414   return worker_thread_task_runner_->NumberOfWorkerThreads();
415 }
416 
RunForegroundTask(std::unique_ptr<Task> task)417 void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
418   if (isolate_->IsExecutionTerminating()) return;
419   DebugSealHandleScope scope(isolate_);
420   Environment* env = Environment::GetCurrent(isolate_);
421   if (env != nullptr) {
422     v8::HandleScope scope(isolate_);
423     InternalCallbackScope cb_scope(env, Object::New(isolate_), { 0, 0 },
424                                    InternalCallbackScope::kNoFlags);
425     task->Run();
426   } else {
427     // The task is moved out of InternalCallbackScope if env is not available.
428     // This is a required else block, and should not be removed.
429     // See comment: https://github.com/nodejs/node/pull/34688#pullrequestreview-463867489
430     task->Run();
431   }
432 }
433 
DeleteFromScheduledTasks(DelayedTask * task)434 void PerIsolatePlatformData::DeleteFromScheduledTasks(DelayedTask* task) {
435   auto it = std::find_if(scheduled_delayed_tasks_.begin(),
436                          scheduled_delayed_tasks_.end(),
437                          [task](const DelayedTaskPointer& delayed) -> bool {
438           return delayed.get() == task;
439       });
440   CHECK_NE(it, scheduled_delayed_tasks_.end());
441   scheduled_delayed_tasks_.erase(it);
442 }
443 
RunForegroundTask(uv_timer_t * handle)444 void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
445   DelayedTask* delayed = ContainerOf(&DelayedTask::timer, handle);
446   delayed->platform_data->RunForegroundTask(std::move(delayed->task));
447   delayed->platform_data->DeleteFromScheduledTasks(delayed);
448 }
449 
DrainTasks(Isolate * isolate)450 void NodePlatform::DrainTasks(Isolate* isolate) {
451   std::shared_ptr<PerIsolatePlatformData> per_isolate = ForNodeIsolate(isolate);
452   if (!per_isolate) return;
453 
454   do {
455     // Worker tasks aren't associated with an Isolate.
456     worker_thread_task_runner_->BlockingDrain();
457   } while (per_isolate->FlushForegroundTasksInternal());
458 }
459 
FlushForegroundTasksInternal()460 bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
461   bool did_work = false;
462 
463   while (std::unique_ptr<DelayedTask> delayed =
464       foreground_delayed_tasks_.Pop()) {
465     did_work = true;
466     uint64_t delay_millis = llround(delayed->timeout * 1000);
467 
468     delayed->timer.data = static_cast<void*>(delayed.get());
469     uv_timer_init(loop_, &delayed->timer);
470     // Timers may not guarantee queue ordering of events with the same delay if
471     // the delay is non-zero. This should not be a problem in practice.
472     uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
473     uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
474     uv_handle_count_++;
475 
476     scheduled_delayed_tasks_.emplace_back(delayed.release(),
477                                           [](DelayedTask* delayed) {
478       uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
479                [](uv_handle_t* handle) {
480         std::unique_ptr<DelayedTask> task {
481             static_cast<DelayedTask*>(handle->data) };
482         task->platform_data->DecreaseHandleCount();
483       });
484     });
485   }
486   // Move all foreground tasks into a separate queue and flush that queue.
487   // This way tasks that are posted while flushing the queue will be run on the
488   // next call of FlushForegroundTasksInternal.
489   std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
490   while (!tasks.empty()) {
491     std::unique_ptr<Task> task = std::move(tasks.front());
492     tasks.pop();
493     did_work = true;
494     RunForegroundTask(std::move(task));
495   }
496   return did_work;
497 }
498 
CallOnWorkerThread(std::unique_ptr<Task> task)499 void NodePlatform::CallOnWorkerThread(std::unique_ptr<Task> task) {
500   worker_thread_task_runner_->PostTask(std::move(task));
501 }
502 
CallDelayedOnWorkerThread(std::unique_ptr<Task> task,double delay_in_seconds)503 void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<Task> task,
504                                              double delay_in_seconds) {
505   worker_thread_task_runner_->PostDelayedTask(std::move(task),
506                                               delay_in_seconds);
507 }
508 
509 
ForIsolate(Isolate * isolate)510 IsolatePlatformDelegate* NodePlatform::ForIsolate(Isolate* isolate) {
511   Mutex::ScopedLock lock(per_isolate_mutex_);
512   auto data = per_isolate_[isolate];
513   CHECK_NOT_NULL(data.first);
514   return data.first;
515 }
516 
517 std::shared_ptr<PerIsolatePlatformData>
ForNodeIsolate(Isolate * isolate)518 NodePlatform::ForNodeIsolate(Isolate* isolate) {
519   Mutex::ScopedLock lock(per_isolate_mutex_);
520   auto data = per_isolate_[isolate];
521   CHECK_NOT_NULL(data.first);
522   return data.second;
523 }
524 
FlushForegroundTasks(Isolate * isolate)525 bool NodePlatform::FlushForegroundTasks(Isolate* isolate) {
526   std::shared_ptr<PerIsolatePlatformData> per_isolate = ForNodeIsolate(isolate);
527   if (!per_isolate) return false;
528   return per_isolate->FlushForegroundTasksInternal();
529 }
530 
PostJob(v8::TaskPriority priority,std::unique_ptr<v8::JobTask> job_task)531 std::unique_ptr<v8::JobHandle> NodePlatform::PostJob(v8::TaskPriority priority,
532                                        std::unique_ptr<v8::JobTask> job_task) {
533   return v8::platform::NewDefaultJobHandle(
534       this, priority, std::move(job_task), NumberOfWorkerThreads());
535 }
536 
IdleTasksEnabled(Isolate * isolate)537 bool NodePlatform::IdleTasksEnabled(Isolate* isolate) {
538   return ForIsolate(isolate)->IdleTasksEnabled();
539 }
540 
541 std::shared_ptr<v8::TaskRunner>
GetForegroundTaskRunner(Isolate * isolate)542 NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
543   return ForIsolate(isolate)->GetForegroundTaskRunner();
544 }
545 
MonotonicallyIncreasingTime()546 double NodePlatform::MonotonicallyIncreasingTime() {
547   // Convert nanos to seconds.
548   return uv_hrtime() / 1e9;
549 }
550 
CurrentClockTimeMillis()551 double NodePlatform::CurrentClockTimeMillis() {
552   return SystemClockTimeMillis();
553 }
554 
GetTracingController()555 v8::TracingController* NodePlatform::GetTracingController() {
556   CHECK_NOT_NULL(tracing_controller_);
557   return tracing_controller_;
558 }
559 
GetStackTracePrinter()560 Platform::StackTracePrinter NodePlatform::GetStackTracePrinter() {
561   return []() {
562     fprintf(stderr, "\n");
563     DumpBacktrace(stderr);
564     fflush(stderr);
565   };
566 }
567 
GetPageAllocator()568 v8::PageAllocator* NodePlatform::GetPageAllocator() {
569   return page_allocator_;
570 }
571 
572 template <class T>
TaskQueue()573 TaskQueue<T>::TaskQueue()
574     : lock_(), tasks_available_(), tasks_drained_(),
575       outstanding_tasks_(0), stopped_(false), task_queue_() { }
576 
577 template <class T>
Push(std::unique_ptr<T> task)578 void TaskQueue<T>::Push(std::unique_ptr<T> task) {
579   Mutex::ScopedLock scoped_lock(lock_);
580   outstanding_tasks_++;
581   task_queue_.push(std::move(task));
582   tasks_available_.Signal(scoped_lock);
583 }
584 
585 template <class T>
Pop()586 std::unique_ptr<T> TaskQueue<T>::Pop() {
587   Mutex::ScopedLock scoped_lock(lock_);
588   if (task_queue_.empty()) {
589     return std::unique_ptr<T>(nullptr);
590   }
591   std::unique_ptr<T> result = std::move(task_queue_.front());
592   task_queue_.pop();
593   return result;
594 }
595 
596 template <class T>
BlockingPop()597 std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
598   Mutex::ScopedLock scoped_lock(lock_);
599   while (task_queue_.empty() && !stopped_) {
600     tasks_available_.Wait(scoped_lock);
601   }
602   if (stopped_) {
603     return std::unique_ptr<T>(nullptr);
604   }
605   std::unique_ptr<T> result = std::move(task_queue_.front());
606   task_queue_.pop();
607   return result;
608 }
609 
610 template <class T>
NotifyOfCompletion()611 void TaskQueue<T>::NotifyOfCompletion() {
612   Mutex::ScopedLock scoped_lock(lock_);
613   if (--outstanding_tasks_ == 0) {
614     tasks_drained_.Broadcast(scoped_lock);
615   }
616 }
617 
618 template <class T>
BlockingDrain()619 void TaskQueue<T>::BlockingDrain() {
620   Mutex::ScopedLock scoped_lock(lock_);
621   while (outstanding_tasks_ > 0) {
622     tasks_drained_.Wait(scoped_lock);
623   }
624 }
625 
626 template <class T>
Stop()627 void TaskQueue<T>::Stop() {
628   Mutex::ScopedLock scoped_lock(lock_);
629   stopped_ = true;
630   tasks_available_.Broadcast(scoped_lock);
631 }
632 
633 template <class T>
PopAll()634 std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
635   Mutex::ScopedLock scoped_lock(lock_);
636   std::queue<std::unique_ptr<T>> result;
637   result.swap(task_queue_);
638   return result;
639 }
640 
641 }  // namespace node
642