• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/threading/worker_pool_posix.h"
6 
7 #include <set>
8 
9 #include "base/bind.h"
10 #include "base/callback.h"
11 #include "base/macros.h"
12 #include "base/synchronization/condition_variable.h"
13 #include "base/synchronization/lock.h"
14 #include "base/synchronization/waitable_event.h"
15 #include "base/threading/platform_thread.h"
16 #include "testing/gtest/include/gtest/gtest.h"
17 
18 namespace base {
19 
20 // Peer class to provide passthrough access to PosixDynamicThreadPool internals.
21 class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
22  public:
PosixDynamicThreadPoolPeer(PosixDynamicThreadPool * pool)23   explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool)
24       : pool_(pool) {}
25 
lock()26   Lock* lock() { return &pool_->lock_; }
pending_tasks_available_cv()27   ConditionVariable* pending_tasks_available_cv() {
28     return &pool_->pending_tasks_available_cv_;
29   }
pending_tasks() const30   const std::queue<PendingTask>& pending_tasks() const {
31     return pool_->pending_tasks_;
32   }
num_idle_threads() const33   int num_idle_threads() const { return pool_->num_idle_threads_; }
num_idle_threads_cv()34   ConditionVariable* num_idle_threads_cv() {
35     return pool_->num_idle_threads_cv_.get();
36   }
set_num_idle_threads_cv(ConditionVariable * cv)37   void set_num_idle_threads_cv(ConditionVariable* cv) {
38     pool_->num_idle_threads_cv_.reset(cv);
39   }
40 
41  private:
42   PosixDynamicThreadPool* pool_;
43 
44   DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer);
45 };
46 
47 namespace {
48 
49 // IncrementingTask's main purpose is to increment a counter.  It also updates a
50 // set of unique thread ids, and signals a ConditionVariable on completion.
51 // Note that since it does not block, there is no way to control the number of
52 // threads used if more than one IncrementingTask is consecutively posted to the
53 // thread pool, since the first one might finish executing before the subsequent
54 // PostTask() calls get invoked.
IncrementingTask(Lock * counter_lock,int * counter,Lock * unique_threads_lock,std::set<PlatformThreadId> * unique_threads)55 void IncrementingTask(Lock* counter_lock,
56                       int* counter,
57                       Lock* unique_threads_lock,
58                       std::set<PlatformThreadId>* unique_threads) {
59   {
60     base::AutoLock locked(*unique_threads_lock);
61     unique_threads->insert(PlatformThread::CurrentId());
62   }
63   base::AutoLock locked(*counter_lock);
64   (*counter)++;
65 }
66 
67 // BlockingIncrementingTask is a simple wrapper around IncrementingTask that
68 // allows for waiting at the start of Run() for a WaitableEvent to be signalled.
69 struct BlockingIncrementingTaskArgs {
70   Lock* counter_lock;
71   int* counter;
72   Lock* unique_threads_lock;
73   std::set<PlatformThreadId>* unique_threads;
74   Lock* num_waiting_to_start_lock;
75   int* num_waiting_to_start;
76   ConditionVariable* num_waiting_to_start_cv;
77   base::WaitableEvent* start;
78 };
79 
BlockingIncrementingTask(const BlockingIncrementingTaskArgs & args)80 void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) {
81   {
82     base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock);
83     (*args.num_waiting_to_start)++;
84   }
85   args.num_waiting_to_start_cv->Signal();
86   args.start->Wait();
87   IncrementingTask(args.counter_lock, args.counter, args.unique_threads_lock,
88                    args.unique_threads);
89 }
90 
91 class PosixDynamicThreadPoolTest : public testing::Test {
92  protected:
PosixDynamicThreadPoolTest()93   PosixDynamicThreadPoolTest()
94       : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60 * 60)),
95         peer_(pool_.get()),
96         counter_(0),
97         num_waiting_to_start_(0),
98         num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
99         start_(WaitableEvent::ResetPolicy::MANUAL,
100                WaitableEvent::InitialState::NOT_SIGNALED) {}
101 
SetUp()102   void SetUp() override {
103     peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
104   }
105 
TearDown()106   void TearDown() override {
107     // Wake up the idle threads so they can terminate.
108     if (pool_.get())
109       pool_->Terminate();
110   }
111 
WaitForTasksToStart(int num_tasks)112   void WaitForTasksToStart(int num_tasks) {
113     base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
114     while (num_waiting_to_start_ < num_tasks) {
115       num_waiting_to_start_cv_.Wait();
116     }
117   }
118 
WaitForIdleThreads(int num_idle_threads)119   void WaitForIdleThreads(int num_idle_threads) {
120     base::AutoLock pool_locked(*peer_.lock());
121     while (peer_.num_idle_threads() < num_idle_threads) {
122       peer_.num_idle_threads_cv()->Wait();
123     }
124   }
125 
CreateNewIncrementingTaskCallback()126   base::Closure CreateNewIncrementingTaskCallback() {
127     return base::Bind(&IncrementingTask, &counter_lock_, &counter_,
128                       &unique_threads_lock_, &unique_threads_);
129   }
130 
CreateNewBlockingIncrementingTaskCallback()131   base::Closure CreateNewBlockingIncrementingTaskCallback() {
132     BlockingIncrementingTaskArgs args = {
133         &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
134         &num_waiting_to_start_lock_, &num_waiting_to_start_,
135         &num_waiting_to_start_cv_, &start_
136     };
137     return base::Bind(&BlockingIncrementingTask, args);
138   }
139 
140   scoped_refptr<base::PosixDynamicThreadPool> pool_;
141   base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_;
142   Lock counter_lock_;
143   int counter_;
144   Lock unique_threads_lock_;
145   std::set<PlatformThreadId> unique_threads_;
146   Lock num_waiting_to_start_lock_;
147   int num_waiting_to_start_;
148   ConditionVariable num_waiting_to_start_cv_;
149   base::WaitableEvent start_;
150 };
151 
152 }  // namespace
153 
TEST_F(PosixDynamicThreadPoolTest,Basic)154 TEST_F(PosixDynamicThreadPoolTest, Basic) {
155   EXPECT_EQ(0, peer_.num_idle_threads());
156   EXPECT_EQ(0U, unique_threads_.size());
157   EXPECT_EQ(0U, peer_.pending_tasks().size());
158 
159   // Add one task and wait for it to be completed.
160   pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
161 
162   WaitForIdleThreads(1);
163 
164   EXPECT_EQ(1U, unique_threads_.size()) <<
165       "There should be only one thread allocated for one task.";
166   EXPECT_EQ(1, counter_);
167 }
168 
TEST_F(PosixDynamicThreadPoolTest,ReuseIdle)169 TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
170   // Add one task and wait for it to be completed.
171   pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
172 
173   WaitForIdleThreads(1);
174 
175   // Add another 2 tasks.  One should reuse the existing worker thread.
176   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
177   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
178 
179   WaitForTasksToStart(2);
180   start_.Signal();
181   WaitForIdleThreads(2);
182 
183   EXPECT_EQ(2U, unique_threads_.size());
184   EXPECT_EQ(2, peer_.num_idle_threads());
185   EXPECT_EQ(3, counter_);
186 }
187 
TEST_F(PosixDynamicThreadPoolTest,TwoActiveTasks)188 TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
189   // Add two blocking tasks.
190   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
191   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
192 
193   EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
194 
195   WaitForTasksToStart(2);
196   start_.Signal();
197   WaitForIdleThreads(2);
198 
199   EXPECT_EQ(2U, unique_threads_.size());
200   EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
201   EXPECT_EQ(2, counter_);
202 }
203 
TEST_F(PosixDynamicThreadPoolTest,Complex)204 TEST_F(PosixDynamicThreadPoolTest, Complex) {
205   // Add two non blocking tasks and wait for them to finish.
206   pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
207 
208   WaitForIdleThreads(1);
209 
210   // Add two blocking tasks, start them simultaneously, and wait for them to
211   // finish.
212   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
213   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
214 
215   WaitForTasksToStart(2);
216   start_.Signal();
217   WaitForIdleThreads(2);
218 
219   EXPECT_EQ(3, counter_);
220   EXPECT_EQ(2, peer_.num_idle_threads());
221   EXPECT_EQ(2U, unique_threads_.size());
222 
223   // Wake up all idle threads so they can exit.
224   {
225     base::AutoLock locked(*peer_.lock());
226     while (peer_.num_idle_threads() > 0) {
227       peer_.pending_tasks_available_cv()->Signal();
228       peer_.num_idle_threads_cv()->Wait();
229     }
230   }
231 
232   // Add another non blocking task.  There are no threads to reuse.
233   pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
234   WaitForIdleThreads(1);
235 
236   // The POSIX implementation of PlatformThread::CurrentId() uses pthread_self()
237   // which is not guaranteed to be unique after a thread joins. The OS X
238   // implemntation of pthread_self() returns the address of the pthread_t, which
239   // is merely a malloc()ed pointer stored in the first TLS slot. When a thread
240   // joins and that structure is freed, the block of memory can be put on the
241   // OS free list, meaning the same address could be reused in a subsequent
242   // allocation. This in fact happens when allocating in a loop as this test
243   // does.
244   //
245   // Because there are two concurrent threads, there's at least the guarantee
246   // of having two unique thread IDs in the set. But after those two threads are
247   // joined, the next-created thread can get a re-used ID if the allocation of
248   // the pthread_t structure is taken from the free list. Therefore, there can
249   // be either 2 or 3 unique thread IDs in the set at this stage in the test.
250   EXPECT_TRUE(unique_threads_.size() >= 2 && unique_threads_.size() <= 3)
251       << "unique_threads_.size() = " << unique_threads_.size();
252   EXPECT_EQ(1, peer_.num_idle_threads());
253   EXPECT_EQ(4, counter_);
254 }
255 
256 }  // namespace base
257