• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The libgav1 Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/utils/threadpool.h"
16 
17 #if defined(_MSC_VER)
18 #include <process.h>
19 #include <windows.h>
20 #else  // defined(_MSC_VER)
21 #include <pthread.h>
22 #endif  // defined(_MSC_VER)
23 #if defined(__ANDROID__) || defined(__GLIBC__)
24 #include <sys/types.h>
25 #include <unistd.h>
26 #endif
27 #include <algorithm>
28 #include <cassert>
29 #include <cinttypes>
30 #include <cstddef>
31 #include <cstdint>
32 #include <cstdio>
33 #include <cstring>
34 #include <functional>
35 #include <memory>
36 #include <new>
37 #include <utility>
38 
39 #if defined(__ANDROID__)
40 #include <chrono>  // NOLINT (unapproved c++11 header)
41 #endif
42 
43 // Define the GetTid() function, a wrapper for the gettid() system call in
44 // Linux.
45 #if defined(__ANDROID__)
GetTid()46 static pid_t GetTid() { return gettid(); }
47 #elif defined(__GLIBC__)
48 // The glibc wrapper for the gettid() system call was added in glibc 2.30.
49 // Emulate it for older versions of glibc.
50 #if __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 30)
GetTid()51 static pid_t GetTid() { return gettid(); }
52 #else  // Older than glibc 2.30
53 #include <sys/syscall.h>
54 
GetTid()55 static pid_t GetTid() { return static_cast<pid_t>(syscall(SYS_gettid)); }
56 #endif  // glibc 2.30 or later.
57 #endif  // defined(__GLIBC__)
58 
59 namespace libgav1 {
60 
61 #if defined(__ANDROID__)
62 namespace {
63 
64 using Clock = std::chrono::steady_clock;
65 using Duration = Clock::duration;
66 constexpr Duration kBusyWaitDuration =
67     std::chrono::duration_cast<Duration>(std::chrono::duration<double>(2e-3));
68 
69 }  // namespace
70 #endif  // defined(__ANDROID__)
71 
72 // static
Create(int num_threads)73 std::unique_ptr<ThreadPool> ThreadPool::Create(int num_threads) {
74   return Create(/*name_prefix=*/"", num_threads);
75 }
76 
77 // static
Create(const char name_prefix[],int num_threads)78 std::unique_ptr<ThreadPool> ThreadPool::Create(const char name_prefix[],
79                                                int num_threads) {
80   if (name_prefix == nullptr || num_threads <= 0) return nullptr;
81   std::unique_ptr<WorkerThread*[]> threads(new (std::nothrow)
82                                                WorkerThread*[num_threads]);
83   if (threads == nullptr) return nullptr;
84   std::unique_ptr<ThreadPool> pool(new (std::nothrow) ThreadPool(
85       name_prefix, std::move(threads), num_threads));
86   if (pool != nullptr && !pool->StartWorkers()) {
87     pool = nullptr;
88   }
89   return pool;
90 }
91 
ThreadPool(const char name_prefix[],std::unique_ptr<WorkerThread * []> threads,int num_threads)92 ThreadPool::ThreadPool(const char name_prefix[],
93                        std::unique_ptr<WorkerThread*[]> threads,
94                        int num_threads)
95     : threads_(std::move(threads)), num_threads_(num_threads) {
96   threads_[0] = nullptr;
97   assert(name_prefix != nullptr);
98   const size_t name_prefix_len =
99       std::min(strlen(name_prefix), sizeof(name_prefix_) - 1);
100   memcpy(name_prefix_, name_prefix, name_prefix_len);
101   name_prefix_[name_prefix_len] = '\0';
102 }
103 
~ThreadPool()104 ThreadPool::~ThreadPool() { Shutdown(); }
105 
Schedule(std::function<void ()> closure)106 void ThreadPool::Schedule(std::function<void()> closure) {
107   LockMutex();
108   if (!queue_.GrowIfNeeded()) {
109     // queue_ is full and we can't grow it. Run |closure| directly.
110     UnlockMutex();
111     closure();
112     return;
113   }
114   queue_.Push(std::move(closure));
115   UnlockMutex();
116   SignalOne();
117 }
118 
num_threads() const119 int ThreadPool::num_threads() const { return num_threads_; }
120 
121 // A simple implementation that mirrors the non-portable Thread.  We may
122 // choose to expand this in the future as a portable implementation of
123 // Thread, or replace it at such a time as one is implemented.
124 class ThreadPool::WorkerThread : public Allocable {
125  public:
126   // Creates and starts a thread that runs pool->WorkerFunction().
127   explicit WorkerThread(ThreadPool* pool);
128 
129   // Not copyable or movable.
130   WorkerThread(const WorkerThread&) = delete;
131   WorkerThread& operator=(const WorkerThread&) = delete;
132 
133   // REQUIRES: Join() must have been called if Start() was called and
134   // succeeded.
135   ~WorkerThread() = default;
136 
137   LIBGAV1_MUST_USE_RESULT bool Start();
138 
139   // Joins with the running thread.
140   void Join();
141 
142  private:
143 #if defined(_MSC_VER)
144   static unsigned int __stdcall ThreadBody(void* arg);
145 #else
146   static void* ThreadBody(void* arg);
147 #endif
148 
149   void SetupName();
150   void Run();
151 
152   ThreadPool* pool_;
153 #if defined(_MSC_VER)
154   HANDLE handle_;
155 #else
156   pthread_t thread_;
157 #endif
158 };
159 
WorkerThread(ThreadPool * pool)160 ThreadPool::WorkerThread::WorkerThread(ThreadPool* pool) : pool_(pool) {}
161 
162 #if defined(_MSC_VER)
163 
Start()164 bool ThreadPool::WorkerThread::Start() {
165   // Since our code calls the C run-time library (CRT), use _beginthreadex
166   // rather than CreateThread. Microsoft documentation says "If a thread
167   // created using CreateThread calls the CRT, the CRT may terminate the
168   // process in low-memory conditions."
169   uintptr_t handle = _beginthreadex(
170       /*security=*/nullptr, /*stack_size=*/0, ThreadBody, this,
171       /*initflag=*/CREATE_SUSPENDED, /*thrdaddr=*/nullptr);
172   if (handle == 0) return false;
173   handle_ = reinterpret_cast<HANDLE>(handle);
174   ResumeThread(handle_);
175   return true;
176 }
177 
Join()178 void ThreadPool::WorkerThread::Join() {
179   WaitForSingleObject(handle_, INFINITE);
180   CloseHandle(handle_);
181 }
182 
ThreadBody(void * arg)183 unsigned int ThreadPool::WorkerThread::ThreadBody(void* arg) {
184   auto* thread = static_cast<WorkerThread*>(arg);
185   thread->Run();
186   return 0;
187 }
188 
SetupName()189 void ThreadPool::WorkerThread::SetupName() {
190   // Not currently supported on Windows.
191 }
192 
193 #else  // defined(_MSC_VER)
194 
Start()195 bool ThreadPool::WorkerThread::Start() {
196   return pthread_create(&thread_, nullptr, ThreadBody, this) == 0;
197 }
198 
Join()199 void ThreadPool::WorkerThread::Join() { pthread_join(thread_, nullptr); }
200 
ThreadBody(void * arg)201 void* ThreadPool::WorkerThread::ThreadBody(void* arg) {
202   auto* thread = static_cast<WorkerThread*>(arg);
203   thread->Run();
204   return nullptr;
205 }
206 
SetupName()207 void ThreadPool::WorkerThread::SetupName() {
208   if (pool_->name_prefix_[0] != '\0') {
209 #if defined(__APPLE__)
210     // Apple's version of pthread_setname_np takes one argument and operates on
211     // the current thread only. Also, pthread_mach_thread_np is Apple-specific.
212     // The maximum size of the |name| buffer was noted in the Chromium source
213     // code and was confirmed by experiments.
214     char name[64];
215     mach_port_t id = pthread_mach_thread_np(pthread_self());
216     int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_,
217                       static_cast<int64_t>(id));
218     assert(rv >= 0);
219     rv = pthread_setname_np(name);
220     assert(rv == 0);
221     static_cast<void>(rv);
222 #elif defined(__ANDROID__) || (defined(__GLIBC__) && !defined(__GNU__))
223     // If the |name| buffer is longer than 16 bytes, pthread_setname_np fails
224     // with error 34 (ERANGE) on Android.
225     char name[16];
226     pid_t id = GetTid();
227     int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_,
228                       static_cast<int64_t>(id));
229     assert(rv >= 0);
230     rv = pthread_setname_np(pthread_self(), name);
231     assert(rv == 0);
232     static_cast<void>(rv);
233 #endif
234   }
235 }
236 
237 #endif  // defined(_MSC_VER)
238 
Run()239 void ThreadPool::WorkerThread::Run() {
240   SetupName();
241   pool_->WorkerFunction();
242 }
243 
StartWorkers()244 bool ThreadPool::StartWorkers() {
245   if (!queue_.Init()) return false;
246   for (int i = 0; i < num_threads_; ++i) {
247     threads_[i] = new (std::nothrow) WorkerThread(this);
248     if (threads_[i] == nullptr) return false;
249     if (!threads_[i]->Start()) {
250       delete threads_[i];
251       threads_[i] = nullptr;
252       return false;
253     }
254   }
255   return true;
256 }
257 
WorkerFunction()258 void ThreadPool::WorkerFunction() {
259   LockMutex();
260   while (true) {
261     if (queue_.Empty()) {
262       if (exit_threads_) {
263         break;  // Queue is empty and exit was requested.
264       }
265 #if defined(__ANDROID__)
266       // On android, if we go to a conditional wait right away, the CPU governor
267       // kicks in and starts shutting the cores down. So we do a very small busy
268       // wait to see if we get our next job within that period. This
269       // significantly improves the performance of common cases of tile parallel
270       // decoding. If we don't receive a job in the busy wait time, we then go
271       // to an actual conditional wait as usual.
272       UnlockMutex();
273       bool found_job = false;
274       const auto wait_start = Clock::now();
275       while (Clock::now() - wait_start < kBusyWaitDuration) {
276         LockMutex();
277         if (!queue_.Empty()) {
278           found_job = true;
279           break;
280         }
281         UnlockMutex();
282       }
283       // If |found_job| is true, we simply continue since we already hold the
284       // mutex and we know for sure that the |queue_| is not empty.
285       if (found_job) continue;
286       // Since |found_job_| was false, the mutex is not being held at this
287       // point.
288       LockMutex();
289       // Ensure that the queue is still empty.
290       if (!queue_.Empty()) continue;
291       if (exit_threads_) {
292         break;  // Queue is empty and exit was requested.
293       }
294 #endif  // defined(__ANDROID__)
295       // Queue is still empty, wait for signal or broadcast.
296       Wait();
297     } else {
298       // Take a job from the queue.
299       std::function<void()> job = std::move(queue_.Front());
300       queue_.Pop();
301 
302       UnlockMutex();
303       // Note that it is good practice to surround this with a try/catch so
304       // the thread pool doesn't go to hell if the job throws an exception.
305       // This is omitted here because Google3 doesn't like exceptions.
306       std::move(job)();
307       job = nullptr;
308 
309       LockMutex();
310     }
311   }
312   UnlockMutex();
313 }
314 
Shutdown()315 void ThreadPool::Shutdown() {
316   // Tell worker threads how to exit.
317   LockMutex();
318   exit_threads_ = true;
319   UnlockMutex();
320   SignalAll();
321 
322   // Join all workers. This will block.
323   for (int i = 0; i < num_threads_; ++i) {
324     if (threads_[i] == nullptr) break;
325     threads_[i]->Join();
326     delete threads_[i];
327   }
328 }
329 
330 }  // namespace libgav1
331