1 #include "node_platform.h"
2 #include "node_internals.h"
3
4 #include "env-inl.h"
5 #include "debug_utils-inl.h"
6 #include <algorithm> // find_if(), find(), move()
7 #include <cmath> // llround()
8 #include <memory> // unique_ptr(), shared_ptr(), make_shared()
9
10 namespace node {
11
12 using v8::Isolate;
13 using v8::Object;
14 using v8::Platform;
15 using v8::Task;
16
17 namespace {
18
19 struct PlatformWorkerData {
20 TaskQueue<Task>* task_queue;
21 Mutex* platform_workers_mutex;
22 ConditionVariable* platform_workers_ready;
23 int* pending_platform_workers;
24 int id;
25 };
26
PlatformWorkerThread(void * data)27 static void PlatformWorkerThread(void* data) {
28 std::unique_ptr<PlatformWorkerData>
29 worker_data(static_cast<PlatformWorkerData*>(data));
30
31 TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
32 TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
33 "PlatformWorkerThread");
34
35 // Notify the main thread that the platform worker is ready.
36 {
37 Mutex::ScopedLock lock(*worker_data->platform_workers_mutex);
38 (*worker_data->pending_platform_workers)--;
39 worker_data->platform_workers_ready->Signal(lock);
40 }
41
42 while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
43 task->Run();
44 pending_worker_tasks->NotifyOfCompletion();
45 }
46 }
47
48 } // namespace
49
50 class WorkerThreadsTaskRunner::DelayedTaskScheduler {
51 public:
DelayedTaskScheduler(TaskQueue<Task> * tasks)52 explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
53 : pending_worker_tasks_(tasks) {}
54
Start()55 std::unique_ptr<uv_thread_t> Start() {
56 auto start_thread = [](void* data) {
57 static_cast<DelayedTaskScheduler*>(data)->Run();
58 };
59 std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
60 uv_sem_init(&ready_, 0);
61 CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
62 uv_sem_wait(&ready_);
63 uv_sem_destroy(&ready_);
64 return t;
65 }
66
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)67 void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
68 tasks_.Push(std::make_unique<ScheduleTask>(this, std::move(task),
69 delay_in_seconds));
70 uv_async_send(&flush_tasks_);
71 }
72
Stop()73 void Stop() {
74 tasks_.Push(std::make_unique<StopTask>(this));
75 uv_async_send(&flush_tasks_);
76 }
77
78 private:
Run()79 void Run() {
80 TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
81 "WorkerThreadsTaskRunner::DelayedTaskScheduler");
82 loop_.data = this;
83 CHECK_EQ(0, uv_loop_init(&loop_));
84 flush_tasks_.data = this;
85 CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
86 uv_sem_post(&ready_);
87
88 uv_run(&loop_, UV_RUN_DEFAULT);
89 CheckedUvLoopClose(&loop_);
90 }
91
FlushTasks(uv_async_t * flush_tasks)92 static void FlushTasks(uv_async_t* flush_tasks) {
93 DelayedTaskScheduler* scheduler =
94 ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
95 while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
96 task->Run();
97 }
98
99 class StopTask : public Task {
100 public:
StopTask(DelayedTaskScheduler * scheduler)101 explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
102
Run()103 void Run() override {
104 std::vector<uv_timer_t*> timers;
105 for (uv_timer_t* timer : scheduler_->timers_)
106 timers.push_back(timer);
107 for (uv_timer_t* timer : timers)
108 scheduler_->TakeTimerTask(timer);
109 uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
110 [](uv_handle_t* handle) {});
111 }
112
113 private:
114 DelayedTaskScheduler* scheduler_;
115 };
116
117 class ScheduleTask : public Task {
118 public:
ScheduleTask(DelayedTaskScheduler * scheduler,std::unique_ptr<Task> task,double delay_in_seconds)119 ScheduleTask(DelayedTaskScheduler* scheduler,
120 std::unique_ptr<Task> task,
121 double delay_in_seconds)
122 : scheduler_(scheduler),
123 task_(std::move(task)),
124 delay_in_seconds_(delay_in_seconds) {}
125
Run()126 void Run() override {
127 uint64_t delay_millis = llround(delay_in_seconds_ * 1000);
128 std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
129 CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
130 timer->data = task_.release();
131 CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
132 scheduler_->timers_.insert(timer.release());
133 }
134
135 private:
136 DelayedTaskScheduler* scheduler_;
137 std::unique_ptr<Task> task_;
138 double delay_in_seconds_;
139 };
140
RunTask(uv_timer_t * timer)141 static void RunTask(uv_timer_t* timer) {
142 DelayedTaskScheduler* scheduler =
143 ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
144 scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
145 }
146
TakeTimerTask(uv_timer_t * timer)147 std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
148 std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
149 uv_timer_stop(timer);
150 uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
151 delete reinterpret_cast<uv_timer_t*>(handle);
152 });
153 timers_.erase(timer);
154 return task;
155 }
156
157 uv_sem_t ready_;
158 TaskQueue<Task>* pending_worker_tasks_;
159
160 TaskQueue<Task> tasks_;
161 uv_loop_t loop_;
162 uv_async_t flush_tasks_;
163 std::unordered_set<uv_timer_t*> timers_;
164 };
165
WorkerThreadsTaskRunner(int thread_pool_size)166 WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
167 Mutex platform_workers_mutex;
168 ConditionVariable platform_workers_ready;
169
170 Mutex::ScopedLock lock(platform_workers_mutex);
171 int pending_platform_workers = thread_pool_size;
172
173 delayed_task_scheduler_ = std::make_unique<DelayedTaskScheduler>(
174 &pending_worker_tasks_);
175 threads_.push_back(delayed_task_scheduler_->Start());
176
177 for (int i = 0; i < thread_pool_size; i++) {
178 PlatformWorkerData* worker_data = new PlatformWorkerData{
179 &pending_worker_tasks_, &platform_workers_mutex,
180 &platform_workers_ready, &pending_platform_workers, i
181 };
182 std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
183 if (uv_thread_create(t.get(), PlatformWorkerThread,
184 worker_data) != 0) {
185 break;
186 }
187 threads_.push_back(std::move(t));
188 }
189
190 // Wait for platform workers to initialize before continuing with the
191 // bootstrap.
192 while (pending_platform_workers > 0) {
193 platform_workers_ready.Wait(lock);
194 }
195 }
196
PostTask(std::unique_ptr<Task> task)197 void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
198 pending_worker_tasks_.Push(std::move(task));
199 }
200
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)201 void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
202 double delay_in_seconds) {
203 delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
204 }
205
BlockingDrain()206 void WorkerThreadsTaskRunner::BlockingDrain() {
207 pending_worker_tasks_.BlockingDrain();
208 }
209
Shutdown()210 void WorkerThreadsTaskRunner::Shutdown() {
211 pending_worker_tasks_.Stop();
212 delayed_task_scheduler_->Stop();
213 for (size_t i = 0; i < threads_.size(); i++) {
214 CHECK_EQ(0, uv_thread_join(threads_[i].get()));
215 }
216 }
217
NumberOfWorkerThreads() const218 int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
219 return threads_.size();
220 }
221
PerIsolatePlatformData(Isolate * isolate,uv_loop_t * loop)222 PerIsolatePlatformData::PerIsolatePlatformData(
223 Isolate* isolate, uv_loop_t* loop)
224 : isolate_(isolate), loop_(loop) {
225 flush_tasks_ = new uv_async_t();
226 CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks));
227 flush_tasks_->data = static_cast<void*>(this);
228 uv_unref(reinterpret_cast<uv_handle_t*>(flush_tasks_));
229 }
230
FlushTasks(uv_async_t * handle)231 void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
232 auto platform_data = static_cast<PerIsolatePlatformData*>(handle->data);
233 platform_data->FlushForegroundTasksInternal();
234 }
235
PostIdleTask(std::unique_ptr<v8::IdleTask> task)236 void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
237 UNREACHABLE();
238 }
239
PostTask(std::unique_ptr<Task> task)240 void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
241 if (flush_tasks_ == nullptr) {
242 // V8 may post tasks during Isolate disposal. In that case, the only
243 // sensible path forward is to discard the task.
244 return;
245 }
246 foreground_tasks_.Push(std::move(task));
247 uv_async_send(flush_tasks_);
248 }
249
PostDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)250 void PerIsolatePlatformData::PostDelayedTask(
251 std::unique_ptr<Task> task, double delay_in_seconds) {
252 if (flush_tasks_ == nullptr) {
253 // V8 may post tasks during Isolate disposal. In that case, the only
254 // sensible path forward is to discard the task.
255 return;
256 }
257 std::unique_ptr<DelayedTask> delayed(new DelayedTask());
258 delayed->task = std::move(task);
259 delayed->platform_data = shared_from_this();
260 delayed->timeout = delay_in_seconds;
261 foreground_delayed_tasks_.Push(std::move(delayed));
262 uv_async_send(flush_tasks_);
263 }
264
PostNonNestableTask(std::unique_ptr<Task> task)265 void PerIsolatePlatformData::PostNonNestableTask(std::unique_ptr<Task> task) {
266 PostTask(std::move(task));
267 }
268
PostNonNestableDelayedTask(std::unique_ptr<Task> task,double delay_in_seconds)269 void PerIsolatePlatformData::PostNonNestableDelayedTask(
270 std::unique_ptr<Task> task,
271 double delay_in_seconds) {
272 PostDelayedTask(std::move(task), delay_in_seconds);
273 }
274
~PerIsolatePlatformData()275 PerIsolatePlatformData::~PerIsolatePlatformData() {
276 Shutdown();
277 }
278
AddShutdownCallback(void (* callback)(void *),void * data)279 void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
280 void* data) {
281 shutdown_callbacks_.emplace_back(ShutdownCallback { callback, data });
282 }
283
Shutdown()284 void PerIsolatePlatformData::Shutdown() {
285 if (flush_tasks_ == nullptr)
286 return;
287
288 // While there should be no V8 tasks in the queues at this point, it is
289 // possible that Node.js-internal tasks from e.g. the inspector are still
290 // lying around. We clear these queues and ignore the return value,
291 // effectively deleting the tasks instead of running them.
292 foreground_delayed_tasks_.PopAll();
293 foreground_tasks_.PopAll();
294 scheduled_delayed_tasks_.clear();
295
296 // Both destroying the scheduled_delayed_tasks_ lists and closing
297 // flush_tasks_ handle add tasks to the event loop. We keep a count of all
298 // non-closed handles, and when that reaches zero, we inform any shutdown
299 // callbacks that the platform is done as far as this Isolate is concerned.
300 self_reference_ = shared_from_this();
301 uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
302 [](uv_handle_t* handle) {
303 std::unique_ptr<uv_async_t> flush_tasks {
304 reinterpret_cast<uv_async_t*>(handle) };
305 PerIsolatePlatformData* platform_data =
306 static_cast<PerIsolatePlatformData*>(flush_tasks->data);
307 platform_data->DecreaseHandleCount();
308 platform_data->self_reference_.reset();
309 });
310 flush_tasks_ = nullptr;
311 }
312
DecreaseHandleCount()313 void PerIsolatePlatformData::DecreaseHandleCount() {
314 CHECK_GE(uv_handle_count_, 1);
315 if (--uv_handle_count_ == 0) {
316 for (const auto& callback : shutdown_callbacks_)
317 callback.cb(callback.data);
318 }
319 }
320
NodePlatform(int thread_pool_size,v8::TracingController * tracing_controller)321 NodePlatform::NodePlatform(int thread_pool_size,
322 v8::TracingController* tracing_controller) {
323 if (tracing_controller != nullptr) {
324 tracing_controller_ = tracing_controller;
325 } else {
326 tracing_controller_ = new v8::TracingController();
327 }
328 // TODO(addaleax): It's a bit icky that we use global state here, but we can't
329 // really do anything about it unless V8 starts exposing a way to access the
330 // current v8::Platform instance.
331 SetTracingController(tracing_controller_);
332 DCHECK_EQ(GetTracingController(), tracing_controller_);
333 worker_thread_task_runner_ =
334 std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
335 }
336
~NodePlatform()337 NodePlatform::~NodePlatform() {
338 Shutdown();
339 }
340
RegisterIsolate(Isolate * isolate,uv_loop_t * loop)341 void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) {
342 Mutex::ScopedLock lock(per_isolate_mutex_);
343 std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
344 CHECK(!existing);
345 per_isolate_[isolate] =
346 std::make_shared<PerIsolatePlatformData>(isolate, loop);
347 }
348
UnregisterIsolate(Isolate * isolate)349 void NodePlatform::UnregisterIsolate(Isolate* isolate) {
350 Mutex::ScopedLock lock(per_isolate_mutex_);
351 std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
352 CHECK(existing);
353 existing->Shutdown();
354 per_isolate_.erase(isolate);
355 }
356
AddIsolateFinishedCallback(Isolate * isolate,void (* cb)(void *),void * data)357 void NodePlatform::AddIsolateFinishedCallback(Isolate* isolate,
358 void (*cb)(void*), void* data) {
359 Mutex::ScopedLock lock(per_isolate_mutex_);
360 auto it = per_isolate_.find(isolate);
361 if (it == per_isolate_.end()) {
362 CHECK(it->second);
363 cb(data);
364 return;
365 }
366 it->second->AddShutdownCallback(cb, data);
367 }
368
Shutdown()369 void NodePlatform::Shutdown() {
370 if (has_shut_down_) return;
371 has_shut_down_ = true;
372 worker_thread_task_runner_->Shutdown();
373
374 {
375 Mutex::ScopedLock lock(per_isolate_mutex_);
376 per_isolate_.clear();
377 }
378 }
379
NumberOfWorkerThreads()380 int NodePlatform::NumberOfWorkerThreads() {
381 return worker_thread_task_runner_->NumberOfWorkerThreads();
382 }
383
RunForegroundTask(std::unique_ptr<Task> task)384 void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
385 DebugSealHandleScope scope(isolate_);
386 Environment* env = Environment::GetCurrent(isolate_);
387 if (env != nullptr) {
388 v8::HandleScope scope(isolate_);
389 InternalCallbackScope cb_scope(env, Object::New(isolate_), { 0, 0 },
390 InternalCallbackScope::kNoFlags);
391 task->Run();
392 } else {
393 task->Run();
394 }
395 }
396
DeleteFromScheduledTasks(DelayedTask * task)397 void PerIsolatePlatformData::DeleteFromScheduledTasks(DelayedTask* task) {
398 auto it = std::find_if(scheduled_delayed_tasks_.begin(),
399 scheduled_delayed_tasks_.end(),
400 [task](const DelayedTaskPointer& delayed) -> bool {
401 return delayed.get() == task;
402 });
403 CHECK_NE(it, scheduled_delayed_tasks_.end());
404 scheduled_delayed_tasks_.erase(it);
405 }
406
RunForegroundTask(uv_timer_t * handle)407 void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
408 DelayedTask* delayed = ContainerOf(&DelayedTask::timer, handle);
409 delayed->platform_data->RunForegroundTask(std::move(delayed->task));
410 delayed->platform_data->DeleteFromScheduledTasks(delayed);
411 }
412
DrainTasks(Isolate * isolate)413 void NodePlatform::DrainTasks(Isolate* isolate) {
414 std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);
415 if (!per_isolate) return;
416
417 do {
418 // Worker tasks aren't associated with an Isolate.
419 worker_thread_task_runner_->BlockingDrain();
420 } while (per_isolate->FlushForegroundTasksInternal());
421 }
422
FlushForegroundTasksInternal()423 bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
424 bool did_work = false;
425
426 while (std::unique_ptr<DelayedTask> delayed =
427 foreground_delayed_tasks_.Pop()) {
428 did_work = true;
429 uint64_t delay_millis = llround(delayed->timeout * 1000);
430
431 delayed->timer.data = static_cast<void*>(delayed.get());
432 uv_timer_init(loop_, &delayed->timer);
433 // Timers may not guarantee queue ordering of events with the same delay if
434 // the delay is non-zero. This should not be a problem in practice.
435 uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
436 uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
437 uv_handle_count_++;
438
439 scheduled_delayed_tasks_.emplace_back(delayed.release(),
440 [](DelayedTask* delayed) {
441 uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
442 [](uv_handle_t* handle) {
443 std::unique_ptr<DelayedTask> task {
444 static_cast<DelayedTask*>(handle->data) };
445 task->platform_data->DecreaseHandleCount();
446 });
447 });
448 }
449 // Move all foreground tasks into a separate queue and flush that queue.
450 // This way tasks that are posted while flushing the queue will be run on the
451 // next call of FlushForegroundTasksInternal.
452 std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
453 while (!tasks.empty()) {
454 std::unique_ptr<Task> task = std::move(tasks.front());
455 tasks.pop();
456 did_work = true;
457 RunForegroundTask(std::move(task));
458 }
459 return did_work;
460 }
461
CallOnWorkerThread(std::unique_ptr<Task> task)462 void NodePlatform::CallOnWorkerThread(std::unique_ptr<Task> task) {
463 worker_thread_task_runner_->PostTask(std::move(task));
464 }
465
CallDelayedOnWorkerThread(std::unique_ptr<Task> task,double delay_in_seconds)466 void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<Task> task,
467 double delay_in_seconds) {
468 worker_thread_task_runner_->PostDelayedTask(std::move(task),
469 delay_in_seconds);
470 }
471
472
473 std::shared_ptr<PerIsolatePlatformData>
ForIsolate(Isolate * isolate)474 NodePlatform::ForIsolate(Isolate* isolate) {
475 Mutex::ScopedLock lock(per_isolate_mutex_);
476 std::shared_ptr<PerIsolatePlatformData> data = per_isolate_[isolate];
477 CHECK(data);
478 return data;
479 }
480
FlushForegroundTasks(Isolate * isolate)481 bool NodePlatform::FlushForegroundTasks(Isolate* isolate) {
482 std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);
483 if (!per_isolate) return false;
484 return per_isolate->FlushForegroundTasksInternal();
485 }
486
IdleTasksEnabled(Isolate * isolate)487 bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }
488
489 std::shared_ptr<v8::TaskRunner>
GetForegroundTaskRunner(Isolate * isolate)490 NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
491 return ForIsolate(isolate);
492 }
493
MonotonicallyIncreasingTime()494 double NodePlatform::MonotonicallyIncreasingTime() {
495 // Convert nanos to seconds.
496 return uv_hrtime() / 1e9;
497 }
498
CurrentClockTimeMillis()499 double NodePlatform::CurrentClockTimeMillis() {
500 return SystemClockTimeMillis();
501 }
502
GetTracingController()503 v8::TracingController* NodePlatform::GetTracingController() {
504 CHECK_NOT_NULL(tracing_controller_);
505 return tracing_controller_;
506 }
507
GetStackTracePrinter()508 Platform::StackTracePrinter NodePlatform::GetStackTracePrinter() {
509 return []() {
510 fprintf(stderr, "\n");
511 DumpBacktrace(stderr);
512 fflush(stderr);
513 };
514 }
515
516 template <class T>
TaskQueue()517 TaskQueue<T>::TaskQueue()
518 : lock_(), tasks_available_(), tasks_drained_(),
519 outstanding_tasks_(0), stopped_(false), task_queue_() { }
520
521 template <class T>
Push(std::unique_ptr<T> task)522 void TaskQueue<T>::Push(std::unique_ptr<T> task) {
523 Mutex::ScopedLock scoped_lock(lock_);
524 outstanding_tasks_++;
525 task_queue_.push(std::move(task));
526 tasks_available_.Signal(scoped_lock);
527 }
528
529 template <class T>
Pop()530 std::unique_ptr<T> TaskQueue<T>::Pop() {
531 Mutex::ScopedLock scoped_lock(lock_);
532 if (task_queue_.empty()) {
533 return std::unique_ptr<T>(nullptr);
534 }
535 std::unique_ptr<T> result = std::move(task_queue_.front());
536 task_queue_.pop();
537 return result;
538 }
539
540 template <class T>
BlockingPop()541 std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
542 Mutex::ScopedLock scoped_lock(lock_);
543 while (task_queue_.empty() && !stopped_) {
544 tasks_available_.Wait(scoped_lock);
545 }
546 if (stopped_) {
547 return std::unique_ptr<T>(nullptr);
548 }
549 std::unique_ptr<T> result = std::move(task_queue_.front());
550 task_queue_.pop();
551 return result;
552 }
553
554 template <class T>
NotifyOfCompletion()555 void TaskQueue<T>::NotifyOfCompletion() {
556 Mutex::ScopedLock scoped_lock(lock_);
557 if (--outstanding_tasks_ == 0) {
558 tasks_drained_.Broadcast(scoped_lock);
559 }
560 }
561
562 template <class T>
BlockingDrain()563 void TaskQueue<T>::BlockingDrain() {
564 Mutex::ScopedLock scoped_lock(lock_);
565 while (outstanding_tasks_ > 0) {
566 tasks_drained_.Wait(scoped_lock);
567 }
568 }
569
570 template <class T>
Stop()571 void TaskQueue<T>::Stop() {
572 Mutex::ScopedLock scoped_lock(lock_);
573 stopped_ = true;
574 tasks_available_.Broadcast(scoped_lock);
575 }
576
577 template <class T>
PopAll()578 std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
579 Mutex::ScopedLock scoped_lock(lock_);
580 std::queue<std::unique_ptr<T>> result;
581 result.swap(task_queue_);
582 return result;
583 }
584
CancelPendingDelayedTasks(Isolate * isolate)585 void MultiIsolatePlatform::CancelPendingDelayedTasks(Isolate* isolate) {}
586
587 } // namespace node
588