• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef SRC_NODE_PLATFORM_H_
2 #define SRC_NODE_PLATFORM_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include <queue>
7 #include <unordered_map>
8 #include <vector>
9 #include <functional>
10 
11 #include "libplatform/libplatform.h"
12 #include "node.h"
13 #include "node_mutex.h"
14 #include "uv.h"
15 
16 namespace node {
17 
18 class NodePlatform;
19 class IsolateData;
20 class PerIsolatePlatformData;
21 
22 template <class T>
23 class TaskQueue {
24  public:
25   TaskQueue();
26   ~TaskQueue() = default;
27 
28   void Push(std::unique_ptr<T> task);
29   std::unique_ptr<T> Pop();
30   std::unique_ptr<T> BlockingPop();
31   std::queue<std::unique_ptr<T>> PopAll();
32   void NotifyOfCompletion();
33   void BlockingDrain();
34   void Stop();
35 
36  private:
37   Mutex lock_;
38   ConditionVariable tasks_available_;
39   ConditionVariable tasks_drained_;
40   int outstanding_tasks_;
41   bool stopped_;
42   std::queue<std::unique_ptr<T>> task_queue_;
43 };
44 
45 struct DelayedTask {
46   std::unique_ptr<v8::Task> task;
47   uv_timer_t timer;
48   double timeout;
49   std::shared_ptr<PerIsolatePlatformData> platform_data;
50 };
51 
52 // This acts as the foreground task runner for a given Isolate.
53 class PerIsolatePlatformData :
54     public v8::TaskRunner,
55     public std::enable_shared_from_this<PerIsolatePlatformData> {
56  public:
57   PerIsolatePlatformData(v8::Isolate* isolate, uv_loop_t* loop);
58   ~PerIsolatePlatformData() override;
59 
60   void PostTask(std::unique_ptr<v8::Task> task) override;
61   void PostIdleTask(std::unique_ptr<v8::IdleTask> task) override;
62   void PostDelayedTask(std::unique_ptr<v8::Task> task,
63                        double delay_in_seconds) override;
IdleTasksEnabled()64   bool IdleTasksEnabled() override { return false; }
65 
66   // Non-nestable tasks are treated like regular tasks.
NonNestableTasksEnabled()67   bool NonNestableTasksEnabled() const override { return true; }
NonNestableDelayedTasksEnabled()68   bool NonNestableDelayedTasksEnabled() const override { return true; }
69   void PostNonNestableTask(std::unique_ptr<v8::Task> task) override;
70   void PostNonNestableDelayedTask(std::unique_ptr<v8::Task> task,
71                                   double delay_in_seconds) override;
72 
73   void AddShutdownCallback(void (*callback)(void*), void* data);
74   void Shutdown();
75 
76   // Returns true if work was dispatched or executed. New tasks that are
77   // posted during flushing of the queue are postponed until the next
78   // flushing.
79   bool FlushForegroundTasksInternal();
80 
event_loop()81   const uv_loop_t* event_loop() const { return loop_; }
82 
83  private:
84   void DeleteFromScheduledTasks(DelayedTask* task);
85   void DecreaseHandleCount();
86 
87   static void FlushTasks(uv_async_t* handle);
88   void RunForegroundTask(std::unique_ptr<v8::Task> task);
89   static void RunForegroundTask(uv_timer_t* timer);
90 
91   struct ShutdownCallback {
92     void (*cb)(void*);
93     void* data;
94   };
95   typedef std::vector<ShutdownCallback> ShutdownCbList;
96   ShutdownCbList shutdown_callbacks_;
97   // shared_ptr to self to keep this object alive during shutdown.
98   std::shared_ptr<PerIsolatePlatformData> self_reference_;
99   uint32_t uv_handle_count_ = 1;  // 1 = flush_tasks_
100 
101   v8::Isolate* const isolate_;
102   uv_loop_t* const loop_;
103   uv_async_t* flush_tasks_ = nullptr;
104   TaskQueue<v8::Task> foreground_tasks_;
105   TaskQueue<DelayedTask> foreground_delayed_tasks_;
106 
107   // Use a custom deleter because libuv needs to close the handle first.
108   typedef std::unique_ptr<DelayedTask, void(*)(DelayedTask*)>
109       DelayedTaskPointer;
110   std::vector<DelayedTaskPointer> scheduled_delayed_tasks_;
111 };
112 
113 // This acts as the single worker thread task runner for all Isolates.
114 class WorkerThreadsTaskRunner {
115  public:
116   explicit WorkerThreadsTaskRunner(int thread_pool_size);
117 
118   void PostTask(std::unique_ptr<v8::Task> task);
119   void PostDelayedTask(std::unique_ptr<v8::Task> task,
120                        double delay_in_seconds);
121 
122   void BlockingDrain();
123   void Shutdown();
124 
125   int NumberOfWorkerThreads() const;
126 
127  private:
128   TaskQueue<v8::Task> pending_worker_tasks_;
129 
130   class DelayedTaskScheduler;
131   std::unique_ptr<DelayedTaskScheduler> delayed_task_scheduler_;
132 
133   std::vector<std::unique_ptr<uv_thread_t>> threads_;
134 };
135 
136 class NodePlatform : public MultiIsolatePlatform {
137  public:
138   NodePlatform(int thread_pool_size,
139                v8::TracingController* tracing_controller);
140   ~NodePlatform() override;
141 
142   void DrainTasks(v8::Isolate* isolate) override;
143   void Shutdown();
144 
145   // v8::Platform implementation.
146   int NumberOfWorkerThreads() override;
147   void CallOnWorkerThread(std::unique_ptr<v8::Task> task) override;
148   void CallDelayedOnWorkerThread(std::unique_ptr<v8::Task> task,
149                                  double delay_in_seconds) override;
CallOnForegroundThread(v8::Isolate * isolate,v8::Task * task)150   void CallOnForegroundThread(v8::Isolate* isolate, v8::Task* task) override {
151     UNREACHABLE();
152   }
CallDelayedOnForegroundThread(v8::Isolate * isolate,v8::Task * task,double delay_in_seconds)153   void CallDelayedOnForegroundThread(v8::Isolate* isolate,
154                                      v8::Task* task,
155                                      double delay_in_seconds) override {
156     UNREACHABLE();
157   }
158   bool IdleTasksEnabled(v8::Isolate* isolate) override;
159   double MonotonicallyIncreasingTime() override;
160   double CurrentClockTimeMillis() override;
161   v8::TracingController* GetTracingController() override;
162   bool FlushForegroundTasks(v8::Isolate* isolate) override;
163 
164   void RegisterIsolate(v8::Isolate* isolate, uv_loop_t* loop) override;
165   void UnregisterIsolate(v8::Isolate* isolate) override;
166   void AddIsolateFinishedCallback(v8::Isolate* isolate,
167                                   void (*callback)(void*), void* data) override;
168 
169   std::shared_ptr<v8::TaskRunner> GetForegroundTaskRunner(
170       v8::Isolate* isolate) override;
171 
172   Platform::StackTracePrinter GetStackTracePrinter() override;
173 
174  private:
175   std::shared_ptr<PerIsolatePlatformData> ForIsolate(v8::Isolate* isolate);
176 
177   Mutex per_isolate_mutex_;
178   std::unordered_map<v8::Isolate*,
179                      std::shared_ptr<PerIsolatePlatformData>> per_isolate_;
180 
181   v8::TracingController* tracing_controller_;
182   std::shared_ptr<WorkerThreadsTaskRunner> worker_thread_task_runner_;
183   bool has_shut_down_ = false;
184 };
185 
186 }  // namespace node
187 
188 #endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
189 
190 #endif  // SRC_NODE_PLATFORM_H_
191