• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2019 The libgav1 Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef LIBGAV1_SRC_UTILS_THREADPOOL_H_
18 #define LIBGAV1_SRC_UTILS_THREADPOOL_H_
19 
20 #include <functional>
21 #include <memory>
22 
23 #if defined(__APPLE__)
24 #include <TargetConditionals.h>
25 #endif
26 
27 #if !defined(LIBGAV1_THREADPOOL_USE_STD_MUTEX)
28 #if defined(__ANDROID__) || (defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE)
29 #define LIBGAV1_THREADPOOL_USE_STD_MUTEX 1
30 #else
31 #define LIBGAV1_THREADPOOL_USE_STD_MUTEX 0
32 #endif
33 #endif
34 
35 #if LIBGAV1_THREADPOOL_USE_STD_MUTEX
36 #include <condition_variable>  // NOLINT (unapproved c++11 header)
37 #include <mutex>               // NOLINT (unapproved c++11 header)
38 #else
39 // absl::Mutex & absl::CondVar are significantly faster than the pthread
40 // variants on platforms other than Android. iOS may deadlock on Shutdown()
41 // using absl, see b/142251739.
42 #include "absl/base/thread_annotations.h"
43 #include "absl/synchronization/mutex.h"
44 #endif
45 
46 #include "src/utils/compiler_attributes.h"
47 #include "src/utils/executor.h"
48 #include "src/utils/memory.h"
49 #include "src/utils/unbounded_queue.h"
50 
51 namespace libgav1 {
52 
53 // An implementation of ThreadPool using POSIX threads (pthreads) or Windows
54 // threads.
55 //
56 // - The pool allocates a fixed number of worker threads on instantiation.
57 // - The worker threads will pick up work jobs as they arrive.
58 // - If all workers are busy, work jobs are queued for later execution.
59 //
60 // The thread pool is shut down when the pool is destroyed.
61 //
62 // Example usage of the thread pool:
63 //   {
64 //     std::unique_ptr<ThreadPool> pool = ThreadPool::Create(4);
65 //     for (int i = 0; i < 100; ++i) {  // Dispatch 100 jobs.
66 //       pool->Schedule([&my_data]() { MyFunction(&my_data); });
67 //     }
68 //   } // ThreadPool gets destroyed only when all jobs are done.
69 class ThreadPool : public Executor, public Allocable {
70  public:
71   // Creates the thread pool with the specified number of worker threads.
72   // If num_threads is 1, the closures are run in FIFO order.
73   static std::unique_ptr<ThreadPool> Create(int num_threads);
74 
75   // Like the above factory method, but also sets the name prefix for threads.
76   static std::unique_ptr<ThreadPool> Create(const char name_prefix[],
77                                             int num_threads);
78 
79   // The destructor will shut down the thread pool and all jobs are executed.
80   // Note that after shutdown, the thread pool does not accept further jobs.
81   ~ThreadPool() override;
82 
83   // Adds the specified "closure" to the queue for processing. If worker threads
84   // are available, "closure" will run immediately. Otherwise "closure" is
85   // queued for later execution.
86   //
87   // NOTE: If the internal queue is full and cannot be resized because of an
88   // out-of-memory error, the current thread runs "closure" before returning
89   // from Schedule(). For our use cases, this seems better than the
90   // alternatives:
91   //   1. Return a failure status.
92   //   2. Have the current thread wait until the queue is not full.
93   void Schedule(std::function<void()> closure) override;
94 
95   int num_threads() const;
96 
97  private:
98   class WorkerThread;
99 
100   // Creates the thread pool with the specified number of worker threads.
101   // If num_threads is 1, the closures are run in FIFO order.
102   ThreadPool(const char name_prefix[], std::unique_ptr<WorkerThread*[]> threads,
103              int num_threads);
104 
105   // Starts the worker pool.
106   LIBGAV1_MUST_USE_RESULT bool StartWorkers();
107 
108   void WorkerFunction();
109 
110   // Shuts down the thread pool, i.e. worker threads finish their work and
111   // pick up new jobs until the queue is empty. This call will block until
112   // the shutdown is complete.
113   //
114   // Note: If a worker encounters an empty queue after this call, it will exit.
115   // Other workers might still be running, and if the queue fills up again, the
116   // thread pool will continue to operate with a decreased number of workers.
117   // It is up to the caller to prevent adding new jobs.
118   void Shutdown();
119 
120 #if LIBGAV1_THREADPOOL_USE_STD_MUTEX
121 
LockMutex()122   void LockMutex() { queue_mutex_.lock(); }
UnlockMutex()123   void UnlockMutex() { queue_mutex_.unlock(); }
124 
Wait()125   void Wait() {
126     std::unique_lock<std::mutex> queue_lock(queue_mutex_, std::adopt_lock);
127     condition_.wait(queue_lock);
128     queue_lock.release();
129   }
130 
SignalOne()131   void SignalOne() { condition_.notify_one(); }
SignalAll()132   void SignalAll() { condition_.notify_all(); }
133 
134   std::condition_variable condition_;
135   std::mutex queue_mutex_;
136 
137 #else  // !LIBGAV1_THREADPOOL_USE_STD_MUTEX
138 
LockMutex()139   void LockMutex() ABSL_EXCLUSIVE_LOCK_FUNCTION() { queue_mutex_.Lock(); }
UnlockMutex()140   void UnlockMutex() ABSL_UNLOCK_FUNCTION() { queue_mutex_.Unlock(); }
Wait()141   void Wait() { condition_.Wait(&queue_mutex_); }
SignalOne()142   void SignalOne() { condition_.Signal(); }
SignalAll()143   void SignalAll() { condition_.SignalAll(); }
144 
145   absl::CondVar condition_;
146   absl::Mutex queue_mutex_;
147 
148 #endif  // LIBGAV1_THREADPOOL_USE_STD_MUTEX
149 
150   UnboundedQueue<std::function<void()>> queue_ LIBGAV1_GUARDED_BY(queue_mutex_);
151   // If not all the worker threads are created, the first entry after the
152   // created worker threads is a null pointer.
153   const std::unique_ptr<WorkerThread*[]> threads_;
154 
155   bool exit_threads_ LIBGAV1_GUARDED_BY(queue_mutex_) = false;
156   const int num_threads_ = 0;
157   // name_prefix_ is a C string, whose length is restricted to 16 characters,
158   // including the terminating null byte ('\0'). This restriction comes from
159   // the Linux pthread_setname_np() function.
160   char name_prefix_[16];
161 };
162 
163 }  // namespace libgav1
164 
165 #undef LIBGAV1_THREADPOOL_USE_STD_MUTEX
166 
167 #endif  // LIBGAV1_SRC_UTILS_THREADPOOL_H_
168