• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // TODO(vtl): The POSIX-specific bits have been factored out. Apply this test to
6 // non-POSIX once we have a non-POSIX implementation.
7 
8 #include <stdint.h>
9 #include <string.h>
10 
11 #include "base/basictypes.h"
12 #include "base/bind.h"
13 #include "base/callback.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/threading/thread.h"
18 #include "mojo/system/channel.h"
19 #include "mojo/system/local_message_pipe_endpoint.h"
20 #include "mojo/system/message_pipe.h"
21 #include "mojo/system/platform_channel.h"
22 #include "mojo/system/proxy_message_pipe_endpoint.h"
23 #include "mojo/system/test_utils.h"
24 #include "mojo/system/waiter.h"
25 #include "testing/gtest/include/gtest/gtest.h"
26 
27 namespace mojo {
28 namespace system {
29 namespace {
30 
31 class RemoteMessagePipeTest : public testing::Test {
32  public:
RemoteMessagePipeTest()33   RemoteMessagePipeTest() : io_thread_("io_thread") {
34   }
35 
~RemoteMessagePipeTest()36   virtual ~RemoteMessagePipeTest() {
37   }
38 
SetUp()39   virtual void SetUp() OVERRIDE {
40     io_thread_.StartWithOptions(
41         base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
42 
43     test::PostTaskAndWait(io_thread_task_runner(),
44                           FROM_HERE,
45                           base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
46                                      base::Unretained(this)));
47   }
48 
TearDown()49   virtual void TearDown() OVERRIDE {
50     test::PostTaskAndWait(io_thread_task_runner(),
51                           FROM_HERE,
52                           base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
53                                      base::Unretained(this)));
54     io_thread_.Stop();
55   }
56 
57   // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
58   // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
59   // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
ConnectMessagePipes(scoped_refptr<MessagePipe> mp_0,scoped_refptr<MessagePipe> mp_1)60   void ConnectMessagePipes(scoped_refptr<MessagePipe> mp_0,
61                            scoped_refptr<MessagePipe> mp_1) {
62     test::PostTaskAndWait(
63         io_thread_task_runner(),
64         FROM_HERE,
65         base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
66                    base::Unretained(this), mp_0, mp_1));
67   }
68 
69   // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
70   // It assumes/requires that this is the bootstrap case, i.e., that the
71   // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
72   // returns *without* waiting for it to finish connecting.
BootstrapMessagePipeNoWait(unsigned channel_index,scoped_refptr<MessagePipe> mp)73   void BootstrapMessagePipeNoWait(unsigned channel_index,
74                                   scoped_refptr<MessagePipe> mp) {
75     io_thread_task_runner()->PostTask(
76         FROM_HERE,
77         base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
78                    base::Unretained(this), channel_index, mp));
79   }
80 
81  protected:
io_thread_message_loop()82   base::MessageLoop* io_thread_message_loop() {
83     return io_thread_.message_loop();
84   }
85 
io_thread_task_runner()86   scoped_refptr<base::TaskRunner> io_thread_task_runner() {
87     return io_thread_message_loop()->message_loop_proxy();
88   }
89 
90  private:
SetUpOnIOThread()91   void SetUpOnIOThread() {
92     CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
93 
94     scoped_ptr<PlatformServerChannel> server_channel(
95         PlatformServerChannel::Create("channel"));
96     CHECK(server_channel.get());
97     CHECK(server_channel->is_valid());
98     scoped_ptr<PlatformClientChannel> client_channel(
99         server_channel->CreateClientChannel());
100     CHECK(client_channel.get());
101     CHECK(client_channel->is_valid());
102 
103     platform_channels_[0] = server_channel.PassAs<PlatformChannel>();
104     platform_channels_[1] = client_channel.PassAs<PlatformChannel>();
105   }
106 
CreateAndInitChannel(unsigned channel_index)107   void CreateAndInitChannel(unsigned channel_index) {
108     CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
109     CHECK(channel_index == 0 || channel_index == 1);
110     CHECK(!channels_[channel_index].get());
111 
112     channels_[channel_index] = new Channel();
113     CHECK(channels_[channel_index]->Init(
114         platform_channels_[channel_index]->PassHandle()));
115   }
116 
ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp_0,scoped_refptr<MessagePipe> mp_1)117   void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp_0,
118                                      scoped_refptr<MessagePipe> mp_1) {
119     CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
120 
121     if (!channels_[0].get())
122       CreateAndInitChannel(0);
123     if (!channels_[1].get())
124       CreateAndInitChannel(1);
125 
126     MessageInTransit::EndpointId local_id_0 =
127         channels_[0]->AttachMessagePipeEndpoint(mp_0, 1);
128     MessageInTransit::EndpointId local_id_1 =
129         channels_[1]->AttachMessagePipeEndpoint(mp_1, 0);
130 
131     channels_[0]->RunMessagePipeEndpoint(local_id_0, local_id_1);
132     channels_[1]->RunMessagePipeEndpoint(local_id_1, local_id_0);
133   }
134 
BootstrapMessagePipeOnIOThread(unsigned channel_index,scoped_refptr<MessagePipe> mp)135   void BootstrapMessagePipeOnIOThread(unsigned channel_index,
136                                       scoped_refptr<MessagePipe> mp) {
137     CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
138     CHECK(channel_index == 0 || channel_index == 1);
139 
140     unsigned port = channel_index ^ 1u;
141 
142     // Important: If we don't boot
143     CreateAndInitChannel(channel_index);
144     CHECK_EQ(channels_[channel_index]->AttachMessagePipeEndpoint(mp, port),
145              Channel::kBootstrapEndpointId);
146     channels_[channel_index]->RunMessagePipeEndpoint(
147         Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId);
148   }
149 
TearDownOnIOThread()150   void TearDownOnIOThread() {
151     if (channels_[0].get()) {
152       channels_[0]->Shutdown();
153       channels_[0] = NULL;
154     }
155     if (channels_[1].get()) {
156       channels_[1]->Shutdown();
157       channels_[1] = NULL;
158     }
159   }
160 
161   base::Thread io_thread_;
162   scoped_ptr<PlatformChannel> platform_channels_[2];
163   scoped_refptr<Channel> channels_[2];
164 
165   DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
166 };
167 
TEST_F(RemoteMessagePipeTest,Basic)168 TEST_F(RemoteMessagePipeTest, Basic) {
169   const char hello[] = "hello";
170   const char world[] = "world!!!1!!!1!";
171   char buffer[100] = { 0 };
172   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
173   Waiter waiter;
174 
175   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
176   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
177   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
178 
179   scoped_refptr<MessagePipe> mp_0(new MessagePipe(
180       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
181       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
182   scoped_refptr<MessagePipe> mp_1(new MessagePipe(
183       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
184       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
185   ConnectMessagePipes(mp_0, mp_1);
186 
187   // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
188 
189   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
190   // it later, it might already be readable.)
191   waiter.Init();
192   EXPECT_EQ(MOJO_RESULT_OK,
193             mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
194 
195   // Write to MP 0, port 0.
196   EXPECT_EQ(MOJO_RESULT_OK,
197             mp_0->WriteMessage(0,
198                                hello, sizeof(hello),
199                                NULL,
200                                MOJO_WRITE_MESSAGE_FLAG_NONE));
201 
202   // Wait.
203   EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
204   mp_1->RemoveWaiter(1, &waiter);
205 
206   // Read from MP 1, port 1.
207   EXPECT_EQ(MOJO_RESULT_OK,
208             mp_1->ReadMessage(1,
209                               buffer, &buffer_size,
210                               NULL, NULL,
211                               MOJO_READ_MESSAGE_FLAG_NONE));
212   EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
213   EXPECT_EQ(0, strcmp(buffer, hello));
214 
215   // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
216 
217   waiter.Init();
218   EXPECT_EQ(MOJO_RESULT_OK,
219             mp_0->AddWaiter(0, &waiter, MOJO_WAIT_FLAG_READABLE, 456));
220 
221   EXPECT_EQ(MOJO_RESULT_OK,
222             mp_1->WriteMessage(1,
223                                world, sizeof(world),
224                                NULL,
225                                MOJO_WRITE_MESSAGE_FLAG_NONE));
226 
227   EXPECT_EQ(456, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
228   mp_0->RemoveWaiter(0, &waiter);
229 
230   buffer_size = static_cast<uint32_t>(sizeof(buffer));
231   EXPECT_EQ(MOJO_RESULT_OK,
232             mp_0->ReadMessage(0,
233                               buffer, &buffer_size,
234                               NULL, NULL,
235                               MOJO_READ_MESSAGE_FLAG_NONE));
236   EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
237   EXPECT_EQ(0, strcmp(buffer, world));
238 
239   // Close MP 0, port 0.
240   mp_0->Close(0);
241 
242   // Try to wait for MP 1, port 1 to become readable. This will eventually fail
243   // when it realizes that MP 0, port 0 has been closed. (It may also fail
244   // immediately.)
245   waiter.Init();
246   MojoResult result = mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789);
247   if (result == MOJO_RESULT_OK) {
248     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
249               waiter.Wait(MOJO_DEADLINE_INDEFINITE));
250     mp_1->RemoveWaiter(1, &waiter);
251   } else {
252     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
253   }
254 
255   // And MP 1, port 1.
256   mp_1->Close(1);
257 }
258 
TEST_F(RemoteMessagePipeTest,Multiplex)259 TEST_F(RemoteMessagePipeTest, Multiplex) {
260   const char hello[] = "hello";
261   const char world[] = "world!!!1!!!1!";
262   char buffer[100] = { 0 };
263   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
264   Waiter waiter;
265 
266   // Connect message pipes as in the |Basic| test.
267 
268   scoped_refptr<MessagePipe> mp_0(new MessagePipe(
269       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
270       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
271   scoped_refptr<MessagePipe> mp_1(new MessagePipe(
272       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
273       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
274   ConnectMessagePipes(mp_0, mp_1);
275 
276   // Now put another message pipe on the channel.
277 
278   scoped_refptr<MessagePipe> mp_2(new MessagePipe(
279       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
280       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
281   scoped_refptr<MessagePipe> mp_3(new MessagePipe(
282       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
283       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
284   ConnectMessagePipes(mp_2, mp_3);
285 
286   // Write: MP 2, port 0 -> MP 3, port 1.
287 
288   waiter.Init();
289   EXPECT_EQ(MOJO_RESULT_OK,
290             mp_3->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789));
291 
292   EXPECT_EQ(MOJO_RESULT_OK,
293             mp_2->WriteMessage(0,
294                                hello, sizeof(hello),
295                                NULL,
296                                MOJO_WRITE_MESSAGE_FLAG_NONE));
297 
298   EXPECT_EQ(789, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
299   mp_3->RemoveWaiter(1, &waiter);
300 
301   // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
302   buffer_size = static_cast<uint32_t>(sizeof(buffer));
303   EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
304             mp_0->ReadMessage(0,
305                               buffer, &buffer_size,
306                               NULL, NULL,
307                               MOJO_READ_MESSAGE_FLAG_NONE));
308   buffer_size = static_cast<uint32_t>(sizeof(buffer));
309   EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
310             mp_1->ReadMessage(1,
311                               buffer, &buffer_size,
312                               NULL, NULL,
313                               MOJO_READ_MESSAGE_FLAG_NONE));
314   buffer_size = static_cast<uint32_t>(sizeof(buffer));
315   EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
316             mp_2->ReadMessage(0,
317                               buffer, &buffer_size,
318                               NULL, NULL,
319                               MOJO_READ_MESSAGE_FLAG_NONE));
320 
321   // Read from MP 3, port 1.
322   buffer_size = static_cast<uint32_t>(sizeof(buffer));
323   EXPECT_EQ(MOJO_RESULT_OK,
324             mp_3->ReadMessage(1,
325                               buffer, &buffer_size,
326                               NULL, NULL,
327                               MOJO_READ_MESSAGE_FLAG_NONE));
328   EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
329   EXPECT_EQ(0, strcmp(buffer, hello));
330 
331   // Write: MP 0, port 0 -> MP 1, port 1 again.
332 
333   waiter.Init();
334   EXPECT_EQ(MOJO_RESULT_OK,
335             mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
336 
337   EXPECT_EQ(MOJO_RESULT_OK,
338             mp_0->WriteMessage(0,
339                                world, sizeof(world),
340                                NULL,
341                                MOJO_WRITE_MESSAGE_FLAG_NONE));
342 
343   EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
344   mp_1->RemoveWaiter(1, &waiter);
345 
346   // Make sure there's nothing on the other ports.
347   buffer_size = static_cast<uint32_t>(sizeof(buffer));
348   EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
349             mp_0->ReadMessage(0,
350                               buffer, &buffer_size,
351                               NULL, NULL,
352                               MOJO_READ_MESSAGE_FLAG_NONE));
353   buffer_size = static_cast<uint32_t>(sizeof(buffer));
354   EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
355             mp_2->ReadMessage(0,
356                               buffer, &buffer_size,
357                               NULL, NULL,
358                               MOJO_READ_MESSAGE_FLAG_NONE));
359   buffer_size = static_cast<uint32_t>(sizeof(buffer));
360   EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
361             mp_3->ReadMessage(1,
362                               buffer, &buffer_size,
363                               NULL, NULL,
364                               MOJO_READ_MESSAGE_FLAG_NONE));
365 
366   buffer_size = static_cast<uint32_t>(sizeof(buffer));
367   EXPECT_EQ(MOJO_RESULT_OK,
368             mp_1->ReadMessage(1,
369                               buffer, &buffer_size,
370                               NULL, NULL,
371                               MOJO_READ_MESSAGE_FLAG_NONE));
372   EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
373   EXPECT_EQ(0, strcmp(buffer, world));
374 }
375 
TEST_F(RemoteMessagePipeTest,CloseBeforeConnect)376 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
377   const char hello[] = "hello";
378   char buffer[100] = { 0 };
379   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
380   Waiter waiter;
381 
382   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
383   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
384   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
385 
386   scoped_refptr<MessagePipe> mp_0(new MessagePipe(
387       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
388       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
389 
390   // Write to MP 0, port 0.
391   EXPECT_EQ(MOJO_RESULT_OK,
392             mp_0->WriteMessage(0,
393                                hello, sizeof(hello),
394                                NULL,
395                                MOJO_WRITE_MESSAGE_FLAG_NONE));
396 
397   BootstrapMessagePipeNoWait(0, mp_0);
398 
399 
400   // Close MP 0, port 0 before channel 1 is even connected.
401   mp_0->Close(0);
402 
403   scoped_refptr<MessagePipe> mp_1(new MessagePipe(
404       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
405       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
406 
407   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
408   // it later, it might already be readable.)
409   waiter.Init();
410   EXPECT_EQ(MOJO_RESULT_OK,
411             mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
412 
413   BootstrapMessagePipeNoWait(1, mp_1);
414 
415   // Wait.
416   EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
417   mp_1->RemoveWaiter(1, &waiter);
418 
419   // Read from MP 1, port 1.
420   EXPECT_EQ(MOJO_RESULT_OK,
421             mp_1->ReadMessage(1,
422                               buffer, &buffer_size,
423                               NULL, NULL,
424                               MOJO_READ_MESSAGE_FLAG_NONE));
425   EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
426   EXPECT_EQ(0, strcmp(buffer, hello));
427 
428   // And MP 1, port 1.
429   mp_1->Close(1);
430 }
431 }  // namespace
432 }  // namespace system
433 }  // namespace mojo
434