• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to
7 // increase tolerance and reduce observed flakiness (though doing so reduces the
8 // meaningfulness of the test).
9 
10 #include "mojo/system/message_pipe_dispatcher.h"
11 
12 #include <string.h>
13 
14 #include <limits>
15 
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/scoped_vector.h"
18 #include "base/rand_util.h"
19 #include "base/threading/platform_thread.h"  // For |Sleep()|.
20 #include "base/threading/simple_thread.h"
21 #include "base/time/time.h"
22 #include "mojo/system/message_pipe.h"
23 #include "mojo/system/test_utils.h"
24 #include "mojo/system/waiter.h"
25 #include "mojo/system/waiter_test_utils.h"
26 #include "testing/gtest/include/gtest/gtest.h"
27 
28 namespace mojo {
29 namespace system {
30 namespace {
31 
TEST(MessagePipeDispatcherTest,Basic)32 TEST(MessagePipeDispatcherTest, Basic) {
33   test::Stopwatch stopwatch;
34   int32_t buffer[1];
35   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
36   uint32_t buffer_size;
37 
38   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
39   for (unsigned i = 0; i < 2; i++) {
40     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
41         MessagePipeDispatcher::kDefaultCreateOptions));
42     EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType());
43     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
44         MessagePipeDispatcher::kDefaultCreateOptions));
45     {
46       scoped_refptr<MessagePipe> mp(new MessagePipe());
47       d0->Init(mp, i);  // 0, 1.
48       d1->Init(mp, i ^ 1);  // 1, 0.
49     }
50     Waiter w;
51     uint32_t context = 0;
52 
53     // Try adding a writable waiter when already writable.
54     w.Init();
55     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
56               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0));
57     // Shouldn't need to remove the waiter (it was not added).
58 
59     // Add a readable waiter to |d0|, then make it readable (by writing to
60     // |d1|), then wait.
61     w.Init();
62     EXPECT_EQ(MOJO_RESULT_OK,
63               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1));
64     buffer[0] = 123456789;
65     EXPECT_EQ(MOJO_RESULT_OK,
66               d1->WriteMessage(buffer, kBufferSize,
67                                NULL,
68                                MOJO_WRITE_MESSAGE_FLAG_NONE));
69     stopwatch.Start();
70     EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context));
71     EXPECT_EQ(1u, context);
72     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
73     d0->RemoveWaiter(&w);
74 
75     // Try adding a readable waiter when already readable (from above).
76     w.Init();
77     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
78               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2));
79     // Shouldn't need to remove the waiter (it was not added).
80 
81     // Make |d0| no longer readable (by reading from it).
82     buffer[0] = 0;
83     buffer_size = kBufferSize;
84     EXPECT_EQ(MOJO_RESULT_OK,
85               d0->ReadMessage(buffer, &buffer_size,
86                               0, NULL,
87                               MOJO_READ_MESSAGE_FLAG_NONE));
88     EXPECT_EQ(kBufferSize, buffer_size);
89     EXPECT_EQ(123456789, buffer[0]);
90 
91     // Wait for zero time for readability on |d0| (will time out).
92     w.Init();
93     EXPECT_EQ(MOJO_RESULT_OK,
94               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3));
95     stopwatch.Start();
96     EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, NULL));
97     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
98     d0->RemoveWaiter(&w);
99 
100     // Wait for non-zero, finite time for readability on |d0| (will time out).
101     w.Init();
102     EXPECT_EQ(MOJO_RESULT_OK,
103               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3));
104     stopwatch.Start();
105     EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
106               w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), NULL));
107     base::TimeDelta elapsed = stopwatch.Elapsed();
108     EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
109     EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
110     d0->RemoveWaiter(&w);
111 
112     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
113     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
114   }
115 }
116 
TEST(MessagePipeDispatcherTest,InvalidParams)117 TEST(MessagePipeDispatcherTest, InvalidParams) {
118   char buffer[1];
119 
120   scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
121         MessagePipeDispatcher::kDefaultCreateOptions));
122   scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
123         MessagePipeDispatcher::kDefaultCreateOptions));
124   {
125     scoped_refptr<MessagePipe> mp(new MessagePipe());
126     d0->Init(mp, 0);
127     d1->Init(mp, 1);
128   }
129 
130   // |WriteMessage|:
131   // Null buffer with nonzero buffer size.
132   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
133             d0->WriteMessage(NULL, 1,
134                              NULL,
135                              MOJO_WRITE_MESSAGE_FLAG_NONE));
136   // Huge buffer size.
137   EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
138             d0->WriteMessage(buffer, std::numeric_limits<uint32_t>::max(),
139                              NULL,
140                              MOJO_WRITE_MESSAGE_FLAG_NONE));
141 
142   // |ReadMessage|:
143   // Null buffer with nonzero buffer size.
144   uint32_t buffer_size = 1;
145   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
146             d0->ReadMessage(NULL, &buffer_size,
147                             0, NULL,
148                             MOJO_READ_MESSAGE_FLAG_NONE));
149 
150   EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
151   EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
152 }
153 
154 // Test what happens when one end is closed (single-threaded test).
TEST(MessagePipeDispatcherTest,BasicClosed)155 TEST(MessagePipeDispatcherTest, BasicClosed) {
156   int32_t buffer[1];
157   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
158   uint32_t buffer_size;
159 
160   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
161   for (unsigned i = 0; i < 2; i++) {
162     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
163         MessagePipeDispatcher::kDefaultCreateOptions));
164     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
165         MessagePipeDispatcher::kDefaultCreateOptions));
166     {
167       scoped_refptr<MessagePipe> mp(new MessagePipe());
168       d0->Init(mp, i);  // 0, 1.
169       d1->Init(mp, i ^ 1);  // 1, 0.
170     }
171     Waiter w;
172 
173     // Write (twice) to |d1|.
174     buffer[0] = 123456789;
175     EXPECT_EQ(MOJO_RESULT_OK,
176               d1->WriteMessage(buffer, kBufferSize,
177                                NULL,
178                                MOJO_WRITE_MESSAGE_FLAG_NONE));
179     buffer[0] = 234567890;
180     EXPECT_EQ(MOJO_RESULT_OK,
181               d1->WriteMessage(buffer, kBufferSize,
182                                NULL,
183                                MOJO_WRITE_MESSAGE_FLAG_NONE));
184 
185     // Try waiting for readable on |d0|; should fail (already satisfied).
186     w.Init();
187     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
188               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0));
189 
190     // Try reading from |d1|; should fail (nothing to read).
191     buffer[0] = 0;
192     buffer_size = kBufferSize;
193     EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
194               d1->ReadMessage(buffer, &buffer_size,
195                               0, NULL,
196                               MOJO_READ_MESSAGE_FLAG_NONE));
197 
198     // Close |d1|.
199     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
200 
201     // Try waiting for readable on |d0|; should fail (already satisfied).
202     w.Init();
203     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
204               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1));
205 
206     // Read from |d0|.
207     buffer[0] = 0;
208     buffer_size = kBufferSize;
209     EXPECT_EQ(MOJO_RESULT_OK,
210               d0->ReadMessage(buffer, &buffer_size,
211                               0, NULL,
212                               MOJO_READ_MESSAGE_FLAG_NONE));
213     EXPECT_EQ(kBufferSize, buffer_size);
214     EXPECT_EQ(123456789, buffer[0]);
215 
216     // Try waiting for readable on |d0|; should fail (already satisfied).
217     w.Init();
218     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
219               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2));
220 
221     // Read again from |d0|.
222     buffer[0] = 0;
223     buffer_size = kBufferSize;
224     EXPECT_EQ(MOJO_RESULT_OK,
225               d0->ReadMessage(buffer, &buffer_size,
226                               0, NULL,
227                               MOJO_READ_MESSAGE_FLAG_NONE));
228     EXPECT_EQ(kBufferSize, buffer_size);
229     EXPECT_EQ(234567890, buffer[0]);
230 
231     // Try waiting for readable on |d0|; should fail (unsatisfiable).
232     w.Init();
233     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
234               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3));
235 
236     // Try waiting for writable on |d0|; should fail (unsatisfiable).
237     w.Init();
238     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
239               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4));
240 
241     // Try reading from |d0|; should fail (nothing to read and other end
242     // closed).
243     buffer[0] = 0;
244     buffer_size = kBufferSize;
245     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
246               d0->ReadMessage(buffer, &buffer_size,
247                               0, NULL,
248                               MOJO_READ_MESSAGE_FLAG_NONE));
249 
250     // Try writing to |d0|; should fail (other end closed).
251     buffer[0] = 345678901;
252     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
253               d0->WriteMessage(buffer, kBufferSize,
254                                NULL,
255                                MOJO_WRITE_MESSAGE_FLAG_NONE));
256 
257     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
258   }
259 }
260 
TEST(MessagePipeDispatcherTest,BasicThreaded)261 TEST(MessagePipeDispatcherTest, BasicThreaded) {
262   test::Stopwatch stopwatch;
263   int32_t buffer[1];
264   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
265   uint32_t buffer_size;
266   base::TimeDelta elapsed;
267   bool did_wait;
268   MojoResult result;
269   uint32_t context;
270 
271   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
272   for (unsigned i = 0; i < 2; i++) {
273     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
274         MessagePipeDispatcher::kDefaultCreateOptions));
275     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
276         MessagePipeDispatcher::kDefaultCreateOptions));
277     {
278       scoped_refptr<MessagePipe> mp(new MessagePipe());
279       d0->Init(mp, i);  // 0, 1.
280       d1->Init(mp, i ^ 1);  // 1, 0.
281     }
282 
283     // Wait for readable on |d1|, which will become readable after some time.
284     {
285       test::WaiterThread thread(d1,
286                                MOJO_HANDLE_SIGNAL_READABLE,
287                                MOJO_DEADLINE_INDEFINITE,
288                                1,
289                                &did_wait, &result, &context);
290       stopwatch.Start();
291       thread.Start();
292       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
293       // Wake it up by writing to |d0|.
294       buffer[0] = 123456789;
295       EXPECT_EQ(MOJO_RESULT_OK,
296                 d0->WriteMessage(buffer, kBufferSize,
297                                  NULL,
298                                  MOJO_WRITE_MESSAGE_FLAG_NONE));
299     }  // Joins the thread.
300     elapsed = stopwatch.Elapsed();
301     EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
302     EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
303     EXPECT_TRUE(did_wait);
304     EXPECT_EQ(MOJO_RESULT_OK, result);
305     EXPECT_EQ(1u, context);
306 
307     // Now |d1| is already readable. Try waiting for it again.
308     {
309       test::WaiterThread thread(d1,
310                                 MOJO_HANDLE_SIGNAL_READABLE,
311                                 MOJO_DEADLINE_INDEFINITE,
312                                 2,
313                                 &did_wait, &result, &context);
314       stopwatch.Start();
315       thread.Start();
316     }  // Joins the thread.
317     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
318     EXPECT_FALSE(did_wait);
319     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
320 
321     // Consume what we wrote to |d0|.
322     buffer[0] = 0;
323     buffer_size = kBufferSize;
324     EXPECT_EQ(MOJO_RESULT_OK,
325               d1->ReadMessage(buffer, &buffer_size,
326                               0, NULL,
327                               MOJO_READ_MESSAGE_FLAG_NONE));
328     EXPECT_EQ(kBufferSize, buffer_size);
329     EXPECT_EQ(123456789, buffer[0]);
330 
331     // Wait for readable on |d1| and close |d0| after some time, which should
332     // cancel that wait.
333     {
334       test::WaiterThread thread(d1,
335                                 MOJO_HANDLE_SIGNAL_READABLE,
336                                 MOJO_DEADLINE_INDEFINITE,
337                                 3,
338                                 &did_wait, &result, &context);
339       stopwatch.Start();
340       thread.Start();
341       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
342       EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
343     }  // Joins the thread.
344     elapsed = stopwatch.Elapsed();
345     EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
346     EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
347     EXPECT_TRUE(did_wait);
348     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
349     EXPECT_EQ(3u, context);
350 
351     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
352   }
353 
354   for (unsigned i = 0; i < 2; i++) {
355     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
356         MessagePipeDispatcher::kDefaultCreateOptions));
357     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
358         MessagePipeDispatcher::kDefaultCreateOptions));
359     {
360       scoped_refptr<MessagePipe> mp(new MessagePipe());
361       d0->Init(mp, i);  // 0, 1.
362       d1->Init(mp, i ^ 1);  // 1, 0.
363     }
364 
365     // Wait for readable on |d1| and close |d1| after some time, which should
366     // cancel that wait.
367     {
368       test::WaiterThread thread(d1,
369                                 MOJO_HANDLE_SIGNAL_READABLE,
370                                 MOJO_DEADLINE_INDEFINITE,
371                                 4,
372                                 &did_wait, &result, &context);
373       stopwatch.Start();
374       thread.Start();
375       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
376       EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
377     }  // Joins the thread.
378     elapsed = stopwatch.Elapsed();
379     EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
380     EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
381     EXPECT_TRUE(did_wait);
382     EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
383     EXPECT_EQ(4u, context);
384 
385     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
386   }
387 }
388 
389 // Stress test -----------------------------------------------------------------
390 
391 const size_t kMaxMessageSize = 2000;
392 
393 class WriterThread : public base::SimpleThread {
394  public:
395   // |*messages_written| and |*bytes_written| belong to the thread while it's
396   // alive.
WriterThread(scoped_refptr<Dispatcher> write_dispatcher,size_t * messages_written,size_t * bytes_written)397   WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
398                size_t* messages_written, size_t* bytes_written)
399       : base::SimpleThread("writer_thread"),
400         write_dispatcher_(write_dispatcher),
401         messages_written_(messages_written),
402         bytes_written_(bytes_written) {
403     *messages_written_ = 0;
404     *bytes_written_ = 0;
405   }
406 
~WriterThread()407   virtual ~WriterThread() {
408     Join();
409   }
410 
411  private:
Run()412   virtual void Run() OVERRIDE {
413     // Make some data to write.
414     unsigned char buffer[kMaxMessageSize];
415     for (size_t i = 0; i < kMaxMessageSize; i++)
416       buffer[i] = static_cast<unsigned char>(i);
417 
418     // Number of messages to write.
419     *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
420 
421     // Write messages.
422     for (size_t i = 0; i < *messages_written_; i++) {
423       uint32_t bytes_to_write = static_cast<uint32_t>(
424           base::RandInt(1, static_cast<int>(kMaxMessageSize)));
425       EXPECT_EQ(MOJO_RESULT_OK,
426                 write_dispatcher_->WriteMessage(buffer, bytes_to_write,
427                                                 NULL,
428                                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
429       *bytes_written_ += bytes_to_write;
430     }
431 
432     // Write one last "quit" message.
433     EXPECT_EQ(MOJO_RESULT_OK,
434               write_dispatcher_->WriteMessage("quit", 4,
435                                               NULL,
436                                               MOJO_WRITE_MESSAGE_FLAG_NONE));
437   }
438 
439   const scoped_refptr<Dispatcher> write_dispatcher_;
440   size_t* const messages_written_;
441   size_t* const bytes_written_;
442 
443   DISALLOW_COPY_AND_ASSIGN(WriterThread);
444 };
445 
446 class ReaderThread : public base::SimpleThread {
447  public:
448   // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,size_t * messages_read,size_t * bytes_read)449   ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
450                size_t* messages_read, size_t* bytes_read)
451       : base::SimpleThread("reader_thread"),
452         read_dispatcher_(read_dispatcher),
453         messages_read_(messages_read),
454         bytes_read_(bytes_read) {
455     *messages_read_ = 0;
456     *bytes_read_ = 0;
457   }
458 
~ReaderThread()459   virtual ~ReaderThread() {
460     Join();
461   }
462 
463  private:
Run()464   virtual void Run() OVERRIDE {
465     unsigned char buffer[kMaxMessageSize];
466     MojoResult result;
467     Waiter w;
468 
469     // Read messages.
470     for (;;) {
471       // Wait for it to be readable.
472       w.Init();
473       result = read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0);
474       EXPECT_TRUE(result == MOJO_RESULT_OK ||
475                   result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result;
476       if (result == MOJO_RESULT_OK) {
477         // Actually need to wait.
478         EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, NULL));
479         read_dispatcher_->RemoveWaiter(&w);
480       }
481 
482       // Now, try to do the read.
483       // Clear the buffer so that we can check the result.
484       memset(buffer, 0, sizeof(buffer));
485       uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
486       result = read_dispatcher_->ReadMessage(buffer, &buffer_size,
487                                              0, NULL,
488                                              MOJO_READ_MESSAGE_FLAG_NONE);
489       EXPECT_TRUE(result == MOJO_RESULT_OK ||
490                   result == MOJO_RESULT_SHOULD_WAIT) << "result: " << result;
491       // We're racing with others to read, so maybe we failed.
492       if (result == MOJO_RESULT_SHOULD_WAIT)
493         continue;  // In which case, try again.
494       // Check for quit.
495       if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
496         return;
497       EXPECT_GE(buffer_size, 1u);
498       EXPECT_LE(buffer_size, kMaxMessageSize);
499       EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
500 
501       (*messages_read_)++;
502       *bytes_read_ += buffer_size;
503     }
504   }
505 
IsValidMessage(const unsigned char * buffer,uint32_t message_size)506   static bool IsValidMessage(const unsigned char* buffer,
507                              uint32_t message_size) {
508     size_t i;
509     for (i = 0; i < message_size; i++) {
510       if (buffer[i] != static_cast<unsigned char>(i))
511         return false;
512     }
513     // Check that the remaining bytes weren't stomped on.
514     for (; i < kMaxMessageSize; i++) {
515       if (buffer[i] != 0)
516         return false;
517     }
518     return true;
519   }
520 
521   const scoped_refptr<Dispatcher> read_dispatcher_;
522   size_t* const messages_read_;
523   size_t* const bytes_read_;
524 
525   DISALLOW_COPY_AND_ASSIGN(ReaderThread);
526 };
527 
TEST(MessagePipeDispatcherTest,Stress)528 TEST(MessagePipeDispatcherTest, Stress) {
529   static const size_t kNumWriters = 30;
530   static const size_t kNumReaders = kNumWriters;
531 
532   scoped_refptr<MessagePipeDispatcher> d_write(new MessagePipeDispatcher(
533         MessagePipeDispatcher::kDefaultCreateOptions));
534   scoped_refptr<MessagePipeDispatcher> d_read(new MessagePipeDispatcher(
535         MessagePipeDispatcher::kDefaultCreateOptions));
536   {
537     scoped_refptr<MessagePipe> mp(new MessagePipe());
538     d_write->Init(mp, 0);
539     d_read->Init(mp, 1);
540   }
541 
542   size_t messages_written[kNumWriters];
543   size_t bytes_written[kNumWriters];
544   size_t messages_read[kNumReaders];
545   size_t bytes_read[kNumReaders];
546   {
547     // Make writers.
548     ScopedVector<WriterThread> writers;
549     for (size_t i = 0; i < kNumWriters; i++) {
550       writers.push_back(
551           new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
552     }
553 
554     // Make readers.
555     ScopedVector<ReaderThread> readers;
556     for (size_t i = 0; i < kNumReaders; i++) {
557       readers.push_back(
558           new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
559     }
560 
561     // Start writers.
562     for (size_t i = 0; i < kNumWriters; i++)
563       writers[i]->Start();
564 
565     // Start readers.
566     for (size_t i = 0; i < kNumReaders; i++)
567       readers[i]->Start();
568 
569     // TODO(vtl): Maybe I should have an event that triggers all the threads to
570     // start doing stuff for real (so that the first ones created/started aren't
571     // advantaged).
572   }  // Joins all the threads.
573 
574   size_t total_messages_written = 0;
575   size_t total_bytes_written = 0;
576   for (size_t i = 0; i < kNumWriters; i++) {
577     total_messages_written += messages_written[i];
578     total_bytes_written += bytes_written[i];
579   }
580   size_t total_messages_read = 0;
581   size_t total_bytes_read = 0;
582   for (size_t i = 0; i < kNumReaders; i++) {
583     total_messages_read += messages_read[i];
584     total_bytes_read += bytes_read[i];
585     // We'd have to be really unlucky to have read no messages on a thread.
586     EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
587     EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
588   }
589   EXPECT_EQ(total_messages_written, total_messages_read);
590   EXPECT_EQ(total_bytes_written, total_bytes_read);
591 
592   EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
593   EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
594 }
595 
596 }  // namespace
597 }  // namespace system
598 }  // namespace mojo
599