1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <memory>
6 #include <vector>
7
8 #include "base/atomicops.h"
9 #include "base/bind.h"
10 #include "base/callback.h"
11 #include "base/macros.h"
12 #include "base/memory/ptr_util.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/run_loop.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/synchronization/atomic_flag.h"
17 #include "base/synchronization/waitable_event.h"
18 #include "base/threading/platform_thread.h"
19 #include "base/threading/sequenced_task_runner_handle.h"
20 #include "base/time/time.h"
21 #include "testing/gtest/include/gtest/gtest.h"
22 #include "testing/perf/perf_test.h"
23
24 namespace base {
25
26 namespace {
27
28 // A thread that waits for the caller to signal an event before proceeding to
29 // call Action::Run().
30 class PostingThread {
31 public:
32 class Action {
33 public:
34 virtual ~Action() = default;
35
36 // Called after the thread is started and |start_event_| is signalled.
37 virtual void Run() = 0;
38
39 protected:
40 Action() = default;
41
42 private:
43 DISALLOW_COPY_AND_ASSIGN(Action);
44 };
45
46 // Creates a PostingThread where the thread waits on |start_event| before
47 // calling action->Run(). If a thread is returned, the thread is guaranteed to
48 // be allocated and running and the caller must call Join() before destroying
49 // the PostingThread.
Create(WaitableEvent * start_event,std::unique_ptr<Action> action)50 static std::unique_ptr<PostingThread> Create(WaitableEvent* start_event,
51 std::unique_ptr<Action> action) {
52 auto posting_thread =
53 WrapUnique(new PostingThread(start_event, std::move(action)));
54
55 if (!posting_thread->Start())
56 return nullptr;
57
58 return posting_thread;
59 }
60
~PostingThread()61 ~PostingThread() { DCHECK_EQ(!thread_handle_.is_null(), join_called_); }
62
Join()63 void Join() {
64 PlatformThread::Join(thread_handle_);
65 join_called_ = true;
66 }
67
68 private:
69 class Delegate final : public PlatformThread::Delegate {
70 public:
Delegate(PostingThread * outer,std::unique_ptr<Action> action)71 Delegate(PostingThread* outer, std::unique_ptr<Action> action)
72 : outer_(outer), action_(std::move(action)) {
73 DCHECK(outer_);
74 DCHECK(action_);
75 }
76
77 ~Delegate() override = default;
78
79 private:
ThreadMain()80 void ThreadMain() override {
81 outer_->thread_started_.Signal();
82 outer_->start_event_->Wait();
83 action_->Run();
84 }
85
86 PostingThread* const outer_;
87 const std::unique_ptr<Action> action_;
88
89 DISALLOW_COPY_AND_ASSIGN(Delegate);
90 };
91
PostingThread(WaitableEvent * start_event,std::unique_ptr<Action> delegate)92 PostingThread(WaitableEvent* start_event, std::unique_ptr<Action> delegate)
93 : start_event_(start_event),
94 thread_started_(WaitableEvent::ResetPolicy::MANUAL,
95 WaitableEvent::InitialState::NOT_SIGNALED),
96 delegate_(this, std::move(delegate)) {
97 DCHECK(start_event_);
98 }
99
Start()100 bool Start() {
101 bool thread_created =
102 PlatformThread::Create(0, &delegate_, &thread_handle_);
103 if (thread_created)
104 thread_started_.Wait();
105
106 return thread_created;
107 }
108
109 bool join_called_ = false;
110 WaitableEvent* const start_event_;
111 WaitableEvent thread_started_;
112 Delegate delegate_;
113
114 PlatformThreadHandle thread_handle_;
115
116 DISALLOW_COPY_AND_ASSIGN(PostingThread);
117 };
118
119 class MessageLoopPerfTest : public ::testing::TestWithParam<int> {
120 public:
MessageLoopPerfTest()121 MessageLoopPerfTest()
122 : message_loop_task_runner_(SequencedTaskRunnerHandle::Get()),
123 run_posting_threads_(WaitableEvent::ResetPolicy::MANUAL,
124 WaitableEvent::InitialState::NOT_SIGNALED) {}
125
ParamInfoToString(::testing::TestParamInfo<int> param_info)126 static std::string ParamInfoToString(
127 ::testing::TestParamInfo<int> param_info) {
128 return PostingThreadCountToString(param_info.param);
129 }
130
PostingThreadCountToString(int posting_threads)131 static std::string PostingThreadCountToString(int posting_threads) {
132 // Special case 1 thread for thread vs threads.
133 if (posting_threads == 1)
134 return "1_Posting_Thread";
135
136 return StringPrintf("%d_Posting_Threads", posting_threads);
137 }
138
139 protected:
140 class ContinuouslyPostTasks final : public PostingThread::Action {
141 public:
ContinuouslyPostTasks(MessageLoopPerfTest * outer)142 ContinuouslyPostTasks(MessageLoopPerfTest* outer) : outer_(outer) {
143 DCHECK(outer_);
144 }
145 ~ContinuouslyPostTasks() override = default;
146
147 private:
Run()148 void Run() override {
149 RepeatingClosure task_to_run =
150 BindRepeating([](size_t* num_tasks_run) { ++*num_tasks_run; },
151 &outer_->num_tasks_run_);
152 while (!outer_->stop_posting_threads_.IsSet()) {
153 outer_->message_loop_task_runner_->PostTask(FROM_HERE, task_to_run);
154 subtle::NoBarrier_AtomicIncrement(&outer_->num_tasks_posted_, 1);
155 }
156 }
157
158 MessageLoopPerfTest* const outer_;
159
160 DISALLOW_COPY_AND_ASSIGN(ContinuouslyPostTasks);
161 };
162
SetUp()163 void SetUp() override {
164 // This check is here because we can't ASSERT_TRUE in the constructor.
165 ASSERT_TRUE(message_loop_task_runner_);
166 }
167
168 // Runs ActionType::Run() on |num_posting_threads| and requests test
169 // termination around |duration|.
170 template <typename ActionType>
RunTest(const int num_posting_threads,TimeDelta duration)171 void RunTest(const int num_posting_threads, TimeDelta duration) {
172 std::vector<std::unique_ptr<PostingThread>> threads;
173 for (int i = 0; i < num_posting_threads; ++i) {
174 threads.emplace_back(PostingThread::Create(
175 &run_posting_threads_, std::make_unique<ActionType>(this)));
176 // Don't assert here to simplify the code that requires a Join() call for
177 // every created PostingThread.
178 EXPECT_TRUE(threads[i]);
179 }
180
181 RunLoop run_loop;
182 message_loop_task_runner_->PostDelayedTask(
183 FROM_HERE,
184 BindOnce(
185 [](RunLoop* run_loop, AtomicFlag* stop_posting_threads) {
186 stop_posting_threads->Set();
187 run_loop->Quit();
188 },
189 &run_loop, &stop_posting_threads_),
190 duration);
191
192 TimeTicks post_task_start = TimeTicks::Now();
193 run_posting_threads_.Signal();
194
195 TimeTicks run_loop_start = TimeTicks::Now();
196 run_loop.Run();
197 tasks_run_duration_ = TimeTicks::Now() - run_loop_start;
198
199 for (auto& thread : threads)
200 thread->Join();
201
202 tasks_posted_duration_ = TimeTicks::Now() - post_task_start;
203 }
204
num_tasks_posted() const205 size_t num_tasks_posted() const {
206 return subtle::NoBarrier_Load(&num_tasks_posted_);
207 }
208
tasks_posted_duration() const209 TimeDelta tasks_posted_duration() const { return tasks_posted_duration_; }
210
num_tasks_run() const211 size_t num_tasks_run() const { return num_tasks_run_; }
212
tasks_run_duration() const213 TimeDelta tasks_run_duration() const { return tasks_run_duration_; }
214
215 private:
216 MessageLoop message_loop_;
217
218 // Accessed on multiple threads, thread-safe or constant:
219 const scoped_refptr<SequencedTaskRunner> message_loop_task_runner_;
220 WaitableEvent run_posting_threads_;
221 AtomicFlag stop_posting_threads_;
222 subtle::AtomicWord num_tasks_posted_ = 0;
223
224 // Accessed only on the test case thread:
225 TimeDelta tasks_posted_duration_;
226 TimeDelta tasks_run_duration_;
227 size_t num_tasks_run_ = 0;
228
229 DISALLOW_COPY_AND_ASSIGN(MessageLoopPerfTest);
230 };
231
232 } // namespace
233
TEST_P(MessageLoopPerfTest,PostTaskRate)234 TEST_P(MessageLoopPerfTest, PostTaskRate) {
235 // Measures the average rate of posting tasks from different threads and the
236 // average rate that the message loop is running those tasks.
237 RunTest<ContinuouslyPostTasks>(GetParam(), TimeDelta::FromSeconds(3));
238 perf_test::PrintResult("task_posting", "",
239 PostingThreadCountToString(GetParam()),
240 tasks_posted_duration().InMicroseconds() /
241 static_cast<double>(num_tasks_posted()),
242 "us/task", true);
243 perf_test::PrintResult("task_running", "",
244 PostingThreadCountToString(GetParam()),
245 tasks_run_duration().InMicroseconds() /
246 static_cast<double>(num_tasks_run()),
247 "us/task", true);
248 }
249
250 INSTANTIATE_TEST_CASE_P(,
251 MessageLoopPerfTest,
252 ::testing::Values(1, 5, 10),
253 MessageLoopPerfTest::ParamInfoToString);
254 } // namespace base
255