1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // TODO(vtl): The POSIX-specific bits have been factored out. Apply this test to
6 // non-POSIX once we have a non-POSIX implementation.
7
8 #include <stdint.h>
9 #include <string.h>
10
11 #include "base/basictypes.h"
12 #include "base/bind.h"
13 #include "base/callback.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/threading/thread.h"
18 #include "mojo/system/channel.h"
19 #include "mojo/system/local_message_pipe_endpoint.h"
20 #include "mojo/system/message_pipe.h"
21 #include "mojo/system/platform_channel.h"
22 #include "mojo/system/proxy_message_pipe_endpoint.h"
23 #include "mojo/system/test_utils.h"
24 #include "mojo/system/waiter.h"
25 #include "testing/gtest/include/gtest/gtest.h"
26
27 namespace mojo {
28 namespace system {
29 namespace {
30
31 class RemoteMessagePipeTest : public testing::Test {
32 public:
RemoteMessagePipeTest()33 RemoteMessagePipeTest() : io_thread_("io_thread") {
34 }
35
~RemoteMessagePipeTest()36 virtual ~RemoteMessagePipeTest() {
37 }
38
SetUp()39 virtual void SetUp() OVERRIDE {
40 io_thread_.StartWithOptions(
41 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
42
43 test::PostTaskAndWait(io_thread_task_runner(),
44 FROM_HERE,
45 base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
46 base::Unretained(this)));
47 }
48
TearDown()49 virtual void TearDown() OVERRIDE {
50 test::PostTaskAndWait(io_thread_task_runner(),
51 FROM_HERE,
52 base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
53 base::Unretained(this)));
54 io_thread_.Stop();
55 }
56
57 // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
58 // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
59 // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
ConnectMessagePipes(scoped_refptr<MessagePipe> mp_0,scoped_refptr<MessagePipe> mp_1)60 void ConnectMessagePipes(scoped_refptr<MessagePipe> mp_0,
61 scoped_refptr<MessagePipe> mp_1) {
62 test::PostTaskAndWait(
63 io_thread_task_runner(),
64 FROM_HERE,
65 base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
66 base::Unretained(this), mp_0, mp_1));
67 }
68
69 // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
70 // It assumes/requires that this is the bootstrap case, i.e., that the
71 // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
72 // returns *without* waiting for it to finish connecting.
BootstrapMessagePipeNoWait(unsigned channel_index,scoped_refptr<MessagePipe> mp)73 void BootstrapMessagePipeNoWait(unsigned channel_index,
74 scoped_refptr<MessagePipe> mp) {
75 io_thread_task_runner()->PostTask(
76 FROM_HERE,
77 base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
78 base::Unretained(this), channel_index, mp));
79 }
80
81 protected:
io_thread_message_loop()82 base::MessageLoop* io_thread_message_loop() {
83 return io_thread_.message_loop();
84 }
85
io_thread_task_runner()86 scoped_refptr<base::TaskRunner> io_thread_task_runner() {
87 return io_thread_message_loop()->message_loop_proxy();
88 }
89
90 private:
SetUpOnIOThread()91 void SetUpOnIOThread() {
92 CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
93
94 scoped_ptr<PlatformServerChannel> server_channel(
95 PlatformServerChannel::Create("channel"));
96 CHECK(server_channel.get());
97 CHECK(server_channel->is_valid());
98 scoped_ptr<PlatformClientChannel> client_channel(
99 server_channel->CreateClientChannel());
100 CHECK(client_channel.get());
101 CHECK(client_channel->is_valid());
102
103 platform_channels_[0] = server_channel.PassAs<PlatformChannel>();
104 platform_channels_[1] = client_channel.PassAs<PlatformChannel>();
105 }
106
CreateAndInitChannel(unsigned channel_index)107 void CreateAndInitChannel(unsigned channel_index) {
108 CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
109 CHECK(channel_index == 0 || channel_index == 1);
110 CHECK(!channels_[channel_index].get());
111
112 channels_[channel_index] = new Channel();
113 CHECK(channels_[channel_index]->Init(
114 platform_channels_[channel_index]->PassHandle()));
115 }
116
ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp_0,scoped_refptr<MessagePipe> mp_1)117 void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp_0,
118 scoped_refptr<MessagePipe> mp_1) {
119 CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
120
121 if (!channels_[0].get())
122 CreateAndInitChannel(0);
123 if (!channels_[1].get())
124 CreateAndInitChannel(1);
125
126 MessageInTransit::EndpointId local_id_0 =
127 channels_[0]->AttachMessagePipeEndpoint(mp_0, 1);
128 MessageInTransit::EndpointId local_id_1 =
129 channels_[1]->AttachMessagePipeEndpoint(mp_1, 0);
130
131 channels_[0]->RunMessagePipeEndpoint(local_id_0, local_id_1);
132 channels_[1]->RunMessagePipeEndpoint(local_id_1, local_id_0);
133 }
134
BootstrapMessagePipeOnIOThread(unsigned channel_index,scoped_refptr<MessagePipe> mp)135 void BootstrapMessagePipeOnIOThread(unsigned channel_index,
136 scoped_refptr<MessagePipe> mp) {
137 CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
138 CHECK(channel_index == 0 || channel_index == 1);
139
140 unsigned port = channel_index ^ 1u;
141
142 // Important: If we don't boot
143 CreateAndInitChannel(channel_index);
144 CHECK_EQ(channels_[channel_index]->AttachMessagePipeEndpoint(mp, port),
145 Channel::kBootstrapEndpointId);
146 channels_[channel_index]->RunMessagePipeEndpoint(
147 Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId);
148 }
149
TearDownOnIOThread()150 void TearDownOnIOThread() {
151 if (channels_[0].get()) {
152 channels_[0]->Shutdown();
153 channels_[0] = NULL;
154 }
155 if (channels_[1].get()) {
156 channels_[1]->Shutdown();
157 channels_[1] = NULL;
158 }
159 }
160
161 base::Thread io_thread_;
162 scoped_ptr<PlatformChannel> platform_channels_[2];
163 scoped_refptr<Channel> channels_[2];
164
165 DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
166 };
167
TEST_F(RemoteMessagePipeTest,Basic)168 TEST_F(RemoteMessagePipeTest, Basic) {
169 const char hello[] = "hello";
170 const char world[] = "world!!!1!!!1!";
171 char buffer[100] = { 0 };
172 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
173 Waiter waiter;
174
175 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
176 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
177 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
178
179 scoped_refptr<MessagePipe> mp_0(new MessagePipe(
180 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
181 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
182 scoped_refptr<MessagePipe> mp_1(new MessagePipe(
183 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
184 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
185 ConnectMessagePipes(mp_0, mp_1);
186
187 // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
188
189 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
190 // it later, it might already be readable.)
191 waiter.Init();
192 EXPECT_EQ(MOJO_RESULT_OK,
193 mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
194
195 // Write to MP 0, port 0.
196 EXPECT_EQ(MOJO_RESULT_OK,
197 mp_0->WriteMessage(0,
198 hello, sizeof(hello),
199 NULL,
200 MOJO_WRITE_MESSAGE_FLAG_NONE));
201
202 // Wait.
203 EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
204 mp_1->RemoveWaiter(1, &waiter);
205
206 // Read from MP 1, port 1.
207 EXPECT_EQ(MOJO_RESULT_OK,
208 mp_1->ReadMessage(1,
209 buffer, &buffer_size,
210 NULL, NULL,
211 MOJO_READ_MESSAGE_FLAG_NONE));
212 EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
213 EXPECT_EQ(0, strcmp(buffer, hello));
214
215 // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
216
217 waiter.Init();
218 EXPECT_EQ(MOJO_RESULT_OK,
219 mp_0->AddWaiter(0, &waiter, MOJO_WAIT_FLAG_READABLE, 456));
220
221 EXPECT_EQ(MOJO_RESULT_OK,
222 mp_1->WriteMessage(1,
223 world, sizeof(world),
224 NULL,
225 MOJO_WRITE_MESSAGE_FLAG_NONE));
226
227 EXPECT_EQ(456, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
228 mp_0->RemoveWaiter(0, &waiter);
229
230 buffer_size = static_cast<uint32_t>(sizeof(buffer));
231 EXPECT_EQ(MOJO_RESULT_OK,
232 mp_0->ReadMessage(0,
233 buffer, &buffer_size,
234 NULL, NULL,
235 MOJO_READ_MESSAGE_FLAG_NONE));
236 EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
237 EXPECT_EQ(0, strcmp(buffer, world));
238
239 // Close MP 0, port 0.
240 mp_0->Close(0);
241
242 // Try to wait for MP 1, port 1 to become readable. This will eventually fail
243 // when it realizes that MP 0, port 0 has been closed. (It may also fail
244 // immediately.)
245 waiter.Init();
246 MojoResult result = mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789);
247 if (result == MOJO_RESULT_OK) {
248 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
249 waiter.Wait(MOJO_DEADLINE_INDEFINITE));
250 mp_1->RemoveWaiter(1, &waiter);
251 } else {
252 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
253 }
254
255 // And MP 1, port 1.
256 mp_1->Close(1);
257 }
258
TEST_F(RemoteMessagePipeTest,Multiplex)259 TEST_F(RemoteMessagePipeTest, Multiplex) {
260 const char hello[] = "hello";
261 const char world[] = "world!!!1!!!1!";
262 char buffer[100] = { 0 };
263 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
264 Waiter waiter;
265
266 // Connect message pipes as in the |Basic| test.
267
268 scoped_refptr<MessagePipe> mp_0(new MessagePipe(
269 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
270 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
271 scoped_refptr<MessagePipe> mp_1(new MessagePipe(
272 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
273 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
274 ConnectMessagePipes(mp_0, mp_1);
275
276 // Now put another message pipe on the channel.
277
278 scoped_refptr<MessagePipe> mp_2(new MessagePipe(
279 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
280 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
281 scoped_refptr<MessagePipe> mp_3(new MessagePipe(
282 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
283 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
284 ConnectMessagePipes(mp_2, mp_3);
285
286 // Write: MP 2, port 0 -> MP 3, port 1.
287
288 waiter.Init();
289 EXPECT_EQ(MOJO_RESULT_OK,
290 mp_3->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789));
291
292 EXPECT_EQ(MOJO_RESULT_OK,
293 mp_2->WriteMessage(0,
294 hello, sizeof(hello),
295 NULL,
296 MOJO_WRITE_MESSAGE_FLAG_NONE));
297
298 EXPECT_EQ(789, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
299 mp_3->RemoveWaiter(1, &waiter);
300
301 // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
302 buffer_size = static_cast<uint32_t>(sizeof(buffer));
303 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
304 mp_0->ReadMessage(0,
305 buffer, &buffer_size,
306 NULL, NULL,
307 MOJO_READ_MESSAGE_FLAG_NONE));
308 buffer_size = static_cast<uint32_t>(sizeof(buffer));
309 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
310 mp_1->ReadMessage(1,
311 buffer, &buffer_size,
312 NULL, NULL,
313 MOJO_READ_MESSAGE_FLAG_NONE));
314 buffer_size = static_cast<uint32_t>(sizeof(buffer));
315 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
316 mp_2->ReadMessage(0,
317 buffer, &buffer_size,
318 NULL, NULL,
319 MOJO_READ_MESSAGE_FLAG_NONE));
320
321 // Read from MP 3, port 1.
322 buffer_size = static_cast<uint32_t>(sizeof(buffer));
323 EXPECT_EQ(MOJO_RESULT_OK,
324 mp_3->ReadMessage(1,
325 buffer, &buffer_size,
326 NULL, NULL,
327 MOJO_READ_MESSAGE_FLAG_NONE));
328 EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
329 EXPECT_EQ(0, strcmp(buffer, hello));
330
331 // Write: MP 0, port 0 -> MP 1, port 1 again.
332
333 waiter.Init();
334 EXPECT_EQ(MOJO_RESULT_OK,
335 mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
336
337 EXPECT_EQ(MOJO_RESULT_OK,
338 mp_0->WriteMessage(0,
339 world, sizeof(world),
340 NULL,
341 MOJO_WRITE_MESSAGE_FLAG_NONE));
342
343 EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
344 mp_1->RemoveWaiter(1, &waiter);
345
346 // Make sure there's nothing on the other ports.
347 buffer_size = static_cast<uint32_t>(sizeof(buffer));
348 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
349 mp_0->ReadMessage(0,
350 buffer, &buffer_size,
351 NULL, NULL,
352 MOJO_READ_MESSAGE_FLAG_NONE));
353 buffer_size = static_cast<uint32_t>(sizeof(buffer));
354 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
355 mp_2->ReadMessage(0,
356 buffer, &buffer_size,
357 NULL, NULL,
358 MOJO_READ_MESSAGE_FLAG_NONE));
359 buffer_size = static_cast<uint32_t>(sizeof(buffer));
360 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
361 mp_3->ReadMessage(1,
362 buffer, &buffer_size,
363 NULL, NULL,
364 MOJO_READ_MESSAGE_FLAG_NONE));
365
366 buffer_size = static_cast<uint32_t>(sizeof(buffer));
367 EXPECT_EQ(MOJO_RESULT_OK,
368 mp_1->ReadMessage(1,
369 buffer, &buffer_size,
370 NULL, NULL,
371 MOJO_READ_MESSAGE_FLAG_NONE));
372 EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
373 EXPECT_EQ(0, strcmp(buffer, world));
374 }
375
TEST_F(RemoteMessagePipeTest,CloseBeforeConnect)376 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
377 const char hello[] = "hello";
378 char buffer[100] = { 0 };
379 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
380 Waiter waiter;
381
382 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
383 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
384 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
385
386 scoped_refptr<MessagePipe> mp_0(new MessagePipe(
387 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
388 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
389
390 // Write to MP 0, port 0.
391 EXPECT_EQ(MOJO_RESULT_OK,
392 mp_0->WriteMessage(0,
393 hello, sizeof(hello),
394 NULL,
395 MOJO_WRITE_MESSAGE_FLAG_NONE));
396
397 BootstrapMessagePipeNoWait(0, mp_0);
398
399
400 // Close MP 0, port 0 before channel 1 is even connected.
401 mp_0->Close(0);
402
403 scoped_refptr<MessagePipe> mp_1(new MessagePipe(
404 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
405 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
406
407 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
408 // it later, it might already be readable.)
409 waiter.Init();
410 EXPECT_EQ(MOJO_RESULT_OK,
411 mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
412
413 BootstrapMessagePipeNoWait(1, mp_1);
414
415 // Wait.
416 EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
417 mp_1->RemoveWaiter(1, &waiter);
418
419 // Read from MP 1, port 1.
420 EXPECT_EQ(MOJO_RESULT_OK,
421 mp_1->ReadMessage(1,
422 buffer, &buffer_size,
423 NULL, NULL,
424 MOJO_READ_MESSAGE_FLAG_NONE));
425 EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
426 EXPECT_EQ(0, strcmp(buffer, hello));
427
428 // And MP 1, port 1.
429 mp_1->Close(1);
430 }
431 } // namespace
432 } // namespace system
433 } // namespace mojo
434