1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/threading/worker_pool_posix.h"
6
7 #include <set>
8
9 #include "base/bind.h"
10 #include "base/callback.h"
11 #include "base/macros.h"
12 #include "base/synchronization/condition_variable.h"
13 #include "base/synchronization/lock.h"
14 #include "base/synchronization/waitable_event.h"
15 #include "base/threading/platform_thread.h"
16 #include "testing/gtest/include/gtest/gtest.h"
17
18 namespace base {
19
20 // Peer class to provide passthrough access to PosixDynamicThreadPool internals.
21 class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
22 public:
PosixDynamicThreadPoolPeer(PosixDynamicThreadPool * pool)23 explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool)
24 : pool_(pool) {}
25
lock()26 Lock* lock() { return &pool_->lock_; }
pending_tasks_available_cv()27 ConditionVariable* pending_tasks_available_cv() {
28 return &pool_->pending_tasks_available_cv_;
29 }
pending_tasks() const30 const std::queue<PendingTask>& pending_tasks() const {
31 return pool_->pending_tasks_;
32 }
num_idle_threads() const33 int num_idle_threads() const { return pool_->num_idle_threads_; }
num_idle_threads_cv()34 ConditionVariable* num_idle_threads_cv() {
35 return pool_->num_idle_threads_cv_.get();
36 }
set_num_idle_threads_cv(ConditionVariable * cv)37 void set_num_idle_threads_cv(ConditionVariable* cv) {
38 pool_->num_idle_threads_cv_.reset(cv);
39 }
40
41 private:
42 PosixDynamicThreadPool* pool_;
43
44 DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer);
45 };
46
47 namespace {
48
49 // IncrementingTask's main purpose is to increment a counter. It also updates a
50 // set of unique thread ids, and signals a ConditionVariable on completion.
51 // Note that since it does not block, there is no way to control the number of
52 // threads used if more than one IncrementingTask is consecutively posted to the
53 // thread pool, since the first one might finish executing before the subsequent
54 // PostTask() calls get invoked.
IncrementingTask(Lock * counter_lock,int * counter,Lock * unique_threads_lock,std::set<PlatformThreadId> * unique_threads)55 void IncrementingTask(Lock* counter_lock,
56 int* counter,
57 Lock* unique_threads_lock,
58 std::set<PlatformThreadId>* unique_threads) {
59 {
60 base::AutoLock locked(*unique_threads_lock);
61 unique_threads->insert(PlatformThread::CurrentId());
62 }
63 base::AutoLock locked(*counter_lock);
64 (*counter)++;
65 }
66
67 // BlockingIncrementingTask is a simple wrapper around IncrementingTask that
68 // allows for waiting at the start of Run() for a WaitableEvent to be signalled.
69 struct BlockingIncrementingTaskArgs {
70 Lock* counter_lock;
71 int* counter;
72 Lock* unique_threads_lock;
73 std::set<PlatformThreadId>* unique_threads;
74 Lock* num_waiting_to_start_lock;
75 int* num_waiting_to_start;
76 ConditionVariable* num_waiting_to_start_cv;
77 base::WaitableEvent* start;
78 };
79
BlockingIncrementingTask(const BlockingIncrementingTaskArgs & args)80 void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) {
81 {
82 base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock);
83 (*args.num_waiting_to_start)++;
84 }
85 args.num_waiting_to_start_cv->Signal();
86 args.start->Wait();
87 IncrementingTask(args.counter_lock, args.counter, args.unique_threads_lock,
88 args.unique_threads);
89 }
90
91 class PosixDynamicThreadPoolTest : public testing::Test {
92 protected:
PosixDynamicThreadPoolTest()93 PosixDynamicThreadPoolTest()
94 : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60 * 60)),
95 peer_(pool_.get()),
96 counter_(0),
97 num_waiting_to_start_(0),
98 num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
99 start_(WaitableEvent::ResetPolicy::MANUAL,
100 WaitableEvent::InitialState::NOT_SIGNALED) {}
101
SetUp()102 void SetUp() override {
103 peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
104 }
105
TearDown()106 void TearDown() override {
107 // Wake up the idle threads so they can terminate.
108 if (pool_.get())
109 pool_->Terminate();
110 }
111
WaitForTasksToStart(int num_tasks)112 void WaitForTasksToStart(int num_tasks) {
113 base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
114 while (num_waiting_to_start_ < num_tasks) {
115 num_waiting_to_start_cv_.Wait();
116 }
117 }
118
WaitForIdleThreads(int num_idle_threads)119 void WaitForIdleThreads(int num_idle_threads) {
120 base::AutoLock pool_locked(*peer_.lock());
121 while (peer_.num_idle_threads() < num_idle_threads) {
122 peer_.num_idle_threads_cv()->Wait();
123 }
124 }
125
CreateNewIncrementingTaskCallback()126 base::Closure CreateNewIncrementingTaskCallback() {
127 return base::Bind(&IncrementingTask, &counter_lock_, &counter_,
128 &unique_threads_lock_, &unique_threads_);
129 }
130
CreateNewBlockingIncrementingTaskCallback()131 base::Closure CreateNewBlockingIncrementingTaskCallback() {
132 BlockingIncrementingTaskArgs args = {
133 &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
134 &num_waiting_to_start_lock_, &num_waiting_to_start_,
135 &num_waiting_to_start_cv_, &start_
136 };
137 return base::Bind(&BlockingIncrementingTask, args);
138 }
139
140 scoped_refptr<base::PosixDynamicThreadPool> pool_;
141 base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_;
142 Lock counter_lock_;
143 int counter_;
144 Lock unique_threads_lock_;
145 std::set<PlatformThreadId> unique_threads_;
146 Lock num_waiting_to_start_lock_;
147 int num_waiting_to_start_;
148 ConditionVariable num_waiting_to_start_cv_;
149 base::WaitableEvent start_;
150 };
151
152 } // namespace
153
TEST_F(PosixDynamicThreadPoolTest,Basic)154 TEST_F(PosixDynamicThreadPoolTest, Basic) {
155 EXPECT_EQ(0, peer_.num_idle_threads());
156 EXPECT_EQ(0U, unique_threads_.size());
157 EXPECT_EQ(0U, peer_.pending_tasks().size());
158
159 // Add one task and wait for it to be completed.
160 pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
161
162 WaitForIdleThreads(1);
163
164 EXPECT_EQ(1U, unique_threads_.size()) <<
165 "There should be only one thread allocated for one task.";
166 EXPECT_EQ(1, counter_);
167 }
168
TEST_F(PosixDynamicThreadPoolTest,ReuseIdle)169 TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
170 // Add one task and wait for it to be completed.
171 pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
172
173 WaitForIdleThreads(1);
174
175 // Add another 2 tasks. One should reuse the existing worker thread.
176 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
177 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
178
179 WaitForTasksToStart(2);
180 start_.Signal();
181 WaitForIdleThreads(2);
182
183 EXPECT_EQ(2U, unique_threads_.size());
184 EXPECT_EQ(2, peer_.num_idle_threads());
185 EXPECT_EQ(3, counter_);
186 }
187
TEST_F(PosixDynamicThreadPoolTest,TwoActiveTasks)188 TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
189 // Add two blocking tasks.
190 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
191 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
192
193 EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
194
195 WaitForTasksToStart(2);
196 start_.Signal();
197 WaitForIdleThreads(2);
198
199 EXPECT_EQ(2U, unique_threads_.size());
200 EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
201 EXPECT_EQ(2, counter_);
202 }
203
TEST_F(PosixDynamicThreadPoolTest,Complex)204 TEST_F(PosixDynamicThreadPoolTest, Complex) {
205 // Add two non blocking tasks and wait for them to finish.
206 pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
207
208 WaitForIdleThreads(1);
209
210 // Add two blocking tasks, start them simultaneously, and wait for them to
211 // finish.
212 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
213 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
214
215 WaitForTasksToStart(2);
216 start_.Signal();
217 WaitForIdleThreads(2);
218
219 EXPECT_EQ(3, counter_);
220 EXPECT_EQ(2, peer_.num_idle_threads());
221 EXPECT_EQ(2U, unique_threads_.size());
222
223 // Wake up all idle threads so they can exit.
224 {
225 base::AutoLock locked(*peer_.lock());
226 while (peer_.num_idle_threads() > 0) {
227 peer_.pending_tasks_available_cv()->Signal();
228 peer_.num_idle_threads_cv()->Wait();
229 }
230 }
231
232 // Add another non blocking task. There are no threads to reuse.
233 pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
234 WaitForIdleThreads(1);
235
236 // The POSIX implementation of PlatformThread::CurrentId() uses pthread_self()
237 // which is not guaranteed to be unique after a thread joins. The OS X
238 // implemntation of pthread_self() returns the address of the pthread_t, which
239 // is merely a malloc()ed pointer stored in the first TLS slot. When a thread
240 // joins and that structure is freed, the block of memory can be put on the
241 // OS free list, meaning the same address could be reused in a subsequent
242 // allocation. This in fact happens when allocating in a loop as this test
243 // does.
244 //
245 // Because there are two concurrent threads, there's at least the guarantee
246 // of having two unique thread IDs in the set. But after those two threads are
247 // joined, the next-created thread can get a re-used ID if the allocation of
248 // the pthread_t structure is taken from the free list. Therefore, there can
249 // be either 2 or 3 unique thread IDs in the set at this stage in the test.
250 EXPECT_TRUE(unique_threads_.size() >= 2 && unique_threads_.size() <= 3)
251 << "unique_threads_.size() = " << unique_threads_.size();
252 EXPECT_EQ(1, peer_.num_idle_threads());
253 EXPECT_EQ(4, counter_);
254 }
255
256 } // namespace base
257