• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <stdint.h>
6 #include <stdio.h>
7 #include <string.h>
8 
9 #include <vector>
10 
11 #include "base/basictypes.h"
12 #include "base/bind.h"
13 #include "base/file_util.h"
14 #include "base/files/file_path.h"
15 #include "base/files/scoped_file.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/threading/platform_thread.h"  // For |Sleep()|.
20 #include "build/build_config.h"  // TODO(vtl): Remove this.
21 #include "mojo/common/test/test_utils.h"
22 #include "mojo/embedder/platform_channel_pair.h"
23 #include "mojo/embedder/scoped_platform_handle.h"
24 #include "mojo/system/channel.h"
25 #include "mojo/system/local_message_pipe_endpoint.h"
26 #include "mojo/system/message_pipe.h"
27 #include "mojo/system/message_pipe_dispatcher.h"
28 #include "mojo/system/platform_handle_dispatcher.h"
29 #include "mojo/system/proxy_message_pipe_endpoint.h"
30 #include "mojo/system/raw_channel.h"
31 #include "mojo/system/shared_buffer_dispatcher.h"
32 #include "mojo/system/test_utils.h"
33 #include "mojo/system/waiter.h"
34 #include "testing/gtest/include/gtest/gtest.h"
35 
36 namespace mojo {
37 namespace system {
38 namespace {
39 
40 class RemoteMessagePipeTest : public testing::Test {
41  public:
RemoteMessagePipeTest()42   RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart) {}
~RemoteMessagePipeTest()43   virtual ~RemoteMessagePipeTest() {}
44 
SetUp()45   virtual void SetUp() OVERRIDE {
46     io_thread_.PostTaskAndWait(
47         FROM_HERE,
48         base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
49                    base::Unretained(this)));
50   }
51 
TearDown()52   virtual void TearDown() OVERRIDE {
53     io_thread_.PostTaskAndWait(
54         FROM_HERE,
55         base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
56                    base::Unretained(this)));
57   }
58 
59  protected:
60   // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
61   // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
62   // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,scoped_refptr<MessagePipe> mp1)63   void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
64                            scoped_refptr<MessagePipe> mp1) {
65     io_thread_.PostTaskAndWait(
66         FROM_HERE,
67         base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
68                    base::Unretained(this), mp0, mp1));
69   }
70 
71   // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
72   // It assumes/requires that this is the bootstrap case, i.e., that the
73   // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
74   // returns *without* waiting for it to finish connecting.
BootstrapMessagePipeNoWait(unsigned channel_index,scoped_refptr<MessagePipe> mp)75   void BootstrapMessagePipeNoWait(unsigned channel_index,
76                                   scoped_refptr<MessagePipe> mp) {
77     io_thread_.PostTask(
78         FROM_HERE,
79         base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
80                    base::Unretained(this), channel_index, mp));
81   }
82 
RestoreInitialState()83   void RestoreInitialState() {
84     io_thread_.PostTaskAndWait(
85         FROM_HERE,
86         base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
87                    base::Unretained(this)));
88   }
89 
io_thread()90   test::TestIOThread* io_thread() { return &io_thread_; }
91 
92  private:
SetUpOnIOThread()93   void SetUpOnIOThread() {
94     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
95 
96     embedder::PlatformChannelPair channel_pair;
97     platform_handles_[0] = channel_pair.PassServerHandle();
98     platform_handles_[1] = channel_pair.PassClientHandle();
99   }
100 
TearDownOnIOThread()101   void TearDownOnIOThread() {
102     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
103 
104     if (channels_[0]) {
105       channels_[0]->Shutdown();
106       channels_[0] = NULL;
107     }
108     if (channels_[1]) {
109       channels_[1]->Shutdown();
110       channels_[1] = NULL;
111     }
112   }
113 
CreateAndInitChannel(unsigned channel_index)114   void CreateAndInitChannel(unsigned channel_index) {
115     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
116     CHECK(channel_index == 0 || channel_index == 1);
117     CHECK(!channels_[channel_index]);
118 
119     channels_[channel_index] = new Channel();
120     CHECK(channels_[channel_index]->Init(
121         RawChannel::Create(platform_handles_[channel_index].Pass())));
122   }
123 
ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,scoped_refptr<MessagePipe> mp1)124   void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
125                                      scoped_refptr<MessagePipe> mp1) {
126     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
127 
128     if (!channels_[0])
129       CreateAndInitChannel(0);
130     if (!channels_[1])
131       CreateAndInitChannel(1);
132 
133     MessageInTransit::EndpointId local_id0 =
134         channels_[0]->AttachMessagePipeEndpoint(mp0, 1);
135     MessageInTransit::EndpointId local_id1 =
136         channels_[1]->AttachMessagePipeEndpoint(mp1, 0);
137 
138     CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
139     CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
140   }
141 
BootstrapMessagePipeOnIOThread(unsigned channel_index,scoped_refptr<MessagePipe> mp)142   void BootstrapMessagePipeOnIOThread(unsigned channel_index,
143                                       scoped_refptr<MessagePipe> mp) {
144     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
145     CHECK(channel_index == 0 || channel_index == 1);
146 
147     unsigned port = channel_index ^ 1u;
148 
149     CreateAndInitChannel(channel_index);
150     MessageInTransit::EndpointId endpoint_id =
151         channels_[channel_index]->AttachMessagePipeEndpoint(mp, port);
152     if (endpoint_id == MessageInTransit::kInvalidEndpointId)
153       return;
154 
155     CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
156     CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
157         Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
158   }
159 
RestoreInitialStateOnIOThread()160   void RestoreInitialStateOnIOThread() {
161     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
162 
163     TearDownOnIOThread();
164     SetUpOnIOThread();
165   }
166 
167   test::TestIOThread io_thread_;
168   embedder::ScopedPlatformHandle platform_handles_[2];
169   scoped_refptr<Channel> channels_[2];
170 
171   DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
172 };
173 
TEST_F(RemoteMessagePipeTest,Basic)174 TEST_F(RemoteMessagePipeTest, Basic) {
175   static const char kHello[] = "hello";
176   static const char kWorld[] = "world!!!1!!!1!";
177   char buffer[100] = { 0 };
178   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
179   Waiter waiter;
180   uint32_t context = 0;
181 
182   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
183   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
184   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
185 
186   scoped_refptr<MessagePipe> mp0(new MessagePipe(
187       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
188       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
189   scoped_refptr<MessagePipe> mp1(new MessagePipe(
190       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
191       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
192   ConnectMessagePipes(mp0, mp1);
193 
194   // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
195 
196   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
197   // it later, it might already be readable.)
198   waiter.Init();
199   EXPECT_EQ(MOJO_RESULT_OK,
200             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
201 
202   // Write to MP 0, port 0.
203   EXPECT_EQ(MOJO_RESULT_OK,
204             mp0->WriteMessage(0,
205                               kHello, sizeof(kHello),
206                               NULL,
207                               MOJO_WRITE_MESSAGE_FLAG_NONE));
208 
209   // Wait.
210   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
211   EXPECT_EQ(123u, context);
212   mp1->RemoveWaiter(1, &waiter);
213 
214   // Read from MP 1, port 1.
215   EXPECT_EQ(MOJO_RESULT_OK,
216             mp1->ReadMessage(1,
217                              buffer, &buffer_size,
218                              NULL, NULL,
219                              MOJO_READ_MESSAGE_FLAG_NONE));
220   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
221   EXPECT_STREQ(kHello, buffer);
222 
223   // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
224 
225   waiter.Init();
226   EXPECT_EQ(MOJO_RESULT_OK,
227             mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456));
228 
229   EXPECT_EQ(MOJO_RESULT_OK,
230             mp1->WriteMessage(1,
231                               kWorld, sizeof(kWorld),
232                               NULL,
233                               MOJO_WRITE_MESSAGE_FLAG_NONE));
234 
235   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
236   EXPECT_EQ(456u, context);
237   mp0->RemoveWaiter(0, &waiter);
238 
239   buffer_size = static_cast<uint32_t>(sizeof(buffer));
240   EXPECT_EQ(MOJO_RESULT_OK,
241             mp0->ReadMessage(0,
242                              buffer, &buffer_size,
243                              NULL, NULL,
244                              MOJO_READ_MESSAGE_FLAG_NONE));
245   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
246   EXPECT_STREQ(kWorld, buffer);
247 
248   // Close MP 0, port 0.
249   mp0->Close(0);
250 
251   // Try to wait for MP 1, port 1 to become readable. This will eventually fail
252   // when it realizes that MP 0, port 0 has been closed. (It may also fail
253   // immediately.)
254   waiter.Init();
255   MojoResult result =
256       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789);
257   if (result == MOJO_RESULT_OK) {
258     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
259               waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
260     EXPECT_EQ(789u, context);
261     mp1->RemoveWaiter(1, &waiter);
262   } else {
263     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
264   }
265 
266   // And MP 1, port 1.
267   mp1->Close(1);
268 }
269 
TEST_F(RemoteMessagePipeTest,Multiplex)270 TEST_F(RemoteMessagePipeTest, Multiplex) {
271   static const char kHello[] = "hello";
272   static const char kWorld[] = "world!!!1!!!1!";
273   char buffer[100] = { 0 };
274   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
275   Waiter waiter;
276   uint32_t context = 0;
277 
278   // Connect message pipes as in the |Basic| test.
279 
280   scoped_refptr<MessagePipe> mp0(new MessagePipe(
281       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
282       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
283   scoped_refptr<MessagePipe> mp1(new MessagePipe(
284       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
285       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
286   ConnectMessagePipes(mp0, mp1);
287 
288   // Now put another message pipe on the channel.
289 
290   scoped_refptr<MessagePipe> mp2(new MessagePipe(
291       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
292       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
293   scoped_refptr<MessagePipe> mp3(new MessagePipe(
294       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
295       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
296   ConnectMessagePipes(mp2, mp3);
297 
298   // Write: MP 2, port 0 -> MP 3, port 1.
299 
300   waiter.Init();
301   EXPECT_EQ(MOJO_RESULT_OK,
302             mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789));
303 
304   EXPECT_EQ(MOJO_RESULT_OK,
305             mp2->WriteMessage(0,
306                               kHello, sizeof(kHello),
307                               NULL,
308                               MOJO_WRITE_MESSAGE_FLAG_NONE));
309 
310   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
311   EXPECT_EQ(789u, context);
312   mp3->RemoveWaiter(1, &waiter);
313 
314   // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
315   buffer_size = static_cast<uint32_t>(sizeof(buffer));
316   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
317             mp0->ReadMessage(0,
318                              buffer, &buffer_size,
319                              NULL, NULL,
320                              MOJO_READ_MESSAGE_FLAG_NONE));
321   buffer_size = static_cast<uint32_t>(sizeof(buffer));
322   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
323             mp1->ReadMessage(1,
324                              buffer, &buffer_size,
325                              NULL, NULL,
326                              MOJO_READ_MESSAGE_FLAG_NONE));
327   buffer_size = static_cast<uint32_t>(sizeof(buffer));
328   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
329             mp2->ReadMessage(0,
330                              buffer, &buffer_size,
331                              NULL, NULL,
332                              MOJO_READ_MESSAGE_FLAG_NONE));
333 
334   // Read from MP 3, port 1.
335   buffer_size = static_cast<uint32_t>(sizeof(buffer));
336   EXPECT_EQ(MOJO_RESULT_OK,
337             mp3->ReadMessage(1,
338                              buffer, &buffer_size,
339                              NULL, NULL,
340                              MOJO_READ_MESSAGE_FLAG_NONE));
341   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
342   EXPECT_STREQ(kHello, buffer);
343 
344   // Write: MP 0, port 0 -> MP 1, port 1 again.
345 
346   waiter.Init();
347   EXPECT_EQ(MOJO_RESULT_OK,
348             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
349 
350   EXPECT_EQ(MOJO_RESULT_OK,
351             mp0->WriteMessage(0,
352                               kWorld, sizeof(kWorld),
353                               NULL,
354                               MOJO_WRITE_MESSAGE_FLAG_NONE));
355 
356   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
357   EXPECT_EQ(123u, context);
358   mp1->RemoveWaiter(1, &waiter);
359 
360   // Make sure there's nothing on the other ports.
361   buffer_size = static_cast<uint32_t>(sizeof(buffer));
362   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
363             mp0->ReadMessage(0,
364                              buffer, &buffer_size,
365                              NULL, NULL,
366                              MOJO_READ_MESSAGE_FLAG_NONE));
367   buffer_size = static_cast<uint32_t>(sizeof(buffer));
368   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
369             mp2->ReadMessage(0,
370                              buffer, &buffer_size,
371                              NULL, NULL,
372                              MOJO_READ_MESSAGE_FLAG_NONE));
373   buffer_size = static_cast<uint32_t>(sizeof(buffer));
374   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
375             mp3->ReadMessage(1,
376                              buffer, &buffer_size,
377                              NULL, NULL,
378                              MOJO_READ_MESSAGE_FLAG_NONE));
379 
380   buffer_size = static_cast<uint32_t>(sizeof(buffer));
381   EXPECT_EQ(MOJO_RESULT_OK,
382             mp1->ReadMessage(1,
383                              buffer, &buffer_size,
384                              NULL, NULL,
385                              MOJO_READ_MESSAGE_FLAG_NONE));
386   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
387   EXPECT_STREQ(kWorld, buffer);
388 
389   mp0->Close(0);
390   mp1->Close(1);
391   mp2->Close(0);
392   mp3->Close(1);
393 }
394 
TEST_F(RemoteMessagePipeTest,CloseBeforeConnect)395 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
396   static const char kHello[] = "hello";
397   char buffer[100] = { 0 };
398   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
399   Waiter waiter;
400   uint32_t context = 0;
401 
402   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
403   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
404   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
405 
406   scoped_refptr<MessagePipe> mp0(new MessagePipe(
407       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
408       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
409 
410   // Write to MP 0, port 0.
411   EXPECT_EQ(MOJO_RESULT_OK,
412             mp0->WriteMessage(0,
413                               kHello, sizeof(kHello),
414                               NULL,
415                               MOJO_WRITE_MESSAGE_FLAG_NONE));
416 
417   BootstrapMessagePipeNoWait(0, mp0);
418 
419 
420   // Close MP 0, port 0 before channel 1 is even connected.
421   mp0->Close(0);
422 
423   scoped_refptr<MessagePipe> mp1(new MessagePipe(
424       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
425       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
426 
427   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
428   // it later, it might already be readable.)
429   waiter.Init();
430   EXPECT_EQ(MOJO_RESULT_OK,
431             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
432 
433   BootstrapMessagePipeNoWait(1, mp1);
434 
435   // Wait.
436   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
437   EXPECT_EQ(123u, context);
438   mp1->RemoveWaiter(1, &waiter);
439 
440   // Read from MP 1, port 1.
441   EXPECT_EQ(MOJO_RESULT_OK,
442             mp1->ReadMessage(1,
443                              buffer, &buffer_size,
444                              NULL, NULL,
445                              MOJO_READ_MESSAGE_FLAG_NONE));
446   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
447   EXPECT_STREQ(kHello, buffer);
448 
449   // And MP 1, port 1.
450   mp1->Close(1);
451 }
452 
TEST_F(RemoteMessagePipeTest,HandlePassing)453 TEST_F(RemoteMessagePipeTest, HandlePassing) {
454   static const char kHello[] = "hello";
455   Waiter waiter;
456   uint32_t context = 0;
457 
458   scoped_refptr<MessagePipe> mp0(new MessagePipe(
459       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
460       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
461   scoped_refptr<MessagePipe> mp1(new MessagePipe(
462       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
463       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
464   ConnectMessagePipes(mp0, mp1);
465 
466   // We'll try to pass this dispatcher.
467   scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher(
468       MessagePipeDispatcher::kDefaultCreateOptions));
469   scoped_refptr<MessagePipe> local_mp(new MessagePipe());
470   dispatcher->Init(local_mp, 0);
471 
472   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
473   // it later, it might already be readable.)
474   waiter.Init();
475   EXPECT_EQ(MOJO_RESULT_OK,
476             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
477 
478   // Write to MP 0, port 0.
479   {
480     DispatcherTransport
481         transport(test::DispatcherTryStartTransport(dispatcher.get()));
482     EXPECT_TRUE(transport.is_valid());
483 
484     std::vector<DispatcherTransport> transports;
485     transports.push_back(transport);
486     EXPECT_EQ(MOJO_RESULT_OK,
487               mp0->WriteMessage(0, kHello, sizeof(kHello), &transports,
488                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
489     transport.End();
490 
491     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
492     // |dispatcher| is destroyed.
493     EXPECT_TRUE(dispatcher->HasOneRef());
494     dispatcher = NULL;
495   }
496 
497   // Wait.
498   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
499   EXPECT_EQ(123u, context);
500   mp1->RemoveWaiter(1, &waiter);
501 
502   // Read from MP 1, port 1.
503   char read_buffer[100] = { 0 };
504   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
505   DispatcherVector read_dispatchers;
506   uint32_t read_num_dispatchers = 10;  // Maximum to get.
507   EXPECT_EQ(MOJO_RESULT_OK,
508             mp1->ReadMessage(1, read_buffer, &read_buffer_size,
509                              &read_dispatchers, &read_num_dispatchers,
510                              MOJO_READ_MESSAGE_FLAG_NONE));
511   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
512   EXPECT_STREQ(kHello, read_buffer);
513   EXPECT_EQ(1u, read_dispatchers.size());
514   EXPECT_EQ(1u, read_num_dispatchers);
515   ASSERT_TRUE(read_dispatchers[0]);
516   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
517 
518   EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
519   dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
520 
521   // Write to "local_mp", port 1.
522   EXPECT_EQ(MOJO_RESULT_OK,
523             local_mp->WriteMessage(1, kHello, sizeof(kHello), NULL,
524                                    MOJO_WRITE_MESSAGE_FLAG_NONE));
525 
526   // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
527   // here. (We don't crash if I sleep and then close.)
528 
529   // Wait for the dispatcher to become readable.
530   waiter.Init();
531   EXPECT_EQ(MOJO_RESULT_OK,
532             dispatcher->AddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456));
533   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
534   EXPECT_EQ(456u, context);
535   dispatcher->RemoveWaiter(&waiter);
536 
537   // Read from the dispatcher.
538   memset(read_buffer, 0, sizeof(read_buffer));
539   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
540   EXPECT_EQ(MOJO_RESULT_OK,
541             dispatcher->ReadMessage(read_buffer, &read_buffer_size, 0, NULL,
542                                     MOJO_READ_MESSAGE_FLAG_NONE));
543   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
544   EXPECT_STREQ(kHello, read_buffer);
545 
546   // Prepare to wait on "local_mp", port 1.
547   waiter.Init();
548   EXPECT_EQ(MOJO_RESULT_OK,
549             local_mp->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789));
550 
551   // Write to the dispatcher.
552   EXPECT_EQ(MOJO_RESULT_OK,
553             dispatcher->WriteMessage(kHello, sizeof(kHello), NULL,
554                                      MOJO_WRITE_MESSAGE_FLAG_NONE));
555 
556   // Wait.
557   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
558   EXPECT_EQ(789u, context);
559   local_mp->RemoveWaiter(1, &waiter);
560 
561   // Read from "local_mp", port 1.
562   memset(read_buffer, 0, sizeof(read_buffer));
563   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
564   EXPECT_EQ(MOJO_RESULT_OK,
565             local_mp->ReadMessage(1, read_buffer, &read_buffer_size, NULL, NULL,
566                                   MOJO_READ_MESSAGE_FLAG_NONE));
567   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
568   EXPECT_STREQ(kHello, read_buffer);
569 
570   // TODO(vtl): Also test that messages queued up before the handle was sent are
571   // delivered properly.
572 
573   // Close everything that belongs to us.
574   mp0->Close(0);
575   mp1->Close(1);
576   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
577   // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
578   local_mp->Close(1);
579 }
580 
581 #if defined(OS_POSIX)
582 #define MAYBE_SharedBufferPassing SharedBufferPassing
583 #else
584 // Not yet implemented (on Windows).
585 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
586 #endif
TEST_F(RemoteMessagePipeTest,MAYBE_SharedBufferPassing)587 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
588   static const char kHello[] = "hello";
589   Waiter waiter;
590   uint32_t context = 0;
591 
592   scoped_refptr<MessagePipe> mp0(new MessagePipe(
593       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
594       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
595   scoped_refptr<MessagePipe> mp1(new MessagePipe(
596       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
597       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
598   ConnectMessagePipes(mp0, mp1);
599 
600   // We'll try to pass this dispatcher.
601   scoped_refptr<SharedBufferDispatcher> dispatcher;
602   EXPECT_EQ(MOJO_RESULT_OK,
603             SharedBufferDispatcher::Create(
604                 SharedBufferDispatcher::kDefaultCreateOptions, 100,
605                 &dispatcher));
606   ASSERT_TRUE(dispatcher);
607 
608   // Make a mapping.
609   scoped_ptr<RawSharedBufferMapping> mapping0;
610   EXPECT_EQ(MOJO_RESULT_OK,
611             dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE,
612                                   &mapping0));
613   ASSERT_TRUE(mapping0);
614   ASSERT_TRUE(mapping0->base());
615   ASSERT_EQ(100u, mapping0->length());
616   static_cast<char*>(mapping0->base())[0] = 'A';
617   static_cast<char*>(mapping0->base())[50] = 'B';
618   static_cast<char*>(mapping0->base())[99] = 'C';
619 
620   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
621   // it later, it might already be readable.)
622   waiter.Init();
623   EXPECT_EQ(MOJO_RESULT_OK,
624             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
625 
626   // Write to MP 0, port 0.
627   {
628     DispatcherTransport
629         transport(test::DispatcherTryStartTransport(dispatcher.get()));
630     EXPECT_TRUE(transport.is_valid());
631 
632     std::vector<DispatcherTransport> transports;
633     transports.push_back(transport);
634     EXPECT_EQ(MOJO_RESULT_OK,
635               mp0->WriteMessage(0, kHello, sizeof(kHello), &transports,
636                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
637     transport.End();
638 
639     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
640     // |dispatcher| is destroyed.
641     EXPECT_TRUE(dispatcher->HasOneRef());
642     dispatcher = NULL;
643   }
644 
645   // Wait.
646   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
647   EXPECT_EQ(123u, context);
648   mp1->RemoveWaiter(1, &waiter);
649 
650   // Read from MP 1, port 1.
651   char read_buffer[100] = { 0 };
652   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
653   DispatcherVector read_dispatchers;
654   uint32_t read_num_dispatchers = 10;  // Maximum to get.
655   EXPECT_EQ(MOJO_RESULT_OK,
656             mp1->ReadMessage(1, read_buffer, &read_buffer_size,
657                              &read_dispatchers, &read_num_dispatchers,
658                              MOJO_READ_MESSAGE_FLAG_NONE));
659   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
660   EXPECT_STREQ(kHello, read_buffer);
661   EXPECT_EQ(1u, read_dispatchers.size());
662   EXPECT_EQ(1u, read_num_dispatchers);
663   ASSERT_TRUE(read_dispatchers[0]);
664   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
665 
666   EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
667   dispatcher =
668       static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
669 
670   // Make another mapping.
671   scoped_ptr<RawSharedBufferMapping> mapping1;
672   EXPECT_EQ(MOJO_RESULT_OK,
673             dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE,
674                                   &mapping1));
675   ASSERT_TRUE(mapping1);
676   ASSERT_TRUE(mapping1->base());
677   ASSERT_EQ(100u, mapping1->length());
678   EXPECT_NE(mapping1->base(), mapping0->base());
679   EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]);
680   EXPECT_EQ('B', static_cast<char*>(mapping1->base())[50]);
681   EXPECT_EQ('C', static_cast<char*>(mapping1->base())[99]);
682 
683   // Write stuff either way.
684   static_cast<char*>(mapping1->base())[1] = 'x';
685   EXPECT_EQ('x', static_cast<char*>(mapping0->base())[1]);
686   static_cast<char*>(mapping0->base())[2] = 'y';
687   EXPECT_EQ('y', static_cast<char*>(mapping1->base())[2]);
688 
689   // Kill the first mapping; the second should still be valid.
690   mapping0.reset();
691   EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]);
692 
693   // Close everything that belongs to us.
694   mp0->Close(0);
695   mp1->Close(1);
696   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
697 
698   // The second mapping should still be good.
699   EXPECT_EQ('x', static_cast<char*>(mapping1->base())[1]);
700 }
701 
702 #if defined(OS_POSIX)
703 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
704 #else
705 // Not yet implemented (on Windows).
706 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
707 #endif
TEST_F(RemoteMessagePipeTest,MAYBE_PlatformHandlePassing)708 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
709   static const char kHello[] = "hello";
710   static const char kWorld[] = "world";
711   Waiter waiter;
712   uint32_t context = 0;
713 
714   scoped_refptr<MessagePipe> mp0(new MessagePipe(
715       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
716       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
717   scoped_refptr<MessagePipe> mp1(new MessagePipe(
718       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
719       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
720   ConnectMessagePipes(mp0, mp1);
721 
722   base::FilePath unused;
723   base::ScopedFILE fp(CreateAndOpenTemporaryFile(&unused));
724   EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
725   // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
726   // be passed.
727   scoped_refptr<PlatformHandleDispatcher> dispatcher(
728       new PlatformHandleDispatcher(
729           mojo::test::PlatformHandleFromFILE(fp.Pass())));
730 
731   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
732   // it later, it might already be readable.)
733   waiter.Init();
734   EXPECT_EQ(MOJO_RESULT_OK,
735             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
736 
737   // Write to MP 0, port 0.
738   {
739     DispatcherTransport
740         transport(test::DispatcherTryStartTransport(dispatcher.get()));
741     EXPECT_TRUE(transport.is_valid());
742 
743     std::vector<DispatcherTransport> transports;
744     transports.push_back(transport);
745     EXPECT_EQ(MOJO_RESULT_OK,
746               mp0->WriteMessage(0, kWorld, sizeof(kWorld), &transports,
747                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
748     transport.End();
749 
750     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
751     // |dispatcher| is destroyed.
752     EXPECT_TRUE(dispatcher->HasOneRef());
753     dispatcher = NULL;
754   }
755 
756   // Wait.
757   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
758   EXPECT_EQ(123u, context);
759   mp1->RemoveWaiter(1, &waiter);
760 
761   // Read from MP 1, port 1.
762   char read_buffer[100] = { 0 };
763   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
764   DispatcherVector read_dispatchers;
765   uint32_t read_num_dispatchers = 10;  // Maximum to get.
766   EXPECT_EQ(MOJO_RESULT_OK,
767             mp1->ReadMessage(1, read_buffer, &read_buffer_size,
768                              &read_dispatchers, &read_num_dispatchers,
769                              MOJO_READ_MESSAGE_FLAG_NONE));
770   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
771   EXPECT_STREQ(kWorld, read_buffer);
772   EXPECT_EQ(1u, read_dispatchers.size());
773   EXPECT_EQ(1u, read_num_dispatchers);
774   ASSERT_TRUE(read_dispatchers[0]);
775   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
776 
777   EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
778   dispatcher =
779       static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
780 
781   embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
782   EXPECT_TRUE(h.is_valid());
783 
784   fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
785   EXPECT_FALSE(h.is_valid());
786   EXPECT_TRUE(fp);
787 
788   rewind(fp.get());
789   memset(read_buffer, 0, sizeof(read_buffer));
790   EXPECT_EQ(sizeof(kHello),
791             fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
792   EXPECT_STREQ(kHello, read_buffer);
793 
794   // Close everything that belongs to us.
795   mp0->Close(0);
796   mp1->Close(1);
797   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
798 }
799 
800 // Test racing closes (on each end).
801 // Note: A flaky failure would almost certainly indicate a problem in the code
802 // itself (not in the test). Also, any logged warnings/errors would also
803 // probably be indicative of bugs.
TEST_F(RemoteMessagePipeTest,RacingClosesStress)804 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
805   base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
806 
807   for (unsigned i = 0; i < 256; i++) {
808     DVLOG(2) << "---------------------------------------- " << i;
809     scoped_refptr<MessagePipe> mp0(new MessagePipe(
810         scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
811         scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
812     BootstrapMessagePipeNoWait(0, mp0);
813 
814     scoped_refptr<MessagePipe> mp1(new MessagePipe(
815         scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
816         scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
817     BootstrapMessagePipeNoWait(1, mp1);
818 
819     if (i & 1u) {
820       io_thread()->task_runner()->PostTask(
821           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
822     }
823     if (i & 2u)
824       base::PlatformThread::Sleep(delay);
825 
826     mp0->Close(0);
827 
828     if (i & 4u) {
829       io_thread()->task_runner()->PostTask(
830           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
831     }
832     if (i & 8u)
833       base::PlatformThread::Sleep(delay);
834 
835     mp1->Close(1);
836 
837     RestoreInitialState();
838   }
839 }
840 
841 }  // namespace
842 }  // namespace system
843 }  // namespace mojo
844