• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2016 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // WorkerThread:
7 //   Task running thread for ANGLE, similar to a TaskRunner in Chromium.
8 //   Might be implemented differently depending on platform.
9 //
10 
11 #include "common/WorkerThread.h"
12 
13 #include "common/angleutils.h"
14 
15 // Controls if our threading code uses std::async or falls back to single-threaded operations.
16 // Note that we can't easily use std::async in UWPs due to UWP threading restrictions.
17 #if !defined(ANGLE_STD_ASYNC_WORKERS) && !defined(ANGLE_ENABLE_WINDOWS_UWP)
18 #    define ANGLE_STD_ASYNC_WORKERS 1
19 #endif  // !defined(ANGLE_STD_ASYNC_WORKERS) && & !defined(ANGLE_ENABLE_WINDOWS_UWP)
20 
21 #if ANGLE_DELEGATE_WORKERS || ANGLE_STD_ASYNC_WORKERS
22 #    include <condition_variable>
23 #    include <future>
24 #    include <mutex>
25 #    include <queue>
26 #    include <thread>
27 #endif  // ANGLE_DELEGATE_WORKERS || ANGLE_STD_ASYNC_WORKERS
28 
29 namespace angle
30 {
31 
32 WaitableEvent::WaitableEvent()  = default;
33 WaitableEvent::~WaitableEvent() = default;
34 
wait()35 void WaitableEventDone::wait() {}
36 
isReady()37 bool WaitableEventDone::isReady()
38 {
39     return true;
40 }
41 
42 // A waitable event that can be completed asynchronously
43 class AsyncWaitableEvent final : public WaitableEvent
44 {
45   public:
46     AsyncWaitableEvent()           = default;
47     ~AsyncWaitableEvent() override = default;
48 
49     void wait() override;
50     bool isReady() override;
51 
52     void markAsReady();
53 
54   private:
55     // To protect the concurrent accesses from both main thread and background
56     // threads to the member fields.
57     std::mutex mMutex;
58 
59     bool mIsReady = false;
60     std::condition_variable mCondition;
61 };
62 
markAsReady()63 void AsyncWaitableEvent::markAsReady()
64 {
65     std::lock_guard<std::mutex> lock(mMutex);
66     mIsReady = true;
67     mCondition.notify_all();
68 }
69 
wait()70 void AsyncWaitableEvent::wait()
71 {
72     std::unique_lock<std::mutex> lock(mMutex);
73     mCondition.wait(lock, [this] { return mIsReady; });
74 }
75 
isReady()76 bool AsyncWaitableEvent::isReady()
77 {
78     std::lock_guard<std::mutex> lock(mMutex);
79     return mIsReady;
80 }
81 
82 WorkerThreadPool::WorkerThreadPool()  = default;
83 WorkerThreadPool::~WorkerThreadPool() = default;
84 
85 class SingleThreadedWorkerPool final : public WorkerThreadPool
86 {
87   public:
88     std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
89     bool isAsync() override;
90 };
91 
92 // SingleThreadedWorkerPool implementation.
postWorkerTask(std::shared_ptr<Closure> task)93 std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
94     std::shared_ptr<Closure> task)
95 {
96     // Thread safety: This function is thread-safe because the task is run on the calling thread
97     // itself.
98     (*task)();
99     return std::make_shared<WaitableEventDone>();
100 }
101 
isAsync()102 bool SingleThreadedWorkerPool::isAsync()
103 {
104     return false;
105 }
106 
107 #if ANGLE_STD_ASYNC_WORKERS
108 
109 class AsyncWorkerPool final : public WorkerThreadPool
110 {
111   public:
112     AsyncWorkerPool(size_t numThreads);
113 
114     ~AsyncWorkerPool() override;
115 
116     std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
117 
118     bool isAsync() override;
119 
120   private:
121     void createThreads();
122 
123     using Task = std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>;
124 
125     // Thread's main loop
126     void threadLoop();
127 
128     bool mTerminated = false;
129     std::mutex mMutex;                 // Protects access to the fields in this class
130     std::condition_variable mCondVar;  // Signals when work is available in the queue
131     std::queue<Task> mTaskQueue;
132     std::deque<std::thread> mThreads;
133     size_t mDesiredThreadCount;
134 };
135 
136 // AsyncWorkerPool implementation.
137 
AsyncWorkerPool(size_t numThreads)138 AsyncWorkerPool::AsyncWorkerPool(size_t numThreads) : mDesiredThreadCount(numThreads)
139 {
140     ASSERT(numThreads != 0);
141 }
142 
~AsyncWorkerPool()143 AsyncWorkerPool::~AsyncWorkerPool()
144 {
145     {
146         std::unique_lock<std::mutex> lock(mMutex);
147         mTerminated = true;
148     }
149     mCondVar.notify_all();
150     for (auto &thread : mThreads)
151     {
152         ASSERT(thread.get_id() != std::this_thread::get_id());
153         thread.join();
154     }
155 }
156 
createThreads()157 void AsyncWorkerPool::createThreads()
158 {
159     if (mDesiredThreadCount == mThreads.size())
160     {
161         return;
162     }
163     ASSERT(mThreads.empty());
164 
165     for (size_t i = 0; i < mDesiredThreadCount; ++i)
166     {
167         mThreads.emplace_back(&AsyncWorkerPool::threadLoop, this);
168     }
169 }
170 
postWorkerTask(std::shared_ptr<Closure> task)171 std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
172 {
173     // Thread safety: This function is thread-safe because access to |mTaskQueue| is protected by
174     // |mMutex|.
175     auto waitable = std::make_shared<AsyncWaitableEvent>();
176     {
177         std::lock_guard<std::mutex> lock(mMutex);
178 
179         // Lazily create the threads on first task
180         createThreads();
181 
182         mTaskQueue.push(std::make_pair(waitable, task));
183     }
184     mCondVar.notify_one();
185     return waitable;
186 }
187 
threadLoop()188 void AsyncWorkerPool::threadLoop()
189 {
190     while (true)
191     {
192         Task task;
193         {
194             std::unique_lock<std::mutex> lock(mMutex);
195             mCondVar.wait(lock, [this] { return !mTaskQueue.empty() || mTerminated; });
196             if (mTerminated)
197             {
198                 return;
199             }
200             task = mTaskQueue.front();
201             mTaskQueue.pop();
202         }
203 
204         auto &waitable = task.first;
205         auto &closure  = task.second;
206 
207         // Note: always add an ANGLE_TRACE_EVENT* macro in the closure.  Then the job will show up
208         // in traces.
209         (*closure)();
210         waitable->markAsReady();
211     }
212 }
213 
isAsync()214 bool AsyncWorkerPool::isAsync()
215 {
216     return true;
217 }
218 
219 #endif  // ANGLE_STD_ASYNC_WORKERS
220 
221 #if ANGLE_DELEGATE_WORKERS
222 
223 class DelegateWorkerPool final : public WorkerThreadPool
224 {
225   public:
DelegateWorkerPool(PlatformMethods * platform)226     DelegateWorkerPool(PlatformMethods *platform) : mPlatform(platform) {}
227     ~DelegateWorkerPool() override = default;
228 
229     std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
230 
231     bool isAsync() override;
232 
233   private:
234     PlatformMethods *mPlatform;
235 };
236 
237 // A function wrapper to execute the closure and to notify the waitable
238 // event after the execution.
239 class DelegateWorkerTask
240 {
241   public:
DelegateWorkerTask(std::shared_ptr<Closure> task,std::shared_ptr<AsyncWaitableEvent> waitable)242     DelegateWorkerTask(std::shared_ptr<Closure> task, std::shared_ptr<AsyncWaitableEvent> waitable)
243         : mTask(task), mWaitable(waitable)
244     {}
245     DelegateWorkerTask()                     = delete;
246     DelegateWorkerTask(DelegateWorkerTask &) = delete;
247 
RunTask(void * userData)248     static void RunTask(void *userData)
249     {
250         DelegateWorkerTask *workerTask = static_cast<DelegateWorkerTask *>(userData);
251         (*workerTask->mTask)();
252         workerTask->mWaitable->markAsReady();
253 
254         // Delete the task after its execution.
255         delete workerTask;
256     }
257 
258   private:
259     ~DelegateWorkerTask() = default;
260 
261     std::shared_ptr<Closure> mTask;
262     std::shared_ptr<AsyncWaitableEvent> mWaitable;
263 };
264 
265 ANGLE_NO_SANITIZE_CFI_ICALL
postWorkerTask(std::shared_ptr<Closure> task)266 std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
267 {
268     // Thread safety: This function is thread-safe because the |postWorkerTask| platform method is
269     // expected to be thread safe.  For Chromium, that forwards the call to the |TaskTracker| class
270     // in base/task/thread_pool/task_tracker.h which is thread-safe.
271     auto waitable = std::make_shared<AsyncWaitableEvent>();
272 
273     // The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution.
274     DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
275     mPlatform->postWorkerTask(mPlatform, DelegateWorkerTask::RunTask, workerTask);
276 
277     return waitable;
278 }
279 
isAsync()280 bool DelegateWorkerPool::isAsync()
281 {
282     return true;
283 }
284 #endif
285 
286 // static
Create(size_t numThreads,PlatformMethods * platform)287 std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(size_t numThreads,
288                                                            PlatformMethods *platform)
289 {
290     const bool multithreaded = numThreads != 1;
291     std::shared_ptr<WorkerThreadPool> pool(nullptr);
292 
293 #if ANGLE_DELEGATE_WORKERS
294     const bool hasPostWorkerTaskImpl = platform->postWorkerTask != nullptr;
295     if (hasPostWorkerTaskImpl && multithreaded)
296     {
297         pool = std::shared_ptr<WorkerThreadPool>(new DelegateWorkerPool(platform));
298     }
299 #endif
300 #if ANGLE_STD_ASYNC_WORKERS
301     if (!pool && multithreaded)
302     {
303         pool = std::shared_ptr<WorkerThreadPool>(new AsyncWorkerPool(
304             numThreads == 0 ? std::thread::hardware_concurrency() : numThreads));
305     }
306 #endif
307     if (!pool)
308     {
309         return std::shared_ptr<WorkerThreadPool>(new SingleThreadedWorkerPool());
310     }
311     return pool;
312 }
313 }  // namespace angle
314