1 #include "node_platform.h"
2 #include "node_internals.h"
3
4 #include "env-inl.h"
5 #include "debug_utils-inl.h"
6 #include <algorithm> // find_if(), find(), move()
7 #include <cmath> // llround()
8 #include <memory> // unique_ptr(), shared_ptr(), make_shared()
9
10 namespace node {
11
12 using v8::Isolate;
13 using v8::Object;
14 using v8::Platform;
15 using v8::Task;
16
17 namespace {
18
19 struct PlatformWorkerData {
20 TaskQueue<Task>* task_queue;
21 Mutex* platform_workers_mutex;
22 ConditionVariable* platform_workers_ready;
23 int* pending_platform_workers;
24 int id;
25 };
26
PlatformWorkerThread(void * data)27 static void PlatformWorkerThread(void* data) {
28 std::unique_ptr<PlatformWorkerData>
29 worker_data(static_cast<PlatformWorkerData*>(data));
30
31 TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
32 TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
33 "PlatformWorkerThread");
34
35 // Notify the main thread that the platform worker is ready.
36 {
37 Mutex::ScopedLock lock(*worker_data->platform_workers_mutex);
38 (*worker_data->pending_platform_workers)--;
39 worker_data->platform_workers_ready->Signal(lock);
40 }
41
42 while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
43 task->Run();
44 pending_worker_tasks->NotifyOfCompletion();
45 }
46 }
47
48 } // namespace
49
50 class WorkerThreadsTaskRunner::DelayedTaskScheduler {
51 public:
DelayedTaskScheduler(TaskQueue<Task> * tasks)52 explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
53 : pending_worker_tasks_(tasks) {}
54
Start()55 std::unique_ptr<uv_thread_t> Start() {
56 auto start_thread = [](void* data) {
57 static_cast<DelayedTaskScheduler*>(data)->Run();
58 };
59 std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
60 uv_sem_init(&ready_, 0);
61 CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
62 uv_sem_wait(&ready_);
63 uv_sem_destroy(&ready_);
64 return t;
65 }
66
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)67 void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
68 tasks_.Push(std::make_unique<ScheduleTask>(this, std::move(task),
69 delay_in_seconds));
70 uv_async_send(&flush_tasks_);
71 }
72
Stop()73 void Stop() {
74 tasks_.Push(std::make_unique<StopTask>(this));
75 uv_async_send(&flush_tasks_);
76 }
77
78 private:
Run()79 void Run() {
80 TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
81 "WorkerThreadsTaskRunner::DelayedTaskScheduler");
82 loop_.data = this;
83 CHECK_EQ(0, uv_loop_init(&loop_));
84 flush_tasks_.data = this;
85 CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
86 uv_sem_post(&ready_);
87
88 uv_run(&loop_, UV_RUN_DEFAULT);
89 CheckedUvLoopClose(&loop_);
90 }
91
FlushTasks(uv_async_t * flush_tasks)92 static void FlushTasks(uv_async_t* flush_tasks) {
93 DelayedTaskScheduler* scheduler =
94 ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
95 while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
96 task->Run();
97 }
98
99 class StopTask : public Task {
100 public:
StopTask(DelayedTaskScheduler * scheduler)101 explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
102
Run()103 void Run() override {
104 std::vector<uv_timer_t*> timers;
105 for (uv_timer_t* timer : scheduler_->timers_)
106 timers.push_back(timer);
107 for (uv_timer_t* timer : timers)
108 scheduler_->TakeTimerTask(timer);
109 uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
110 [](uv_handle_t* handle) {});
111 }
112
113 private:
114 DelayedTaskScheduler* scheduler_;
115 };
116
117 class ScheduleTask : public Task {
118 public:
ScheduleTask(DelayedTaskScheduler * scheduler,std::unique_ptr<Task> task,double delay_in_seconds)119 ScheduleTask(DelayedTaskScheduler* scheduler,
120 std::unique_ptr<Task> task,
121 double delay_in_seconds)
122 : scheduler_(scheduler),
123 task_(std::move(task)),
124 delay_in_seconds_(delay_in_seconds) {}
125
Run()126 void Run() override {
127 uint64_t delay_millis = llround(delay_in_seconds_ * 1000);
128 std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
129 CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
130 timer->data = task_.release();
131 CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
132 scheduler_->timers_.insert(timer.release());
133 }
134
135 private:
136 DelayedTaskScheduler* scheduler_;
137 std::unique_ptr<Task> task_;
138 double delay_in_seconds_;
139 };
140
RunTask(uv_timer_t * timer)141 static void RunTask(uv_timer_t* timer) {
142 DelayedTaskScheduler* scheduler =
143 ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
144 scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
145 }
146
TakeTimerTask(uv_timer_t * timer)147 std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
148 std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
149 uv_timer_stop(timer);
150 uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
151 delete reinterpret_cast<uv_timer_t*>(handle);
152 });
153 timers_.erase(timer);
154 return task;
155 }
156
157 uv_sem_t ready_;
158 TaskQueue<Task>* pending_worker_tasks_;
159
160 TaskQueue<Task> tasks_;
161 uv_loop_t loop_;
162 uv_async_t flush_tasks_;
163 std::unordered_set<uv_timer_t*> timers_;
164 };
165
WorkerThreadsTaskRunner(int thread_pool_size)166 WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
167 Mutex platform_workers_mutex;
168 ConditionVariable platform_workers_ready;
169
170 Mutex::ScopedLock lock(platform_workers_mutex);
171 int pending_platform_workers = thread_pool_size;
172
173 delayed_task_scheduler_ = std::make_unique<DelayedTaskScheduler>(
174 &pending_worker_tasks_);
175 threads_.push_back(delayed_task_scheduler_->Start());
176
177 for (int i = 0; i < thread_pool_size; i++) {
178 PlatformWorkerData* worker_data = new PlatformWorkerData{
179 &pending_worker_tasks_, &platform_workers_mutex,
180 &platform_workers_ready, &pending_platform_workers, i
181 };
182 std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
183 if (uv_thread_create(t.get(), PlatformWorkerThread,
184 worker_data) != 0) {
185 break;
186 }
187 threads_.push_back(std::move(t));
188 }
189
190 // Wait for platform workers to initialize before continuing with the
191 // bootstrap.
192 while (pending_platform_workers > 0) {
193 platform_workers_ready.Wait(lock);
194 }
195 }
196
PostTask(std::unique_ptr<Task> task)197 void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
198 pending_worker_tasks_.Push(std::move(task));
199 }
200
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)201 void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
202 double delay_in_seconds) {
203 delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
204 }
205
BlockingDrain()206 void WorkerThreadsTaskRunner::BlockingDrain() {
207 pending_worker_tasks_.BlockingDrain();
208 }
209
Shutdown()210 void WorkerThreadsTaskRunner::Shutdown() {
211 pending_worker_tasks_.Stop();
212 delayed_task_scheduler_->Stop();
213 for (size_t i = 0; i < threads_.size(); i++) {
214 CHECK_EQ(0, uv_thread_join(threads_[i].get()));
215 }
216 }
217
NumberOfWorkerThreads() const218 int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
219 return threads_.size();
220 }
221
PerIsolatePlatformData(Isolate * isolate,uv_loop_t * loop)222 PerIsolatePlatformData::PerIsolatePlatformData(
223 Isolate* isolate, uv_loop_t* loop)
224 : isolate_(isolate), loop_(loop) {
225 flush_tasks_ = new uv_async_t();
226 CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks));
227 flush_tasks_->data = static_cast<void*>(this);
228 uv_unref(reinterpret_cast<uv_handle_t*>(flush_tasks_));
229 }
230
231 std::shared_ptr<v8::TaskRunner>
GetForegroundTaskRunner()232 PerIsolatePlatformData::GetForegroundTaskRunner() {
233 return shared_from_this();
234 }
235
FlushTasks(uv_async_t * handle)236 void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
237 auto platform_data = static_cast<PerIsolatePlatformData*>(handle->data);
238 platform_data->FlushForegroundTasksInternal();
239 }
240
PostIdleTask(std::unique_ptr<v8::IdleTask> task)241 void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
242 UNREACHABLE();
243 }
244
PostTask(std::unique_ptr<Task> task)245 void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
246 if (flush_tasks_ == nullptr) {
247 // V8 may post tasks during Isolate disposal. In that case, the only
248 // sensible path forward is to discard the task.
249 return;
250 }
251 foreground_tasks_.Push(std::move(task));
252 uv_async_send(flush_tasks_);
253 }
254
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)255 void PerIsolatePlatformData::PostDelayedTask(
256 std::unique_ptr<Task> task, double delay_in_seconds) {
257 if (flush_tasks_ == nullptr) {
258 // V8 may post tasks during Isolate disposal. In that case, the only
259 // sensible path forward is to discard the task.
260 return;
261 }
262 std::unique_ptr<DelayedTask> delayed(new DelayedTask());
263 delayed->task = std::move(task);
264 delayed->platform_data = shared_from_this();
265 delayed->timeout = delay_in_seconds;
266 foreground_delayed_tasks_.Push(std::move(delayed));
267 uv_async_send(flush_tasks_);
268 }
269
PostNonNestableTask(std::unique_ptr<Task> task)270 void PerIsolatePlatformData::PostNonNestableTask(std::unique_ptr<Task> task) {
271 PostTask(std::move(task));
272 }
273
PostNonNestableDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)274 void PerIsolatePlatformData::PostNonNestableDelayedTask(
275 std::unique_ptr<Task> task,
276 double delay_in_seconds) {
277 PostDelayedTask(std::move(task), delay_in_seconds);
278 }
279
~PerIsolatePlatformData()280 PerIsolatePlatformData::~PerIsolatePlatformData() {
281 CHECK(!flush_tasks_);
282 }
283
AddShutdownCallback(void (* callback)(void *),void * data)284 void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
285 void* data) {
286 shutdown_callbacks_.emplace_back(ShutdownCallback { callback, data });
287 }
288
Shutdown()289 void PerIsolatePlatformData::Shutdown() {
290 if (flush_tasks_ == nullptr)
291 return;
292
293 // While there should be no V8 tasks in the queues at this point, it is
294 // possible that Node.js-internal tasks from e.g. the inspector are still
295 // lying around. We clear these queues and ignore the return value,
296 // effectively deleting the tasks instead of running them.
297 foreground_delayed_tasks_.PopAll();
298 foreground_tasks_.PopAll();
299 scheduled_delayed_tasks_.clear();
300
301 // Both destroying the scheduled_delayed_tasks_ lists and closing
302 // flush_tasks_ handle add tasks to the event loop. We keep a count of all
303 // non-closed handles, and when that reaches zero, we inform any shutdown
304 // callbacks that the platform is done as far as this Isolate is concerned.
305 self_reference_ = shared_from_this();
306 uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
307 [](uv_handle_t* handle) {
308 std::unique_ptr<uv_async_t> flush_tasks {
309 reinterpret_cast<uv_async_t*>(handle) };
310 PerIsolatePlatformData* platform_data =
311 static_cast<PerIsolatePlatformData*>(flush_tasks->data);
312 platform_data->DecreaseHandleCount();
313 platform_data->self_reference_.reset();
314 });
315 flush_tasks_ = nullptr;
316 }
317
DecreaseHandleCount()318 void PerIsolatePlatformData::DecreaseHandleCount() {
319 CHECK_GE(uv_handle_count_, 1);
320 if (--uv_handle_count_ == 0) {
321 for (const auto& callback : shutdown_callbacks_)
322 callback.cb(callback.data);
323 }
324 }
325
NodePlatform(int thread_pool_size,v8::TracingController * tracing_controller)326 NodePlatform::NodePlatform(int thread_pool_size,
327 v8::TracingController* tracing_controller) {
328 if (tracing_controller != nullptr) {
329 tracing_controller_ = tracing_controller;
330 } else {
331 tracing_controller_ = new v8::TracingController();
332 }
333 // TODO(addaleax): It's a bit icky that we use global state here, but we can't
334 // really do anything about it unless V8 starts exposing a way to access the
335 // current v8::Platform instance.
336 SetTracingController(tracing_controller_);
337 DCHECK_EQ(GetTracingController(), tracing_controller_);
338 worker_thread_task_runner_ =
339 std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
340 }
341
~NodePlatform()342 NodePlatform::~NodePlatform() {
343 Shutdown();
344 }
345
RegisterIsolate(Isolate * isolate,uv_loop_t * loop)346 void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) {
347 Mutex::ScopedLock lock(per_isolate_mutex_);
348 auto delegate = std::make_shared<PerIsolatePlatformData>(isolate, loop);
349 IsolatePlatformDelegate* ptr = delegate.get();
350 auto insertion = per_isolate_.emplace(
351 isolate,
352 std::make_pair(ptr, std::move(delegate)));
353 CHECK(insertion.second);
354 }
355
RegisterIsolate(Isolate * isolate,IsolatePlatformDelegate * delegate)356 void NodePlatform::RegisterIsolate(Isolate* isolate,
357 IsolatePlatformDelegate* delegate) {
358 Mutex::ScopedLock lock(per_isolate_mutex_);
359 auto insertion = per_isolate_.emplace(
360 isolate,
361 std::make_pair(delegate, std::shared_ptr<PerIsolatePlatformData>{}));
362 CHECK(insertion.second);
363 }
364
UnregisterIsolate(Isolate * isolate)365 void NodePlatform::UnregisterIsolate(Isolate* isolate) {
366 Mutex::ScopedLock lock(per_isolate_mutex_);
367 auto existing_it = per_isolate_.find(isolate);
368 CHECK_NE(existing_it, per_isolate_.end());
369 auto& existing = existing_it->second;
370 if (existing.second) {
371 existing.second->Shutdown();
372 }
373 per_isolate_.erase(existing_it);
374 }
375
AddIsolateFinishedCallback(Isolate * isolate,void (* cb)(void *),void * data)376 void NodePlatform::AddIsolateFinishedCallback(Isolate* isolate,
377 void (*cb)(void*), void* data) {
378 Mutex::ScopedLock lock(per_isolate_mutex_);
379 auto it = per_isolate_.find(isolate);
380 if (it == per_isolate_.end()) {
381 cb(data);
382 return;
383 }
384 CHECK(it->second.second);
385 it->second.second->AddShutdownCallback(cb, data);
386 }
387
Shutdown()388 void NodePlatform::Shutdown() {
389 if (has_shut_down_) return;
390 has_shut_down_ = true;
391 worker_thread_task_runner_->Shutdown();
392
393 {
394 Mutex::ScopedLock lock(per_isolate_mutex_);
395 per_isolate_.clear();
396 }
397 }
398
NumberOfWorkerThreads()399 int NodePlatform::NumberOfWorkerThreads() {
400 return worker_thread_task_runner_->NumberOfWorkerThreads();
401 }
402
RunForegroundTask(std::unique_ptr<Task> task)403 void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
404 DebugSealHandleScope scope(isolate_);
405 Environment* env = Environment::GetCurrent(isolate_);
406 if (env != nullptr) {
407 v8::HandleScope scope(isolate_);
408 InternalCallbackScope cb_scope(env, Object::New(isolate_), { 0, 0 },
409 InternalCallbackScope::kNoFlags);
410 task->Run();
411 } else {
412 // The task is moved out of InternalCallbackScope if env is not available.
413 // This is a required else block, and should not be removed.
414 // See comment: https://github.com/nodejs/node/pull/34688#pullrequestreview-463867489
415 task->Run();
416 }
417 }
418
DeleteFromScheduledTasks(DelayedTask * task)419 void PerIsolatePlatformData::DeleteFromScheduledTasks(DelayedTask* task) {
420 auto it = std::find_if(scheduled_delayed_tasks_.begin(),
421 scheduled_delayed_tasks_.end(),
422 [task](const DelayedTaskPointer& delayed) -> bool {
423 return delayed.get() == task;
424 });
425 CHECK_NE(it, scheduled_delayed_tasks_.end());
426 scheduled_delayed_tasks_.erase(it);
427 }
428
RunForegroundTask(uv_timer_t * handle)429 void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
430 DelayedTask* delayed = ContainerOf(&DelayedTask::timer, handle);
431 delayed->platform_data->RunForegroundTask(std::move(delayed->task));
432 delayed->platform_data->DeleteFromScheduledTasks(delayed);
433 }
434
DrainTasks(Isolate * isolate)435 void NodePlatform::DrainTasks(Isolate* isolate) {
436 std::shared_ptr<PerIsolatePlatformData> per_isolate = ForNodeIsolate(isolate);
437 if (!per_isolate) return;
438
439 do {
440 // Worker tasks aren't associated with an Isolate.
441 worker_thread_task_runner_->BlockingDrain();
442 } while (per_isolate->FlushForegroundTasksInternal());
443 }
444
FlushForegroundTasksInternal()445 bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
446 bool did_work = false;
447
448 while (std::unique_ptr<DelayedTask> delayed =
449 foreground_delayed_tasks_.Pop()) {
450 did_work = true;
451 uint64_t delay_millis = llround(delayed->timeout * 1000);
452
453 delayed->timer.data = static_cast<void*>(delayed.get());
454 uv_timer_init(loop_, &delayed->timer);
455 // Timers may not guarantee queue ordering of events with the same delay if
456 // the delay is non-zero. This should not be a problem in practice.
457 uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
458 uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
459 uv_handle_count_++;
460
461 scheduled_delayed_tasks_.emplace_back(delayed.release(),
462 [](DelayedTask* delayed) {
463 uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
464 [](uv_handle_t* handle) {
465 std::unique_ptr<DelayedTask> task {
466 static_cast<DelayedTask*>(handle->data) };
467 task->platform_data->DecreaseHandleCount();
468 });
469 });
470 }
471 // Move all foreground tasks into a separate queue and flush that queue.
472 // This way tasks that are posted while flushing the queue will be run on the
473 // next call of FlushForegroundTasksInternal.
474 std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
475 while (!tasks.empty()) {
476 std::unique_ptr<Task> task = std::move(tasks.front());
477 tasks.pop();
478 did_work = true;
479 RunForegroundTask(std::move(task));
480 }
481 return did_work;
482 }
483
CallOnWorkerThread(std::unique_ptr<Task> task)484 void NodePlatform::CallOnWorkerThread(std::unique_ptr<Task> task) {
485 worker_thread_task_runner_->PostTask(std::move(task));
486 }
487
CallDelayedOnWorkerThread(std::unique_ptr<Task> task,double delay_in_seconds)488 void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<Task> task,
489 double delay_in_seconds) {
490 worker_thread_task_runner_->PostDelayedTask(std::move(task),
491 delay_in_seconds);
492 }
493
494
ForIsolate(Isolate * isolate)495 IsolatePlatformDelegate* NodePlatform::ForIsolate(Isolate* isolate) {
496 Mutex::ScopedLock lock(per_isolate_mutex_);
497 auto data = per_isolate_[isolate];
498 CHECK_NOT_NULL(data.first);
499 return data.first;
500 }
501
502 std::shared_ptr<PerIsolatePlatformData>
ForNodeIsolate(Isolate * isolate)503 NodePlatform::ForNodeIsolate(Isolate* isolate) {
504 Mutex::ScopedLock lock(per_isolate_mutex_);
505 auto data = per_isolate_[isolate];
506 CHECK_NOT_NULL(data.first);
507 return data.second;
508 }
509
FlushForegroundTasks(Isolate * isolate)510 bool NodePlatform::FlushForegroundTasks(Isolate* isolate) {
511 std::shared_ptr<PerIsolatePlatformData> per_isolate = ForNodeIsolate(isolate);
512 if (!per_isolate) return false;
513 return per_isolate->FlushForegroundTasksInternal();
514 }
515
IdleTasksEnabled(Isolate * isolate)516 bool NodePlatform::IdleTasksEnabled(Isolate* isolate) {
517 return ForIsolate(isolate)->IdleTasksEnabled();
518 }
519
520 std::shared_ptr<v8::TaskRunner>
GetForegroundTaskRunner(Isolate * isolate)521 NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
522 return ForIsolate(isolate)->GetForegroundTaskRunner();
523 }
524
MonotonicallyIncreasingTime()525 double NodePlatform::MonotonicallyIncreasingTime() {
526 // Convert nanos to seconds.
527 return uv_hrtime() / 1e9;
528 }
529
CurrentClockTimeMillis()530 double NodePlatform::CurrentClockTimeMillis() {
531 return SystemClockTimeMillis();
532 }
533
GetTracingController()534 v8::TracingController* NodePlatform::GetTracingController() {
535 CHECK_NOT_NULL(tracing_controller_);
536 return tracing_controller_;
537 }
538
GetStackTracePrinter()539 Platform::StackTracePrinter NodePlatform::GetStackTracePrinter() {
540 return []() {
541 fprintf(stderr, "\n");
542 DumpBacktrace(stderr);
543 fflush(stderr);
544 };
545 }
546
547 template <class T>
TaskQueue()548 TaskQueue<T>::TaskQueue()
549 : lock_(), tasks_available_(), tasks_drained_(),
550 outstanding_tasks_(0), stopped_(false), task_queue_() { }
551
552 template <class T>
Push(std::unique_ptr<T> task)553 void TaskQueue<T>::Push(std::unique_ptr<T> task) {
554 Mutex::ScopedLock scoped_lock(lock_);
555 outstanding_tasks_++;
556 task_queue_.push(std::move(task));
557 tasks_available_.Signal(scoped_lock);
558 }
559
560 template <class T>
Pop()561 std::unique_ptr<T> TaskQueue<T>::Pop() {
562 Mutex::ScopedLock scoped_lock(lock_);
563 if (task_queue_.empty()) {
564 return std::unique_ptr<T>(nullptr);
565 }
566 std::unique_ptr<T> result = std::move(task_queue_.front());
567 task_queue_.pop();
568 return result;
569 }
570
571 template <class T>
BlockingPop()572 std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
573 Mutex::ScopedLock scoped_lock(lock_);
574 while (task_queue_.empty() && !stopped_) {
575 tasks_available_.Wait(scoped_lock);
576 }
577 if (stopped_) {
578 return std::unique_ptr<T>(nullptr);
579 }
580 std::unique_ptr<T> result = std::move(task_queue_.front());
581 task_queue_.pop();
582 return result;
583 }
584
585 template <class T>
NotifyOfCompletion()586 void TaskQueue<T>::NotifyOfCompletion() {
587 Mutex::ScopedLock scoped_lock(lock_);
588 if (--outstanding_tasks_ == 0) {
589 tasks_drained_.Broadcast(scoped_lock);
590 }
591 }
592
593 template <class T>
BlockingDrain()594 void TaskQueue<T>::BlockingDrain() {
595 Mutex::ScopedLock scoped_lock(lock_);
596 while (outstanding_tasks_ > 0) {
597 tasks_drained_.Wait(scoped_lock);
598 }
599 }
600
601 template <class T>
Stop()602 void TaskQueue<T>::Stop() {
603 Mutex::ScopedLock scoped_lock(lock_);
604 stopped_ = true;
605 tasks_available_.Broadcast(scoped_lock);
606 }
607
608 template <class T>
PopAll()609 std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
610 Mutex::ScopedLock scoped_lock(lock_);
611 std::queue<std::unique_ptr<T>> result;
612 result.swap(task_queue_);
613 return result;
614 }
615
CancelPendingDelayedTasks(Isolate * isolate)616 void MultiIsolatePlatform::CancelPendingDelayedTasks(Isolate* isolate) {}
617
618 } // namespace node
619