1 /*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "thread_pool.h"
18
19 #include <string>
20
21 #include "base/atomic.h"
22 #include "common_runtime_test.h"
23 #include "scoped_thread_state_change-inl.h"
24 #include "thread-inl.h"
25
26 namespace art HIDDEN {
27
28 class CountTask : public Task {
29 public:
CountTask(AtomicInteger * count)30 explicit CountTask(AtomicInteger* count) : count_(count), verbose_(false) {}
31
Run(Thread * self)32 void Run(Thread* self) override {
33 if (verbose_) {
34 LOG(INFO) << "Running: " << *self;
35 }
36 // Simulate doing some work.
37 usleep(100);
38 // Increment the counter which keeps track of work completed.
39 ++*count_;
40 }
41
Finalize()42 void Finalize() override {
43 if (verbose_) {
44 LOG(INFO) << "Finalizing: " << *Thread::Current();
45 }
46 delete this;
47 }
48
49 private:
50 AtomicInteger* const count_;
51 const bool verbose_;
52 };
53
54 class ThreadPoolTest : public CommonRuntimeTest {
55 public:
56 static int32_t num_threads;
57 };
58
59 int32_t ThreadPoolTest::num_threads = 4;
60
61 // Check that the thread pool actually runs tasks that you assign it.
TEST_F(ThreadPoolTest,CheckRun)62 TEST_F(ThreadPoolTest, CheckRun) {
63 Thread* self = Thread::Current();
64 std::unique_ptr<ThreadPool> thread_pool(
65 ThreadPool::Create("Thread pool test thread pool", num_threads));
66 AtomicInteger count(0);
67 static const int32_t num_tasks = num_threads * 4;
68 for (int32_t i = 0; i < num_tasks; ++i) {
69 thread_pool->AddTask(self, new CountTask(&count));
70 }
71 thread_pool->StartWorkers(self);
72 // Wait for tasks to complete.
73 thread_pool->Wait(self, true, false);
74 // Make sure that we finished all the work.
75 EXPECT_EQ(num_tasks, count.load(std::memory_order_seq_cst));
76 }
77
TEST_F(ThreadPoolTest,StopStart)78 TEST_F(ThreadPoolTest, StopStart) {
79 Thread* self = Thread::Current();
80 std::unique_ptr<ThreadPool> thread_pool(
81 ThreadPool::Create("Thread pool test thread pool", num_threads));
82 AtomicInteger count(0);
83 static const int32_t num_tasks = num_threads * 4;
84 for (int32_t i = 0; i < num_tasks; ++i) {
85 thread_pool->AddTask(self, new CountTask(&count));
86 }
87 usleep(200);
88 // Check that no threads started prematurely.
89 EXPECT_EQ(0, count.load(std::memory_order_seq_cst));
90 // Signal the threads to start processing tasks.
91 thread_pool->StartWorkers(self);
92 usleep(200);
93 thread_pool->StopWorkers(self);
94 AtomicInteger bad_count(0);
95 thread_pool->AddTask(self, new CountTask(&bad_count));
96 usleep(200);
97 // Ensure that the task added after the workers were stopped doesn't get run.
98 EXPECT_EQ(0, bad_count.load(std::memory_order_seq_cst));
99 // Allow tasks to finish up and delete themselves.
100 thread_pool->StartWorkers(self);
101 thread_pool->Wait(self, false, false);
102 }
103
TEST_F(ThreadPoolTest,StopWait)104 TEST_F(ThreadPoolTest, StopWait) {
105 Thread* self = Thread::Current();
106 std::unique_ptr<ThreadPool> thread_pool(
107 ThreadPool::Create("Thread pool test thread pool", num_threads));
108
109 AtomicInteger count(0);
110 static const int32_t num_tasks = num_threads * 100;
111 for (int32_t i = 0; i < num_tasks; ++i) {
112 thread_pool->AddTask(self, new CountTask(&count));
113 }
114
115 // Signal the threads to start processing tasks.
116 thread_pool->StartWorkers(self);
117 usleep(200);
118 thread_pool->StopWorkers(self);
119
120 thread_pool->Wait(self, false, false); // We should not deadlock here.
121
122 // Drain the task list. Note: we have to restart here, as no tasks will be finished when
123 // the pool is stopped.
124 thread_pool->StartWorkers(self);
125 thread_pool->Wait(self, /* do_work= */ true, false);
126 }
127
128 class TreeTask : public Task {
129 public:
TreeTask(ThreadPool * const thread_pool,AtomicInteger * count,int depth)130 TreeTask(ThreadPool* const thread_pool, AtomicInteger* count, int depth)
131 : thread_pool_(thread_pool),
132 count_(count),
133 depth_(depth) {}
134
Run(Thread * self)135 void Run(Thread* self) override {
136 if (depth_ > 1) {
137 thread_pool_->AddTask(self, new TreeTask(thread_pool_, count_, depth_ - 1));
138 thread_pool_->AddTask(self, new TreeTask(thread_pool_, count_, depth_ - 1));
139 }
140 // Increment the counter which keeps track of work completed.
141 ++*count_;
142 }
143
Finalize()144 void Finalize() override {
145 delete this;
146 }
147
148 private:
149 ThreadPool* const thread_pool_;
150 AtomicInteger* const count_;
151 const int depth_;
152 };
153
154 // Test that adding new tasks from within a task works.
TEST_F(ThreadPoolTest,RecursiveTest)155 TEST_F(ThreadPoolTest, RecursiveTest) {
156 Thread* self = Thread::Current();
157 std::unique_ptr<ThreadPool> thread_pool(
158 ThreadPool::Create("Thread pool test thread pool", num_threads));
159 AtomicInteger count(0);
160 static const int depth = 8;
161 thread_pool->AddTask(self, new TreeTask(thread_pool.get(), &count, depth));
162 thread_pool->StartWorkers(self);
163 thread_pool->Wait(self, true, false);
164 EXPECT_EQ((1 << depth) - 1, count.load(std::memory_order_seq_cst));
165 }
166
167 class PeerTask : public Task {
168 public:
PeerTask()169 PeerTask() {}
170
Run(Thread * self)171 void Run(Thread* self) override {
172 ScopedObjectAccess soa(self);
173 CHECK(self->GetPeer() != nullptr);
174 }
175
Finalize()176 void Finalize() override {
177 delete this;
178 }
179 };
180
181 class NoPeerTask : public Task {
182 public:
NoPeerTask()183 NoPeerTask() {}
184
Run(Thread * self)185 void Run(Thread* self) override {
186 ScopedObjectAccess soa(self);
187 CHECK(self->GetPeer() == nullptr);
188 }
189
Finalize()190 void Finalize() override {
191 delete this;
192 }
193 };
194
195 // Tests for create_peer functionality.
TEST_F(ThreadPoolTest,PeerTest)196 TEST_F(ThreadPoolTest, PeerTest) {
197 Thread* self = Thread::Current();
198 {
199 std::unique_ptr<ThreadPool> thread_pool(
200 ThreadPool::Create("Thread pool test thread pool", 1));
201 thread_pool->AddTask(self, new NoPeerTask());
202 thread_pool->StartWorkers(self);
203 thread_pool->Wait(self, false, false);
204 }
205
206 {
207 // To create peers, the runtime needs to be started.
208 self->TransitionFromSuspendedToRunnable();
209 bool started = runtime_->Start();
210 ASSERT_TRUE(started);
211
212 std::unique_ptr<ThreadPool> thread_pool(
213 ThreadPool::Create("Thread pool test thread pool", 1, true));
214 thread_pool->AddTask(self, new PeerTask());
215 thread_pool->StartWorkers(self);
216 thread_pool->Wait(self, false, false);
217 }
218 }
219
220 } // namespace art
221