• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2016 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // WorkerThread:
7 //   Task running thread for ANGLE, similar to a TaskRunner in Chromium.
8 //   Might be implemented differently depending on platform.
9 //
10 
11 #include "libANGLE/WorkerThread.h"
12 
13 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
14 #    include <condition_variable>
15 #    include <future>
16 #    include <mutex>
17 #    include <queue>
18 #    include <thread>
19 #endif  // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
20 
21 namespace angle
22 {
23 
24 WaitableEvent::WaitableEvent()  = default;
25 WaitableEvent::~WaitableEvent() = default;
26 
wait()27 void WaitableEventDone::wait() {}
28 
isReady()29 bool WaitableEventDone::isReady()
30 {
31     return true;
32 }
33 
34 WorkerThreadPool::WorkerThreadPool()  = default;
35 WorkerThreadPool::~WorkerThreadPool() = default;
36 
37 class SingleThreadedWaitableEvent final : public WaitableEvent
38 {
39   public:
40     SingleThreadedWaitableEvent()           = default;
41     ~SingleThreadedWaitableEvent() override = default;
42 
43     void wait() override;
44     bool isReady() override;
45 };
46 
wait()47 void SingleThreadedWaitableEvent::wait() {}
48 
isReady()49 bool SingleThreadedWaitableEvent::isReady()
50 {
51     return true;
52 }
53 
54 class SingleThreadedWorkerPool final : public WorkerThreadPool
55 {
56   public:
57     std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
58     void setMaxThreads(size_t maxThreads) override;
59     bool isAsync() override;
60 };
61 
62 // SingleThreadedWorkerPool implementation.
postWorkerTask(std::shared_ptr<Closure> task)63 std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
64     std::shared_ptr<Closure> task)
65 {
66     (*task)();
67     return std::make_shared<SingleThreadedWaitableEvent>();
68 }
69 
setMaxThreads(size_t maxThreads)70 void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {}
71 
isAsync()72 bool SingleThreadedWorkerPool::isAsync()
73 {
74     return false;
75 }
76 
77 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
78 class AsyncWaitableEvent final : public WaitableEvent
79 {
80   public:
AsyncWaitableEvent()81     AsyncWaitableEvent() : mIsPending(true) {}
82     ~AsyncWaitableEvent() override = default;
83 
84     void wait() override;
85     bool isReady() override;
86 
87   private:
88     friend class AsyncWorkerPool;
89     void setFuture(std::future<void> &&future);
90 
91     // To block wait() when the task is stil in queue to be run.
92     // Also to protect the concurrent accesses from both main thread and
93     // background threads to the member fields.
94     std::mutex mMutex;
95 
96     bool mIsPending;
97     std::condition_variable mCondition;
98     std::future<void> mFuture;
99 };
100 
setFuture(std::future<void> && future)101 void AsyncWaitableEvent::setFuture(std::future<void> &&future)
102 {
103     mFuture = std::move(future);
104 }
105 
wait()106 void AsyncWaitableEvent::wait()
107 {
108     {
109         std::unique_lock<std::mutex> lock(mMutex);
110         mCondition.wait(lock, [this] { return !mIsPending; });
111     }
112 
113     ASSERT(mFuture.valid());
114     mFuture.wait();
115 }
116 
isReady()117 bool AsyncWaitableEvent::isReady()
118 {
119     std::lock_guard<std::mutex> lock(mMutex);
120     if (mIsPending)
121     {
122         return false;
123     }
124     ASSERT(mFuture.valid());
125     return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
126 }
127 
128 class AsyncWorkerPool final : public WorkerThreadPool
129 {
130   public:
AsyncWorkerPool(size_t maxThreads)131     AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {}
132     ~AsyncWorkerPool() override = default;
133 
134     std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
135     void setMaxThreads(size_t maxThreads) override;
136     bool isAsync() override;
137 
138   private:
139     void checkToRunPendingTasks();
140 
141     // To protect the concurrent accesses from both main thread and background
142     // threads to the member fields.
143     std::mutex mMutex;
144 
145     size_t mMaxThreads;
146     size_t mRunningThreads;
147     std::queue<std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>> mTaskQueue;
148 };
149 
150 // AsyncWorkerPool implementation.
postWorkerTask(std::shared_ptr<Closure> task)151 std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
152 {
153     ASSERT(mMaxThreads > 0);
154 
155     auto waitable = std::make_shared<AsyncWaitableEvent>();
156     {
157         std::lock_guard<std::mutex> lock(mMutex);
158         mTaskQueue.push(std::make_pair(waitable, task));
159     }
160     checkToRunPendingTasks();
161     return waitable;
162 }
163 
setMaxThreads(size_t maxThreads)164 void AsyncWorkerPool::setMaxThreads(size_t maxThreads)
165 {
166     {
167         std::lock_guard<std::mutex> lock(mMutex);
168         mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads);
169     }
170     checkToRunPendingTasks();
171 }
172 
isAsync()173 bool AsyncWorkerPool::isAsync()
174 {
175     return true;
176 }
177 
checkToRunPendingTasks()178 void AsyncWorkerPool::checkToRunPendingTasks()
179 {
180     std::lock_guard<std::mutex> lock(mMutex);
181     while (mRunningThreads < mMaxThreads && !mTaskQueue.empty())
182     {
183         auto task = mTaskQueue.front();
184         mTaskQueue.pop();
185         auto waitable = task.first;
186         auto closure  = task.second;
187 
188         auto future = std::async(std::launch::async, [closure, this] {
189             (*closure)();
190             {
191                 std::lock_guard<std::mutex> lock(mMutex);
192                 ASSERT(mRunningThreads != 0);
193                 --mRunningThreads;
194             }
195             checkToRunPendingTasks();
196         });
197 
198         ++mRunningThreads;
199 
200         {
201             std::lock_guard<std::mutex> waitableLock(waitable->mMutex);
202             waitable->mIsPending = false;
203             waitable->setFuture(std::move(future));
204         }
205         waitable->mCondition.notify_all();
206     }
207 }
208 #endif  // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
209 
210 // static
Create(bool multithreaded)211 std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded)
212 {
213     std::shared_ptr<WorkerThreadPool> pool(nullptr);
214 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
215     if (multithreaded)
216     {
217         pool = std::shared_ptr<WorkerThreadPool>(static_cast<WorkerThreadPool *>(
218             new AsyncWorkerPool(std::thread::hardware_concurrency())));
219     }
220 #endif
221     if (!pool)
222     {
223         return std::shared_ptr<WorkerThreadPool>(
224             static_cast<WorkerThreadPool *>(new SingleThreadedWorkerPool()));
225     }
226     return pool;
227 }
228 
229 // static
PostWorkerTask(std::shared_ptr<WorkerThreadPool> pool,std::shared_ptr<Closure> task)230 std::shared_ptr<WaitableEvent> WorkerThreadPool::PostWorkerTask(
231     std::shared_ptr<WorkerThreadPool> pool,
232     std::shared_ptr<Closure> task)
233 {
234     std::shared_ptr<WaitableEvent> event = pool->postWorkerTask(task);
235     if (event.get())
236     {
237         event->setWorkerThreadPool(pool);
238     }
239     return event;
240 }
241 
242 }  // namespace angle
243