1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stdint.h>
6 #include <stdio.h>
7 #include <string.h>
8
9 #include <vector>
10
11 #include "base/basictypes.h"
12 #include "base/bind.h"
13 #include "base/file_util.h"
14 #include "base/files/file_path.h"
15 #include "base/files/scoped_file.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/threading/platform_thread.h" // For |Sleep()|.
20 #include "build/build_config.h" // TODO(vtl): Remove this.
21 #include "mojo/common/test/test_utils.h"
22 #include "mojo/embedder/platform_channel_pair.h"
23 #include "mojo/embedder/scoped_platform_handle.h"
24 #include "mojo/system/channel.h"
25 #include "mojo/system/local_message_pipe_endpoint.h"
26 #include "mojo/system/message_pipe.h"
27 #include "mojo/system/message_pipe_dispatcher.h"
28 #include "mojo/system/platform_handle_dispatcher.h"
29 #include "mojo/system/proxy_message_pipe_endpoint.h"
30 #include "mojo/system/raw_channel.h"
31 #include "mojo/system/shared_buffer_dispatcher.h"
32 #include "mojo/system/test_utils.h"
33 #include "mojo/system/waiter.h"
34 #include "testing/gtest/include/gtest/gtest.h"
35
36 namespace mojo {
37 namespace system {
38 namespace {
39
40 class RemoteMessagePipeTest : public testing::Test {
41 public:
RemoteMessagePipeTest()42 RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart) {}
~RemoteMessagePipeTest()43 virtual ~RemoteMessagePipeTest() {}
44
SetUp()45 virtual void SetUp() OVERRIDE {
46 io_thread_.PostTaskAndWait(
47 FROM_HERE,
48 base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
49 base::Unretained(this)));
50 }
51
TearDown()52 virtual void TearDown() OVERRIDE {
53 io_thread_.PostTaskAndWait(
54 FROM_HERE,
55 base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
56 base::Unretained(this)));
57 }
58
59 protected:
60 // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
61 // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
62 // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,scoped_refptr<MessagePipe> mp1)63 void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
64 scoped_refptr<MessagePipe> mp1) {
65 io_thread_.PostTaskAndWait(
66 FROM_HERE,
67 base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
68 base::Unretained(this), mp0, mp1));
69 }
70
71 // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
72 // It assumes/requires that this is the bootstrap case, i.e., that the
73 // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
74 // returns *without* waiting for it to finish connecting.
BootstrapMessagePipeNoWait(unsigned channel_index,scoped_refptr<MessagePipe> mp)75 void BootstrapMessagePipeNoWait(unsigned channel_index,
76 scoped_refptr<MessagePipe> mp) {
77 io_thread_.PostTask(
78 FROM_HERE,
79 base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
80 base::Unretained(this), channel_index, mp));
81 }
82
RestoreInitialState()83 void RestoreInitialState() {
84 io_thread_.PostTaskAndWait(
85 FROM_HERE,
86 base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
87 base::Unretained(this)));
88 }
89
io_thread()90 test::TestIOThread* io_thread() { return &io_thread_; }
91
92 private:
SetUpOnIOThread()93 void SetUpOnIOThread() {
94 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
95
96 embedder::PlatformChannelPair channel_pair;
97 platform_handles_[0] = channel_pair.PassServerHandle();
98 platform_handles_[1] = channel_pair.PassClientHandle();
99 }
100
TearDownOnIOThread()101 void TearDownOnIOThread() {
102 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
103
104 if (channels_[0]) {
105 channels_[0]->Shutdown();
106 channels_[0] = NULL;
107 }
108 if (channels_[1]) {
109 channels_[1]->Shutdown();
110 channels_[1] = NULL;
111 }
112 }
113
CreateAndInitChannel(unsigned channel_index)114 void CreateAndInitChannel(unsigned channel_index) {
115 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
116 CHECK(channel_index == 0 || channel_index == 1);
117 CHECK(!channels_[channel_index]);
118
119 channels_[channel_index] = new Channel();
120 CHECK(channels_[channel_index]->Init(
121 RawChannel::Create(platform_handles_[channel_index].Pass())));
122 }
123
ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,scoped_refptr<MessagePipe> mp1)124 void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
125 scoped_refptr<MessagePipe> mp1) {
126 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
127
128 if (!channels_[0])
129 CreateAndInitChannel(0);
130 if (!channels_[1])
131 CreateAndInitChannel(1);
132
133 MessageInTransit::EndpointId local_id0 =
134 channels_[0]->AttachMessagePipeEndpoint(mp0, 1);
135 MessageInTransit::EndpointId local_id1 =
136 channels_[1]->AttachMessagePipeEndpoint(mp1, 0);
137
138 CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
139 CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
140 }
141
BootstrapMessagePipeOnIOThread(unsigned channel_index,scoped_refptr<MessagePipe> mp)142 void BootstrapMessagePipeOnIOThread(unsigned channel_index,
143 scoped_refptr<MessagePipe> mp) {
144 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
145 CHECK(channel_index == 0 || channel_index == 1);
146
147 unsigned port = channel_index ^ 1u;
148
149 CreateAndInitChannel(channel_index);
150 MessageInTransit::EndpointId endpoint_id =
151 channels_[channel_index]->AttachMessagePipeEndpoint(mp, port);
152 if (endpoint_id == MessageInTransit::kInvalidEndpointId)
153 return;
154
155 CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
156 CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
157 Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
158 }
159
RestoreInitialStateOnIOThread()160 void RestoreInitialStateOnIOThread() {
161 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
162
163 TearDownOnIOThread();
164 SetUpOnIOThread();
165 }
166
167 test::TestIOThread io_thread_;
168 embedder::ScopedPlatformHandle platform_handles_[2];
169 scoped_refptr<Channel> channels_[2];
170
171 DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
172 };
173
TEST_F(RemoteMessagePipeTest,Basic)174 TEST_F(RemoteMessagePipeTest, Basic) {
175 static const char kHello[] = "hello";
176 static const char kWorld[] = "world!!!1!!!1!";
177 char buffer[100] = { 0 };
178 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
179 Waiter waiter;
180 uint32_t context = 0;
181
182 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
183 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
184 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
185
186 scoped_refptr<MessagePipe> mp0(new MessagePipe(
187 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
188 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
189 scoped_refptr<MessagePipe> mp1(new MessagePipe(
190 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
191 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
192 ConnectMessagePipes(mp0, mp1);
193
194 // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
195
196 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
197 // it later, it might already be readable.)
198 waiter.Init();
199 EXPECT_EQ(MOJO_RESULT_OK,
200 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
201
202 // Write to MP 0, port 0.
203 EXPECT_EQ(MOJO_RESULT_OK,
204 mp0->WriteMessage(0,
205 kHello, sizeof(kHello),
206 NULL,
207 MOJO_WRITE_MESSAGE_FLAG_NONE));
208
209 // Wait.
210 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
211 EXPECT_EQ(123u, context);
212 mp1->RemoveWaiter(1, &waiter);
213
214 // Read from MP 1, port 1.
215 EXPECT_EQ(MOJO_RESULT_OK,
216 mp1->ReadMessage(1,
217 buffer, &buffer_size,
218 NULL, NULL,
219 MOJO_READ_MESSAGE_FLAG_NONE));
220 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
221 EXPECT_STREQ(kHello, buffer);
222
223 // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
224
225 waiter.Init();
226 EXPECT_EQ(MOJO_RESULT_OK,
227 mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456));
228
229 EXPECT_EQ(MOJO_RESULT_OK,
230 mp1->WriteMessage(1,
231 kWorld, sizeof(kWorld),
232 NULL,
233 MOJO_WRITE_MESSAGE_FLAG_NONE));
234
235 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
236 EXPECT_EQ(456u, context);
237 mp0->RemoveWaiter(0, &waiter);
238
239 buffer_size = static_cast<uint32_t>(sizeof(buffer));
240 EXPECT_EQ(MOJO_RESULT_OK,
241 mp0->ReadMessage(0,
242 buffer, &buffer_size,
243 NULL, NULL,
244 MOJO_READ_MESSAGE_FLAG_NONE));
245 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
246 EXPECT_STREQ(kWorld, buffer);
247
248 // Close MP 0, port 0.
249 mp0->Close(0);
250
251 // Try to wait for MP 1, port 1 to become readable. This will eventually fail
252 // when it realizes that MP 0, port 0 has been closed. (It may also fail
253 // immediately.)
254 waiter.Init();
255 MojoResult result =
256 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789);
257 if (result == MOJO_RESULT_OK) {
258 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
259 waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
260 EXPECT_EQ(789u, context);
261 mp1->RemoveWaiter(1, &waiter);
262 } else {
263 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
264 }
265
266 // And MP 1, port 1.
267 mp1->Close(1);
268 }
269
TEST_F(RemoteMessagePipeTest,Multiplex)270 TEST_F(RemoteMessagePipeTest, Multiplex) {
271 static const char kHello[] = "hello";
272 static const char kWorld[] = "world!!!1!!!1!";
273 char buffer[100] = { 0 };
274 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
275 Waiter waiter;
276 uint32_t context = 0;
277
278 // Connect message pipes as in the |Basic| test.
279
280 scoped_refptr<MessagePipe> mp0(new MessagePipe(
281 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
282 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
283 scoped_refptr<MessagePipe> mp1(new MessagePipe(
284 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
285 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
286 ConnectMessagePipes(mp0, mp1);
287
288 // Now put another message pipe on the channel.
289
290 scoped_refptr<MessagePipe> mp2(new MessagePipe(
291 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
292 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
293 scoped_refptr<MessagePipe> mp3(new MessagePipe(
294 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
295 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
296 ConnectMessagePipes(mp2, mp3);
297
298 // Write: MP 2, port 0 -> MP 3, port 1.
299
300 waiter.Init();
301 EXPECT_EQ(MOJO_RESULT_OK,
302 mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789));
303
304 EXPECT_EQ(MOJO_RESULT_OK,
305 mp2->WriteMessage(0,
306 kHello, sizeof(kHello),
307 NULL,
308 MOJO_WRITE_MESSAGE_FLAG_NONE));
309
310 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
311 EXPECT_EQ(789u, context);
312 mp3->RemoveWaiter(1, &waiter);
313
314 // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
315 buffer_size = static_cast<uint32_t>(sizeof(buffer));
316 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
317 mp0->ReadMessage(0,
318 buffer, &buffer_size,
319 NULL, NULL,
320 MOJO_READ_MESSAGE_FLAG_NONE));
321 buffer_size = static_cast<uint32_t>(sizeof(buffer));
322 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
323 mp1->ReadMessage(1,
324 buffer, &buffer_size,
325 NULL, NULL,
326 MOJO_READ_MESSAGE_FLAG_NONE));
327 buffer_size = static_cast<uint32_t>(sizeof(buffer));
328 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
329 mp2->ReadMessage(0,
330 buffer, &buffer_size,
331 NULL, NULL,
332 MOJO_READ_MESSAGE_FLAG_NONE));
333
334 // Read from MP 3, port 1.
335 buffer_size = static_cast<uint32_t>(sizeof(buffer));
336 EXPECT_EQ(MOJO_RESULT_OK,
337 mp3->ReadMessage(1,
338 buffer, &buffer_size,
339 NULL, NULL,
340 MOJO_READ_MESSAGE_FLAG_NONE));
341 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
342 EXPECT_STREQ(kHello, buffer);
343
344 // Write: MP 0, port 0 -> MP 1, port 1 again.
345
346 waiter.Init();
347 EXPECT_EQ(MOJO_RESULT_OK,
348 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
349
350 EXPECT_EQ(MOJO_RESULT_OK,
351 mp0->WriteMessage(0,
352 kWorld, sizeof(kWorld),
353 NULL,
354 MOJO_WRITE_MESSAGE_FLAG_NONE));
355
356 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
357 EXPECT_EQ(123u, context);
358 mp1->RemoveWaiter(1, &waiter);
359
360 // Make sure there's nothing on the other ports.
361 buffer_size = static_cast<uint32_t>(sizeof(buffer));
362 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
363 mp0->ReadMessage(0,
364 buffer, &buffer_size,
365 NULL, NULL,
366 MOJO_READ_MESSAGE_FLAG_NONE));
367 buffer_size = static_cast<uint32_t>(sizeof(buffer));
368 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
369 mp2->ReadMessage(0,
370 buffer, &buffer_size,
371 NULL, NULL,
372 MOJO_READ_MESSAGE_FLAG_NONE));
373 buffer_size = static_cast<uint32_t>(sizeof(buffer));
374 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
375 mp3->ReadMessage(1,
376 buffer, &buffer_size,
377 NULL, NULL,
378 MOJO_READ_MESSAGE_FLAG_NONE));
379
380 buffer_size = static_cast<uint32_t>(sizeof(buffer));
381 EXPECT_EQ(MOJO_RESULT_OK,
382 mp1->ReadMessage(1,
383 buffer, &buffer_size,
384 NULL, NULL,
385 MOJO_READ_MESSAGE_FLAG_NONE));
386 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
387 EXPECT_STREQ(kWorld, buffer);
388
389 mp0->Close(0);
390 mp1->Close(1);
391 mp2->Close(0);
392 mp3->Close(1);
393 }
394
TEST_F(RemoteMessagePipeTest,CloseBeforeConnect)395 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
396 static const char kHello[] = "hello";
397 char buffer[100] = { 0 };
398 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
399 Waiter waiter;
400 uint32_t context = 0;
401
402 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
403 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
404 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
405
406 scoped_refptr<MessagePipe> mp0(new MessagePipe(
407 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
408 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
409
410 // Write to MP 0, port 0.
411 EXPECT_EQ(MOJO_RESULT_OK,
412 mp0->WriteMessage(0,
413 kHello, sizeof(kHello),
414 NULL,
415 MOJO_WRITE_MESSAGE_FLAG_NONE));
416
417 BootstrapMessagePipeNoWait(0, mp0);
418
419
420 // Close MP 0, port 0 before channel 1 is even connected.
421 mp0->Close(0);
422
423 scoped_refptr<MessagePipe> mp1(new MessagePipe(
424 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
425 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
426
427 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
428 // it later, it might already be readable.)
429 waiter.Init();
430 EXPECT_EQ(MOJO_RESULT_OK,
431 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
432
433 BootstrapMessagePipeNoWait(1, mp1);
434
435 // Wait.
436 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
437 EXPECT_EQ(123u, context);
438 mp1->RemoveWaiter(1, &waiter);
439
440 // Read from MP 1, port 1.
441 EXPECT_EQ(MOJO_RESULT_OK,
442 mp1->ReadMessage(1,
443 buffer, &buffer_size,
444 NULL, NULL,
445 MOJO_READ_MESSAGE_FLAG_NONE));
446 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
447 EXPECT_STREQ(kHello, buffer);
448
449 // And MP 1, port 1.
450 mp1->Close(1);
451 }
452
TEST_F(RemoteMessagePipeTest,HandlePassing)453 TEST_F(RemoteMessagePipeTest, HandlePassing) {
454 static const char kHello[] = "hello";
455 Waiter waiter;
456 uint32_t context = 0;
457
458 scoped_refptr<MessagePipe> mp0(new MessagePipe(
459 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
460 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
461 scoped_refptr<MessagePipe> mp1(new MessagePipe(
462 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
463 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
464 ConnectMessagePipes(mp0, mp1);
465
466 // We'll try to pass this dispatcher.
467 scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher(
468 MessagePipeDispatcher::kDefaultCreateOptions));
469 scoped_refptr<MessagePipe> local_mp(new MessagePipe());
470 dispatcher->Init(local_mp, 0);
471
472 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
473 // it later, it might already be readable.)
474 waiter.Init();
475 EXPECT_EQ(MOJO_RESULT_OK,
476 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
477
478 // Write to MP 0, port 0.
479 {
480 DispatcherTransport
481 transport(test::DispatcherTryStartTransport(dispatcher.get()));
482 EXPECT_TRUE(transport.is_valid());
483
484 std::vector<DispatcherTransport> transports;
485 transports.push_back(transport);
486 EXPECT_EQ(MOJO_RESULT_OK,
487 mp0->WriteMessage(0, kHello, sizeof(kHello), &transports,
488 MOJO_WRITE_MESSAGE_FLAG_NONE));
489 transport.End();
490
491 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
492 // |dispatcher| is destroyed.
493 EXPECT_TRUE(dispatcher->HasOneRef());
494 dispatcher = NULL;
495 }
496
497 // Wait.
498 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
499 EXPECT_EQ(123u, context);
500 mp1->RemoveWaiter(1, &waiter);
501
502 // Read from MP 1, port 1.
503 char read_buffer[100] = { 0 };
504 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
505 DispatcherVector read_dispatchers;
506 uint32_t read_num_dispatchers = 10; // Maximum to get.
507 EXPECT_EQ(MOJO_RESULT_OK,
508 mp1->ReadMessage(1, read_buffer, &read_buffer_size,
509 &read_dispatchers, &read_num_dispatchers,
510 MOJO_READ_MESSAGE_FLAG_NONE));
511 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
512 EXPECT_STREQ(kHello, read_buffer);
513 EXPECT_EQ(1u, read_dispatchers.size());
514 EXPECT_EQ(1u, read_num_dispatchers);
515 ASSERT_TRUE(read_dispatchers[0]);
516 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
517
518 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
519 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
520
521 // Write to "local_mp", port 1.
522 EXPECT_EQ(MOJO_RESULT_OK,
523 local_mp->WriteMessage(1, kHello, sizeof(kHello), NULL,
524 MOJO_WRITE_MESSAGE_FLAG_NONE));
525
526 // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
527 // here. (We don't crash if I sleep and then close.)
528
529 // Wait for the dispatcher to become readable.
530 waiter.Init();
531 EXPECT_EQ(MOJO_RESULT_OK,
532 dispatcher->AddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456));
533 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
534 EXPECT_EQ(456u, context);
535 dispatcher->RemoveWaiter(&waiter);
536
537 // Read from the dispatcher.
538 memset(read_buffer, 0, sizeof(read_buffer));
539 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
540 EXPECT_EQ(MOJO_RESULT_OK,
541 dispatcher->ReadMessage(read_buffer, &read_buffer_size, 0, NULL,
542 MOJO_READ_MESSAGE_FLAG_NONE));
543 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
544 EXPECT_STREQ(kHello, read_buffer);
545
546 // Prepare to wait on "local_mp", port 1.
547 waiter.Init();
548 EXPECT_EQ(MOJO_RESULT_OK,
549 local_mp->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789));
550
551 // Write to the dispatcher.
552 EXPECT_EQ(MOJO_RESULT_OK,
553 dispatcher->WriteMessage(kHello, sizeof(kHello), NULL,
554 MOJO_WRITE_MESSAGE_FLAG_NONE));
555
556 // Wait.
557 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
558 EXPECT_EQ(789u, context);
559 local_mp->RemoveWaiter(1, &waiter);
560
561 // Read from "local_mp", port 1.
562 memset(read_buffer, 0, sizeof(read_buffer));
563 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
564 EXPECT_EQ(MOJO_RESULT_OK,
565 local_mp->ReadMessage(1, read_buffer, &read_buffer_size, NULL, NULL,
566 MOJO_READ_MESSAGE_FLAG_NONE));
567 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
568 EXPECT_STREQ(kHello, read_buffer);
569
570 // TODO(vtl): Also test that messages queued up before the handle was sent are
571 // delivered properly.
572
573 // Close everything that belongs to us.
574 mp0->Close(0);
575 mp1->Close(1);
576 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
577 // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
578 local_mp->Close(1);
579 }
580
581 #if defined(OS_POSIX)
582 #define MAYBE_SharedBufferPassing SharedBufferPassing
583 #else
584 // Not yet implemented (on Windows).
585 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
586 #endif
TEST_F(RemoteMessagePipeTest,MAYBE_SharedBufferPassing)587 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
588 static const char kHello[] = "hello";
589 Waiter waiter;
590 uint32_t context = 0;
591
592 scoped_refptr<MessagePipe> mp0(new MessagePipe(
593 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
594 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
595 scoped_refptr<MessagePipe> mp1(new MessagePipe(
596 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
597 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
598 ConnectMessagePipes(mp0, mp1);
599
600 // We'll try to pass this dispatcher.
601 scoped_refptr<SharedBufferDispatcher> dispatcher;
602 EXPECT_EQ(MOJO_RESULT_OK,
603 SharedBufferDispatcher::Create(
604 SharedBufferDispatcher::kDefaultCreateOptions, 100,
605 &dispatcher));
606 ASSERT_TRUE(dispatcher);
607
608 // Make a mapping.
609 scoped_ptr<RawSharedBufferMapping> mapping0;
610 EXPECT_EQ(MOJO_RESULT_OK,
611 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE,
612 &mapping0));
613 ASSERT_TRUE(mapping0);
614 ASSERT_TRUE(mapping0->base());
615 ASSERT_EQ(100u, mapping0->length());
616 static_cast<char*>(mapping0->base())[0] = 'A';
617 static_cast<char*>(mapping0->base())[50] = 'B';
618 static_cast<char*>(mapping0->base())[99] = 'C';
619
620 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
621 // it later, it might already be readable.)
622 waiter.Init();
623 EXPECT_EQ(MOJO_RESULT_OK,
624 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
625
626 // Write to MP 0, port 0.
627 {
628 DispatcherTransport
629 transport(test::DispatcherTryStartTransport(dispatcher.get()));
630 EXPECT_TRUE(transport.is_valid());
631
632 std::vector<DispatcherTransport> transports;
633 transports.push_back(transport);
634 EXPECT_EQ(MOJO_RESULT_OK,
635 mp0->WriteMessage(0, kHello, sizeof(kHello), &transports,
636 MOJO_WRITE_MESSAGE_FLAG_NONE));
637 transport.End();
638
639 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
640 // |dispatcher| is destroyed.
641 EXPECT_TRUE(dispatcher->HasOneRef());
642 dispatcher = NULL;
643 }
644
645 // Wait.
646 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
647 EXPECT_EQ(123u, context);
648 mp1->RemoveWaiter(1, &waiter);
649
650 // Read from MP 1, port 1.
651 char read_buffer[100] = { 0 };
652 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
653 DispatcherVector read_dispatchers;
654 uint32_t read_num_dispatchers = 10; // Maximum to get.
655 EXPECT_EQ(MOJO_RESULT_OK,
656 mp1->ReadMessage(1, read_buffer, &read_buffer_size,
657 &read_dispatchers, &read_num_dispatchers,
658 MOJO_READ_MESSAGE_FLAG_NONE));
659 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
660 EXPECT_STREQ(kHello, read_buffer);
661 EXPECT_EQ(1u, read_dispatchers.size());
662 EXPECT_EQ(1u, read_num_dispatchers);
663 ASSERT_TRUE(read_dispatchers[0]);
664 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
665
666 EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
667 dispatcher =
668 static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
669
670 // Make another mapping.
671 scoped_ptr<RawSharedBufferMapping> mapping1;
672 EXPECT_EQ(MOJO_RESULT_OK,
673 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE,
674 &mapping1));
675 ASSERT_TRUE(mapping1);
676 ASSERT_TRUE(mapping1->base());
677 ASSERT_EQ(100u, mapping1->length());
678 EXPECT_NE(mapping1->base(), mapping0->base());
679 EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]);
680 EXPECT_EQ('B', static_cast<char*>(mapping1->base())[50]);
681 EXPECT_EQ('C', static_cast<char*>(mapping1->base())[99]);
682
683 // Write stuff either way.
684 static_cast<char*>(mapping1->base())[1] = 'x';
685 EXPECT_EQ('x', static_cast<char*>(mapping0->base())[1]);
686 static_cast<char*>(mapping0->base())[2] = 'y';
687 EXPECT_EQ('y', static_cast<char*>(mapping1->base())[2]);
688
689 // Kill the first mapping; the second should still be valid.
690 mapping0.reset();
691 EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]);
692
693 // Close everything that belongs to us.
694 mp0->Close(0);
695 mp1->Close(1);
696 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
697
698 // The second mapping should still be good.
699 EXPECT_EQ('x', static_cast<char*>(mapping1->base())[1]);
700 }
701
702 #if defined(OS_POSIX)
703 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
704 #else
705 // Not yet implemented (on Windows).
706 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
707 #endif
TEST_F(RemoteMessagePipeTest,MAYBE_PlatformHandlePassing)708 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
709 static const char kHello[] = "hello";
710 static const char kWorld[] = "world";
711 Waiter waiter;
712 uint32_t context = 0;
713
714 scoped_refptr<MessagePipe> mp0(new MessagePipe(
715 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
716 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
717 scoped_refptr<MessagePipe> mp1(new MessagePipe(
718 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
719 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
720 ConnectMessagePipes(mp0, mp1);
721
722 base::FilePath unused;
723 base::ScopedFILE fp(CreateAndOpenTemporaryFile(&unused));
724 EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
725 // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
726 // be passed.
727 scoped_refptr<PlatformHandleDispatcher> dispatcher(
728 new PlatformHandleDispatcher(
729 mojo::test::PlatformHandleFromFILE(fp.Pass())));
730
731 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
732 // it later, it might already be readable.)
733 waiter.Init();
734 EXPECT_EQ(MOJO_RESULT_OK,
735 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
736
737 // Write to MP 0, port 0.
738 {
739 DispatcherTransport
740 transport(test::DispatcherTryStartTransport(dispatcher.get()));
741 EXPECT_TRUE(transport.is_valid());
742
743 std::vector<DispatcherTransport> transports;
744 transports.push_back(transport);
745 EXPECT_EQ(MOJO_RESULT_OK,
746 mp0->WriteMessage(0, kWorld, sizeof(kWorld), &transports,
747 MOJO_WRITE_MESSAGE_FLAG_NONE));
748 transport.End();
749
750 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
751 // |dispatcher| is destroyed.
752 EXPECT_TRUE(dispatcher->HasOneRef());
753 dispatcher = NULL;
754 }
755
756 // Wait.
757 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
758 EXPECT_EQ(123u, context);
759 mp1->RemoveWaiter(1, &waiter);
760
761 // Read from MP 1, port 1.
762 char read_buffer[100] = { 0 };
763 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
764 DispatcherVector read_dispatchers;
765 uint32_t read_num_dispatchers = 10; // Maximum to get.
766 EXPECT_EQ(MOJO_RESULT_OK,
767 mp1->ReadMessage(1, read_buffer, &read_buffer_size,
768 &read_dispatchers, &read_num_dispatchers,
769 MOJO_READ_MESSAGE_FLAG_NONE));
770 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
771 EXPECT_STREQ(kWorld, read_buffer);
772 EXPECT_EQ(1u, read_dispatchers.size());
773 EXPECT_EQ(1u, read_num_dispatchers);
774 ASSERT_TRUE(read_dispatchers[0]);
775 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
776
777 EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
778 dispatcher =
779 static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
780
781 embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
782 EXPECT_TRUE(h.is_valid());
783
784 fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
785 EXPECT_FALSE(h.is_valid());
786 EXPECT_TRUE(fp);
787
788 rewind(fp.get());
789 memset(read_buffer, 0, sizeof(read_buffer));
790 EXPECT_EQ(sizeof(kHello),
791 fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
792 EXPECT_STREQ(kHello, read_buffer);
793
794 // Close everything that belongs to us.
795 mp0->Close(0);
796 mp1->Close(1);
797 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
798 }
799
800 // Test racing closes (on each end).
801 // Note: A flaky failure would almost certainly indicate a problem in the code
802 // itself (not in the test). Also, any logged warnings/errors would also
803 // probably be indicative of bugs.
TEST_F(RemoteMessagePipeTest,RacingClosesStress)804 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
805 base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
806
807 for (unsigned i = 0; i < 256; i++) {
808 DVLOG(2) << "---------------------------------------- " << i;
809 scoped_refptr<MessagePipe> mp0(new MessagePipe(
810 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
811 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
812 BootstrapMessagePipeNoWait(0, mp0);
813
814 scoped_refptr<MessagePipe> mp1(new MessagePipe(
815 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
816 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
817 BootstrapMessagePipeNoWait(1, mp1);
818
819 if (i & 1u) {
820 io_thread()->task_runner()->PostTask(
821 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
822 }
823 if (i & 2u)
824 base::PlatformThread::Sleep(delay);
825
826 mp0->Close(0);
827
828 if (i & 4u) {
829 io_thread()->task_runner()->PostTask(
830 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
831 }
832 if (i & 8u)
833 base::PlatformThread::Sleep(delay);
834
835 mp1->Close(1);
836
837 RestoreInitialState();
838 }
839 }
840
841 } // namespace
842 } // namespace system
843 } // namespace mojo
844