1 /* 2 * Copyright 2019 The libgav1 Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef LIBGAV1_SRC_UTILS_THREADPOOL_H_ 18 #define LIBGAV1_SRC_UTILS_THREADPOOL_H_ 19 20 #include <functional> 21 #include <memory> 22 23 #if defined(__APPLE__) 24 #include <TargetConditionals.h> 25 #endif 26 27 #if !defined(LIBGAV1_THREADPOOL_USE_STD_MUTEX) 28 #if defined(__ANDROID__) || (defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE) 29 #define LIBGAV1_THREADPOOL_USE_STD_MUTEX 1 30 #else 31 #define LIBGAV1_THREADPOOL_USE_STD_MUTEX 0 32 #endif 33 #endif 34 35 #if LIBGAV1_THREADPOOL_USE_STD_MUTEX 36 #include <condition_variable> // NOLINT (unapproved c++11 header) 37 #include <mutex> // NOLINT (unapproved c++11 header) 38 #else 39 // absl::Mutex & absl::CondVar are significantly faster than the pthread 40 // variants on platforms other than Android. iOS may deadlock on Shutdown() 41 // using absl, see b/142251739. 42 #include "absl/base/thread_annotations.h" 43 #include "absl/synchronization/mutex.h" 44 #endif 45 46 #include "src/utils/compiler_attributes.h" 47 #include "src/utils/executor.h" 48 #include "src/utils/memory.h" 49 #include "src/utils/unbounded_queue.h" 50 51 namespace libgav1 { 52 53 // An implementation of ThreadPool using POSIX threads (pthreads) or Windows 54 // threads. 55 // 56 // - The pool allocates a fixed number of worker threads on instantiation. 57 // - The worker threads will pick up work jobs as they arrive. 58 // - If all workers are busy, work jobs are queued for later execution. 59 // 60 // The thread pool is shut down when the pool is destroyed. 61 // 62 // Example usage of the thread pool: 63 // { 64 // std::unique_ptr<ThreadPool> pool = ThreadPool::Create(4); 65 // for (int i = 0; i < 100; ++i) { // Dispatch 100 jobs. 66 // pool->Schedule([&my_data]() { MyFunction(&my_data); }); 67 // } 68 // } // ThreadPool gets destroyed only when all jobs are done. 69 class ThreadPool : public Executor, public Allocable { 70 public: 71 // Creates the thread pool with the specified number of worker threads. 72 // If num_threads is 1, the closures are run in FIFO order. 73 static std::unique_ptr<ThreadPool> Create(int num_threads); 74 75 // Like the above factory method, but also sets the name prefix for threads. 76 static std::unique_ptr<ThreadPool> Create(const char name_prefix[], 77 int num_threads); 78 79 // The destructor will shut down the thread pool and all jobs are executed. 80 // Note that after shutdown, the thread pool does not accept further jobs. 81 ~ThreadPool() override; 82 83 // Adds the specified "closure" to the queue for processing. If worker threads 84 // are available, "closure" will run immediately. Otherwise "closure" is 85 // queued for later execution. 86 // 87 // NOTE: If the internal queue is full and cannot be resized because of an 88 // out-of-memory error, the current thread runs "closure" before returning 89 // from Schedule(). For our use cases, this seems better than the 90 // alternatives: 91 // 1. Return a failure status. 92 // 2. Have the current thread wait until the queue is not full. 93 void Schedule(std::function<void()> closure) override; 94 95 int num_threads() const; 96 97 private: 98 class WorkerThread; 99 100 // Creates the thread pool with the specified number of worker threads. 101 // If num_threads is 1, the closures are run in FIFO order. 102 ThreadPool(const char name_prefix[], std::unique_ptr<WorkerThread*[]> threads, 103 int num_threads); 104 105 // Starts the worker pool. 106 LIBGAV1_MUST_USE_RESULT bool StartWorkers(); 107 108 void WorkerFunction(); 109 110 // Shuts down the thread pool, i.e. worker threads finish their work and 111 // pick up new jobs until the queue is empty. This call will block until 112 // the shutdown is complete. 113 // 114 // Note: If a worker encounters an empty queue after this call, it will exit. 115 // Other workers might still be running, and if the queue fills up again, the 116 // thread pool will continue to operate with a decreased number of workers. 117 // It is up to the caller to prevent adding new jobs. 118 void Shutdown(); 119 120 #if LIBGAV1_THREADPOOL_USE_STD_MUTEX 121 LockMutex()122 void LockMutex() { queue_mutex_.lock(); } UnlockMutex()123 void UnlockMutex() { queue_mutex_.unlock(); } 124 Wait()125 void Wait() { 126 std::unique_lock<std::mutex> queue_lock(queue_mutex_, std::adopt_lock); 127 condition_.wait(queue_lock); 128 queue_lock.release(); 129 } 130 SignalOne()131 void SignalOne() { condition_.notify_one(); } SignalAll()132 void SignalAll() { condition_.notify_all(); } 133 134 std::condition_variable condition_; 135 std::mutex queue_mutex_; 136 137 #else // !LIBGAV1_THREADPOOL_USE_STD_MUTEX 138 LockMutex()139 void LockMutex() ABSL_EXCLUSIVE_LOCK_FUNCTION() { queue_mutex_.Lock(); } UnlockMutex()140 void UnlockMutex() ABSL_UNLOCK_FUNCTION() { queue_mutex_.Unlock(); } Wait()141 void Wait() { condition_.Wait(&queue_mutex_); } SignalOne()142 void SignalOne() { condition_.Signal(); } SignalAll()143 void SignalAll() { condition_.SignalAll(); } 144 145 absl::CondVar condition_; 146 absl::Mutex queue_mutex_; 147 148 #endif // LIBGAV1_THREADPOOL_USE_STD_MUTEX 149 150 UnboundedQueue<std::function<void()>> queue_ LIBGAV1_GUARDED_BY(queue_mutex_); 151 // If not all the worker threads are created, the first entry after the 152 // created worker threads is a null pointer. 153 const std::unique_ptr<WorkerThread*[]> threads_; 154 155 bool exit_threads_ LIBGAV1_GUARDED_BY(queue_mutex_) = false; 156 const int num_threads_ = 0; 157 // name_prefix_ is a C string, whose length is restricted to 16 characters, 158 // including the terminating null byte ('\0'). This restriction comes from 159 // the Linux pthread_setname_np() function. 160 char name_prefix_[16]; 161 }; 162 163 } // namespace libgav1 164 165 #undef LIBGAV1_THREADPOOL_USE_STD_MUTEX 166 167 #endif // LIBGAV1_SRC_UTILS_THREADPOOL_H_ 168