1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to
7 // increase tolerance and reduce observed flakiness (though doing so reduces the
8 // meaningfulness of the test).
9
10 #include "mojo/system/message_pipe_dispatcher.h"
11
12 #include <string.h>
13
14 #include <limits>
15
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/scoped_vector.h"
18 #include "base/rand_util.h"
19 #include "base/threading/platform_thread.h" // For |Sleep()|.
20 #include "base/threading/simple_thread.h"
21 #include "base/time/time.h"
22 #include "mojo/system/message_pipe.h"
23 #include "mojo/system/test_utils.h"
24 #include "mojo/system/waiter.h"
25 #include "mojo/system/waiter_test_utils.h"
26 #include "testing/gtest/include/gtest/gtest.h"
27
28 namespace mojo {
29 namespace system {
30 namespace {
31
TEST(MessagePipeDispatcherTest,Basic)32 TEST(MessagePipeDispatcherTest, Basic) {
33 test::Stopwatch stopwatch;
34 int32_t buffer[1];
35 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
36 uint32_t buffer_size;
37
38 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
39 for (unsigned i = 0; i < 2; i++) {
40 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
41 MessagePipeDispatcher::kDefaultCreateOptions));
42 EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType());
43 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
44 MessagePipeDispatcher::kDefaultCreateOptions));
45 {
46 scoped_refptr<MessagePipe> mp(new MessagePipe());
47 d0->Init(mp, i); // 0, 1.
48 d1->Init(mp, i ^ 1); // 1, 0.
49 }
50 Waiter w;
51 uint32_t context = 0;
52
53 // Try adding a writable waiter when already writable.
54 w.Init();
55 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
56 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0));
57 // Shouldn't need to remove the waiter (it was not added).
58
59 // Add a readable waiter to |d0|, then make it readable (by writing to
60 // |d1|), then wait.
61 w.Init();
62 EXPECT_EQ(MOJO_RESULT_OK,
63 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1));
64 buffer[0] = 123456789;
65 EXPECT_EQ(MOJO_RESULT_OK,
66 d1->WriteMessage(buffer, kBufferSize,
67 NULL,
68 MOJO_WRITE_MESSAGE_FLAG_NONE));
69 stopwatch.Start();
70 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context));
71 EXPECT_EQ(1u, context);
72 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
73 d0->RemoveWaiter(&w);
74
75 // Try adding a readable waiter when already readable (from above).
76 w.Init();
77 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
78 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2));
79 // Shouldn't need to remove the waiter (it was not added).
80
81 // Make |d0| no longer readable (by reading from it).
82 buffer[0] = 0;
83 buffer_size = kBufferSize;
84 EXPECT_EQ(MOJO_RESULT_OK,
85 d0->ReadMessage(buffer, &buffer_size,
86 0, NULL,
87 MOJO_READ_MESSAGE_FLAG_NONE));
88 EXPECT_EQ(kBufferSize, buffer_size);
89 EXPECT_EQ(123456789, buffer[0]);
90
91 // Wait for zero time for readability on |d0| (will time out).
92 w.Init();
93 EXPECT_EQ(MOJO_RESULT_OK,
94 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3));
95 stopwatch.Start();
96 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, NULL));
97 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
98 d0->RemoveWaiter(&w);
99
100 // Wait for non-zero, finite time for readability on |d0| (will time out).
101 w.Init();
102 EXPECT_EQ(MOJO_RESULT_OK,
103 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3));
104 stopwatch.Start();
105 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
106 w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), NULL));
107 base::TimeDelta elapsed = stopwatch.Elapsed();
108 EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
109 EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
110 d0->RemoveWaiter(&w);
111
112 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
113 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
114 }
115 }
116
TEST(MessagePipeDispatcherTest,InvalidParams)117 TEST(MessagePipeDispatcherTest, InvalidParams) {
118 char buffer[1];
119
120 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
121 MessagePipeDispatcher::kDefaultCreateOptions));
122 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
123 MessagePipeDispatcher::kDefaultCreateOptions));
124 {
125 scoped_refptr<MessagePipe> mp(new MessagePipe());
126 d0->Init(mp, 0);
127 d1->Init(mp, 1);
128 }
129
130 // |WriteMessage|:
131 // Null buffer with nonzero buffer size.
132 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
133 d0->WriteMessage(NULL, 1,
134 NULL,
135 MOJO_WRITE_MESSAGE_FLAG_NONE));
136 // Huge buffer size.
137 EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
138 d0->WriteMessage(buffer, std::numeric_limits<uint32_t>::max(),
139 NULL,
140 MOJO_WRITE_MESSAGE_FLAG_NONE));
141
142 // |ReadMessage|:
143 // Null buffer with nonzero buffer size.
144 uint32_t buffer_size = 1;
145 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
146 d0->ReadMessage(NULL, &buffer_size,
147 0, NULL,
148 MOJO_READ_MESSAGE_FLAG_NONE));
149
150 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
151 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
152 }
153
154 // Test what happens when one end is closed (single-threaded test).
TEST(MessagePipeDispatcherTest,BasicClosed)155 TEST(MessagePipeDispatcherTest, BasicClosed) {
156 int32_t buffer[1];
157 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
158 uint32_t buffer_size;
159
160 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
161 for (unsigned i = 0; i < 2; i++) {
162 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
163 MessagePipeDispatcher::kDefaultCreateOptions));
164 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
165 MessagePipeDispatcher::kDefaultCreateOptions));
166 {
167 scoped_refptr<MessagePipe> mp(new MessagePipe());
168 d0->Init(mp, i); // 0, 1.
169 d1->Init(mp, i ^ 1); // 1, 0.
170 }
171 Waiter w;
172
173 // Write (twice) to |d1|.
174 buffer[0] = 123456789;
175 EXPECT_EQ(MOJO_RESULT_OK,
176 d1->WriteMessage(buffer, kBufferSize,
177 NULL,
178 MOJO_WRITE_MESSAGE_FLAG_NONE));
179 buffer[0] = 234567890;
180 EXPECT_EQ(MOJO_RESULT_OK,
181 d1->WriteMessage(buffer, kBufferSize,
182 NULL,
183 MOJO_WRITE_MESSAGE_FLAG_NONE));
184
185 // Try waiting for readable on |d0|; should fail (already satisfied).
186 w.Init();
187 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
188 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0));
189
190 // Try reading from |d1|; should fail (nothing to read).
191 buffer[0] = 0;
192 buffer_size = kBufferSize;
193 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
194 d1->ReadMessage(buffer, &buffer_size,
195 0, NULL,
196 MOJO_READ_MESSAGE_FLAG_NONE));
197
198 // Close |d1|.
199 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
200
201 // Try waiting for readable on |d0|; should fail (already satisfied).
202 w.Init();
203 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
204 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1));
205
206 // Read from |d0|.
207 buffer[0] = 0;
208 buffer_size = kBufferSize;
209 EXPECT_EQ(MOJO_RESULT_OK,
210 d0->ReadMessage(buffer, &buffer_size,
211 0, NULL,
212 MOJO_READ_MESSAGE_FLAG_NONE));
213 EXPECT_EQ(kBufferSize, buffer_size);
214 EXPECT_EQ(123456789, buffer[0]);
215
216 // Try waiting for readable on |d0|; should fail (already satisfied).
217 w.Init();
218 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
219 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2));
220
221 // Read again from |d0|.
222 buffer[0] = 0;
223 buffer_size = kBufferSize;
224 EXPECT_EQ(MOJO_RESULT_OK,
225 d0->ReadMessage(buffer, &buffer_size,
226 0, NULL,
227 MOJO_READ_MESSAGE_FLAG_NONE));
228 EXPECT_EQ(kBufferSize, buffer_size);
229 EXPECT_EQ(234567890, buffer[0]);
230
231 // Try waiting for readable on |d0|; should fail (unsatisfiable).
232 w.Init();
233 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
234 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3));
235
236 // Try waiting for writable on |d0|; should fail (unsatisfiable).
237 w.Init();
238 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
239 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4));
240
241 // Try reading from |d0|; should fail (nothing to read and other end
242 // closed).
243 buffer[0] = 0;
244 buffer_size = kBufferSize;
245 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
246 d0->ReadMessage(buffer, &buffer_size,
247 0, NULL,
248 MOJO_READ_MESSAGE_FLAG_NONE));
249
250 // Try writing to |d0|; should fail (other end closed).
251 buffer[0] = 345678901;
252 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
253 d0->WriteMessage(buffer, kBufferSize,
254 NULL,
255 MOJO_WRITE_MESSAGE_FLAG_NONE));
256
257 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
258 }
259 }
260
TEST(MessagePipeDispatcherTest,BasicThreaded)261 TEST(MessagePipeDispatcherTest, BasicThreaded) {
262 test::Stopwatch stopwatch;
263 int32_t buffer[1];
264 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
265 uint32_t buffer_size;
266 base::TimeDelta elapsed;
267 bool did_wait;
268 MojoResult result;
269 uint32_t context;
270
271 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
272 for (unsigned i = 0; i < 2; i++) {
273 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
274 MessagePipeDispatcher::kDefaultCreateOptions));
275 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
276 MessagePipeDispatcher::kDefaultCreateOptions));
277 {
278 scoped_refptr<MessagePipe> mp(new MessagePipe());
279 d0->Init(mp, i); // 0, 1.
280 d1->Init(mp, i ^ 1); // 1, 0.
281 }
282
283 // Wait for readable on |d1|, which will become readable after some time.
284 {
285 test::WaiterThread thread(d1,
286 MOJO_HANDLE_SIGNAL_READABLE,
287 MOJO_DEADLINE_INDEFINITE,
288 1,
289 &did_wait, &result, &context);
290 stopwatch.Start();
291 thread.Start();
292 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
293 // Wake it up by writing to |d0|.
294 buffer[0] = 123456789;
295 EXPECT_EQ(MOJO_RESULT_OK,
296 d0->WriteMessage(buffer, kBufferSize,
297 NULL,
298 MOJO_WRITE_MESSAGE_FLAG_NONE));
299 } // Joins the thread.
300 elapsed = stopwatch.Elapsed();
301 EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
302 EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
303 EXPECT_TRUE(did_wait);
304 EXPECT_EQ(MOJO_RESULT_OK, result);
305 EXPECT_EQ(1u, context);
306
307 // Now |d1| is already readable. Try waiting for it again.
308 {
309 test::WaiterThread thread(d1,
310 MOJO_HANDLE_SIGNAL_READABLE,
311 MOJO_DEADLINE_INDEFINITE,
312 2,
313 &did_wait, &result, &context);
314 stopwatch.Start();
315 thread.Start();
316 } // Joins the thread.
317 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
318 EXPECT_FALSE(did_wait);
319 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
320
321 // Consume what we wrote to |d0|.
322 buffer[0] = 0;
323 buffer_size = kBufferSize;
324 EXPECT_EQ(MOJO_RESULT_OK,
325 d1->ReadMessage(buffer, &buffer_size,
326 0, NULL,
327 MOJO_READ_MESSAGE_FLAG_NONE));
328 EXPECT_EQ(kBufferSize, buffer_size);
329 EXPECT_EQ(123456789, buffer[0]);
330
331 // Wait for readable on |d1| and close |d0| after some time, which should
332 // cancel that wait.
333 {
334 test::WaiterThread thread(d1,
335 MOJO_HANDLE_SIGNAL_READABLE,
336 MOJO_DEADLINE_INDEFINITE,
337 3,
338 &did_wait, &result, &context);
339 stopwatch.Start();
340 thread.Start();
341 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
342 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
343 } // Joins the thread.
344 elapsed = stopwatch.Elapsed();
345 EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
346 EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
347 EXPECT_TRUE(did_wait);
348 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
349 EXPECT_EQ(3u, context);
350
351 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
352 }
353
354 for (unsigned i = 0; i < 2; i++) {
355 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
356 MessagePipeDispatcher::kDefaultCreateOptions));
357 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
358 MessagePipeDispatcher::kDefaultCreateOptions));
359 {
360 scoped_refptr<MessagePipe> mp(new MessagePipe());
361 d0->Init(mp, i); // 0, 1.
362 d1->Init(mp, i ^ 1); // 1, 0.
363 }
364
365 // Wait for readable on |d1| and close |d1| after some time, which should
366 // cancel that wait.
367 {
368 test::WaiterThread thread(d1,
369 MOJO_HANDLE_SIGNAL_READABLE,
370 MOJO_DEADLINE_INDEFINITE,
371 4,
372 &did_wait, &result, &context);
373 stopwatch.Start();
374 thread.Start();
375 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
376 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
377 } // Joins the thread.
378 elapsed = stopwatch.Elapsed();
379 EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
380 EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
381 EXPECT_TRUE(did_wait);
382 EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
383 EXPECT_EQ(4u, context);
384
385 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
386 }
387 }
388
389 // Stress test -----------------------------------------------------------------
390
391 const size_t kMaxMessageSize = 2000;
392
393 class WriterThread : public base::SimpleThread {
394 public:
395 // |*messages_written| and |*bytes_written| belong to the thread while it's
396 // alive.
WriterThread(scoped_refptr<Dispatcher> write_dispatcher,size_t * messages_written,size_t * bytes_written)397 WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
398 size_t* messages_written, size_t* bytes_written)
399 : base::SimpleThread("writer_thread"),
400 write_dispatcher_(write_dispatcher),
401 messages_written_(messages_written),
402 bytes_written_(bytes_written) {
403 *messages_written_ = 0;
404 *bytes_written_ = 0;
405 }
406
~WriterThread()407 virtual ~WriterThread() {
408 Join();
409 }
410
411 private:
Run()412 virtual void Run() OVERRIDE {
413 // Make some data to write.
414 unsigned char buffer[kMaxMessageSize];
415 for (size_t i = 0; i < kMaxMessageSize; i++)
416 buffer[i] = static_cast<unsigned char>(i);
417
418 // Number of messages to write.
419 *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
420
421 // Write messages.
422 for (size_t i = 0; i < *messages_written_; i++) {
423 uint32_t bytes_to_write = static_cast<uint32_t>(
424 base::RandInt(1, static_cast<int>(kMaxMessageSize)));
425 EXPECT_EQ(MOJO_RESULT_OK,
426 write_dispatcher_->WriteMessage(buffer, bytes_to_write,
427 NULL,
428 MOJO_WRITE_MESSAGE_FLAG_NONE));
429 *bytes_written_ += bytes_to_write;
430 }
431
432 // Write one last "quit" message.
433 EXPECT_EQ(MOJO_RESULT_OK,
434 write_dispatcher_->WriteMessage("quit", 4,
435 NULL,
436 MOJO_WRITE_MESSAGE_FLAG_NONE));
437 }
438
439 const scoped_refptr<Dispatcher> write_dispatcher_;
440 size_t* const messages_written_;
441 size_t* const bytes_written_;
442
443 DISALLOW_COPY_AND_ASSIGN(WriterThread);
444 };
445
446 class ReaderThread : public base::SimpleThread {
447 public:
448 // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,size_t * messages_read,size_t * bytes_read)449 ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
450 size_t* messages_read, size_t* bytes_read)
451 : base::SimpleThread("reader_thread"),
452 read_dispatcher_(read_dispatcher),
453 messages_read_(messages_read),
454 bytes_read_(bytes_read) {
455 *messages_read_ = 0;
456 *bytes_read_ = 0;
457 }
458
~ReaderThread()459 virtual ~ReaderThread() {
460 Join();
461 }
462
463 private:
Run()464 virtual void Run() OVERRIDE {
465 unsigned char buffer[kMaxMessageSize];
466 MojoResult result;
467 Waiter w;
468
469 // Read messages.
470 for (;;) {
471 // Wait for it to be readable.
472 w.Init();
473 result = read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0);
474 EXPECT_TRUE(result == MOJO_RESULT_OK ||
475 result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result;
476 if (result == MOJO_RESULT_OK) {
477 // Actually need to wait.
478 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, NULL));
479 read_dispatcher_->RemoveWaiter(&w);
480 }
481
482 // Now, try to do the read.
483 // Clear the buffer so that we can check the result.
484 memset(buffer, 0, sizeof(buffer));
485 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
486 result = read_dispatcher_->ReadMessage(buffer, &buffer_size,
487 0, NULL,
488 MOJO_READ_MESSAGE_FLAG_NONE);
489 EXPECT_TRUE(result == MOJO_RESULT_OK ||
490 result == MOJO_RESULT_SHOULD_WAIT) << "result: " << result;
491 // We're racing with others to read, so maybe we failed.
492 if (result == MOJO_RESULT_SHOULD_WAIT)
493 continue; // In which case, try again.
494 // Check for quit.
495 if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
496 return;
497 EXPECT_GE(buffer_size, 1u);
498 EXPECT_LE(buffer_size, kMaxMessageSize);
499 EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
500
501 (*messages_read_)++;
502 *bytes_read_ += buffer_size;
503 }
504 }
505
IsValidMessage(const unsigned char * buffer,uint32_t message_size)506 static bool IsValidMessage(const unsigned char* buffer,
507 uint32_t message_size) {
508 size_t i;
509 for (i = 0; i < message_size; i++) {
510 if (buffer[i] != static_cast<unsigned char>(i))
511 return false;
512 }
513 // Check that the remaining bytes weren't stomped on.
514 for (; i < kMaxMessageSize; i++) {
515 if (buffer[i] != 0)
516 return false;
517 }
518 return true;
519 }
520
521 const scoped_refptr<Dispatcher> read_dispatcher_;
522 size_t* const messages_read_;
523 size_t* const bytes_read_;
524
525 DISALLOW_COPY_AND_ASSIGN(ReaderThread);
526 };
527
TEST(MessagePipeDispatcherTest,Stress)528 TEST(MessagePipeDispatcherTest, Stress) {
529 static const size_t kNumWriters = 30;
530 static const size_t kNumReaders = kNumWriters;
531
532 scoped_refptr<MessagePipeDispatcher> d_write(new MessagePipeDispatcher(
533 MessagePipeDispatcher::kDefaultCreateOptions));
534 scoped_refptr<MessagePipeDispatcher> d_read(new MessagePipeDispatcher(
535 MessagePipeDispatcher::kDefaultCreateOptions));
536 {
537 scoped_refptr<MessagePipe> mp(new MessagePipe());
538 d_write->Init(mp, 0);
539 d_read->Init(mp, 1);
540 }
541
542 size_t messages_written[kNumWriters];
543 size_t bytes_written[kNumWriters];
544 size_t messages_read[kNumReaders];
545 size_t bytes_read[kNumReaders];
546 {
547 // Make writers.
548 ScopedVector<WriterThread> writers;
549 for (size_t i = 0; i < kNumWriters; i++) {
550 writers.push_back(
551 new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
552 }
553
554 // Make readers.
555 ScopedVector<ReaderThread> readers;
556 for (size_t i = 0; i < kNumReaders; i++) {
557 readers.push_back(
558 new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
559 }
560
561 // Start writers.
562 for (size_t i = 0; i < kNumWriters; i++)
563 writers[i]->Start();
564
565 // Start readers.
566 for (size_t i = 0; i < kNumReaders; i++)
567 readers[i]->Start();
568
569 // TODO(vtl): Maybe I should have an event that triggers all the threads to
570 // start doing stuff for real (so that the first ones created/started aren't
571 // advantaged).
572 } // Joins all the threads.
573
574 size_t total_messages_written = 0;
575 size_t total_bytes_written = 0;
576 for (size_t i = 0; i < kNumWriters; i++) {
577 total_messages_written += messages_written[i];
578 total_bytes_written += bytes_written[i];
579 }
580 size_t total_messages_read = 0;
581 size_t total_bytes_read = 0;
582 for (size_t i = 0; i < kNumReaders; i++) {
583 total_messages_read += messages_read[i];
584 total_bytes_read += bytes_read[i];
585 // We'd have to be really unlucky to have read no messages on a thread.
586 EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
587 EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
588 }
589 EXPECT_EQ(total_messages_written, total_messages_read);
590 EXPECT_EQ(total_bytes_written, total_bytes_read);
591
592 EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
593 EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
594 }
595
596 } // namespace
597 } // namespace system
598 } // namespace mojo
599