1 // 2 // Copyright 2016 The ANGLE Project Authors. All rights reserved. 3 // Use of this source code is governed by a BSD-style license that can be 4 // found in the LICENSE file. 5 // 6 // WorkerThread: 7 // Task running thread for ANGLE, similar to a TaskRunner in Chromium. 8 // Might be implemented differently depending on platform. 9 // 10 11 #include "libANGLE/WorkerThread.h" 12 13 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) 14 # include <condition_variable> 15 # include <future> 16 # include <mutex> 17 # include <queue> 18 # include <thread> 19 #endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) 20 21 namespace angle 22 { 23 24 WaitableEvent::WaitableEvent() = default; 25 WaitableEvent::~WaitableEvent() = default; 26 wait()27void WaitableEventDone::wait() {} 28 isReady()29bool WaitableEventDone::isReady() 30 { 31 return true; 32 } 33 34 WorkerThreadPool::WorkerThreadPool() = default; 35 WorkerThreadPool::~WorkerThreadPool() = default; 36 37 class SingleThreadedWaitableEvent final : public WaitableEvent 38 { 39 public: 40 SingleThreadedWaitableEvent() = default; 41 ~SingleThreadedWaitableEvent() override = default; 42 43 void wait() override; 44 bool isReady() override; 45 }; 46 wait()47void SingleThreadedWaitableEvent::wait() {} 48 isReady()49bool SingleThreadedWaitableEvent::isReady() 50 { 51 return true; 52 } 53 54 class SingleThreadedWorkerPool final : public WorkerThreadPool 55 { 56 public: 57 std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override; 58 void setMaxThreads(size_t maxThreads) override; 59 bool isAsync() override; 60 }; 61 62 // SingleThreadedWorkerPool implementation. postWorkerTask(std::shared_ptr<Closure> task)63std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask( 64 std::shared_ptr<Closure> task) 65 { 66 (*task)(); 67 return std::make_shared<SingleThreadedWaitableEvent>(); 68 } 69 setMaxThreads(size_t maxThreads)70void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {} 71 isAsync()72bool SingleThreadedWorkerPool::isAsync() 73 { 74 return false; 75 } 76 77 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) 78 class AsyncWaitableEvent final : public WaitableEvent 79 { 80 public: AsyncWaitableEvent()81 AsyncWaitableEvent() : mIsPending(true) {} 82 ~AsyncWaitableEvent() override = default; 83 84 void wait() override; 85 bool isReady() override; 86 87 private: 88 friend class AsyncWorkerPool; 89 void setFuture(std::future<void> &&future); 90 91 // To block wait() when the task is stil in queue to be run. 92 // Also to protect the concurrent accesses from both main thread and 93 // background threads to the member fields. 94 std::mutex mMutex; 95 96 bool mIsPending; 97 std::condition_variable mCondition; 98 std::future<void> mFuture; 99 }; 100 setFuture(std::future<void> && future)101void AsyncWaitableEvent::setFuture(std::future<void> &&future) 102 { 103 mFuture = std::move(future); 104 } 105 wait()106void AsyncWaitableEvent::wait() 107 { 108 { 109 std::unique_lock<std::mutex> lock(mMutex); 110 mCondition.wait(lock, [this] { return !mIsPending; }); 111 } 112 113 ASSERT(mFuture.valid()); 114 mFuture.wait(); 115 } 116 isReady()117bool AsyncWaitableEvent::isReady() 118 { 119 std::lock_guard<std::mutex> lock(mMutex); 120 if (mIsPending) 121 { 122 return false; 123 } 124 ASSERT(mFuture.valid()); 125 return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready; 126 } 127 128 class AsyncWorkerPool final : public WorkerThreadPool 129 { 130 public: AsyncWorkerPool(size_t maxThreads)131 AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {} 132 ~AsyncWorkerPool() override = default; 133 134 std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override; 135 void setMaxThreads(size_t maxThreads) override; 136 bool isAsync() override; 137 138 private: 139 void checkToRunPendingTasks(); 140 141 // To protect the concurrent accesses from both main thread and background 142 // threads to the member fields. 143 std::mutex mMutex; 144 145 size_t mMaxThreads; 146 size_t mRunningThreads; 147 std::queue<std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>> mTaskQueue; 148 }; 149 150 // AsyncWorkerPool implementation. postWorkerTask(std::shared_ptr<Closure> task)151std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task) 152 { 153 ASSERT(mMaxThreads > 0); 154 155 auto waitable = std::make_shared<AsyncWaitableEvent>(); 156 { 157 std::lock_guard<std::mutex> lock(mMutex); 158 mTaskQueue.push(std::make_pair(waitable, task)); 159 } 160 checkToRunPendingTasks(); 161 return waitable; 162 } 163 setMaxThreads(size_t maxThreads)164void AsyncWorkerPool::setMaxThreads(size_t maxThreads) 165 { 166 { 167 std::lock_guard<std::mutex> lock(mMutex); 168 mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads); 169 } 170 checkToRunPendingTasks(); 171 } 172 isAsync()173bool AsyncWorkerPool::isAsync() 174 { 175 return true; 176 } 177 checkToRunPendingTasks()178void AsyncWorkerPool::checkToRunPendingTasks() 179 { 180 std::lock_guard<std::mutex> lock(mMutex); 181 while (mRunningThreads < mMaxThreads && !mTaskQueue.empty()) 182 { 183 auto task = mTaskQueue.front(); 184 mTaskQueue.pop(); 185 auto waitable = task.first; 186 auto closure = task.second; 187 188 auto future = std::async(std::launch::async, [closure, this] { 189 (*closure)(); 190 { 191 std::lock_guard<std::mutex> lock(mMutex); 192 ASSERT(mRunningThreads != 0); 193 --mRunningThreads; 194 } 195 checkToRunPendingTasks(); 196 }); 197 198 ++mRunningThreads; 199 200 { 201 std::lock_guard<std::mutex> waitableLock(waitable->mMutex); 202 waitable->mIsPending = false; 203 waitable->setFuture(std::move(future)); 204 } 205 waitable->mCondition.notify_all(); 206 } 207 } 208 #endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) 209 210 // static Create(bool multithreaded)211std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded) 212 { 213 std::shared_ptr<WorkerThreadPool> pool(nullptr); 214 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) 215 if (multithreaded) 216 { 217 pool = std::shared_ptr<WorkerThreadPool>(static_cast<WorkerThreadPool *>( 218 new AsyncWorkerPool(std::thread::hardware_concurrency()))); 219 } 220 #endif 221 if (!pool) 222 { 223 return std::shared_ptr<WorkerThreadPool>( 224 static_cast<WorkerThreadPool *>(new SingleThreadedWorkerPool())); 225 } 226 return pool; 227 } 228 229 // static PostWorkerTask(std::shared_ptr<WorkerThreadPool> pool,std::shared_ptr<Closure> task)230std::shared_ptr<WaitableEvent> WorkerThreadPool::PostWorkerTask( 231 std::shared_ptr<WorkerThreadPool> pool, 232 std::shared_ptr<Closure> task) 233 { 234 std::shared_ptr<WaitableEvent> event = pool->postWorkerTask(task); 235 if (event.get()) 236 { 237 event->setWorkerThreadPool(pool); 238 } 239 return event; 240 } 241 242 } // namespace angle 243