• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <stdint.h>
6 #include <stdio.h>
7 #include <string.h>
8 
9 #include <string>
10 #include <vector>
11 
12 #include "base/bind.h"
13 #include "base/file_util.h"
14 #include "base/files/file_path.h"
15 #include "base/files/scoped_file.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/macros.h"
19 #include "base/threading/platform_thread.h"  // For |Sleep()|.
20 #include "build/build_config.h"  // TODO(vtl): Remove this.
21 #include "mojo/common/test/multiprocess_test_helper.h"
22 #include "mojo/common/test/test_utils.h"
23 #include "mojo/embedder/scoped_platform_handle.h"
24 #include "mojo/system/channel.h"
25 #include "mojo/system/dispatcher.h"
26 #include "mojo/system/local_message_pipe_endpoint.h"
27 #include "mojo/system/message_pipe.h"
28 #include "mojo/system/platform_handle_dispatcher.h"
29 #include "mojo/system/proxy_message_pipe_endpoint.h"
30 #include "mojo/system/raw_channel.h"
31 #include "mojo/system/raw_shared_buffer.h"
32 #include "mojo/system/shared_buffer_dispatcher.h"
33 #include "mojo/system/test_utils.h"
34 #include "mojo/system/waiter.h"
35 #include "testing/gtest/include/gtest/gtest.h"
36 
37 namespace mojo {
38 namespace system {
39 namespace {
40 
41 class ChannelThread {
42  public:
ChannelThread()43   ChannelThread() : test_io_thread_(test::TestIOThread::kManualStart) {}
~ChannelThread()44   ~ChannelThread() {
45     Stop();
46   }
47 
Start(embedder::ScopedPlatformHandle platform_handle,scoped_refptr<MessagePipe> message_pipe)48   void Start(embedder::ScopedPlatformHandle platform_handle,
49              scoped_refptr<MessagePipe> message_pipe) {
50     test_io_thread_.Start();
51     test_io_thread_.PostTaskAndWait(
52         FROM_HERE,
53         base::Bind(&ChannelThread::InitChannelOnIOThread,
54                    base::Unretained(this), base::Passed(&platform_handle),
55                    message_pipe));
56   }
57 
Stop()58   void Stop() {
59     if (channel_) {
60       // Hack to flush write buffers before quitting.
61       // TODO(vtl): Remove this once |Channel| has a
62       // |FlushWriteBufferAndShutdown()| (or whatever).
63       while (!channel_->IsWriteBufferEmpty())
64         base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20));
65 
66       test_io_thread_.PostTaskAndWait(
67           FROM_HERE,
68           base::Bind(&ChannelThread::ShutdownChannelOnIOThread,
69                      base::Unretained(this)));
70     }
71     test_io_thread_.Stop();
72   }
73 
74  private:
InitChannelOnIOThread(embedder::ScopedPlatformHandle platform_handle,scoped_refptr<MessagePipe> message_pipe)75   void InitChannelOnIOThread(embedder::ScopedPlatformHandle platform_handle,
76                              scoped_refptr<MessagePipe> message_pipe) {
77     CHECK_EQ(base::MessageLoop::current(), test_io_thread_.message_loop());
78     CHECK(platform_handle.is_valid());
79 
80     // Create and initialize |Channel|.
81     channel_ = new Channel();
82     CHECK(channel_->Init(RawChannel::Create(platform_handle.Pass())));
83 
84     // Attach the message pipe endpoint.
85     // Note: On the "server" (parent process) side, we need not attach the
86     // message pipe endpoint immediately. However, on the "client" (child
87     // process) side, this *must* be done here -- otherwise, the |Channel| may
88     // receive/process messages (which it can do as soon as it's hooked up to
89     // the IO thread message loop, and that message loop runs) before the
90     // message pipe endpoint is attached.
91     CHECK_EQ(channel_->AttachMessagePipeEndpoint(message_pipe, 1),
92              Channel::kBootstrapEndpointId);
93     CHECK(channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId,
94                                            Channel::kBootstrapEndpointId));
95   }
96 
ShutdownChannelOnIOThread()97   void ShutdownChannelOnIOThread() {
98     CHECK(channel_);
99     channel_->Shutdown();
100     channel_ = NULL;
101   }
102 
103   test::TestIOThread test_io_thread_;
104   scoped_refptr<Channel> channel_;
105 
106   DISALLOW_COPY_AND_ASSIGN(ChannelThread);
107 };
108 
109 class MultiprocessMessagePipeTest : public testing::Test {
110  public:
MultiprocessMessagePipeTest()111   MultiprocessMessagePipeTest() {}
~MultiprocessMessagePipeTest()112   virtual ~MultiprocessMessagePipeTest() {}
113 
114  protected:
Init(scoped_refptr<MessagePipe> mp)115   void Init(scoped_refptr<MessagePipe> mp) {
116     channel_thread_.Start(helper_.server_platform_handle.Pass(), mp);
117   }
118 
helper()119   mojo::test::MultiprocessTestHelper* helper() { return &helper_; }
120 
121  private:
122   ChannelThread channel_thread_;
123   mojo::test::MultiprocessTestHelper helper_;
124 
125   DISALLOW_COPY_AND_ASSIGN(MultiprocessMessagePipeTest);
126 };
127 
WaitIfNecessary(scoped_refptr<MessagePipe> mp,MojoHandleSignals signals)128 MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp,
129                            MojoHandleSignals signals) {
130   Waiter waiter;
131   waiter.Init();
132 
133   MojoResult add_result = mp->AddWaiter(0, &waiter, signals, 0);
134   if (add_result != MOJO_RESULT_OK) {
135     return (add_result == MOJO_RESULT_ALREADY_EXISTS) ? MOJO_RESULT_OK :
136                                                         add_result;
137   }
138 
139   MojoResult wait_result = waiter.Wait(MOJO_DEADLINE_INDEFINITE, NULL);
140   mp->RemoveWaiter(0, &waiter);
141   return wait_result;
142 }
143 
144 // For each message received, sends a reply message with the same contents
145 // repeated twice, until the other end is closed or it receives "quitquitquit"
146 // (which it doesn't reply to). It'll return the number of messages received,
147 // not including any "quitquitquit" message, modulo 100.
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho)148 MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) {
149   ChannelThread channel_thread;
150   embedder::ScopedPlatformHandle client_platform_handle =
151       mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
152   CHECK(client_platform_handle.is_valid());
153   scoped_refptr<MessagePipe> mp(new MessagePipe(
154       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
155       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
156   channel_thread.Start(client_platform_handle.Pass(), mp);
157 
158   const std::string quitquitquit("quitquitquit");
159   int rv = 0;
160   for (;; rv = (rv + 1) % 100) {
161     // Wait for our end of the message pipe to be readable.
162     MojoResult result = WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE);
163     if (result != MOJO_RESULT_OK) {
164       // It was closed, probably.
165       CHECK_EQ(result, MOJO_RESULT_FAILED_PRECONDITION);
166       break;
167     }
168 
169     std::string read_buffer(1000, '\0');
170     uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
171     CHECK_EQ(mp->ReadMessage(0,
172                              &read_buffer[0], &read_buffer_size,
173                              NULL, NULL,
174                              MOJO_READ_MESSAGE_FLAG_NONE),
175              MOJO_RESULT_OK);
176     read_buffer.resize(read_buffer_size);
177     VLOG(2) << "Child got: " << read_buffer;
178 
179     if (read_buffer == quitquitquit) {
180       VLOG(2) << "Child quitting.";
181       break;
182     }
183 
184     std::string write_buffer = read_buffer + read_buffer;
185     CHECK_EQ(mp->WriteMessage(0,
186                               write_buffer.data(),
187                               static_cast<uint32_t>(write_buffer.size()),
188                               NULL,
189                               MOJO_WRITE_MESSAGE_FLAG_NONE),
190              MOJO_RESULT_OK);
191   }
192 
193   mp->Close(0);
194   return rv;
195 }
196 
197 // Sends "hello" to child, and expects "hellohello" back.
TEST_F(MultiprocessMessagePipeTest,Basic)198 TEST_F(MultiprocessMessagePipeTest, Basic) {
199   helper()->StartChild("EchoEcho");
200 
201   scoped_refptr<MessagePipe> mp(new MessagePipe(
202       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
203       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
204   Init(mp);
205 
206   std::string hello("hello");
207   EXPECT_EQ(MOJO_RESULT_OK,
208             mp->WriteMessage(0,
209                              hello.data(), static_cast<uint32_t>(hello.size()),
210                              NULL,
211                              MOJO_WRITE_MESSAGE_FLAG_NONE));
212 
213   EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
214 
215   std::string read_buffer(1000, '\0');
216   uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
217   CHECK_EQ(mp->ReadMessage(0,
218                            &read_buffer[0], &read_buffer_size,
219                            NULL, NULL,
220                            MOJO_READ_MESSAGE_FLAG_NONE),
221            MOJO_RESULT_OK);
222   read_buffer.resize(read_buffer_size);
223   VLOG(2) << "Parent got: " << read_buffer;
224   EXPECT_EQ(hello + hello, read_buffer);
225 
226   mp->Close(0);
227 
228   // We sent one message.
229   EXPECT_EQ(1 % 100, helper()->WaitForChildShutdown());
230 }
231 
232 // Sends a bunch of messages to the child. Expects them "repeated" back. Waits
233 // for the child to close its end before quitting.
TEST_F(MultiprocessMessagePipeTest,QueueMessages)234 TEST_F(MultiprocessMessagePipeTest, QueueMessages) {
235   helper()->StartChild("EchoEcho");
236 
237   scoped_refptr<MessagePipe> mp(new MessagePipe(
238       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
239       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
240   Init(mp);
241 
242   static const size_t kNumMessages = 1001;
243   for (size_t i = 0; i < kNumMessages; i++) {
244     std::string write_buffer(i, 'A' + (i % 26));
245     EXPECT_EQ(MOJO_RESULT_OK,
246               mp->WriteMessage(0,
247                                write_buffer.data(),
248                                static_cast<uint32_t>(write_buffer.size()),
249                                NULL,
250                                MOJO_WRITE_MESSAGE_FLAG_NONE));
251   }
252 
253   const std::string quitquitquit("quitquitquit");
254   EXPECT_EQ(MOJO_RESULT_OK,
255             mp->WriteMessage(0,
256                              quitquitquit.data(),
257                              static_cast<uint32_t>(quitquitquit.size()),
258                              NULL,
259                              MOJO_WRITE_MESSAGE_FLAG_NONE));
260 
261   for (size_t i = 0; i < kNumMessages; i++) {
262     EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
263 
264     std::string read_buffer(kNumMessages * 2, '\0');
265     uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
266     CHECK_EQ(mp->ReadMessage(0,
267                              &read_buffer[0], &read_buffer_size,
268                              NULL, NULL,
269                              MOJO_READ_MESSAGE_FLAG_NONE),
270              MOJO_RESULT_OK);
271     read_buffer.resize(read_buffer_size);
272 
273     EXPECT_EQ(std::string(i * 2, 'A' + (i % 26)), read_buffer);
274   }
275 
276   // Wait for it to become readable, which should fail (since we sent
277   // "quitquitquit").
278   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
279             WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
280 
281   mp->Close(0);
282 
283   EXPECT_EQ(static_cast<int>(kNumMessages % 100),
284             helper()->WaitForChildShutdown());
285 }
286 
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckSharedBuffer)287 MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckSharedBuffer) {
288   ChannelThread channel_thread;
289   embedder::ScopedPlatformHandle client_platform_handle =
290       mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
291   CHECK(client_platform_handle.is_valid());
292   scoped_refptr<MessagePipe> mp(new MessagePipe(
293       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
294       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
295   channel_thread.Start(client_platform_handle.Pass(), mp);
296 
297   // Wait for the first message from our parent.
298   CHECK_EQ(WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE), MOJO_RESULT_OK);
299 
300   // It should have a shared buffer.
301   std::string read_buffer(100, '\0');
302   uint32_t num_bytes = static_cast<uint32_t>(read_buffer.size());
303   DispatcherVector dispatchers;
304   uint32_t num_dispatchers = 10;  // Maximum number to receive.
305   CHECK_EQ(mp->ReadMessage(0,
306                            &read_buffer[0], &num_bytes,
307                            &dispatchers, &num_dispatchers,
308                            MOJO_READ_MESSAGE_FLAG_NONE),
309            MOJO_RESULT_OK);
310   read_buffer.resize(num_bytes);
311   CHECK_EQ(read_buffer, std::string("go 1"));
312   CHECK_EQ(num_dispatchers, 1u);
313 
314   CHECK_EQ(dispatchers[0]->GetType(), Dispatcher::kTypeSharedBuffer);
315 
316   scoped_refptr<SharedBufferDispatcher> dispatcher(
317       static_cast<SharedBufferDispatcher*>(dispatchers[0].get()));
318 
319   // Make a mapping.
320   scoped_ptr<RawSharedBufferMapping> mapping;
321   CHECK_EQ(dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping),
322            MOJO_RESULT_OK);
323   CHECK(mapping);
324   CHECK(mapping->base());
325   CHECK_EQ(mapping->length(), 100u);
326 
327   // Write some stuff to the shared buffer.
328   static const char kHello[] = "hello";
329   memcpy(mapping->base(), kHello, sizeof(kHello));
330 
331   // We should be able to close the dispatcher now.
332   dispatcher->Close();
333 
334   // And send a message to signal that we've written stuff.
335   const std::string go2("go 2");
336   CHECK_EQ(mp->WriteMessage(0,
337                             &go2[0],
338                             static_cast<uint32_t>(go2.size()),
339                             NULL,
340                             MOJO_WRITE_MESSAGE_FLAG_NONE),
341            MOJO_RESULT_OK);
342 
343   // Now wait for our parent to send us a message.
344   CHECK_EQ(WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE), MOJO_RESULT_OK);
345 
346   read_buffer = std::string(100, '\0');
347   num_bytes = static_cast<uint32_t>(read_buffer.size());
348   CHECK_EQ(mp->ReadMessage(0,
349                            &read_buffer[0], &num_bytes,
350                            NULL, NULL,
351                            MOJO_READ_MESSAGE_FLAG_NONE),
352            MOJO_RESULT_OK);
353   read_buffer.resize(num_bytes);
354   CHECK_EQ(read_buffer, std::string("go 3"));
355 
356   // It should have written something to the shared buffer.
357   static const char kWorld[] = "world!!!";
358   CHECK_EQ(memcmp(mapping->base(), kWorld, sizeof(kWorld)), 0);
359 
360   // And we're done.
361   mp->Close(0);
362 
363   return 0;
364 }
365 
366 #if defined(OS_POSIX)
367 #define MAYBE_SharedBufferPassing SharedBufferPassing
368 #else
369 // Not yet implemented (on Windows).
370 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
371 #endif
TEST_F(MultiprocessMessagePipeTest,MAYBE_SharedBufferPassing)372 TEST_F(MultiprocessMessagePipeTest, MAYBE_SharedBufferPassing) {
373   helper()->StartChild("CheckSharedBuffer");
374 
375   scoped_refptr<MessagePipe> mp(new MessagePipe(
376       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
377       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
378   Init(mp);
379 
380   // Make a shared buffer.
381   scoped_refptr<SharedBufferDispatcher> dispatcher;
382   EXPECT_EQ(MOJO_RESULT_OK,
383             SharedBufferDispatcher::Create(
384                 SharedBufferDispatcher::kDefaultCreateOptions, 100,
385                 &dispatcher));
386   ASSERT_TRUE(dispatcher);
387 
388   // Make a mapping.
389   scoped_ptr<RawSharedBufferMapping> mapping;
390   EXPECT_EQ(MOJO_RESULT_OK,
391             dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping));
392   ASSERT_TRUE(mapping);
393   ASSERT_TRUE(mapping->base());
394   ASSERT_EQ(100u, mapping->length());
395 
396   // Send the shared buffer.
397   const std::string go1("go 1");
398   DispatcherTransport transport(
399       test::DispatcherTryStartTransport(dispatcher.get()));
400   ASSERT_TRUE(transport.is_valid());
401 
402   std::vector<DispatcherTransport> transports;
403   transports.push_back(transport);
404   EXPECT_EQ(MOJO_RESULT_OK,
405             mp->WriteMessage(0,
406                              &go1[0],
407                              static_cast<uint32_t>(go1.size()),
408                              &transports,
409                              MOJO_WRITE_MESSAGE_FLAG_NONE));
410   transport.End();
411 
412   EXPECT_TRUE(dispatcher->HasOneRef());
413   dispatcher = NULL;
414 
415   // Wait for a message from the child.
416   EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
417 
418   std::string read_buffer(100, '\0');
419   uint32_t num_bytes = static_cast<uint32_t>(read_buffer.size());
420   EXPECT_EQ(MOJO_RESULT_OK,
421             mp->ReadMessage(0,
422                             &read_buffer[0], &num_bytes,
423                             NULL, NULL,
424                             MOJO_READ_MESSAGE_FLAG_NONE));
425   read_buffer.resize(num_bytes);
426   EXPECT_EQ(std::string("go 2"), read_buffer);
427 
428   // After we get it, the child should have written something to the shared
429   // buffer.
430   static const char kHello[] = "hello";
431   EXPECT_EQ(0, memcmp(mapping->base(), kHello, sizeof(kHello)));
432 
433   // Now we'll write some stuff to the shared buffer.
434   static const char kWorld[] = "world!!!";
435   memcpy(mapping->base(), kWorld, sizeof(kWorld));
436 
437   // And send a message to signal that we've written stuff.
438   const std::string go3("go 3");
439   EXPECT_EQ(MOJO_RESULT_OK,
440             mp->WriteMessage(0,
441                              &go3[0],
442                              static_cast<uint32_t>(go3.size()),
443                              NULL,
444                              MOJO_WRITE_MESSAGE_FLAG_NONE));
445 
446   // Wait for |mp| to become readable, which should fail.
447   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
448             WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
449 
450   mp->Close(0);
451 
452   EXPECT_EQ(0, helper()->WaitForChildShutdown());
453 }
454 
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckPlatformHandleFile)455 MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckPlatformHandleFile) {
456   ChannelThread channel_thread;
457   embedder::ScopedPlatformHandle client_platform_handle =
458       mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
459   CHECK(client_platform_handle.is_valid());
460   scoped_refptr<MessagePipe> mp(new MessagePipe(
461       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
462       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
463   channel_thread.Start(client_platform_handle.Pass(), mp);
464 
465   CHECK_EQ(WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE), MOJO_RESULT_OK);
466 
467   std::string read_buffer(100, '\0');
468   uint32_t num_bytes = static_cast<uint32_t>(read_buffer.size());
469   DispatcherVector dispatchers;
470   uint32_t num_dispatchers = 10;  // Maximum number to receive.
471   CHECK_EQ(mp->ReadMessage(0,
472                            &read_buffer[0], &num_bytes,
473                            &dispatchers, &num_dispatchers,
474                            MOJO_READ_MESSAGE_FLAG_NONE),
475            MOJO_RESULT_OK);
476   mp->Close(0);
477 
478   read_buffer.resize(num_bytes);
479   CHECK_EQ(read_buffer, std::string("hello"));
480   CHECK_EQ(num_dispatchers, 1u);
481 
482   CHECK_EQ(dispatchers[0]->GetType(), Dispatcher::kTypePlatformHandle);
483 
484   scoped_refptr<PlatformHandleDispatcher> dispatcher(
485       static_cast<PlatformHandleDispatcher*>(dispatchers[0].get()));
486   embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
487   CHECK(h.is_valid());
488   dispatcher->Close();
489 
490   base::ScopedFILE fp(mojo::test::FILEFromPlatformHandle(h.Pass(), "r"));
491   CHECK(fp);
492   std::string fread_buffer(100, '\0');
493   size_t bytes_read = fread(&fread_buffer[0], 1, fread_buffer.size(), fp.get());
494   fread_buffer.resize(bytes_read);
495   CHECK_EQ(fread_buffer, "world");
496 
497   return 0;
498 }
499 
500 #if defined(OS_POSIX)
501 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
502 #else
503 // Not yet implemented (on Windows).
504 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
505 #endif
TEST_F(MultiprocessMessagePipeTest,MAYBE_PlatformHandlePassing)506 TEST_F(MultiprocessMessagePipeTest, MAYBE_PlatformHandlePassing) {
507   helper()->StartChild("CheckPlatformHandleFile");
508 
509   scoped_refptr<MessagePipe> mp(new MessagePipe(
510       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
511       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
512   Init(mp);
513 
514   base::FilePath unused;
515   base::ScopedFILE fp(CreateAndOpenTemporaryFile(&unused));
516   const std::string world("world");
517   ASSERT_EQ(fwrite(&world[0], 1, world.size(), fp.get()), world.size());
518   fflush(fp.get());
519   rewind(fp.get());
520 
521   embedder::ScopedPlatformHandle h(
522       mojo::test::PlatformHandleFromFILE(fp.Pass()));
523   scoped_refptr<PlatformHandleDispatcher> dispatcher(
524       new PlatformHandleDispatcher(h.Pass()));
525 
526   const std::string hello("hello");
527   DispatcherTransport transport(
528       test::DispatcherTryStartTransport(dispatcher.get()));
529   ASSERT_TRUE(transport.is_valid());
530 
531   std::vector<DispatcherTransport> transports;
532   transports.push_back(transport);
533   EXPECT_EQ(MOJO_RESULT_OK,
534             mp->WriteMessage(0,
535                              &hello[0],
536                              static_cast<uint32_t>(hello.size()),
537                              &transports,
538                              MOJO_WRITE_MESSAGE_FLAG_NONE));
539   transport.End();
540 
541   EXPECT_TRUE(dispatcher->HasOneRef());
542   dispatcher = NULL;
543 
544   // Wait for it to become readable, which should fail.
545   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
546             WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
547 
548   mp->Close(0);
549 
550   EXPECT_EQ(0, helper()->WaitForChildShutdown());
551 }
552 
553 }  // namespace
554 }  // namespace system
555 }  // namespace mojo
556