1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/common/handle_watcher.h"
6
7 #include <string>
8
9 #include "base/at_exit.h"
10 #include "base/auto_reset.h"
11 #include "base/bind.h"
12 #include "base/memory/scoped_vector.h"
13 #include "base/run_loop.h"
14 #include "base/test/simple_test_tick_clock.h"
15 #include "base/threading/thread.h"
16 #include "mojo/common/message_pump_mojo.h"
17 #include "mojo/common/time_helper.h"
18 #include "mojo/public/cpp/system/core.h"
19 #include "mojo/public/cpp/test_support/test_utils.h"
20 #include "testing/gtest/include/gtest/gtest.h"
21
22 namespace mojo {
23 namespace common {
24 namespace test {
25
26 enum MessageLoopConfig {
27 MESSAGE_LOOP_CONFIG_DEFAULT = 0,
28 MESSAGE_LOOP_CONFIG_MOJO = 1
29 };
30
ObserveCallback(bool * was_signaled,MojoResult * result_observed,MojoResult result)31 void ObserveCallback(bool* was_signaled,
32 MojoResult* result_observed,
33 MojoResult result) {
34 *was_signaled = true;
35 *result_observed = result;
36 }
37
RunUntilIdle()38 void RunUntilIdle() {
39 base::RunLoop run_loop;
40 run_loop.RunUntilIdle();
41 }
42
DeleteWatcherAndForwardResult(HandleWatcher * watcher,base::Callback<void (MojoResult)> next_callback,MojoResult result)43 void DeleteWatcherAndForwardResult(
44 HandleWatcher* watcher,
45 base::Callback<void(MojoResult)> next_callback,
46 MojoResult result) {
47 delete watcher;
48 next_callback.Run(result);
49 }
50
CreateMessageLoop(MessageLoopConfig config)51 scoped_ptr<base::MessageLoop> CreateMessageLoop(MessageLoopConfig config) {
52 scoped_ptr<base::MessageLoop> loop;
53 if (config == MESSAGE_LOOP_CONFIG_DEFAULT)
54 loop.reset(new base::MessageLoop());
55 else
56 loop.reset(new base::MessageLoop(MessagePumpMojo::Create()));
57 return loop.Pass();
58 }
59
60 // Helper class to manage the callback and running the message loop waiting for
61 // message to be received. Typical usage is something like:
62 // Schedule callback returned from GetCallback().
63 // RunUntilGotCallback();
64 // EXPECT_TRUE(got_callback());
65 // clear_callback();
66 class CallbackHelper {
67 public:
CallbackHelper()68 CallbackHelper()
69 : got_callback_(false),
70 run_loop_(NULL),
71 weak_factory_(this) {}
~CallbackHelper()72 ~CallbackHelper() {}
73
74 // See description above |got_callback_|.
got_callback() const75 bool got_callback() const { return got_callback_; }
clear_callback()76 void clear_callback() { got_callback_ = false; }
77
78 // Runs the current MessageLoop until the callback returned from GetCallback()
79 // is notified.
RunUntilGotCallback()80 void RunUntilGotCallback() {
81 ASSERT_TRUE(run_loop_ == NULL);
82 base::RunLoop run_loop;
83 base::AutoReset<base::RunLoop*> reseter(&run_loop_, &run_loop);
84 run_loop.Run();
85 }
86
GetCallback()87 base::Callback<void(MojoResult)> GetCallback() {
88 return base::Bind(&CallbackHelper::OnCallback, weak_factory_.GetWeakPtr());
89 }
90
Start(HandleWatcher * watcher,const MessagePipeHandle & handle)91 void Start(HandleWatcher* watcher, const MessagePipeHandle& handle) {
92 StartWithCallback(watcher, handle, GetCallback());
93 }
94
StartWithCallback(HandleWatcher * watcher,const MessagePipeHandle & handle,const base::Callback<void (MojoResult)> & callback)95 void StartWithCallback(HandleWatcher* watcher,
96 const MessagePipeHandle& handle,
97 const base::Callback<void(MojoResult)>& callback) {
98 watcher->Start(handle, MOJO_HANDLE_SIGNAL_READABLE,
99 MOJO_DEADLINE_INDEFINITE, callback);
100 }
101
102 private:
OnCallback(MojoResult result)103 void OnCallback(MojoResult result) {
104 got_callback_ = true;
105 if (run_loop_)
106 run_loop_->Quit();
107 }
108
109 // Set to true when the callback is called.
110 bool got_callback_;
111
112 // If non-NULL we're in RunUntilGotCallback().
113 base::RunLoop* run_loop_;
114
115 base::WeakPtrFactory<CallbackHelper> weak_factory_;
116
117 private:
118 DISALLOW_COPY_AND_ASSIGN(CallbackHelper);
119 };
120
121 class HandleWatcherTest : public testing::TestWithParam<MessageLoopConfig> {
122 public:
HandleWatcherTest()123 HandleWatcherTest() : message_loop_(CreateMessageLoop(GetParam())) {}
~HandleWatcherTest()124 virtual ~HandleWatcherTest() {
125 test::SetTickClockForTest(NULL);
126 }
127
128 protected:
TearDownMessageLoop()129 void TearDownMessageLoop() {
130 message_loop_.reset();
131 }
132
InstallTickClock()133 void InstallTickClock() {
134 test::SetTickClockForTest(&tick_clock_);
135 }
136
137 base::SimpleTestTickClock tick_clock_;
138
139 private:
140 base::ShadowingAtExitManager at_exit_;
141 scoped_ptr<base::MessageLoop> message_loop_;
142
143 DISALLOW_COPY_AND_ASSIGN(HandleWatcherTest);
144 };
145
146 INSTANTIATE_TEST_CASE_P(
147 MultipleMessageLoopConfigs, HandleWatcherTest,
148 testing::Values(MESSAGE_LOOP_CONFIG_DEFAULT, MESSAGE_LOOP_CONFIG_MOJO));
149
150 // Trivial test case with a single handle to watch.
TEST_P(HandleWatcherTest,SingleHandler)151 TEST_P(HandleWatcherTest, SingleHandler) {
152 MessagePipe test_pipe;
153 ASSERT_TRUE(test_pipe.handle0.is_valid());
154 CallbackHelper callback_helper;
155 HandleWatcher watcher;
156 callback_helper.Start(&watcher, test_pipe.handle0.get());
157 RunUntilIdle();
158 EXPECT_FALSE(callback_helper.got_callback());
159 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(),
160 std::string()));
161 callback_helper.RunUntilGotCallback();
162 EXPECT_TRUE(callback_helper.got_callback());
163 }
164
165 // Creates three handles and notfies them in reverse order ensuring each one is
166 // notified appropriately.
TEST_P(HandleWatcherTest,ThreeHandles)167 TEST_P(HandleWatcherTest, ThreeHandles) {
168 MessagePipe test_pipe1;
169 MessagePipe test_pipe2;
170 MessagePipe test_pipe3;
171 CallbackHelper callback_helper1;
172 CallbackHelper callback_helper2;
173 CallbackHelper callback_helper3;
174 ASSERT_TRUE(test_pipe1.handle0.is_valid());
175 ASSERT_TRUE(test_pipe2.handle0.is_valid());
176 ASSERT_TRUE(test_pipe3.handle0.is_valid());
177
178 HandleWatcher watcher1;
179 callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
180 RunUntilIdle();
181 EXPECT_FALSE(callback_helper1.got_callback());
182 EXPECT_FALSE(callback_helper2.got_callback());
183 EXPECT_FALSE(callback_helper3.got_callback());
184
185 HandleWatcher watcher2;
186 callback_helper2.Start(&watcher2, test_pipe2.handle0.get());
187 RunUntilIdle();
188 EXPECT_FALSE(callback_helper1.got_callback());
189 EXPECT_FALSE(callback_helper2.got_callback());
190 EXPECT_FALSE(callback_helper3.got_callback());
191
192 HandleWatcher watcher3;
193 callback_helper3.Start(&watcher3, test_pipe3.handle0.get());
194 RunUntilIdle();
195 EXPECT_FALSE(callback_helper1.got_callback());
196 EXPECT_FALSE(callback_helper2.got_callback());
197 EXPECT_FALSE(callback_helper3.got_callback());
198
199 // Write to 3 and make sure it's notified.
200 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(),
201 std::string()));
202 callback_helper3.RunUntilGotCallback();
203 EXPECT_FALSE(callback_helper1.got_callback());
204 EXPECT_FALSE(callback_helper2.got_callback());
205 EXPECT_TRUE(callback_helper3.got_callback());
206 callback_helper3.clear_callback();
207
208 // Write to 1 and 3. Only 1 should be notified since 3 was is no longer
209 // running.
210 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
211 std::string()));
212 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(),
213 std::string()));
214 callback_helper1.RunUntilGotCallback();
215 EXPECT_TRUE(callback_helper1.got_callback());
216 EXPECT_FALSE(callback_helper2.got_callback());
217 EXPECT_FALSE(callback_helper3.got_callback());
218 callback_helper1.clear_callback();
219
220 // Write to 1 and 2. Only 2 should be notified (since 1 was already notified).
221 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
222 std::string()));
223 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(),
224 std::string()));
225 callback_helper2.RunUntilGotCallback();
226 EXPECT_FALSE(callback_helper1.got_callback());
227 EXPECT_TRUE(callback_helper2.got_callback());
228 EXPECT_FALSE(callback_helper3.got_callback());
229 }
230
231 // Verifies Start() invoked a second time works.
TEST_P(HandleWatcherTest,Restart)232 TEST_P(HandleWatcherTest, Restart) {
233 MessagePipe test_pipe1;
234 MessagePipe test_pipe2;
235 CallbackHelper callback_helper1;
236 CallbackHelper callback_helper2;
237 ASSERT_TRUE(test_pipe1.handle0.is_valid());
238 ASSERT_TRUE(test_pipe2.handle0.is_valid());
239
240 HandleWatcher watcher1;
241 callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
242 RunUntilIdle();
243 EXPECT_FALSE(callback_helper1.got_callback());
244 EXPECT_FALSE(callback_helper2.got_callback());
245
246 HandleWatcher watcher2;
247 callback_helper2.Start(&watcher2, test_pipe2.handle0.get());
248 RunUntilIdle();
249 EXPECT_FALSE(callback_helper1.got_callback());
250 EXPECT_FALSE(callback_helper2.got_callback());
251
252 // Write to 1 and make sure it's notified.
253 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
254 std::string()));
255 callback_helper1.RunUntilGotCallback();
256 EXPECT_TRUE(callback_helper1.got_callback());
257 EXPECT_FALSE(callback_helper2.got_callback());
258 callback_helper1.clear_callback();
259 EXPECT_TRUE(mojo::test::DiscardMessage(test_pipe1.handle0.get()));
260
261 // Write to 2 and make sure it's notified.
262 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(),
263 std::string()));
264 callback_helper2.RunUntilGotCallback();
265 EXPECT_FALSE(callback_helper1.got_callback());
266 EXPECT_TRUE(callback_helper2.got_callback());
267 callback_helper2.clear_callback();
268
269 // Listen on 1 again.
270 callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
271 RunUntilIdle();
272 EXPECT_FALSE(callback_helper1.got_callback());
273 EXPECT_FALSE(callback_helper2.got_callback());
274
275 // Write to 1 and make sure it's notified.
276 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
277 std::string()));
278 callback_helper1.RunUntilGotCallback();
279 EXPECT_TRUE(callback_helper1.got_callback());
280 EXPECT_FALSE(callback_helper2.got_callback());
281 }
282
283 // Verifies deadline is honored.
TEST_P(HandleWatcherTest,Deadline)284 TEST_P(HandleWatcherTest, Deadline) {
285 InstallTickClock();
286
287 MessagePipe test_pipe1;
288 MessagePipe test_pipe2;
289 MessagePipe test_pipe3;
290 CallbackHelper callback_helper1;
291 CallbackHelper callback_helper2;
292 CallbackHelper callback_helper3;
293 ASSERT_TRUE(test_pipe1.handle0.is_valid());
294 ASSERT_TRUE(test_pipe2.handle0.is_valid());
295 ASSERT_TRUE(test_pipe3.handle0.is_valid());
296
297 // Add a watcher with an infinite timeout.
298 HandleWatcher watcher1;
299 callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
300 RunUntilIdle();
301 EXPECT_FALSE(callback_helper1.got_callback());
302 EXPECT_FALSE(callback_helper2.got_callback());
303 EXPECT_FALSE(callback_helper3.got_callback());
304
305 // Add another watcher wth a timeout of 500 microseconds.
306 HandleWatcher watcher2;
307 watcher2.Start(test_pipe2.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE, 500,
308 callback_helper2.GetCallback());
309 RunUntilIdle();
310 EXPECT_FALSE(callback_helper1.got_callback());
311 EXPECT_FALSE(callback_helper2.got_callback());
312 EXPECT_FALSE(callback_helper3.got_callback());
313
314 // Advance the clock passed the deadline. We also have to start another
315 // watcher to wake up the background thread.
316 tick_clock_.Advance(base::TimeDelta::FromMicroseconds(501));
317
318 HandleWatcher watcher3;
319 callback_helper3.Start(&watcher3, test_pipe3.handle0.get());
320
321 callback_helper2.RunUntilGotCallback();
322 EXPECT_FALSE(callback_helper1.got_callback());
323 EXPECT_TRUE(callback_helper2.got_callback());
324 EXPECT_FALSE(callback_helper3.got_callback());
325 }
326
TEST_P(HandleWatcherTest,DeleteInCallback)327 TEST_P(HandleWatcherTest, DeleteInCallback) {
328 MessagePipe test_pipe;
329 CallbackHelper callback_helper;
330
331 HandleWatcher* watcher = new HandleWatcher();
332 callback_helper.StartWithCallback(watcher, test_pipe.handle1.get(),
333 base::Bind(&DeleteWatcherAndForwardResult,
334 watcher,
335 callback_helper.GetCallback()));
336 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle0.get(),
337 std::string()));
338 callback_helper.RunUntilGotCallback();
339 EXPECT_TRUE(callback_helper.got_callback());
340 }
341
TEST_P(HandleWatcherTest,AbortedOnMessageLoopDestruction)342 TEST_P(HandleWatcherTest, AbortedOnMessageLoopDestruction) {
343 bool was_signaled = false;
344 MojoResult result = MOJO_RESULT_OK;
345
346 MessagePipe pipe;
347 HandleWatcher watcher;
348 watcher.Start(pipe.handle0.get(),
349 MOJO_HANDLE_SIGNAL_READABLE,
350 MOJO_DEADLINE_INDEFINITE,
351 base::Bind(&ObserveCallback, &was_signaled, &result));
352
353 // Now, let the MessageLoop get torn down. We expect our callback to run.
354 TearDownMessageLoop();
355
356 EXPECT_TRUE(was_signaled);
357 EXPECT_EQ(MOJO_RESULT_ABORTED, result);
358 }
359
NeverReached(MojoResult result)360 void NeverReached(MojoResult result) {
361 FAIL() << "Callback should never be invoked " << result;
362 }
363
364 // Called on the main thread when a thread is done. Decrements |active_count|
365 // and if |active_count| is zero quits |run_loop|.
StressThreadDone(base::RunLoop * run_loop,int * active_count)366 void StressThreadDone(base::RunLoop* run_loop, int* active_count) {
367 (*active_count)--;
368 EXPECT_GE(*active_count, 0);
369 if (*active_count == 0)
370 run_loop->Quit();
371 }
372
373 // See description of StressTest. This is called on the background thread.
374 // |count| is the number of HandleWatchers to create. |active_count| is the
375 // number of outstanding threads, |task_runner| the task runner for the main
376 // thread and |run_loop| the run loop that should be quit when there are no more
377 // threads running. When done StressThreadDone() is invoked on the main thread.
378 // |active_count| and |run_loop| should only be used on the main thread.
RunStressTest(int count,scoped_refptr<base::TaskRunner> task_runner,base::RunLoop * run_loop,int * active_count)379 void RunStressTest(int count,
380 scoped_refptr<base::TaskRunner> task_runner,
381 base::RunLoop* run_loop,
382 int* active_count) {
383 struct TestData {
384 MessagePipe pipe;
385 HandleWatcher watcher;
386 };
387 ScopedVector<TestData> data_vector;
388 for (int i = 0; i < count; ++i) {
389 if (i % 20 == 0) {
390 // Every so often we wait. This results in some level of thread balancing
391 // as well as making sure HandleWatcher has time to actually start some
392 // watches.
393 MessagePipe test_pipe;
394 ASSERT_TRUE(test_pipe.handle0.is_valid());
395 CallbackHelper callback_helper;
396 HandleWatcher watcher;
397 callback_helper.Start(&watcher, test_pipe.handle0.get());
398 RunUntilIdle();
399 EXPECT_FALSE(callback_helper.got_callback());
400 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(),
401 std::string()));
402 base::MessageLoop::ScopedNestableTaskAllower scoper(
403 base::MessageLoop::current());
404 callback_helper.RunUntilGotCallback();
405 EXPECT_TRUE(callback_helper.got_callback());
406 } else {
407 scoped_ptr<TestData> test_data(new TestData);
408 ASSERT_TRUE(test_data->pipe.handle0.is_valid());
409 test_data->watcher.Start(test_data->pipe.handle0.get(),
410 MOJO_HANDLE_SIGNAL_READABLE,
411 MOJO_DEADLINE_INDEFINITE,
412 base::Bind(&NeverReached));
413 data_vector.push_back(test_data.release());
414 }
415 if (i % 15 == 0)
416 data_vector.clear();
417 }
418 task_runner->PostTask(FROM_HERE,
419 base::Bind(&StressThreadDone, run_loop,
420 active_count));
421 }
422
423 // This test is meant to stress HandleWatcher. It uses from various threads
424 // repeatedly starting and stopping watches. It spins up kThreadCount
425 // threads. Each thread creates kWatchCount watches. Every so often each thread
426 // writes to a pipe and waits for the response.
TEST(HandleWatcherCleanEnvironmentTest,StressTest)427 TEST(HandleWatcherCleanEnvironmentTest, StressTest) {
428 #if defined(NDEBUG)
429 const int kThreadCount = 15;
430 const int kWatchCount = 400;
431 #else
432 const int kThreadCount = 10;
433 const int kWatchCount = 250;
434 #endif
435
436 base::ShadowingAtExitManager at_exit;
437 base::MessageLoop message_loop;
438 base::RunLoop run_loop;
439 ScopedVector<base::Thread> threads;
440 int threads_active_counter = kThreadCount;
441 // Starts the threads first and then post the task in hopes of having more
442 // threads running at once.
443 for (int i = 0; i < kThreadCount; ++i) {
444 scoped_ptr<base::Thread> thread(new base::Thread("test thread"));
445 if (i % 2) {
446 base::Thread::Options thread_options;
447 thread_options.message_pump_factory =
448 base::Bind(&MessagePumpMojo::Create);
449 thread->StartWithOptions(thread_options);
450 } else {
451 thread->Start();
452 }
453 threads.push_back(thread.release());
454 }
455 for (int i = 0; i < kThreadCount; ++i) {
456 threads[i]->task_runner()->PostTask(
457 FROM_HERE, base::Bind(&RunStressTest, kWatchCount,
458 message_loop.task_runner(),
459 &run_loop, &threads_active_counter));
460 }
461 run_loop.Run();
462 ASSERT_EQ(0, threads_active_counter);
463 }
464
465 } // namespace test
466 } // namespace common
467 } // namespace mojo
468