• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to
7 // increase tolerance and reduce observed flakiness (though doing so reduces the
8 // meaningfulness of the test).
9 
10 #include "mojo/system/message_pipe_dispatcher.h"
11 
12 #include <string.h>
13 
14 #include <limits>
15 
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/scoped_vector.h"
18 #include "base/rand_util.h"
19 #include "base/threading/platform_thread.h"  // For |Sleep()|.
20 #include "base/threading/simple_thread.h"
21 #include "base/time/time.h"
22 #include "mojo/system/message_pipe.h"
23 #include "mojo/system/test_utils.h"
24 #include "mojo/system/waiter.h"
25 #include "mojo/system/waiter_test_utils.h"
26 #include "testing/gtest/include/gtest/gtest.h"
27 
28 namespace mojo {
29 namespace system {
30 namespace {
31 
TEST(MessagePipeDispatcherTest,Basic)32 TEST(MessagePipeDispatcherTest, Basic) {
33   test::Stopwatch stopwatch;
34   int32_t buffer[1];
35   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
36   uint32_t buffer_size;
37 
38   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
39   for (unsigned i = 0; i < 2; i++) {
40     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
41         MessagePipeDispatcher::kDefaultCreateOptions));
42     EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType());
43     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
44         MessagePipeDispatcher::kDefaultCreateOptions));
45     {
46       scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
47       d0->Init(mp, i);      // 0, 1.
48       d1->Init(mp, i ^ 1);  // 1, 0.
49     }
50     Waiter w;
51     uint32_t context = 0;
52     HandleSignalsState hss;
53 
54     // Try adding a writable waiter when already writable.
55     w.Init();
56     hss = HandleSignalsState();
57     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
58               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
59     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
60     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
61               hss.satisfiable_signals);
62     // Shouldn't need to remove the waiter (it was not added).
63 
64     // Add a readable waiter to |d0|, then make it readable (by writing to
65     // |d1|), then wait.
66     w.Init();
67     ASSERT_EQ(MOJO_RESULT_OK,
68               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr));
69     buffer[0] = 123456789;
70     EXPECT_EQ(MOJO_RESULT_OK,
71               d1->WriteMessage(UserPointer<const void>(buffer),
72                                kBufferSize,
73                                nullptr,
74                                MOJO_WRITE_MESSAGE_FLAG_NONE));
75     stopwatch.Start();
76     EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context));
77     EXPECT_EQ(1u, context);
78     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
79     hss = HandleSignalsState();
80     d0->RemoveWaiter(&w, &hss);
81     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
82               hss.satisfied_signals);
83     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
84               hss.satisfiable_signals);
85 
86     // Try adding a readable waiter when already readable (from above).
87     w.Init();
88     hss = HandleSignalsState();
89     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
90               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
91     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
92               hss.satisfied_signals);
93     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
94               hss.satisfiable_signals);
95     // Shouldn't need to remove the waiter (it was not added).
96 
97     // Make |d0| no longer readable (by reading from it).
98     buffer[0] = 0;
99     buffer_size = kBufferSize;
100     EXPECT_EQ(MOJO_RESULT_OK,
101               d0->ReadMessage(UserPointer<void>(buffer),
102                               MakeUserPointer(&buffer_size),
103                               0,
104                               nullptr,
105                               MOJO_READ_MESSAGE_FLAG_NONE));
106     EXPECT_EQ(kBufferSize, buffer_size);
107     EXPECT_EQ(123456789, buffer[0]);
108 
109     // Wait for zero time for readability on |d0| (will time out).
110     w.Init();
111     ASSERT_EQ(MOJO_RESULT_OK,
112               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr));
113     stopwatch.Start();
114     EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr));
115     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
116     hss = HandleSignalsState();
117     d0->RemoveWaiter(&w, &hss);
118     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
119     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
120               hss.satisfiable_signals);
121 
122     // Wait for non-zero, finite time for readability on |d0| (will time out).
123     w.Init();
124     ASSERT_EQ(MOJO_RESULT_OK,
125               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr));
126     stopwatch.Start();
127     EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
128               w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), nullptr));
129     base::TimeDelta elapsed = stopwatch.Elapsed();
130     EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
131     EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
132     hss = HandleSignalsState();
133     d0->RemoveWaiter(&w, &hss);
134     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
135     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
136               hss.satisfiable_signals);
137 
138     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
139     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
140   }
141 }
142 
TEST(MessagePipeDispatcherTest,InvalidParams)143 TEST(MessagePipeDispatcherTest, InvalidParams) {
144   char buffer[1];
145 
146   scoped_refptr<MessagePipeDispatcher> d0(
147       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
148   scoped_refptr<MessagePipeDispatcher> d1(
149       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
150   {
151     scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
152     d0->Init(mp, 0);
153     d1->Init(mp, 1);
154   }
155 
156   // |WriteMessage|:
157   // Huge buffer size.
158   EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
159             d0->WriteMessage(UserPointer<const void>(buffer),
160                              std::numeric_limits<uint32_t>::max(),
161                              nullptr,
162                              MOJO_WRITE_MESSAGE_FLAG_NONE));
163 
164   EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
165   EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
166 }
167 
168 // These test invalid arguments that should cause death if we're being paranoid
169 // about checking arguments (which we would want to do if, e.g., we were in a
170 // true "kernel" situation, but we might not want to do otherwise for
171 // performance reasons). Probably blatant errors like passing in null pointers
172 // (for required pointer arguments) will still cause death, but perhaps not
173 // predictably.
TEST(MessagePipeDispatcherTest,InvalidParamsDeath)174 TEST(MessagePipeDispatcherTest, InvalidParamsDeath) {
175   const char kMemoryCheckFailedRegex[] = "Check failed";
176 
177   scoped_refptr<MessagePipeDispatcher> d0(
178       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
179   scoped_refptr<MessagePipeDispatcher> d1(
180       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
181   {
182     scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
183     d0->Init(mp, 0);
184     d1->Init(mp, 1);
185   }
186 
187   // |WriteMessage|:
188   // Null buffer with nonzero buffer size.
189   EXPECT_DEATH_IF_SUPPORTED(
190       d0->WriteMessage(
191           NullUserPointer(), 1, nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE),
192       kMemoryCheckFailedRegex);
193 
194   // |ReadMessage|:
195   // Null buffer with nonzero buffer size.
196   // First write something so that we actually have something to read.
197   EXPECT_EQ(MOJO_RESULT_OK,
198             d1->WriteMessage(UserPointer<const void>("x"),
199                              1,
200                              nullptr,
201                              MOJO_WRITE_MESSAGE_FLAG_NONE));
202   uint32_t buffer_size = 1;
203   EXPECT_DEATH_IF_SUPPORTED(d0->ReadMessage(NullUserPointer(),
204                                             MakeUserPointer(&buffer_size),
205                                             0,
206                                             nullptr,
207                                             MOJO_READ_MESSAGE_FLAG_NONE),
208                             kMemoryCheckFailedRegex);
209 
210   EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
211   EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
212 }
213 
214 // Test what happens when one end is closed (single-threaded test).
TEST(MessagePipeDispatcherTest,BasicClosed)215 TEST(MessagePipeDispatcherTest, BasicClosed) {
216   int32_t buffer[1];
217   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
218   uint32_t buffer_size;
219 
220   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
221   for (unsigned i = 0; i < 2; i++) {
222     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
223         MessagePipeDispatcher::kDefaultCreateOptions));
224     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
225         MessagePipeDispatcher::kDefaultCreateOptions));
226     {
227       scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
228       d0->Init(mp, i);      // 0, 1.
229       d1->Init(mp, i ^ 1);  // 1, 0.
230     }
231     Waiter w;
232     HandleSignalsState hss;
233 
234     // Write (twice) to |d1|.
235     buffer[0] = 123456789;
236     EXPECT_EQ(MOJO_RESULT_OK,
237               d1->WriteMessage(UserPointer<const void>(buffer),
238                                kBufferSize,
239                                nullptr,
240                                MOJO_WRITE_MESSAGE_FLAG_NONE));
241     buffer[0] = 234567890;
242     EXPECT_EQ(MOJO_RESULT_OK,
243               d1->WriteMessage(UserPointer<const void>(buffer),
244                                kBufferSize,
245                                nullptr,
246                                MOJO_WRITE_MESSAGE_FLAG_NONE));
247 
248     // Try waiting for readable on |d0|; should fail (already satisfied).
249     w.Init();
250     hss = HandleSignalsState();
251     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
252               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
253     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
254               hss.satisfied_signals);
255     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
256               hss.satisfiable_signals);
257 
258     // Try reading from |d1|; should fail (nothing to read).
259     buffer[0] = 0;
260     buffer_size = kBufferSize;
261     EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
262               d1->ReadMessage(UserPointer<void>(buffer),
263                               MakeUserPointer(&buffer_size),
264                               0,
265                               nullptr,
266                               MOJO_READ_MESSAGE_FLAG_NONE));
267 
268     // Close |d1|.
269     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
270 
271     // Try waiting for readable on |d0|; should fail (already satisfied).
272     w.Init();
273     hss = HandleSignalsState();
274     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
275               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, &hss));
276     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
277     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
278 
279     // Read from |d0|.
280     buffer[0] = 0;
281     buffer_size = kBufferSize;
282     EXPECT_EQ(MOJO_RESULT_OK,
283               d0->ReadMessage(UserPointer<void>(buffer),
284                               MakeUserPointer(&buffer_size),
285                               0,
286                               nullptr,
287                               MOJO_READ_MESSAGE_FLAG_NONE));
288     EXPECT_EQ(kBufferSize, buffer_size);
289     EXPECT_EQ(123456789, buffer[0]);
290 
291     // Try waiting for readable on |d0|; should fail (already satisfied).
292     w.Init();
293     hss = HandleSignalsState();
294     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
295               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
296     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
297     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
298 
299     // Read again from |d0|.
300     buffer[0] = 0;
301     buffer_size = kBufferSize;
302     EXPECT_EQ(MOJO_RESULT_OK,
303               d0->ReadMessage(UserPointer<void>(buffer),
304                               MakeUserPointer(&buffer_size),
305                               0,
306                               nullptr,
307                               MOJO_READ_MESSAGE_FLAG_NONE));
308     EXPECT_EQ(kBufferSize, buffer_size);
309     EXPECT_EQ(234567890, buffer[0]);
310 
311     // Try waiting for readable on |d0|; should fail (unsatisfiable).
312     w.Init();
313     hss = HandleSignalsState();
314     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
315               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss));
316     EXPECT_EQ(0u, hss.satisfied_signals);
317     EXPECT_EQ(0u, hss.satisfiable_signals);
318 
319     // Try waiting for writable on |d0|; should fail (unsatisfiable).
320     w.Init();
321     hss = HandleSignalsState();
322     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
323               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss));
324     EXPECT_EQ(0u, hss.satisfied_signals);
325     EXPECT_EQ(0u, hss.satisfiable_signals);
326 
327     // Try reading from |d0|; should fail (nothing to read and other end
328     // closed).
329     buffer[0] = 0;
330     buffer_size = kBufferSize;
331     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
332               d0->ReadMessage(UserPointer<void>(buffer),
333                               MakeUserPointer(&buffer_size),
334                               0,
335                               nullptr,
336                               MOJO_READ_MESSAGE_FLAG_NONE));
337 
338     // Try writing to |d0|; should fail (other end closed).
339     buffer[0] = 345678901;
340     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
341               d0->WriteMessage(UserPointer<const void>(buffer),
342                                kBufferSize,
343                                nullptr,
344                                MOJO_WRITE_MESSAGE_FLAG_NONE));
345 
346     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
347   }
348 }
349 
350 #if defined(OS_WIN)
351 // http://crbug.com/396386
352 #define MAYBE_BasicThreaded DISABLED_BasicThreaded
353 #else
354 #define MAYBE_BasicThreaded BasicThreaded
355 #endif
TEST(MessagePipeDispatcherTest,MAYBE_BasicThreaded)356 TEST(MessagePipeDispatcherTest, MAYBE_BasicThreaded) {
357   test::Stopwatch stopwatch;
358   int32_t buffer[1];
359   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
360   uint32_t buffer_size;
361   base::TimeDelta elapsed;
362   bool did_wait;
363   MojoResult result;
364   uint32_t context;
365   HandleSignalsState hss;
366 
367   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
368   for (unsigned i = 0; i < 2; i++) {
369     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
370         MessagePipeDispatcher::kDefaultCreateOptions));
371     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
372         MessagePipeDispatcher::kDefaultCreateOptions));
373     {
374       scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
375       d0->Init(mp, i);      // 0, 1.
376       d1->Init(mp, i ^ 1);  // 1, 0.
377     }
378 
379     // Wait for readable on |d1|, which will become readable after some time.
380     {
381       test::WaiterThread thread(d1,
382                                 MOJO_HANDLE_SIGNAL_READABLE,
383                                 MOJO_DEADLINE_INDEFINITE,
384                                 1,
385                                 &did_wait,
386                                 &result,
387                                 &context,
388                                 &hss);
389       stopwatch.Start();
390       thread.Start();
391       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
392       // Wake it up by writing to |d0|.
393       buffer[0] = 123456789;
394       EXPECT_EQ(MOJO_RESULT_OK,
395                 d0->WriteMessage(UserPointer<const void>(buffer),
396                                  kBufferSize,
397                                  nullptr,
398                                  MOJO_WRITE_MESSAGE_FLAG_NONE));
399     }  // Joins the thread.
400     elapsed = stopwatch.Elapsed();
401     EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
402     EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
403     EXPECT_TRUE(did_wait);
404     EXPECT_EQ(MOJO_RESULT_OK, result);
405     EXPECT_EQ(1u, context);
406     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
407               hss.satisfied_signals);
408     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
409               hss.satisfiable_signals);
410 
411     // Now |d1| is already readable. Try waiting for it again.
412     {
413       test::WaiterThread thread(d1,
414                                 MOJO_HANDLE_SIGNAL_READABLE,
415                                 MOJO_DEADLINE_INDEFINITE,
416                                 2,
417                                 &did_wait,
418                                 &result,
419                                 &context,
420                                 &hss);
421       stopwatch.Start();
422       thread.Start();
423     }  // Joins the thread.
424     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
425     EXPECT_FALSE(did_wait);
426     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
427     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
428               hss.satisfied_signals);
429     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
430               hss.satisfiable_signals);
431 
432     // Consume what we wrote to |d0|.
433     buffer[0] = 0;
434     buffer_size = kBufferSize;
435     EXPECT_EQ(MOJO_RESULT_OK,
436               d1->ReadMessage(UserPointer<void>(buffer),
437                               MakeUserPointer(&buffer_size),
438                               0,
439                               nullptr,
440                               MOJO_READ_MESSAGE_FLAG_NONE));
441     EXPECT_EQ(kBufferSize, buffer_size);
442     EXPECT_EQ(123456789, buffer[0]);
443 
444     // Wait for readable on |d1| and close |d0| after some time, which should
445     // cancel that wait.
446     {
447       test::WaiterThread thread(d1,
448                                 MOJO_HANDLE_SIGNAL_READABLE,
449                                 MOJO_DEADLINE_INDEFINITE,
450                                 3,
451                                 &did_wait,
452                                 &result,
453                                 &context,
454                                 &hss);
455       stopwatch.Start();
456       thread.Start();
457       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
458       EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
459     }  // Joins the thread.
460     elapsed = stopwatch.Elapsed();
461     EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
462     EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
463     EXPECT_TRUE(did_wait);
464     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
465     EXPECT_EQ(3u, context);
466     EXPECT_EQ(0u, hss.satisfied_signals);
467     EXPECT_EQ(0u, hss.satisfiable_signals);
468 
469     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
470   }
471 
472   for (unsigned i = 0; i < 2; i++) {
473     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
474         MessagePipeDispatcher::kDefaultCreateOptions));
475     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
476         MessagePipeDispatcher::kDefaultCreateOptions));
477     {
478       scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
479       d0->Init(mp, i);      // 0, 1.
480       d1->Init(mp, i ^ 1);  // 1, 0.
481     }
482 
483     // Wait for readable on |d1| and close |d1| after some time, which should
484     // cancel that wait.
485     {
486       test::WaiterThread thread(d1,
487                                 MOJO_HANDLE_SIGNAL_READABLE,
488                                 MOJO_DEADLINE_INDEFINITE,
489                                 4,
490                                 &did_wait,
491                                 &result,
492                                 &context,
493                                 &hss);
494       stopwatch.Start();
495       thread.Start();
496       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
497       EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
498     }  // Joins the thread.
499     elapsed = stopwatch.Elapsed();
500     EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
501     EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
502     EXPECT_TRUE(did_wait);
503     EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
504     EXPECT_EQ(4u, context);
505     EXPECT_EQ(0u, hss.satisfied_signals);
506     EXPECT_EQ(0u, hss.satisfiable_signals);
507 
508     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
509   }
510 }
511 
512 // Stress test -----------------------------------------------------------------
513 
514 const size_t kMaxMessageSize = 2000;
515 
516 class WriterThread : public base::SimpleThread {
517  public:
518   // |*messages_written| and |*bytes_written| belong to the thread while it's
519   // alive.
WriterThread(scoped_refptr<Dispatcher> write_dispatcher,size_t * messages_written,size_t * bytes_written)520   WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
521                size_t* messages_written,
522                size_t* bytes_written)
523       : base::SimpleThread("writer_thread"),
524         write_dispatcher_(write_dispatcher),
525         messages_written_(messages_written),
526         bytes_written_(bytes_written) {
527     *messages_written_ = 0;
528     *bytes_written_ = 0;
529   }
530 
~WriterThread()531   virtual ~WriterThread() { Join(); }
532 
533  private:
Run()534   virtual void Run() OVERRIDE {
535     // Make some data to write.
536     unsigned char buffer[kMaxMessageSize];
537     for (size_t i = 0; i < kMaxMessageSize; i++)
538       buffer[i] = static_cast<unsigned char>(i);
539 
540     // Number of messages to write.
541     *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
542 
543     // Write messages.
544     for (size_t i = 0; i < *messages_written_; i++) {
545       uint32_t bytes_to_write = static_cast<uint32_t>(
546           base::RandInt(1, static_cast<int>(kMaxMessageSize)));
547       EXPECT_EQ(MOJO_RESULT_OK,
548                 write_dispatcher_->WriteMessage(UserPointer<const void>(buffer),
549                                                 bytes_to_write,
550                                                 nullptr,
551                                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
552       *bytes_written_ += bytes_to_write;
553     }
554 
555     // Write one last "quit" message.
556     EXPECT_EQ(MOJO_RESULT_OK,
557               write_dispatcher_->WriteMessage(UserPointer<const void>("quit"),
558                                               4,
559                                               nullptr,
560                                               MOJO_WRITE_MESSAGE_FLAG_NONE));
561   }
562 
563   const scoped_refptr<Dispatcher> write_dispatcher_;
564   size_t* const messages_written_;
565   size_t* const bytes_written_;
566 
567   DISALLOW_COPY_AND_ASSIGN(WriterThread);
568 };
569 
570 class ReaderThread : public base::SimpleThread {
571  public:
572   // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,size_t * messages_read,size_t * bytes_read)573   ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
574                size_t* messages_read,
575                size_t* bytes_read)
576       : base::SimpleThread("reader_thread"),
577         read_dispatcher_(read_dispatcher),
578         messages_read_(messages_read),
579         bytes_read_(bytes_read) {
580     *messages_read_ = 0;
581     *bytes_read_ = 0;
582   }
583 
~ReaderThread()584   virtual ~ReaderThread() { Join(); }
585 
586  private:
Run()587   virtual void Run() OVERRIDE {
588     unsigned char buffer[kMaxMessageSize];
589     Waiter w;
590     HandleSignalsState hss;
591     MojoResult result;
592 
593     // Read messages.
594     for (;;) {
595       // Wait for it to be readable.
596       w.Init();
597       hss = HandleSignalsState();
598       result =
599           read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss);
600       EXPECT_TRUE(result == MOJO_RESULT_OK ||
601                   result == MOJO_RESULT_ALREADY_EXISTS)
602           << "result: " << result;
603       if (result == MOJO_RESULT_OK) {
604         // Actually need to wait.
605         EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, nullptr));
606         read_dispatcher_->RemoveWaiter(&w, &hss);
607       }
608       // We may not actually be readable, since we're racing with other threads.
609       EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
610 
611       // Now, try to do the read.
612       // Clear the buffer so that we can check the result.
613       memset(buffer, 0, sizeof(buffer));
614       uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
615       result = read_dispatcher_->ReadMessage(UserPointer<void>(buffer),
616                                              MakeUserPointer(&buffer_size),
617                                              0,
618                                              nullptr,
619                                              MOJO_READ_MESSAGE_FLAG_NONE);
620       EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_SHOULD_WAIT)
621           << "result: " << result;
622       // We're racing with others to read, so maybe we failed.
623       if (result == MOJO_RESULT_SHOULD_WAIT)
624         continue;  // In which case, try again.
625       // Check for quit.
626       if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
627         return;
628       EXPECT_GE(buffer_size, 1u);
629       EXPECT_LE(buffer_size, kMaxMessageSize);
630       EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
631 
632       (*messages_read_)++;
633       *bytes_read_ += buffer_size;
634     }
635   }
636 
IsValidMessage(const unsigned char * buffer,uint32_t message_size)637   static bool IsValidMessage(const unsigned char* buffer,
638                              uint32_t message_size) {
639     size_t i;
640     for (i = 0; i < message_size; i++) {
641       if (buffer[i] != static_cast<unsigned char>(i))
642         return false;
643     }
644     // Check that the remaining bytes weren't stomped on.
645     for (; i < kMaxMessageSize; i++) {
646       if (buffer[i] != 0)
647         return false;
648     }
649     return true;
650   }
651 
652   const scoped_refptr<Dispatcher> read_dispatcher_;
653   size_t* const messages_read_;
654   size_t* const bytes_read_;
655 
656   DISALLOW_COPY_AND_ASSIGN(ReaderThread);
657 };
658 
TEST(MessagePipeDispatcherTest,Stress)659 TEST(MessagePipeDispatcherTest, Stress) {
660   static const size_t kNumWriters = 30;
661   static const size_t kNumReaders = kNumWriters;
662 
663   scoped_refptr<MessagePipeDispatcher> d_write(
664       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
665   scoped_refptr<MessagePipeDispatcher> d_read(
666       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
667   {
668     scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
669     d_write->Init(mp, 0);
670     d_read->Init(mp, 1);
671   }
672 
673   size_t messages_written[kNumWriters];
674   size_t bytes_written[kNumWriters];
675   size_t messages_read[kNumReaders];
676   size_t bytes_read[kNumReaders];
677   {
678     // Make writers.
679     ScopedVector<WriterThread> writers;
680     for (size_t i = 0; i < kNumWriters; i++) {
681       writers.push_back(
682           new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
683     }
684 
685     // Make readers.
686     ScopedVector<ReaderThread> readers;
687     for (size_t i = 0; i < kNumReaders; i++) {
688       readers.push_back(
689           new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
690     }
691 
692     // Start writers.
693     for (size_t i = 0; i < kNumWriters; i++)
694       writers[i]->Start();
695 
696     // Start readers.
697     for (size_t i = 0; i < kNumReaders; i++)
698       readers[i]->Start();
699 
700     // TODO(vtl): Maybe I should have an event that triggers all the threads to
701     // start doing stuff for real (so that the first ones created/started aren't
702     // advantaged).
703   }  // Joins all the threads.
704 
705   size_t total_messages_written = 0;
706   size_t total_bytes_written = 0;
707   for (size_t i = 0; i < kNumWriters; i++) {
708     total_messages_written += messages_written[i];
709     total_bytes_written += bytes_written[i];
710   }
711   size_t total_messages_read = 0;
712   size_t total_bytes_read = 0;
713   for (size_t i = 0; i < kNumReaders; i++) {
714     total_messages_read += messages_read[i];
715     total_bytes_read += bytes_read[i];
716     // We'd have to be really unlucky to have read no messages on a thread.
717     EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
718     EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
719   }
720   EXPECT_EQ(total_messages_written, total_messages_read);
721   EXPECT_EQ(total_bytes_written, total_bytes_read);
722 
723   EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
724   EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
725 }
726 
727 }  // namespace
728 }  // namespace system
729 }  // namespace mojo
730