1 #include "node_platform.h"
2 #include "node_internals.h"
3
4 #include "env-inl.h"
5 #include "debug_utils-inl.h"
6 #include <algorithm> // find_if(), find(), move()
7 #include <cmath> // llround()
8 #include <memory> // unique_ptr(), shared_ptr(), make_shared()
9
10 namespace node {
11
12 using v8::Isolate;
13 using v8::Object;
14 using v8::Platform;
15 using v8::Task;
16
17 namespace {
18
19 struct PlatformWorkerData {
20 TaskQueue<Task>* task_queue;
21 Mutex* platform_workers_mutex;
22 ConditionVariable* platform_workers_ready;
23 int* pending_platform_workers;
24 int id;
25 };
26
PlatformWorkerThread(void * data)27 static void PlatformWorkerThread(void* data) {
28 std::unique_ptr<PlatformWorkerData>
29 worker_data(static_cast<PlatformWorkerData*>(data));
30
31 TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
32 TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
33 "PlatformWorkerThread");
34
35 // Notify the main thread that the platform worker is ready.
36 {
37 Mutex::ScopedLock lock(*worker_data->platform_workers_mutex);
38 (*worker_data->pending_platform_workers)--;
39 worker_data->platform_workers_ready->Signal(lock);
40 }
41
42 while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
43 task->Run();
44 pending_worker_tasks->NotifyOfCompletion();
45 }
46 }
47
GetActualThreadPoolSize(int thread_pool_size)48 static int GetActualThreadPoolSize(int thread_pool_size) {
49 if (thread_pool_size < 1) {
50 thread_pool_size = uv_available_parallelism() - 1;
51 }
52 return std::max(thread_pool_size, 1);
53 }
54
55 } // namespace
56
57 class WorkerThreadsTaskRunner::DelayedTaskScheduler {
58 public:
DelayedTaskScheduler(TaskQueue<Task> * tasks)59 explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
60 : pending_worker_tasks_(tasks) {}
61
Start()62 std::unique_ptr<uv_thread_t> Start() {
63 auto start_thread = [](void* data) {
64 static_cast<DelayedTaskScheduler*>(data)->Run();
65 };
66 std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
67 uv_sem_init(&ready_, 0);
68 CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
69 uv_sem_wait(&ready_);
70 uv_sem_destroy(&ready_);
71 return t;
72 }
73
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)74 void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
75 tasks_.Push(std::make_unique<ScheduleTask>(this, std::move(task),
76 delay_in_seconds));
77 uv_async_send(&flush_tasks_);
78 }
79
Stop()80 void Stop() {
81 tasks_.Push(std::make_unique<StopTask>(this));
82 uv_async_send(&flush_tasks_);
83 }
84
85 private:
Run()86 void Run() {
87 TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
88 "WorkerThreadsTaskRunner::DelayedTaskScheduler");
89 loop_.data = this;
90 CHECK_EQ(0, uv_loop_init(&loop_));
91 flush_tasks_.data = this;
92 CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
93 uv_sem_post(&ready_);
94
95 uv_run(&loop_, UV_RUN_DEFAULT);
96 CheckedUvLoopClose(&loop_);
97 }
98
FlushTasks(uv_async_t * flush_tasks)99 static void FlushTasks(uv_async_t* flush_tasks) {
100 DelayedTaskScheduler* scheduler =
101 ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
102 while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
103 task->Run();
104 }
105
106 class StopTask : public Task {
107 public:
StopTask(DelayedTaskScheduler * scheduler)108 explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
109
Run()110 void Run() override {
111 std::vector<uv_timer_t*> timers;
112 for (uv_timer_t* timer : scheduler_->timers_)
113 timers.push_back(timer);
114 for (uv_timer_t* timer : timers)
115 scheduler_->TakeTimerTask(timer);
116 uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
117 [](uv_handle_t* handle) {});
118 }
119
120 private:
121 DelayedTaskScheduler* scheduler_;
122 };
123
124 class ScheduleTask : public Task {
125 public:
ScheduleTask(DelayedTaskScheduler * scheduler,std::unique_ptr<Task> task,double delay_in_seconds)126 ScheduleTask(DelayedTaskScheduler* scheduler,
127 std::unique_ptr<Task> task,
128 double delay_in_seconds)
129 : scheduler_(scheduler),
130 task_(std::move(task)),
131 delay_in_seconds_(delay_in_seconds) {}
132
Run()133 void Run() override {
134 uint64_t delay_millis = llround(delay_in_seconds_ * 1000);
135 std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
136 CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
137 timer->data = task_.release();
138 CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
139 scheduler_->timers_.insert(timer.release());
140 }
141
142 private:
143 DelayedTaskScheduler* scheduler_;
144 std::unique_ptr<Task> task_;
145 double delay_in_seconds_;
146 };
147
RunTask(uv_timer_t * timer)148 static void RunTask(uv_timer_t* timer) {
149 DelayedTaskScheduler* scheduler =
150 ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
151 scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
152 }
153
TakeTimerTask(uv_timer_t * timer)154 std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
155 std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
156 uv_timer_stop(timer);
157 uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
158 delete reinterpret_cast<uv_timer_t*>(handle);
159 });
160 timers_.erase(timer);
161 return task;
162 }
163
164 uv_sem_t ready_;
165 TaskQueue<Task>* pending_worker_tasks_;
166
167 TaskQueue<Task> tasks_;
168 uv_loop_t loop_;
169 uv_async_t flush_tasks_;
170 std::unordered_set<uv_timer_t*> timers_;
171 };
172
WorkerThreadsTaskRunner(int thread_pool_size)173 WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
174 Mutex platform_workers_mutex;
175 ConditionVariable platform_workers_ready;
176
177 Mutex::ScopedLock lock(platform_workers_mutex);
178 int pending_platform_workers = thread_pool_size;
179
180 delayed_task_scheduler_ = std::make_unique<DelayedTaskScheduler>(
181 &pending_worker_tasks_);
182 threads_.push_back(delayed_task_scheduler_->Start());
183
184 for (int i = 0; i < thread_pool_size; i++) {
185 PlatformWorkerData* worker_data = new PlatformWorkerData{
186 &pending_worker_tasks_, &platform_workers_mutex,
187 &platform_workers_ready, &pending_platform_workers, i
188 };
189 std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
190 if (uv_thread_create(t.get(), PlatformWorkerThread,
191 worker_data) != 0) {
192 break;
193 }
194 threads_.push_back(std::move(t));
195 }
196
197 // Wait for platform workers to initialize before continuing with the
198 // bootstrap.
199 while (pending_platform_workers > 0) {
200 platform_workers_ready.Wait(lock);
201 }
202 }
203
PostTask(std::unique_ptr<Task> task)204 void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
205 pending_worker_tasks_.Push(std::move(task));
206 }
207
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)208 void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
209 double delay_in_seconds) {
210 delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
211 }
212
BlockingDrain()213 void WorkerThreadsTaskRunner::BlockingDrain() {
214 pending_worker_tasks_.BlockingDrain();
215 }
216
Shutdown()217 void WorkerThreadsTaskRunner::Shutdown() {
218 pending_worker_tasks_.Stop();
219 delayed_task_scheduler_->Stop();
220 for (size_t i = 0; i < threads_.size(); i++) {
221 CHECK_EQ(0, uv_thread_join(threads_[i].get()));
222 }
223 }
224
NumberOfWorkerThreads() const225 int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
226 return threads_.size();
227 }
228
PerIsolatePlatformData(Isolate * isolate,uv_loop_t * loop)229 PerIsolatePlatformData::PerIsolatePlatformData(
230 Isolate* isolate, uv_loop_t* loop)
231 : isolate_(isolate), loop_(loop) {
232 flush_tasks_ = new uv_async_t();
233 CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks));
234 flush_tasks_->data = static_cast<void*>(this);
235 uv_unref(reinterpret_cast<uv_handle_t*>(flush_tasks_));
236 }
237
238 std::shared_ptr<v8::TaskRunner>
GetForegroundTaskRunner()239 PerIsolatePlatformData::GetForegroundTaskRunner() {
240 return shared_from_this();
241 }
242
FlushTasks(uv_async_t * handle)243 void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
244 auto platform_data = static_cast<PerIsolatePlatformData*>(handle->data);
245 platform_data->FlushForegroundTasksInternal();
246 }
247
PostIdleTask(std::unique_ptr<v8::IdleTask> task)248 void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
249 UNREACHABLE();
250 }
251
PostTask(std::unique_ptr<Task> task)252 void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
253 if (flush_tasks_ == nullptr) {
254 // V8 may post tasks during Isolate disposal. In that case, the only
255 // sensible path forward is to discard the task.
256 return;
257 }
258 foreground_tasks_.Push(std::move(task));
259 uv_async_send(flush_tasks_);
260 }
261
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)262 void PerIsolatePlatformData::PostDelayedTask(
263 std::unique_ptr<Task> task, double delay_in_seconds) {
264 if (flush_tasks_ == nullptr) {
265 // V8 may post tasks during Isolate disposal. In that case, the only
266 // sensible path forward is to discard the task.
267 return;
268 }
269 std::unique_ptr<DelayedTask> delayed(new DelayedTask());
270 delayed->task = std::move(task);
271 delayed->platform_data = shared_from_this();
272 delayed->timeout = delay_in_seconds;
273 foreground_delayed_tasks_.Push(std::move(delayed));
274 uv_async_send(flush_tasks_);
275 }
276
PostNonNestableTask(std::unique_ptr<Task> task)277 void PerIsolatePlatformData::PostNonNestableTask(std::unique_ptr<Task> task) {
278 PostTask(std::move(task));
279 }
280
PostNonNestableDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)281 void PerIsolatePlatformData::PostNonNestableDelayedTask(
282 std::unique_ptr<Task> task,
283 double delay_in_seconds) {
284 PostDelayedTask(std::move(task), delay_in_seconds);
285 }
286
~PerIsolatePlatformData()287 PerIsolatePlatformData::~PerIsolatePlatformData() {
288 CHECK(!flush_tasks_);
289 }
290
AddShutdownCallback(void (* callback)(void *),void * data)291 void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
292 void* data) {
293 shutdown_callbacks_.emplace_back(ShutdownCallback { callback, data });
294 }
295
Shutdown()296 void PerIsolatePlatformData::Shutdown() {
297 if (flush_tasks_ == nullptr)
298 return;
299
300 // While there should be no V8 tasks in the queues at this point, it is
301 // possible that Node.js-internal tasks from e.g. the inspector are still
302 // lying around. We clear these queues and ignore the return value,
303 // effectively deleting the tasks instead of running them.
304 foreground_delayed_tasks_.PopAll();
305 foreground_tasks_.PopAll();
306 scheduled_delayed_tasks_.clear();
307
308 // Both destroying the scheduled_delayed_tasks_ lists and closing
309 // flush_tasks_ handle add tasks to the event loop. We keep a count of all
310 // non-closed handles, and when that reaches zero, we inform any shutdown
311 // callbacks that the platform is done as far as this Isolate is concerned.
312 self_reference_ = shared_from_this();
313 uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
314 [](uv_handle_t* handle) {
315 std::unique_ptr<uv_async_t> flush_tasks {
316 reinterpret_cast<uv_async_t*>(handle) };
317 PerIsolatePlatformData* platform_data =
318 static_cast<PerIsolatePlatformData*>(flush_tasks->data);
319 platform_data->DecreaseHandleCount();
320 platform_data->self_reference_.reset();
321 });
322 flush_tasks_ = nullptr;
323 }
324
DecreaseHandleCount()325 void PerIsolatePlatformData::DecreaseHandleCount() {
326 CHECK_GE(uv_handle_count_, 1);
327 if (--uv_handle_count_ == 0) {
328 for (const auto& callback : shutdown_callbacks_)
329 callback.cb(callback.data);
330 }
331 }
332
NodePlatform(int thread_pool_size,v8::TracingController * tracing_controller,v8::PageAllocator * page_allocator)333 NodePlatform::NodePlatform(int thread_pool_size,
334 v8::TracingController* tracing_controller,
335 v8::PageAllocator* page_allocator) {
336 if (tracing_controller != nullptr) {
337 tracing_controller_ = tracing_controller;
338 } else {
339 tracing_controller_ = new v8::TracingController();
340 }
341
342 // V8 will default to its built in allocator if none is provided.
343 page_allocator_ = page_allocator;
344
345 // TODO(addaleax): It's a bit icky that we use global state here, but we can't
346 // really do anything about it unless V8 starts exposing a way to access the
347 // current v8::Platform instance.
348 SetTracingController(tracing_controller_);
349 DCHECK_EQ(GetTracingController(), tracing_controller_);
350
351 thread_pool_size = GetActualThreadPoolSize(thread_pool_size);
352 worker_thread_task_runner_ =
353 std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
354 }
355
~NodePlatform()356 NodePlatform::~NodePlatform() {
357 Shutdown();
358 }
359
RegisterIsolate(Isolate * isolate,uv_loop_t * loop)360 void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) {
361 Mutex::ScopedLock lock(per_isolate_mutex_);
362 auto delegate = std::make_shared<PerIsolatePlatformData>(isolate, loop);
363 IsolatePlatformDelegate* ptr = delegate.get();
364 auto insertion = per_isolate_.emplace(
365 isolate,
366 std::make_pair(ptr, std::move(delegate)));
367 CHECK(insertion.second);
368 }
369
RegisterIsolate(Isolate * isolate,IsolatePlatformDelegate * delegate)370 void NodePlatform::RegisterIsolate(Isolate* isolate,
371 IsolatePlatformDelegate* delegate) {
372 Mutex::ScopedLock lock(per_isolate_mutex_);
373 auto insertion = per_isolate_.emplace(
374 isolate,
375 std::make_pair(delegate, std::shared_ptr<PerIsolatePlatformData>{}));
376 CHECK(insertion.second);
377 }
378
UnregisterIsolate(Isolate * isolate)379 void NodePlatform::UnregisterIsolate(Isolate* isolate) {
380 Mutex::ScopedLock lock(per_isolate_mutex_);
381 auto existing_it = per_isolate_.find(isolate);
382 CHECK_NE(existing_it, per_isolate_.end());
383 auto& existing = existing_it->second;
384 if (existing.second) {
385 existing.second->Shutdown();
386 }
387 per_isolate_.erase(existing_it);
388 }
389
AddIsolateFinishedCallback(Isolate * isolate,void (* cb)(void *),void * data)390 void NodePlatform::AddIsolateFinishedCallback(Isolate* isolate,
391 void (*cb)(void*), void* data) {
392 Mutex::ScopedLock lock(per_isolate_mutex_);
393 auto it = per_isolate_.find(isolate);
394 if (it == per_isolate_.end()) {
395 cb(data);
396 return;
397 }
398 CHECK(it->second.second);
399 it->second.second->AddShutdownCallback(cb, data);
400 }
401
Shutdown()402 void NodePlatform::Shutdown() {
403 if (has_shut_down_) return;
404 has_shut_down_ = true;
405 worker_thread_task_runner_->Shutdown();
406
407 {
408 Mutex::ScopedLock lock(per_isolate_mutex_);
409 per_isolate_.clear();
410 }
411 }
412
NumberOfWorkerThreads()413 int NodePlatform::NumberOfWorkerThreads() {
414 return worker_thread_task_runner_->NumberOfWorkerThreads();
415 }
416
RunForegroundTask(std::unique_ptr<Task> task)417 void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
418 if (isolate_->IsExecutionTerminating()) return;
419 DebugSealHandleScope scope(isolate_);
420 Environment* env = Environment::GetCurrent(isolate_);
421 if (env != nullptr) {
422 v8::HandleScope scope(isolate_);
423 InternalCallbackScope cb_scope(env, Object::New(isolate_), { 0, 0 },
424 InternalCallbackScope::kNoFlags);
425 task->Run();
426 } else {
427 // The task is moved out of InternalCallbackScope if env is not available.
428 // This is a required else block, and should not be removed.
429 // See comment: https://github.com/nodejs/node/pull/34688#pullrequestreview-463867489
430 task->Run();
431 }
432 }
433
DeleteFromScheduledTasks(DelayedTask * task)434 void PerIsolatePlatformData::DeleteFromScheduledTasks(DelayedTask* task) {
435 auto it = std::find_if(scheduled_delayed_tasks_.begin(),
436 scheduled_delayed_tasks_.end(),
437 [task](const DelayedTaskPointer& delayed) -> bool {
438 return delayed.get() == task;
439 });
440 CHECK_NE(it, scheduled_delayed_tasks_.end());
441 scheduled_delayed_tasks_.erase(it);
442 }
443
RunForegroundTask(uv_timer_t * handle)444 void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
445 DelayedTask* delayed = ContainerOf(&DelayedTask::timer, handle);
446 delayed->platform_data->RunForegroundTask(std::move(delayed->task));
447 delayed->platform_data->DeleteFromScheduledTasks(delayed);
448 }
449
DrainTasks(Isolate * isolate)450 void NodePlatform::DrainTasks(Isolate* isolate) {
451 std::shared_ptr<PerIsolatePlatformData> per_isolate = ForNodeIsolate(isolate);
452 if (!per_isolate) return;
453
454 do {
455 // Worker tasks aren't associated with an Isolate.
456 worker_thread_task_runner_->BlockingDrain();
457 } while (per_isolate->FlushForegroundTasksInternal());
458 }
459
FlushForegroundTasksInternal()460 bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
461 bool did_work = false;
462
463 while (std::unique_ptr<DelayedTask> delayed =
464 foreground_delayed_tasks_.Pop()) {
465 did_work = true;
466 uint64_t delay_millis = llround(delayed->timeout * 1000);
467
468 delayed->timer.data = static_cast<void*>(delayed.get());
469 uv_timer_init(loop_, &delayed->timer);
470 // Timers may not guarantee queue ordering of events with the same delay if
471 // the delay is non-zero. This should not be a problem in practice.
472 uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
473 uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
474 uv_handle_count_++;
475
476 scheduled_delayed_tasks_.emplace_back(delayed.release(),
477 [](DelayedTask* delayed) {
478 uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
479 [](uv_handle_t* handle) {
480 std::unique_ptr<DelayedTask> task {
481 static_cast<DelayedTask*>(handle->data) };
482 task->platform_data->DecreaseHandleCount();
483 });
484 });
485 }
486 // Move all foreground tasks into a separate queue and flush that queue.
487 // This way tasks that are posted while flushing the queue will be run on the
488 // next call of FlushForegroundTasksInternal.
489 std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
490 while (!tasks.empty()) {
491 std::unique_ptr<Task> task = std::move(tasks.front());
492 tasks.pop();
493 did_work = true;
494 RunForegroundTask(std::move(task));
495 }
496 return did_work;
497 }
498
CallOnWorkerThread(std::unique_ptr<Task> task)499 void NodePlatform::CallOnWorkerThread(std::unique_ptr<Task> task) {
500 worker_thread_task_runner_->PostTask(std::move(task));
501 }
502
CallDelayedOnWorkerThread(std::unique_ptr<Task> task,double delay_in_seconds)503 void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<Task> task,
504 double delay_in_seconds) {
505 worker_thread_task_runner_->PostDelayedTask(std::move(task),
506 delay_in_seconds);
507 }
508
509
ForIsolate(Isolate * isolate)510 IsolatePlatformDelegate* NodePlatform::ForIsolate(Isolate* isolate) {
511 Mutex::ScopedLock lock(per_isolate_mutex_);
512 auto data = per_isolate_[isolate];
513 CHECK_NOT_NULL(data.first);
514 return data.first;
515 }
516
517 std::shared_ptr<PerIsolatePlatformData>
ForNodeIsolate(Isolate * isolate)518 NodePlatform::ForNodeIsolate(Isolate* isolate) {
519 Mutex::ScopedLock lock(per_isolate_mutex_);
520 auto data = per_isolate_[isolate];
521 CHECK_NOT_NULL(data.first);
522 return data.second;
523 }
524
FlushForegroundTasks(Isolate * isolate)525 bool NodePlatform::FlushForegroundTasks(Isolate* isolate) {
526 std::shared_ptr<PerIsolatePlatformData> per_isolate = ForNodeIsolate(isolate);
527 if (!per_isolate) return false;
528 return per_isolate->FlushForegroundTasksInternal();
529 }
530
PostJob(v8::TaskPriority priority,std::unique_ptr<v8::JobTask> job_task)531 std::unique_ptr<v8::JobHandle> NodePlatform::PostJob(v8::TaskPriority priority,
532 std::unique_ptr<v8::JobTask> job_task) {
533 return v8::platform::NewDefaultJobHandle(
534 this, priority, std::move(job_task), NumberOfWorkerThreads());
535 }
536
IdleTasksEnabled(Isolate * isolate)537 bool NodePlatform::IdleTasksEnabled(Isolate* isolate) {
538 return ForIsolate(isolate)->IdleTasksEnabled();
539 }
540
541 std::shared_ptr<v8::TaskRunner>
GetForegroundTaskRunner(Isolate * isolate)542 NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
543 return ForIsolate(isolate)->GetForegroundTaskRunner();
544 }
545
MonotonicallyIncreasingTime()546 double NodePlatform::MonotonicallyIncreasingTime() {
547 // Convert nanos to seconds.
548 return uv_hrtime() / 1e9;
549 }
550
CurrentClockTimeMillis()551 double NodePlatform::CurrentClockTimeMillis() {
552 return SystemClockTimeMillis();
553 }
554
GetTracingController()555 v8::TracingController* NodePlatform::GetTracingController() {
556 CHECK_NOT_NULL(tracing_controller_);
557 return tracing_controller_;
558 }
559
GetStackTracePrinter()560 Platform::StackTracePrinter NodePlatform::GetStackTracePrinter() {
561 return []() {
562 fprintf(stderr, "\n");
563 DumpBacktrace(stderr);
564 fflush(stderr);
565 };
566 }
567
GetPageAllocator()568 v8::PageAllocator* NodePlatform::GetPageAllocator() {
569 return page_allocator_;
570 }
571
572 template <class T>
TaskQueue()573 TaskQueue<T>::TaskQueue()
574 : lock_(), tasks_available_(), tasks_drained_(),
575 outstanding_tasks_(0), stopped_(false), task_queue_() { }
576
577 template <class T>
Push(std::unique_ptr<T> task)578 void TaskQueue<T>::Push(std::unique_ptr<T> task) {
579 Mutex::ScopedLock scoped_lock(lock_);
580 outstanding_tasks_++;
581 task_queue_.push(std::move(task));
582 tasks_available_.Signal(scoped_lock);
583 }
584
585 template <class T>
Pop()586 std::unique_ptr<T> TaskQueue<T>::Pop() {
587 Mutex::ScopedLock scoped_lock(lock_);
588 if (task_queue_.empty()) {
589 return std::unique_ptr<T>(nullptr);
590 }
591 std::unique_ptr<T> result = std::move(task_queue_.front());
592 task_queue_.pop();
593 return result;
594 }
595
596 template <class T>
BlockingPop()597 std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
598 Mutex::ScopedLock scoped_lock(lock_);
599 while (task_queue_.empty() && !stopped_) {
600 tasks_available_.Wait(scoped_lock);
601 }
602 if (stopped_) {
603 return std::unique_ptr<T>(nullptr);
604 }
605 std::unique_ptr<T> result = std::move(task_queue_.front());
606 task_queue_.pop();
607 return result;
608 }
609
610 template <class T>
NotifyOfCompletion()611 void TaskQueue<T>::NotifyOfCompletion() {
612 Mutex::ScopedLock scoped_lock(lock_);
613 if (--outstanding_tasks_ == 0) {
614 tasks_drained_.Broadcast(scoped_lock);
615 }
616 }
617
618 template <class T>
BlockingDrain()619 void TaskQueue<T>::BlockingDrain() {
620 Mutex::ScopedLock scoped_lock(lock_);
621 while (outstanding_tasks_ > 0) {
622 tasks_drained_.Wait(scoped_lock);
623 }
624 }
625
626 template <class T>
Stop()627 void TaskQueue<T>::Stop() {
628 Mutex::ScopedLock scoped_lock(lock_);
629 stopped_ = true;
630 tasks_available_.Broadcast(scoped_lock);
631 }
632
633 template <class T>
PopAll()634 std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
635 Mutex::ScopedLock scoped_lock(lock_);
636 std::queue<std::unique_ptr<T>> result;
637 result.swap(task_queue_);
638 return result;
639 }
640
641 } // namespace node
642