1 //
2 // Copyright 2016 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // WorkerThread:
7 // Task running thread for ANGLE, similar to a TaskRunner in Chromium.
8 // Might be implemented differently depending on platform.
9 //
10
11 #include "common/WorkerThread.h"
12
13 #include "common/angleutils.h"
14
15 // Controls if our threading code uses std::async or falls back to single-threaded operations.
16 // Note that we can't easily use std::async in UWPs due to UWP threading restrictions.
17 #if !defined(ANGLE_STD_ASYNC_WORKERS) && !defined(ANGLE_ENABLE_WINDOWS_UWP)
18 # define ANGLE_STD_ASYNC_WORKERS 1
19 #endif // !defined(ANGLE_STD_ASYNC_WORKERS) && & !defined(ANGLE_ENABLE_WINDOWS_UWP)
20
21 #if ANGLE_DELEGATE_WORKERS || ANGLE_STD_ASYNC_WORKERS
22 # include <condition_variable>
23 # include <future>
24 # include <mutex>
25 # include <queue>
26 # include <thread>
27 #endif // ANGLE_DELEGATE_WORKERS || ANGLE_STD_ASYNC_WORKERS
28
29 namespace angle
30 {
31
32 WaitableEvent::WaitableEvent() = default;
33 WaitableEvent::~WaitableEvent() = default;
34
wait()35 void WaitableEventDone::wait() {}
36
isReady()37 bool WaitableEventDone::isReady()
38 {
39 return true;
40 }
41
42 // A waitable event that can be completed asynchronously
43 class AsyncWaitableEvent final : public WaitableEvent
44 {
45 public:
46 AsyncWaitableEvent() = default;
47 ~AsyncWaitableEvent() override = default;
48
49 void wait() override;
50 bool isReady() override;
51
52 void markAsReady();
53
54 private:
55 // To protect the concurrent accesses from both main thread and background
56 // threads to the member fields.
57 std::mutex mMutex;
58
59 bool mIsReady = false;
60 std::condition_variable mCondition;
61 };
62
markAsReady()63 void AsyncWaitableEvent::markAsReady()
64 {
65 std::lock_guard<std::mutex> lock(mMutex);
66 mIsReady = true;
67 mCondition.notify_all();
68 }
69
wait()70 void AsyncWaitableEvent::wait()
71 {
72 std::unique_lock<std::mutex> lock(mMutex);
73 mCondition.wait(lock, [this] { return mIsReady; });
74 }
75
isReady()76 bool AsyncWaitableEvent::isReady()
77 {
78 std::lock_guard<std::mutex> lock(mMutex);
79 return mIsReady;
80 }
81
82 WorkerThreadPool::WorkerThreadPool() = default;
83 WorkerThreadPool::~WorkerThreadPool() = default;
84
85 class SingleThreadedWorkerPool final : public WorkerThreadPool
86 {
87 public:
88 std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
89 bool isAsync() override;
90 };
91
92 // SingleThreadedWorkerPool implementation.
postWorkerTask(std::shared_ptr<Closure> task)93 std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
94 std::shared_ptr<Closure> task)
95 {
96 // Thread safety: This function is thread-safe because the task is run on the calling thread
97 // itself.
98 (*task)();
99 return std::make_shared<WaitableEventDone>();
100 }
101
isAsync()102 bool SingleThreadedWorkerPool::isAsync()
103 {
104 return false;
105 }
106
107 #if ANGLE_STD_ASYNC_WORKERS
108
109 class AsyncWorkerPool final : public WorkerThreadPool
110 {
111 public:
112 AsyncWorkerPool(size_t numThreads);
113
114 ~AsyncWorkerPool() override;
115
116 std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
117
118 bool isAsync() override;
119
120 private:
121 void createThreads();
122
123 using Task = std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>;
124
125 // Thread's main loop
126 void threadLoop();
127
128 bool mTerminated = false;
129 std::mutex mMutex; // Protects access to the fields in this class
130 std::condition_variable mCondVar; // Signals when work is available in the queue
131 std::queue<Task> mTaskQueue;
132 std::deque<std::thread> mThreads;
133 size_t mDesiredThreadCount;
134 };
135
136 // AsyncWorkerPool implementation.
137
AsyncWorkerPool(size_t numThreads)138 AsyncWorkerPool::AsyncWorkerPool(size_t numThreads) : mDesiredThreadCount(numThreads)
139 {
140 ASSERT(numThreads != 0);
141 }
142
~AsyncWorkerPool()143 AsyncWorkerPool::~AsyncWorkerPool()
144 {
145 {
146 std::unique_lock<std::mutex> lock(mMutex);
147 mTerminated = true;
148 }
149 mCondVar.notify_all();
150 for (auto &thread : mThreads)
151 {
152 ASSERT(thread.get_id() != std::this_thread::get_id());
153 thread.join();
154 }
155 }
156
createThreads()157 void AsyncWorkerPool::createThreads()
158 {
159 if (mDesiredThreadCount == mThreads.size())
160 {
161 return;
162 }
163 ASSERT(mThreads.empty());
164
165 for (size_t i = 0; i < mDesiredThreadCount; ++i)
166 {
167 mThreads.emplace_back(&AsyncWorkerPool::threadLoop, this);
168 }
169 }
170
postWorkerTask(std::shared_ptr<Closure> task)171 std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
172 {
173 // Thread safety: This function is thread-safe because access to |mTaskQueue| is protected by
174 // |mMutex|.
175 auto waitable = std::make_shared<AsyncWaitableEvent>();
176 {
177 std::lock_guard<std::mutex> lock(mMutex);
178
179 // Lazily create the threads on first task
180 createThreads();
181
182 mTaskQueue.push(std::make_pair(waitable, task));
183 }
184 mCondVar.notify_one();
185 return waitable;
186 }
187
threadLoop()188 void AsyncWorkerPool::threadLoop()
189 {
190 while (true)
191 {
192 Task task;
193 {
194 std::unique_lock<std::mutex> lock(mMutex);
195 mCondVar.wait(lock, [this] { return !mTaskQueue.empty() || mTerminated; });
196 if (mTerminated)
197 {
198 return;
199 }
200 task = mTaskQueue.front();
201 mTaskQueue.pop();
202 }
203
204 auto &waitable = task.first;
205 auto &closure = task.second;
206
207 // Note: always add an ANGLE_TRACE_EVENT* macro in the closure. Then the job will show up
208 // in traces.
209 (*closure)();
210 waitable->markAsReady();
211 }
212 }
213
isAsync()214 bool AsyncWorkerPool::isAsync()
215 {
216 return true;
217 }
218
219 #endif // ANGLE_STD_ASYNC_WORKERS
220
221 #if ANGLE_DELEGATE_WORKERS
222
223 class DelegateWorkerPool final : public WorkerThreadPool
224 {
225 public:
DelegateWorkerPool(PlatformMethods * platform)226 DelegateWorkerPool(PlatformMethods *platform) : mPlatform(platform) {}
227 ~DelegateWorkerPool() override = default;
228
229 std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
230
231 bool isAsync() override;
232
233 private:
234 PlatformMethods *mPlatform;
235 };
236
237 // A function wrapper to execute the closure and to notify the waitable
238 // event after the execution.
239 class DelegateWorkerTask
240 {
241 public:
DelegateWorkerTask(std::shared_ptr<Closure> task,std::shared_ptr<AsyncWaitableEvent> waitable)242 DelegateWorkerTask(std::shared_ptr<Closure> task, std::shared_ptr<AsyncWaitableEvent> waitable)
243 : mTask(task), mWaitable(waitable)
244 {}
245 DelegateWorkerTask() = delete;
246 DelegateWorkerTask(DelegateWorkerTask &) = delete;
247
RunTask(void * userData)248 static void RunTask(void *userData)
249 {
250 DelegateWorkerTask *workerTask = static_cast<DelegateWorkerTask *>(userData);
251 (*workerTask->mTask)();
252 workerTask->mWaitable->markAsReady();
253
254 // Delete the task after its execution.
255 delete workerTask;
256 }
257
258 private:
259 ~DelegateWorkerTask() = default;
260
261 std::shared_ptr<Closure> mTask;
262 std::shared_ptr<AsyncWaitableEvent> mWaitable;
263 };
264
265 ANGLE_NO_SANITIZE_CFI_ICALL
postWorkerTask(std::shared_ptr<Closure> task)266 std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
267 {
268 // Thread safety: This function is thread-safe because the |postWorkerTask| platform method is
269 // expected to be thread safe. For Chromium, that forwards the call to the |TaskTracker| class
270 // in base/task/thread_pool/task_tracker.h which is thread-safe.
271 auto waitable = std::make_shared<AsyncWaitableEvent>();
272
273 // The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution.
274 DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
275 mPlatform->postWorkerTask(mPlatform, DelegateWorkerTask::RunTask, workerTask);
276
277 return waitable;
278 }
279
isAsync()280 bool DelegateWorkerPool::isAsync()
281 {
282 return true;
283 }
284 #endif
285
286 // static
Create(size_t numThreads,PlatformMethods * platform)287 std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(size_t numThreads,
288 PlatformMethods *platform)
289 {
290 const bool multithreaded = numThreads != 1;
291 std::shared_ptr<WorkerThreadPool> pool(nullptr);
292
293 #if ANGLE_DELEGATE_WORKERS
294 const bool hasPostWorkerTaskImpl = platform->postWorkerTask != nullptr;
295 if (hasPostWorkerTaskImpl && multithreaded)
296 {
297 pool = std::shared_ptr<WorkerThreadPool>(new DelegateWorkerPool(platform));
298 }
299 #endif
300 #if ANGLE_STD_ASYNC_WORKERS
301 if (!pool && multithreaded)
302 {
303 pool = std::shared_ptr<WorkerThreadPool>(new AsyncWorkerPool(
304 numThreads == 0 ? std::thread::hardware_concurrency() : numThreads));
305 }
306 #endif
307 if (!pool)
308 {
309 return std::shared_ptr<WorkerThreadPool>(new SingleThreadedWorkerPool());
310 }
311 return pool;
312 }
313 } // namespace angle
314