1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stdint.h>
6 #include <stdio.h>
7 #include <string.h>
8
9 #include <string>
10 #include <vector>
11
12 #include "base/bind.h"
13 #include "base/file_util.h"
14 #include "base/files/file_path.h"
15 #include "base/files/scoped_file.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/macros.h"
19 #include "base/threading/platform_thread.h" // For |Sleep()|.
20 #include "build/build_config.h" // TODO(vtl): Remove this.
21 #include "mojo/common/test/multiprocess_test_helper.h"
22 #include "mojo/common/test/test_utils.h"
23 #include "mojo/embedder/scoped_platform_handle.h"
24 #include "mojo/system/channel.h"
25 #include "mojo/system/dispatcher.h"
26 #include "mojo/system/local_message_pipe_endpoint.h"
27 #include "mojo/system/message_pipe.h"
28 #include "mojo/system/platform_handle_dispatcher.h"
29 #include "mojo/system/proxy_message_pipe_endpoint.h"
30 #include "mojo/system/raw_channel.h"
31 #include "mojo/system/raw_shared_buffer.h"
32 #include "mojo/system/shared_buffer_dispatcher.h"
33 #include "mojo/system/test_utils.h"
34 #include "mojo/system/waiter.h"
35 #include "testing/gtest/include/gtest/gtest.h"
36
37 namespace mojo {
38 namespace system {
39 namespace {
40
41 class ChannelThread {
42 public:
ChannelThread()43 ChannelThread() : test_io_thread_(test::TestIOThread::kManualStart) {}
~ChannelThread()44 ~ChannelThread() {
45 Stop();
46 }
47
Start(embedder::ScopedPlatformHandle platform_handle,scoped_refptr<MessagePipe> message_pipe)48 void Start(embedder::ScopedPlatformHandle platform_handle,
49 scoped_refptr<MessagePipe> message_pipe) {
50 test_io_thread_.Start();
51 test_io_thread_.PostTaskAndWait(
52 FROM_HERE,
53 base::Bind(&ChannelThread::InitChannelOnIOThread,
54 base::Unretained(this), base::Passed(&platform_handle),
55 message_pipe));
56 }
57
Stop()58 void Stop() {
59 if (channel_) {
60 // Hack to flush write buffers before quitting.
61 // TODO(vtl): Remove this once |Channel| has a
62 // |FlushWriteBufferAndShutdown()| (or whatever).
63 while (!channel_->IsWriteBufferEmpty())
64 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20));
65
66 test_io_thread_.PostTaskAndWait(
67 FROM_HERE,
68 base::Bind(&ChannelThread::ShutdownChannelOnIOThread,
69 base::Unretained(this)));
70 }
71 test_io_thread_.Stop();
72 }
73
74 private:
InitChannelOnIOThread(embedder::ScopedPlatformHandle platform_handle,scoped_refptr<MessagePipe> message_pipe)75 void InitChannelOnIOThread(embedder::ScopedPlatformHandle platform_handle,
76 scoped_refptr<MessagePipe> message_pipe) {
77 CHECK_EQ(base::MessageLoop::current(), test_io_thread_.message_loop());
78 CHECK(platform_handle.is_valid());
79
80 // Create and initialize |Channel|.
81 channel_ = new Channel();
82 CHECK(channel_->Init(RawChannel::Create(platform_handle.Pass())));
83
84 // Attach the message pipe endpoint.
85 // Note: On the "server" (parent process) side, we need not attach the
86 // message pipe endpoint immediately. However, on the "client" (child
87 // process) side, this *must* be done here -- otherwise, the |Channel| may
88 // receive/process messages (which it can do as soon as it's hooked up to
89 // the IO thread message loop, and that message loop runs) before the
90 // message pipe endpoint is attached.
91 CHECK_EQ(channel_->AttachMessagePipeEndpoint(message_pipe, 1),
92 Channel::kBootstrapEndpointId);
93 CHECK(channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId,
94 Channel::kBootstrapEndpointId));
95 }
96
ShutdownChannelOnIOThread()97 void ShutdownChannelOnIOThread() {
98 CHECK(channel_);
99 channel_->Shutdown();
100 channel_ = NULL;
101 }
102
103 test::TestIOThread test_io_thread_;
104 scoped_refptr<Channel> channel_;
105
106 DISALLOW_COPY_AND_ASSIGN(ChannelThread);
107 };
108
109 class MultiprocessMessagePipeTest : public testing::Test {
110 public:
MultiprocessMessagePipeTest()111 MultiprocessMessagePipeTest() {}
~MultiprocessMessagePipeTest()112 virtual ~MultiprocessMessagePipeTest() {}
113
114 protected:
Init(scoped_refptr<MessagePipe> mp)115 void Init(scoped_refptr<MessagePipe> mp) {
116 channel_thread_.Start(helper_.server_platform_handle.Pass(), mp);
117 }
118
helper()119 mojo::test::MultiprocessTestHelper* helper() { return &helper_; }
120
121 private:
122 ChannelThread channel_thread_;
123 mojo::test::MultiprocessTestHelper helper_;
124
125 DISALLOW_COPY_AND_ASSIGN(MultiprocessMessagePipeTest);
126 };
127
WaitIfNecessary(scoped_refptr<MessagePipe> mp,MojoHandleSignals signals)128 MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp,
129 MojoHandleSignals signals) {
130 Waiter waiter;
131 waiter.Init();
132
133 MojoResult add_result = mp->AddWaiter(0, &waiter, signals, 0);
134 if (add_result != MOJO_RESULT_OK) {
135 return (add_result == MOJO_RESULT_ALREADY_EXISTS) ? MOJO_RESULT_OK :
136 add_result;
137 }
138
139 MojoResult wait_result = waiter.Wait(MOJO_DEADLINE_INDEFINITE, NULL);
140 mp->RemoveWaiter(0, &waiter);
141 return wait_result;
142 }
143
144 // For each message received, sends a reply message with the same contents
145 // repeated twice, until the other end is closed or it receives "quitquitquit"
146 // (which it doesn't reply to). It'll return the number of messages received,
147 // not including any "quitquitquit" message, modulo 100.
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho)148 MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) {
149 ChannelThread channel_thread;
150 embedder::ScopedPlatformHandle client_platform_handle =
151 mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
152 CHECK(client_platform_handle.is_valid());
153 scoped_refptr<MessagePipe> mp(new MessagePipe(
154 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
155 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
156 channel_thread.Start(client_platform_handle.Pass(), mp);
157
158 const std::string quitquitquit("quitquitquit");
159 int rv = 0;
160 for (;; rv = (rv + 1) % 100) {
161 // Wait for our end of the message pipe to be readable.
162 MojoResult result = WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE);
163 if (result != MOJO_RESULT_OK) {
164 // It was closed, probably.
165 CHECK_EQ(result, MOJO_RESULT_FAILED_PRECONDITION);
166 break;
167 }
168
169 std::string read_buffer(1000, '\0');
170 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
171 CHECK_EQ(mp->ReadMessage(0,
172 &read_buffer[0], &read_buffer_size,
173 NULL, NULL,
174 MOJO_READ_MESSAGE_FLAG_NONE),
175 MOJO_RESULT_OK);
176 read_buffer.resize(read_buffer_size);
177 VLOG(2) << "Child got: " << read_buffer;
178
179 if (read_buffer == quitquitquit) {
180 VLOG(2) << "Child quitting.";
181 break;
182 }
183
184 std::string write_buffer = read_buffer + read_buffer;
185 CHECK_EQ(mp->WriteMessage(0,
186 write_buffer.data(),
187 static_cast<uint32_t>(write_buffer.size()),
188 NULL,
189 MOJO_WRITE_MESSAGE_FLAG_NONE),
190 MOJO_RESULT_OK);
191 }
192
193 mp->Close(0);
194 return rv;
195 }
196
197 // Sends "hello" to child, and expects "hellohello" back.
TEST_F(MultiprocessMessagePipeTest,Basic)198 TEST_F(MultiprocessMessagePipeTest, Basic) {
199 helper()->StartChild("EchoEcho");
200
201 scoped_refptr<MessagePipe> mp(new MessagePipe(
202 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
203 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
204 Init(mp);
205
206 std::string hello("hello");
207 EXPECT_EQ(MOJO_RESULT_OK,
208 mp->WriteMessage(0,
209 hello.data(), static_cast<uint32_t>(hello.size()),
210 NULL,
211 MOJO_WRITE_MESSAGE_FLAG_NONE));
212
213 EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
214
215 std::string read_buffer(1000, '\0');
216 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
217 CHECK_EQ(mp->ReadMessage(0,
218 &read_buffer[0], &read_buffer_size,
219 NULL, NULL,
220 MOJO_READ_MESSAGE_FLAG_NONE),
221 MOJO_RESULT_OK);
222 read_buffer.resize(read_buffer_size);
223 VLOG(2) << "Parent got: " << read_buffer;
224 EXPECT_EQ(hello + hello, read_buffer);
225
226 mp->Close(0);
227
228 // We sent one message.
229 EXPECT_EQ(1 % 100, helper()->WaitForChildShutdown());
230 }
231
232 // Sends a bunch of messages to the child. Expects them "repeated" back. Waits
233 // for the child to close its end before quitting.
TEST_F(MultiprocessMessagePipeTest,QueueMessages)234 TEST_F(MultiprocessMessagePipeTest, QueueMessages) {
235 helper()->StartChild("EchoEcho");
236
237 scoped_refptr<MessagePipe> mp(new MessagePipe(
238 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
239 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
240 Init(mp);
241
242 static const size_t kNumMessages = 1001;
243 for (size_t i = 0; i < kNumMessages; i++) {
244 std::string write_buffer(i, 'A' + (i % 26));
245 EXPECT_EQ(MOJO_RESULT_OK,
246 mp->WriteMessage(0,
247 write_buffer.data(),
248 static_cast<uint32_t>(write_buffer.size()),
249 NULL,
250 MOJO_WRITE_MESSAGE_FLAG_NONE));
251 }
252
253 const std::string quitquitquit("quitquitquit");
254 EXPECT_EQ(MOJO_RESULT_OK,
255 mp->WriteMessage(0,
256 quitquitquit.data(),
257 static_cast<uint32_t>(quitquitquit.size()),
258 NULL,
259 MOJO_WRITE_MESSAGE_FLAG_NONE));
260
261 for (size_t i = 0; i < kNumMessages; i++) {
262 EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
263
264 std::string read_buffer(kNumMessages * 2, '\0');
265 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
266 CHECK_EQ(mp->ReadMessage(0,
267 &read_buffer[0], &read_buffer_size,
268 NULL, NULL,
269 MOJO_READ_MESSAGE_FLAG_NONE),
270 MOJO_RESULT_OK);
271 read_buffer.resize(read_buffer_size);
272
273 EXPECT_EQ(std::string(i * 2, 'A' + (i % 26)), read_buffer);
274 }
275
276 // Wait for it to become readable, which should fail (since we sent
277 // "quitquitquit").
278 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
279 WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
280
281 mp->Close(0);
282
283 EXPECT_EQ(static_cast<int>(kNumMessages % 100),
284 helper()->WaitForChildShutdown());
285 }
286
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckSharedBuffer)287 MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckSharedBuffer) {
288 ChannelThread channel_thread;
289 embedder::ScopedPlatformHandle client_platform_handle =
290 mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
291 CHECK(client_platform_handle.is_valid());
292 scoped_refptr<MessagePipe> mp(new MessagePipe(
293 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
294 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
295 channel_thread.Start(client_platform_handle.Pass(), mp);
296
297 // Wait for the first message from our parent.
298 CHECK_EQ(WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE), MOJO_RESULT_OK);
299
300 // It should have a shared buffer.
301 std::string read_buffer(100, '\0');
302 uint32_t num_bytes = static_cast<uint32_t>(read_buffer.size());
303 DispatcherVector dispatchers;
304 uint32_t num_dispatchers = 10; // Maximum number to receive.
305 CHECK_EQ(mp->ReadMessage(0,
306 &read_buffer[0], &num_bytes,
307 &dispatchers, &num_dispatchers,
308 MOJO_READ_MESSAGE_FLAG_NONE),
309 MOJO_RESULT_OK);
310 read_buffer.resize(num_bytes);
311 CHECK_EQ(read_buffer, std::string("go 1"));
312 CHECK_EQ(num_dispatchers, 1u);
313
314 CHECK_EQ(dispatchers[0]->GetType(), Dispatcher::kTypeSharedBuffer);
315
316 scoped_refptr<SharedBufferDispatcher> dispatcher(
317 static_cast<SharedBufferDispatcher*>(dispatchers[0].get()));
318
319 // Make a mapping.
320 scoped_ptr<RawSharedBufferMapping> mapping;
321 CHECK_EQ(dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping),
322 MOJO_RESULT_OK);
323 CHECK(mapping);
324 CHECK(mapping->base());
325 CHECK_EQ(mapping->length(), 100u);
326
327 // Write some stuff to the shared buffer.
328 static const char kHello[] = "hello";
329 memcpy(mapping->base(), kHello, sizeof(kHello));
330
331 // We should be able to close the dispatcher now.
332 dispatcher->Close();
333
334 // And send a message to signal that we've written stuff.
335 const std::string go2("go 2");
336 CHECK_EQ(mp->WriteMessage(0,
337 &go2[0],
338 static_cast<uint32_t>(go2.size()),
339 NULL,
340 MOJO_WRITE_MESSAGE_FLAG_NONE),
341 MOJO_RESULT_OK);
342
343 // Now wait for our parent to send us a message.
344 CHECK_EQ(WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE), MOJO_RESULT_OK);
345
346 read_buffer = std::string(100, '\0');
347 num_bytes = static_cast<uint32_t>(read_buffer.size());
348 CHECK_EQ(mp->ReadMessage(0,
349 &read_buffer[0], &num_bytes,
350 NULL, NULL,
351 MOJO_READ_MESSAGE_FLAG_NONE),
352 MOJO_RESULT_OK);
353 read_buffer.resize(num_bytes);
354 CHECK_EQ(read_buffer, std::string("go 3"));
355
356 // It should have written something to the shared buffer.
357 static const char kWorld[] = "world!!!";
358 CHECK_EQ(memcmp(mapping->base(), kWorld, sizeof(kWorld)), 0);
359
360 // And we're done.
361 mp->Close(0);
362
363 return 0;
364 }
365
366 #if defined(OS_POSIX)
367 #define MAYBE_SharedBufferPassing SharedBufferPassing
368 #else
369 // Not yet implemented (on Windows).
370 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
371 #endif
TEST_F(MultiprocessMessagePipeTest,MAYBE_SharedBufferPassing)372 TEST_F(MultiprocessMessagePipeTest, MAYBE_SharedBufferPassing) {
373 helper()->StartChild("CheckSharedBuffer");
374
375 scoped_refptr<MessagePipe> mp(new MessagePipe(
376 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
377 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
378 Init(mp);
379
380 // Make a shared buffer.
381 scoped_refptr<SharedBufferDispatcher> dispatcher;
382 EXPECT_EQ(MOJO_RESULT_OK,
383 SharedBufferDispatcher::Create(
384 SharedBufferDispatcher::kDefaultCreateOptions, 100,
385 &dispatcher));
386 ASSERT_TRUE(dispatcher);
387
388 // Make a mapping.
389 scoped_ptr<RawSharedBufferMapping> mapping;
390 EXPECT_EQ(MOJO_RESULT_OK,
391 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping));
392 ASSERT_TRUE(mapping);
393 ASSERT_TRUE(mapping->base());
394 ASSERT_EQ(100u, mapping->length());
395
396 // Send the shared buffer.
397 const std::string go1("go 1");
398 DispatcherTransport transport(
399 test::DispatcherTryStartTransport(dispatcher.get()));
400 ASSERT_TRUE(transport.is_valid());
401
402 std::vector<DispatcherTransport> transports;
403 transports.push_back(transport);
404 EXPECT_EQ(MOJO_RESULT_OK,
405 mp->WriteMessage(0,
406 &go1[0],
407 static_cast<uint32_t>(go1.size()),
408 &transports,
409 MOJO_WRITE_MESSAGE_FLAG_NONE));
410 transport.End();
411
412 EXPECT_TRUE(dispatcher->HasOneRef());
413 dispatcher = NULL;
414
415 // Wait for a message from the child.
416 EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
417
418 std::string read_buffer(100, '\0');
419 uint32_t num_bytes = static_cast<uint32_t>(read_buffer.size());
420 EXPECT_EQ(MOJO_RESULT_OK,
421 mp->ReadMessage(0,
422 &read_buffer[0], &num_bytes,
423 NULL, NULL,
424 MOJO_READ_MESSAGE_FLAG_NONE));
425 read_buffer.resize(num_bytes);
426 EXPECT_EQ(std::string("go 2"), read_buffer);
427
428 // After we get it, the child should have written something to the shared
429 // buffer.
430 static const char kHello[] = "hello";
431 EXPECT_EQ(0, memcmp(mapping->base(), kHello, sizeof(kHello)));
432
433 // Now we'll write some stuff to the shared buffer.
434 static const char kWorld[] = "world!!!";
435 memcpy(mapping->base(), kWorld, sizeof(kWorld));
436
437 // And send a message to signal that we've written stuff.
438 const std::string go3("go 3");
439 EXPECT_EQ(MOJO_RESULT_OK,
440 mp->WriteMessage(0,
441 &go3[0],
442 static_cast<uint32_t>(go3.size()),
443 NULL,
444 MOJO_WRITE_MESSAGE_FLAG_NONE));
445
446 // Wait for |mp| to become readable, which should fail.
447 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
448 WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
449
450 mp->Close(0);
451
452 EXPECT_EQ(0, helper()->WaitForChildShutdown());
453 }
454
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckPlatformHandleFile)455 MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckPlatformHandleFile) {
456 ChannelThread channel_thread;
457 embedder::ScopedPlatformHandle client_platform_handle =
458 mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
459 CHECK(client_platform_handle.is_valid());
460 scoped_refptr<MessagePipe> mp(new MessagePipe(
461 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
462 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
463 channel_thread.Start(client_platform_handle.Pass(), mp);
464
465 CHECK_EQ(WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE), MOJO_RESULT_OK);
466
467 std::string read_buffer(100, '\0');
468 uint32_t num_bytes = static_cast<uint32_t>(read_buffer.size());
469 DispatcherVector dispatchers;
470 uint32_t num_dispatchers = 10; // Maximum number to receive.
471 CHECK_EQ(mp->ReadMessage(0,
472 &read_buffer[0], &num_bytes,
473 &dispatchers, &num_dispatchers,
474 MOJO_READ_MESSAGE_FLAG_NONE),
475 MOJO_RESULT_OK);
476 mp->Close(0);
477
478 read_buffer.resize(num_bytes);
479 CHECK_EQ(read_buffer, std::string("hello"));
480 CHECK_EQ(num_dispatchers, 1u);
481
482 CHECK_EQ(dispatchers[0]->GetType(), Dispatcher::kTypePlatformHandle);
483
484 scoped_refptr<PlatformHandleDispatcher> dispatcher(
485 static_cast<PlatformHandleDispatcher*>(dispatchers[0].get()));
486 embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
487 CHECK(h.is_valid());
488 dispatcher->Close();
489
490 base::ScopedFILE fp(mojo::test::FILEFromPlatformHandle(h.Pass(), "r"));
491 CHECK(fp);
492 std::string fread_buffer(100, '\0');
493 size_t bytes_read = fread(&fread_buffer[0], 1, fread_buffer.size(), fp.get());
494 fread_buffer.resize(bytes_read);
495 CHECK_EQ(fread_buffer, "world");
496
497 return 0;
498 }
499
500 #if defined(OS_POSIX)
501 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
502 #else
503 // Not yet implemented (on Windows).
504 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
505 #endif
TEST_F(MultiprocessMessagePipeTest,MAYBE_PlatformHandlePassing)506 TEST_F(MultiprocessMessagePipeTest, MAYBE_PlatformHandlePassing) {
507 helper()->StartChild("CheckPlatformHandleFile");
508
509 scoped_refptr<MessagePipe> mp(new MessagePipe(
510 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
511 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
512 Init(mp);
513
514 base::FilePath unused;
515 base::ScopedFILE fp(CreateAndOpenTemporaryFile(&unused));
516 const std::string world("world");
517 ASSERT_EQ(fwrite(&world[0], 1, world.size(), fp.get()), world.size());
518 fflush(fp.get());
519 rewind(fp.get());
520
521 embedder::ScopedPlatformHandle h(
522 mojo::test::PlatformHandleFromFILE(fp.Pass()));
523 scoped_refptr<PlatformHandleDispatcher> dispatcher(
524 new PlatformHandleDispatcher(h.Pass()));
525
526 const std::string hello("hello");
527 DispatcherTransport transport(
528 test::DispatcherTryStartTransport(dispatcher.get()));
529 ASSERT_TRUE(transport.is_valid());
530
531 std::vector<DispatcherTransport> transports;
532 transports.push_back(transport);
533 EXPECT_EQ(MOJO_RESULT_OK,
534 mp->WriteMessage(0,
535 &hello[0],
536 static_cast<uint32_t>(hello.size()),
537 &transports,
538 MOJO_WRITE_MESSAGE_FLAG_NONE));
539 transport.End();
540
541 EXPECT_TRUE(dispatcher->HasOneRef());
542 dispatcher = NULL;
543
544 // Wait for it to become readable, which should fail.
545 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
546 WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE));
547
548 mp->Close(0);
549
550 EXPECT_EQ(0, helper()->WaitForChildShutdown());
551 }
552
553 } // namespace
554 } // namespace system
555 } // namespace mojo
556