1 //
2 // Copyright 2016 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // WorkerThread:
7 // Task running thread for ANGLE, similar to a TaskRunner in Chromium.
8 // Might be implemented differently depending on platform.
9 //
10
11 #include "libANGLE/WorkerThread.h"
12
13 #include "libANGLE/trace.h"
14
15 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
16 # include <condition_variable>
17 # include <future>
18 # include <mutex>
19 # include <queue>
20 # include <thread>
21 #endif // (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
22
23 namespace angle
24 {
25
26 WaitableEvent::WaitableEvent() = default;
27 WaitableEvent::~WaitableEvent() = default;
28
wait()29 void WaitableEventDone::wait() {}
30
isReady()31 bool WaitableEventDone::isReady()
32 {
33 return true;
34 }
35
36 WorkerThreadPool::WorkerThreadPool() = default;
37 WorkerThreadPool::~WorkerThreadPool() = default;
38
39 class SingleThreadedWaitableEvent final : public WaitableEvent
40 {
41 public:
42 SingleThreadedWaitableEvent() = default;
43 ~SingleThreadedWaitableEvent() override = default;
44
45 void wait() override;
46 bool isReady() override;
47 };
48
wait()49 void SingleThreadedWaitableEvent::wait() {}
50
isReady()51 bool SingleThreadedWaitableEvent::isReady()
52 {
53 return true;
54 }
55
56 class SingleThreadedWorkerPool final : public WorkerThreadPool
57 {
58 public:
59 std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
60 void setMaxThreads(size_t maxThreads) override;
61 bool isAsync() override;
62 };
63
64 // SingleThreadedWorkerPool implementation.
postWorkerTask(std::shared_ptr<Closure> task)65 std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
66 std::shared_ptr<Closure> task)
67 {
68 (*task)();
69 return std::make_shared<SingleThreadedWaitableEvent>();
70 }
71
setMaxThreads(size_t maxThreads)72 void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {}
73
isAsync()74 bool SingleThreadedWorkerPool::isAsync()
75 {
76 return false;
77 }
78
79 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
80 class AsyncWaitableEvent final : public WaitableEvent
81 {
82 public:
AsyncWaitableEvent()83 AsyncWaitableEvent() : mIsPending(true) {}
84 ~AsyncWaitableEvent() override = default;
85
86 void wait() override;
87 bool isReady() override;
88
89 private:
90 friend class AsyncWorkerPool;
91 void setFuture(std::future<void> &&future);
92
93 // To block wait() when the task is still in queue to be run.
94 // Also to protect the concurrent accesses from both main thread and
95 // background threads to the member fields.
96 std::mutex mMutex;
97
98 bool mIsPending;
99 std::condition_variable mCondition;
100 std::future<void> mFuture;
101 };
102
setFuture(std::future<void> && future)103 void AsyncWaitableEvent::setFuture(std::future<void> &&future)
104 {
105 mFuture = std::move(future);
106 }
107
wait()108 void AsyncWaitableEvent::wait()
109 {
110 ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWaitableEvent::wait");
111 {
112 std::unique_lock<std::mutex> lock(mMutex);
113 mCondition.wait(lock, [this] { return !mIsPending; });
114 }
115
116 ASSERT(mFuture.valid());
117 mFuture.wait();
118 }
119
isReady()120 bool AsyncWaitableEvent::isReady()
121 {
122 std::lock_guard<std::mutex> lock(mMutex);
123 if (mIsPending)
124 {
125 return false;
126 }
127 ASSERT(mFuture.valid());
128 return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
129 }
130
131 class AsyncWorkerPool final : public WorkerThreadPool
132 {
133 public:
AsyncWorkerPool(size_t maxThreads)134 AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {}
135 ~AsyncWorkerPool() override = default;
136
137 std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
138 void setMaxThreads(size_t maxThreads) override;
139 bool isAsync() override;
140
141 private:
142 void checkToRunPendingTasks();
143
144 // To protect the concurrent accesses from both main thread and background
145 // threads to the member fields.
146 std::mutex mMutex;
147
148 size_t mMaxThreads;
149 size_t mRunningThreads;
150 std::queue<std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>> mTaskQueue;
151 };
152
153 // AsyncWorkerPool implementation.
postWorkerTask(std::shared_ptr<Closure> task)154 std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
155 {
156 ASSERT(mMaxThreads > 0);
157
158 auto waitable = std::make_shared<AsyncWaitableEvent>();
159 {
160 std::lock_guard<std::mutex> lock(mMutex);
161 mTaskQueue.push(std::make_pair(waitable, task));
162 }
163 checkToRunPendingTasks();
164 return std::move(waitable);
165 }
166
setMaxThreads(size_t maxThreads)167 void AsyncWorkerPool::setMaxThreads(size_t maxThreads)
168 {
169 {
170 std::lock_guard<std::mutex> lock(mMutex);
171 mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads);
172 }
173 checkToRunPendingTasks();
174 }
175
isAsync()176 bool AsyncWorkerPool::isAsync()
177 {
178 return true;
179 }
180
checkToRunPendingTasks()181 void AsyncWorkerPool::checkToRunPendingTasks()
182 {
183 std::lock_guard<std::mutex> lock(mMutex);
184 while (mRunningThreads < mMaxThreads && !mTaskQueue.empty())
185 {
186 auto task = mTaskQueue.front();
187 mTaskQueue.pop();
188 auto waitable = task.first;
189 auto closure = task.second;
190
191 auto future = std::async(std::launch::async, [closure, this] {
192 {
193 ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWorkerPool::RunTask");
194 (*closure)();
195 }
196 {
197 std::lock_guard<std::mutex> lock(mMutex);
198 ASSERT(mRunningThreads != 0);
199 --mRunningThreads;
200 }
201 checkToRunPendingTasks();
202 });
203
204 ++mRunningThreads;
205
206 {
207 std::lock_guard<std::mutex> waitableLock(waitable->mMutex);
208 waitable->mIsPending = false;
209 waitable->setFuture(std::move(future));
210 }
211 waitable->mCondition.notify_all();
212 }
213 }
214 #endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
215
216 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
217 class DelegateWaitableEvent final : public WaitableEvent
218 {
219 public:
220 DelegateWaitableEvent() = default;
221 ~DelegateWaitableEvent() override = default;
222
223 void wait() override;
224 bool isReady() override;
225
226 void markAsReady();
227
228 private:
229 // To protect the concurrent accesses from both main thread and background
230 // threads to the member fields.
231 std::mutex mMutex;
232
233 bool mIsReady = false;
234 std::condition_variable mCondition;
235 };
236
markAsReady()237 void DelegateWaitableEvent::markAsReady()
238 {
239 std::lock_guard<std::mutex> lock(mMutex);
240 mIsReady = true;
241 mCondition.notify_all();
242 }
243
wait()244 void DelegateWaitableEvent::wait()
245 {
246 std::unique_lock<std::mutex> lock(mMutex);
247 mCondition.wait(lock, [this] { return mIsReady; });
248 }
249
isReady()250 bool DelegateWaitableEvent::isReady()
251 {
252 std::lock_guard<std::mutex> lock(mMutex);
253 return mIsReady;
254 }
255
256 class DelegateWorkerPool final : public WorkerThreadPool
257 {
258 public:
259 DelegateWorkerPool() = default;
260 ~DelegateWorkerPool() override = default;
261
262 std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
263
264 void setMaxThreads(size_t maxThreads) override;
265 bool isAsync() override;
266 };
267
268 // A function wrapper to execute the closure and to notify the waitable
269 // event after the execution.
270 class DelegateWorkerTask
271 {
272 public:
DelegateWorkerTask(std::shared_ptr<Closure> task,std::shared_ptr<DelegateWaitableEvent> waitable)273 DelegateWorkerTask(std::shared_ptr<Closure> task,
274 std::shared_ptr<DelegateWaitableEvent> waitable)
275 : mTask(task), mWaitable(waitable)
276 {}
277 DelegateWorkerTask() = delete;
278 DelegateWorkerTask(DelegateWorkerTask &) = delete;
279
RunTask(void * userData)280 static void RunTask(void *userData)
281 {
282 DelegateWorkerTask *workerTask = static_cast<DelegateWorkerTask *>(userData);
283 (*workerTask->mTask)();
284 workerTask->mWaitable->markAsReady();
285
286 // Delete the task after its execution.
287 delete workerTask;
288 }
289
290 private:
291 ~DelegateWorkerTask() = default;
292
293 std::shared_ptr<Closure> mTask;
294 std::shared_ptr<DelegateWaitableEvent> mWaitable;
295 };
296
postWorkerTask(std::shared_ptr<Closure> task)297 std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
298 {
299 auto waitable = std::make_shared<DelegateWaitableEvent>();
300
301 // The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution.
302 DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
303 auto *platform = ANGLEPlatformCurrent();
304 platform->postWorkerTask(platform, DelegateWorkerTask::RunTask, workerTask);
305
306 return std::move(waitable);
307 }
308
setMaxThreads(size_t maxThreads)309 void DelegateWorkerPool::setMaxThreads(size_t maxThreads) {}
310
isAsync()311 bool DelegateWorkerPool::isAsync()
312 {
313 return true;
314 }
315 #endif
316
317 // static
Create(bool multithreaded)318 std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded)
319 {
320 std::shared_ptr<WorkerThreadPool> pool(nullptr);
321
322 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
323 const bool hasPostWorkerTaskImpl = ANGLEPlatformCurrent()->postWorkerTask;
324 if (hasPostWorkerTaskImpl && multithreaded)
325 {
326 pool = std::shared_ptr<WorkerThreadPool>(new DelegateWorkerPool());
327 }
328 #endif
329 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
330 if (!pool && multithreaded)
331 {
332 pool = std::shared_ptr<WorkerThreadPool>(
333 new AsyncWorkerPool(std::thread::hardware_concurrency()));
334 }
335 #endif
336 if (!pool)
337 {
338 return std::shared_ptr<WorkerThreadPool>(new SingleThreadedWorkerPool());
339 }
340 return pool;
341 }
342
343 // static
PostWorkerTask(std::shared_ptr<WorkerThreadPool> pool,std::shared_ptr<Closure> task)344 std::shared_ptr<WaitableEvent> WorkerThreadPool::PostWorkerTask(
345 std::shared_ptr<WorkerThreadPool> pool,
346 std::shared_ptr<Closure> task)
347 {
348 std::shared_ptr<WaitableEvent> event = pool->postWorkerTask(task);
349 if (event.get())
350 {
351 event->setWorkerThreadPool(pool);
352 }
353 return event;
354 }
355
356 } // namespace angle
357