// // Copyright 2016 The ANGLE Project Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. // // WorkerThread: // Task running thread for ANGLE, similar to a TaskRunner in Chromium. // Might be implemented differently depending on platform. // #include "libANGLE/WorkerThread.h" #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) # include # include # include # include # include #endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) namespace angle { WaitableEvent::WaitableEvent() = default; WaitableEvent::~WaitableEvent() = default; void WaitableEventDone::wait() {} bool WaitableEventDone::isReady() { return true; } WorkerThreadPool::WorkerThreadPool() = default; WorkerThreadPool::~WorkerThreadPool() = default; class SingleThreadedWaitableEvent final : public WaitableEvent { public: SingleThreadedWaitableEvent() = default; ~SingleThreadedWaitableEvent() override = default; void wait() override; bool isReady() override; }; void SingleThreadedWaitableEvent::wait() {} bool SingleThreadedWaitableEvent::isReady() { return true; } class SingleThreadedWorkerPool final : public WorkerThreadPool { public: std::shared_ptr postWorkerTask(std::shared_ptr task) override; void setMaxThreads(size_t maxThreads) override; bool isAsync() override; }; // SingleThreadedWorkerPool implementation. std::shared_ptr SingleThreadedWorkerPool::postWorkerTask( std::shared_ptr task) { (*task)(); return std::make_shared(); } void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {} bool SingleThreadedWorkerPool::isAsync() { return false; } #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) class AsyncWaitableEvent final : public WaitableEvent { public: AsyncWaitableEvent() : mIsPending(true) {} ~AsyncWaitableEvent() override = default; void wait() override; bool isReady() override; private: friend class AsyncWorkerPool; void setFuture(std::future &&future); // To block wait() when the task is stil in queue to be run. // Also to protect the concurrent accesses from both main thread and // background threads to the member fields. std::mutex mMutex; bool mIsPending; std::condition_variable mCondition; std::future mFuture; }; void AsyncWaitableEvent::setFuture(std::future &&future) { mFuture = std::move(future); } void AsyncWaitableEvent::wait() { { std::unique_lock lock(mMutex); mCondition.wait(lock, [this] { return !mIsPending; }); } ASSERT(mFuture.valid()); mFuture.wait(); } bool AsyncWaitableEvent::isReady() { std::lock_guard lock(mMutex); if (mIsPending) { return false; } ASSERT(mFuture.valid()); return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready; } class AsyncWorkerPool final : public WorkerThreadPool { public: AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {} ~AsyncWorkerPool() override = default; std::shared_ptr postWorkerTask(std::shared_ptr task) override; void setMaxThreads(size_t maxThreads) override; bool isAsync() override; private: void checkToRunPendingTasks(); // To protect the concurrent accesses from both main thread and background // threads to the member fields. std::mutex mMutex; size_t mMaxThreads; size_t mRunningThreads; std::queue, std::shared_ptr>> mTaskQueue; }; // AsyncWorkerPool implementation. std::shared_ptr AsyncWorkerPool::postWorkerTask(std::shared_ptr task) { ASSERT(mMaxThreads > 0); auto waitable = std::make_shared(); { std::lock_guard lock(mMutex); mTaskQueue.push(std::make_pair(waitable, task)); } checkToRunPendingTasks(); return waitable; } void AsyncWorkerPool::setMaxThreads(size_t maxThreads) { { std::lock_guard lock(mMutex); mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads); } checkToRunPendingTasks(); } bool AsyncWorkerPool::isAsync() { return true; } void AsyncWorkerPool::checkToRunPendingTasks() { std::lock_guard lock(mMutex); while (mRunningThreads < mMaxThreads && !mTaskQueue.empty()) { auto task = mTaskQueue.front(); mTaskQueue.pop(); auto waitable = task.first; auto closure = task.second; auto future = std::async(std::launch::async, [closure, this] { (*closure)(); { std::lock_guard lock(mMutex); ASSERT(mRunningThreads != 0); --mRunningThreads; } checkToRunPendingTasks(); }); ++mRunningThreads; { std::lock_guard waitableLock(waitable->mMutex); waitable->mIsPending = false; waitable->setFuture(std::move(future)); } waitable->mCondition.notify_all(); } } #endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) // static std::shared_ptr WorkerThreadPool::Create(bool multithreaded) { std::shared_ptr pool(nullptr); #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) if (multithreaded) { pool = std::shared_ptr(static_cast( new AsyncWorkerPool(std::thread::hardware_concurrency()))); } #endif if (!pool) { return std::shared_ptr( static_cast(new SingleThreadedWorkerPool())); } return pool; } // static std::shared_ptr WorkerThreadPool::PostWorkerTask( std::shared_ptr pool, std::shared_ptr task) { std::shared_ptr event = pool->postWorkerTask(task); if (event.get()) { event->setWorkerThreadPool(pool); } return event; } } // namespace angle