• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "thread_pool.h"
18 
19 #include <string>
20 
21 #include "base/atomic.h"
22 #include "common_runtime_test.h"
23 #include "scoped_thread_state_change-inl.h"
24 #include "thread-inl.h"
25 
26 namespace art HIDDEN {
27 
28 class CountTask : public Task {
29  public:
CountTask(AtomicInteger * count)30   explicit CountTask(AtomicInteger* count) : count_(count), verbose_(false) {}
31 
Run(Thread * self)32   void Run(Thread* self) override {
33     if (verbose_) {
34       LOG(INFO) << "Running: " << *self;
35     }
36     // Simulate doing some work.
37     usleep(100);
38     // Increment the counter which keeps track of work completed.
39     ++*count_;
40   }
41 
Finalize()42   void Finalize() override {
43     if (verbose_) {
44       LOG(INFO) << "Finalizing: " << *Thread::Current();
45     }
46     delete this;
47   }
48 
49  private:
50   AtomicInteger* const count_;
51   const bool verbose_;
52 };
53 
54 class ThreadPoolTest : public CommonRuntimeTest {
55  public:
56   static int32_t num_threads;
57 };
58 
59 int32_t ThreadPoolTest::num_threads = 4;
60 
61 // Check that the thread pool actually runs tasks that you assign it.
TEST_F(ThreadPoolTest,CheckRun)62 TEST_F(ThreadPoolTest, CheckRun) {
63   Thread* self = Thread::Current();
64   std::unique_ptr<ThreadPool> thread_pool(
65       ThreadPool::Create("Thread pool test thread pool", num_threads));
66   AtomicInteger count(0);
67   static const int32_t num_tasks = num_threads * 4;
68   for (int32_t i = 0; i < num_tasks; ++i) {
69     thread_pool->AddTask(self, new CountTask(&count));
70   }
71   thread_pool->StartWorkers(self);
72   // Wait for tasks to complete.
73   thread_pool->Wait(self, true, false);
74   // Make sure that we finished all the work.
75   EXPECT_EQ(num_tasks, count.load(std::memory_order_seq_cst));
76 }
77 
TEST_F(ThreadPoolTest,StopStart)78 TEST_F(ThreadPoolTest, StopStart) {
79   Thread* self = Thread::Current();
80   std::unique_ptr<ThreadPool> thread_pool(
81       ThreadPool::Create("Thread pool test thread pool", num_threads));
82   AtomicInteger count(0);
83   static const int32_t num_tasks = num_threads * 4;
84   for (int32_t i = 0; i < num_tasks; ++i) {
85     thread_pool->AddTask(self, new CountTask(&count));
86   }
87   usleep(200);
88   // Check that no threads started prematurely.
89   EXPECT_EQ(0, count.load(std::memory_order_seq_cst));
90   // Signal the threads to start processing tasks.
91   thread_pool->StartWorkers(self);
92   usleep(200);
93   thread_pool->StopWorkers(self);
94   AtomicInteger bad_count(0);
95   thread_pool->AddTask(self, new CountTask(&bad_count));
96   usleep(200);
97   // Ensure that the task added after the workers were stopped doesn't get run.
98   EXPECT_EQ(0, bad_count.load(std::memory_order_seq_cst));
99   // Allow tasks to finish up and delete themselves.
100   thread_pool->StartWorkers(self);
101   thread_pool->Wait(self, false, false);
102 }
103 
TEST_F(ThreadPoolTest,StopWait)104 TEST_F(ThreadPoolTest, StopWait) {
105   Thread* self = Thread::Current();
106   std::unique_ptr<ThreadPool> thread_pool(
107       ThreadPool::Create("Thread pool test thread pool", num_threads));
108 
109   AtomicInteger count(0);
110   static const int32_t num_tasks = num_threads * 100;
111   for (int32_t i = 0; i < num_tasks; ++i) {
112     thread_pool->AddTask(self, new CountTask(&count));
113   }
114 
115   // Signal the threads to start processing tasks.
116   thread_pool->StartWorkers(self);
117   usleep(200);
118   thread_pool->StopWorkers(self);
119 
120   thread_pool->Wait(self, false, false);  // We should not deadlock here.
121 
122   // Drain the task list. Note: we have to restart here, as no tasks will be finished when
123   // the pool is stopped.
124   thread_pool->StartWorkers(self);
125   thread_pool->Wait(self, /* do_work= */ true, false);
126 }
127 
128 class TreeTask : public Task {
129  public:
TreeTask(ThreadPool * const thread_pool,AtomicInteger * count,int depth)130   TreeTask(ThreadPool* const thread_pool, AtomicInteger* count, int depth)
131       : thread_pool_(thread_pool),
132         count_(count),
133         depth_(depth) {}
134 
Run(Thread * self)135   void Run(Thread* self) override {
136     if (depth_ > 1) {
137       thread_pool_->AddTask(self, new TreeTask(thread_pool_, count_, depth_ - 1));
138       thread_pool_->AddTask(self, new TreeTask(thread_pool_, count_, depth_ - 1));
139     }
140     // Increment the counter which keeps track of work completed.
141     ++*count_;
142   }
143 
Finalize()144   void Finalize() override {
145     delete this;
146   }
147 
148  private:
149   ThreadPool* const thread_pool_;
150   AtomicInteger* const count_;
151   const int depth_;
152 };
153 
154 // Test that adding new tasks from within a task works.
TEST_F(ThreadPoolTest,RecursiveTest)155 TEST_F(ThreadPoolTest, RecursiveTest) {
156   Thread* self = Thread::Current();
157   std::unique_ptr<ThreadPool> thread_pool(
158       ThreadPool::Create("Thread pool test thread pool", num_threads));
159   AtomicInteger count(0);
160   static const int depth = 8;
161   thread_pool->AddTask(self, new TreeTask(thread_pool.get(), &count, depth));
162   thread_pool->StartWorkers(self);
163   thread_pool->Wait(self, true, false);
164   EXPECT_EQ((1 << depth) - 1, count.load(std::memory_order_seq_cst));
165 }
166 
167 class PeerTask : public Task {
168  public:
PeerTask()169   PeerTask() {}
170 
Run(Thread * self)171   void Run(Thread* self) override {
172     ScopedObjectAccess soa(self);
173     CHECK(self->GetPeer() != nullptr);
174   }
175 
Finalize()176   void Finalize() override {
177     delete this;
178   }
179 };
180 
181 class NoPeerTask : public Task {
182  public:
NoPeerTask()183   NoPeerTask() {}
184 
Run(Thread * self)185   void Run(Thread* self) override {
186     ScopedObjectAccess soa(self);
187     CHECK(self->GetPeer() == nullptr);
188   }
189 
Finalize()190   void Finalize() override {
191     delete this;
192   }
193 };
194 
195 // Tests for create_peer functionality.
TEST_F(ThreadPoolTest,PeerTest)196 TEST_F(ThreadPoolTest, PeerTest) {
197   Thread* self = Thread::Current();
198   {
199     std::unique_ptr<ThreadPool> thread_pool(
200         ThreadPool::Create("Thread pool test thread pool", 1));
201     thread_pool->AddTask(self, new NoPeerTask());
202     thread_pool->StartWorkers(self);
203     thread_pool->Wait(self, false, false);
204   }
205 
206   {
207     // To create peers, the runtime needs to be started.
208     self->TransitionFromSuspendedToRunnable();
209     bool started = runtime_->Start();
210     ASSERT_TRUE(started);
211 
212     std::unique_ptr<ThreadPool> thread_pool(
213         ThreadPool::Create("Thread pool test thread pool", 1, true));
214     thread_pool->AddTask(self, new PeerTask());
215     thread_pool->StartWorkers(self);
216     thread_pool->Wait(self, false, false);
217   }
218 }
219 
220 }  // namespace art
221