1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/connector.h"
6
7 #include <stddef.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <utility>
11
12 #include "base/bind.h"
13 #include "base/callback.h"
14 #include "base/callback_helpers.h"
15 #include "base/message_loop/message_loop.h"
16 #include "base/run_loop.h"
17 #include "base/threading/thread_task_runner_handle.h"
18 #include "mojo/public/cpp/bindings/lib/message_builder.h"
19 #include "mojo/public/cpp/bindings/tests/message_queue.h"
20 #include "testing/gtest/include/gtest/gtest.h"
21
22 namespace mojo {
23 namespace test {
24 namespace {
25
26 class MessageAccumulator : public MessageReceiver {
27 public:
MessageAccumulator()28 MessageAccumulator() {}
MessageAccumulator(const base::Closure & closure)29 explicit MessageAccumulator(const base::Closure& closure)
30 : closure_(closure) {}
31
Accept(Message * message)32 bool Accept(Message* message) override {
33 queue_.Push(message);
34 if (!closure_.is_null())
35 base::ResetAndReturn(&closure_).Run();
36 return true;
37 }
38
IsEmpty() const39 bool IsEmpty() const { return queue_.IsEmpty(); }
40
Pop(Message * message)41 void Pop(Message* message) { queue_.Pop(message); }
42
set_closure(const base::Closure & closure)43 void set_closure(const base::Closure& closure) { closure_ = closure; }
44
size() const45 size_t size() const { return queue_.size(); }
46
47 private:
48 MessageQueue queue_;
49 base::Closure closure_;
50 };
51
52 class ConnectorDeletingMessageAccumulator : public MessageAccumulator {
53 public:
ConnectorDeletingMessageAccumulator(Connector ** connector)54 ConnectorDeletingMessageAccumulator(Connector** connector)
55 : connector_(connector) {}
56
Accept(Message * message)57 bool Accept(Message* message) override {
58 delete *connector_;
59 *connector_ = nullptr;
60 return MessageAccumulator::Accept(message);
61 }
62
63 private:
64 Connector** connector_;
65 };
66
67 class ReentrantMessageAccumulator : public MessageAccumulator {
68 public:
ReentrantMessageAccumulator(Connector * connector)69 ReentrantMessageAccumulator(Connector* connector)
70 : connector_(connector), number_of_calls_(0) {}
71
Accept(Message * message)72 bool Accept(Message* message) override {
73 if (!MessageAccumulator::Accept(message))
74 return false;
75 number_of_calls_++;
76 if (number_of_calls_ == 1) {
77 return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
78 }
79 return true;
80 }
81
number_of_calls()82 int number_of_calls() { return number_of_calls_; }
83
84 private:
85 Connector* connector_;
86 int number_of_calls_;
87 };
88
89 class ConnectorTest : public testing::Test {
90 public:
ConnectorTest()91 ConnectorTest() {}
92
SetUp()93 void SetUp() override {
94 CreateMessagePipe(nullptr, &handle0_, &handle1_);
95 }
96
TearDown()97 void TearDown() override {}
98
AllocMessage(const char * text,Message * message)99 void AllocMessage(const char* text, Message* message) {
100 size_t payload_size = strlen(text) + 1; // Plus null terminator.
101 internal::MessageBuilder builder(1, payload_size);
102 memcpy(builder.buffer()->Allocate(payload_size), text, payload_size);
103
104 builder.message()->MoveTo(message);
105 }
106
107 protected:
108 ScopedMessagePipeHandle handle0_;
109 ScopedMessagePipeHandle handle1_;
110
111 private:
112 base::MessageLoop loop_;
113 };
114
TEST_F(ConnectorTest,Basic)115 TEST_F(ConnectorTest, Basic) {
116 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
117 base::ThreadTaskRunnerHandle::Get());
118 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
119 base::ThreadTaskRunnerHandle::Get());
120
121 const char kText[] = "hello world";
122
123 Message message;
124 AllocMessage(kText, &message);
125
126 connector0.Accept(&message);
127
128 base::RunLoop run_loop;
129 MessageAccumulator accumulator(run_loop.QuitClosure());
130 connector1.set_incoming_receiver(&accumulator);
131
132 run_loop.Run();
133
134 ASSERT_FALSE(accumulator.IsEmpty());
135
136 Message message_received;
137 accumulator.Pop(&message_received);
138
139 EXPECT_EQ(
140 std::string(kText),
141 std::string(reinterpret_cast<const char*>(message_received.payload())));
142 }
143
TEST_F(ConnectorTest,Basic_Synchronous)144 TEST_F(ConnectorTest, Basic_Synchronous) {
145 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
146 base::ThreadTaskRunnerHandle::Get());
147 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
148 base::ThreadTaskRunnerHandle::Get());
149
150 const char kText[] = "hello world";
151
152 Message message;
153 AllocMessage(kText, &message);
154
155 connector0.Accept(&message);
156
157 MessageAccumulator accumulator;
158 connector1.set_incoming_receiver(&accumulator);
159
160 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
161
162 ASSERT_FALSE(accumulator.IsEmpty());
163
164 Message message_received;
165 accumulator.Pop(&message_received);
166
167 EXPECT_EQ(
168 std::string(kText),
169 std::string(reinterpret_cast<const char*>(message_received.payload())));
170 }
171
TEST_F(ConnectorTest,Basic_EarlyIncomingReceiver)172 TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) {
173 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
174 base::ThreadTaskRunnerHandle::Get());
175 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
176 base::ThreadTaskRunnerHandle::Get());
177
178 base::RunLoop run_loop;
179 MessageAccumulator accumulator(run_loop.QuitClosure());
180 connector1.set_incoming_receiver(&accumulator);
181
182 const char kText[] = "hello world";
183
184 Message message;
185 AllocMessage(kText, &message);
186
187 connector0.Accept(&message);
188
189 run_loop.Run();
190
191 ASSERT_FALSE(accumulator.IsEmpty());
192
193 Message message_received;
194 accumulator.Pop(&message_received);
195
196 EXPECT_EQ(
197 std::string(kText),
198 std::string(reinterpret_cast<const char*>(message_received.payload())));
199 }
200
TEST_F(ConnectorTest,Basic_TwoMessages)201 TEST_F(ConnectorTest, Basic_TwoMessages) {
202 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
203 base::ThreadTaskRunnerHandle::Get());
204 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
205 base::ThreadTaskRunnerHandle::Get());
206
207 const char* kText[] = {"hello", "world"};
208
209 for (size_t i = 0; i < arraysize(kText); ++i) {
210 Message message;
211 AllocMessage(kText[i], &message);
212
213 connector0.Accept(&message);
214 }
215
216 MessageAccumulator accumulator;
217 connector1.set_incoming_receiver(&accumulator);
218
219 for (size_t i = 0; i < arraysize(kText); ++i) {
220 if (accumulator.IsEmpty()) {
221 base::RunLoop run_loop;
222 accumulator.set_closure(run_loop.QuitClosure());
223 run_loop.Run();
224 }
225 ASSERT_FALSE(accumulator.IsEmpty());
226
227 Message message_received;
228 accumulator.Pop(&message_received);
229
230 EXPECT_EQ(
231 std::string(kText[i]),
232 std::string(reinterpret_cast<const char*>(message_received.payload())));
233 }
234 }
235
TEST_F(ConnectorTest,Basic_TwoMessages_Synchronous)236 TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) {
237 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
238 base::ThreadTaskRunnerHandle::Get());
239 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
240 base::ThreadTaskRunnerHandle::Get());
241
242 const char* kText[] = {"hello", "world"};
243
244 for (size_t i = 0; i < arraysize(kText); ++i) {
245 Message message;
246 AllocMessage(kText[i], &message);
247
248 connector0.Accept(&message);
249 }
250
251 MessageAccumulator accumulator;
252 connector1.set_incoming_receiver(&accumulator);
253
254 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
255
256 ASSERT_FALSE(accumulator.IsEmpty());
257
258 Message message_received;
259 accumulator.Pop(&message_received);
260
261 EXPECT_EQ(
262 std::string(kText[0]),
263 std::string(reinterpret_cast<const char*>(message_received.payload())));
264
265 ASSERT_TRUE(accumulator.IsEmpty());
266 }
267
TEST_F(ConnectorTest,WriteToClosedPipe)268 TEST_F(ConnectorTest, WriteToClosedPipe) {
269 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
270 base::ThreadTaskRunnerHandle::Get());
271
272 const char kText[] = "hello world";
273
274 Message message;
275 AllocMessage(kText, &message);
276
277 // Close the other end of the pipe.
278 handle1_.reset();
279
280 // Not observed yet because we haven't spun the message loop yet.
281 EXPECT_FALSE(connector0.encountered_error());
282
283 // Write failures are not reported.
284 bool ok = connector0.Accept(&message);
285 EXPECT_TRUE(ok);
286
287 // Still not observed.
288 EXPECT_FALSE(connector0.encountered_error());
289
290 // Spin the message loop, and then we should start observing the closed pipe.
291 base::RunLoop run_loop;
292 connector0.set_connection_error_handler(run_loop.QuitClosure());
293 run_loop.Run();
294
295 EXPECT_TRUE(connector0.encountered_error());
296 }
297
TEST_F(ConnectorTest,MessageWithHandles)298 TEST_F(ConnectorTest, MessageWithHandles) {
299 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
300 base::ThreadTaskRunnerHandle::Get());
301 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
302 base::ThreadTaskRunnerHandle::Get());
303
304 const char kText[] = "hello world";
305
306 Message message1;
307 AllocMessage(kText, &message1);
308
309 MessagePipe pipe;
310 message1.mutable_handles()->push_back(pipe.handle0.release());
311
312 connector0.Accept(&message1);
313
314 // The message should have been transferred, releasing the handles.
315 EXPECT_TRUE(message1.handles()->empty());
316
317 base::RunLoop run_loop;
318 MessageAccumulator accumulator(run_loop.QuitClosure());
319 connector1.set_incoming_receiver(&accumulator);
320
321 run_loop.Run();
322
323 ASSERT_FALSE(accumulator.IsEmpty());
324
325 Message message_received;
326 accumulator.Pop(&message_received);
327
328 EXPECT_EQ(
329 std::string(kText),
330 std::string(reinterpret_cast<const char*>(message_received.payload())));
331 ASSERT_EQ(1U, message_received.handles()->size());
332
333 // Now send a message to the transferred handle and confirm it's sent through
334 // to the orginal pipe.
335 // TODO(vtl): Do we need a better way of "downcasting" the handle types?
336 ScopedMessagePipeHandle smph;
337 smph.reset(MessagePipeHandle(message_received.handles()->front().value()));
338 message_received.mutable_handles()->front() = Handle();
339 // |smph| now owns this handle.
340
341 Connector connector_received(std::move(smph), Connector::SINGLE_THREADED_SEND,
342 base::ThreadTaskRunnerHandle::Get());
343 Connector connector_original(std::move(pipe.handle1),
344 Connector::SINGLE_THREADED_SEND,
345 base::ThreadTaskRunnerHandle::Get());
346
347 Message message2;
348 AllocMessage(kText, &message2);
349
350 connector_received.Accept(&message2);
351 base::RunLoop run_loop2;
352 MessageAccumulator accumulator2(run_loop2.QuitClosure());
353 connector_original.set_incoming_receiver(&accumulator2);
354 run_loop2.Run();
355
356 ASSERT_FALSE(accumulator2.IsEmpty());
357
358 accumulator2.Pop(&message_received);
359
360 EXPECT_EQ(
361 std::string(kText),
362 std::string(reinterpret_cast<const char*>(message_received.payload())));
363 }
364
TEST_F(ConnectorTest,WaitForIncomingMessageWithError)365 TEST_F(ConnectorTest, WaitForIncomingMessageWithError) {
366 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
367 base::ThreadTaskRunnerHandle::Get());
368 // Close the other end of the pipe.
369 handle1_.reset();
370 ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE));
371 }
372
TEST_F(ConnectorTest,WaitForIncomingMessageWithDeletion)373 TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) {
374 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
375 base::ThreadTaskRunnerHandle::Get());
376 Connector* connector1 =
377 new Connector(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
378 base::ThreadTaskRunnerHandle::Get());
379
380 const char kText[] = "hello world";
381
382 Message message;
383 AllocMessage(kText, &message);
384
385 connector0.Accept(&message);
386
387 ConnectorDeletingMessageAccumulator accumulator(&connector1);
388 connector1->set_incoming_receiver(&accumulator);
389
390 connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
391
392 ASSERT_FALSE(connector1);
393 ASSERT_FALSE(accumulator.IsEmpty());
394
395 Message message_received;
396 accumulator.Pop(&message_received);
397
398 EXPECT_EQ(
399 std::string(kText),
400 std::string(reinterpret_cast<const char*>(message_received.payload())));
401 }
402
TEST_F(ConnectorTest,WaitForIncomingMessageWithReentrancy)403 TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) {
404 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
405 base::ThreadTaskRunnerHandle::Get());
406 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
407 base::ThreadTaskRunnerHandle::Get());
408
409 const char* kText[] = {"hello", "world"};
410
411 for (size_t i = 0; i < arraysize(kText); ++i) {
412 Message message;
413 AllocMessage(kText[i], &message);
414
415 connector0.Accept(&message);
416 }
417
418 ReentrantMessageAccumulator accumulator(&connector1);
419 connector1.set_incoming_receiver(&accumulator);
420
421 for (size_t i = 0; i < arraysize(kText); ++i) {
422 if (accumulator.IsEmpty()) {
423 base::RunLoop run_loop;
424 accumulator.set_closure(run_loop.QuitClosure());
425 run_loop.Run();
426 }
427 ASSERT_FALSE(accumulator.IsEmpty());
428
429 Message message_received;
430 accumulator.Pop(&message_received);
431
432 EXPECT_EQ(
433 std::string(kText[i]),
434 std::string(reinterpret_cast<const char*>(message_received.payload())));
435 }
436
437 ASSERT_EQ(2, accumulator.number_of_calls());
438 }
439
ForwardErrorHandler(bool * called,const base::Closure & callback)440 void ForwardErrorHandler(bool* called, const base::Closure& callback) {
441 *called = true;
442 callback.Run();
443 }
444
TEST_F(ConnectorTest,RaiseError)445 TEST_F(ConnectorTest, RaiseError) {
446 base::RunLoop run_loop, run_loop2;
447 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
448 base::ThreadTaskRunnerHandle::Get());
449 bool error_handler_called0 = false;
450 connector0.set_connection_error_handler(
451 base::Bind(&ForwardErrorHandler, &error_handler_called0,
452 run_loop.QuitClosure()));
453
454 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
455 base::ThreadTaskRunnerHandle::Get());
456 bool error_handler_called1 = false;
457 connector1.set_connection_error_handler(
458 base::Bind(&ForwardErrorHandler, &error_handler_called1,
459 run_loop2.QuitClosure()));
460
461 const char kText[] = "hello world";
462
463 Message message;
464 AllocMessage(kText, &message);
465
466 connector0.Accept(&message);
467 connector0.RaiseError();
468
469 base::RunLoop run_loop3;
470 MessageAccumulator accumulator(run_loop3.QuitClosure());
471 connector1.set_incoming_receiver(&accumulator);
472
473 run_loop3.Run();
474
475 // Messages sent prior to RaiseError() still arrive at the other end.
476 ASSERT_FALSE(accumulator.IsEmpty());
477
478 Message message_received;
479 accumulator.Pop(&message_received);
480
481 EXPECT_EQ(
482 std::string(kText),
483 std::string(reinterpret_cast<const char*>(message_received.payload())));
484
485 run_loop.Run();
486 run_loop2.Run();
487
488 // Connection error handler is called at both sides.
489 EXPECT_TRUE(error_handler_called0);
490 EXPECT_TRUE(error_handler_called1);
491
492 // The error flag is set at both sides.
493 EXPECT_TRUE(connector0.encountered_error());
494 EXPECT_TRUE(connector1.encountered_error());
495
496 // The message pipe handle is valid at both sides.
497 EXPECT_TRUE(connector0.is_valid());
498 EXPECT_TRUE(connector1.is_valid());
499 }
500
PauseConnectorAndRunClosure(Connector * connector,const base::Closure & closure)501 void PauseConnectorAndRunClosure(Connector* connector,
502 const base::Closure& closure) {
503 connector->PauseIncomingMethodCallProcessing();
504 closure.Run();
505 }
506
TEST_F(ConnectorTest,PauseWithQueuedMessages)507 TEST_F(ConnectorTest, PauseWithQueuedMessages) {
508 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
509 base::ThreadTaskRunnerHandle::Get());
510 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
511 base::ThreadTaskRunnerHandle::Get());
512
513 const char kText[] = "hello world";
514
515 // Queue up two messages.
516 Message message;
517 AllocMessage(kText, &message);
518 connector0.Accept(&message);
519 AllocMessage(kText, &message);
520 connector0.Accept(&message);
521
522 base::RunLoop run_loop;
523 // Configure the accumulator such that it pauses after the first message is
524 // received.
525 MessageAccumulator accumulator(
526 base::Bind(&PauseConnectorAndRunClosure, &connector1,
527 run_loop.QuitClosure()));
528 connector1.set_incoming_receiver(&accumulator);
529
530 run_loop.Run();
531
532 // As we paused after the first message we should only have gotten one
533 // message.
534 ASSERT_EQ(1u, accumulator.size());
535 }
536
AccumulateWithNestedLoop(MessageAccumulator * accumulator,const base::Closure & closure)537 void AccumulateWithNestedLoop(MessageAccumulator* accumulator,
538 const base::Closure& closure) {
539 base::RunLoop nested_run_loop;
540 base::MessageLoop::ScopedNestableTaskAllower allow(
541 base::MessageLoop::current());
542 accumulator->set_closure(nested_run_loop.QuitClosure());
543 nested_run_loop.Run();
544 closure.Run();
545 }
546
TEST_F(ConnectorTest,ProcessWhenNested)547 TEST_F(ConnectorTest, ProcessWhenNested) {
548 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
549 base::ThreadTaskRunnerHandle::Get());
550 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
551 base::ThreadTaskRunnerHandle::Get());
552
553 const char kText[] = "hello world";
554
555 // Queue up two messages.
556 Message message;
557 AllocMessage(kText, &message);
558 connector0.Accept(&message);
559 AllocMessage(kText, &message);
560 connector0.Accept(&message);
561
562 base::RunLoop run_loop;
563 MessageAccumulator accumulator;
564 // When the accumulator gets the first message it spins a nested message
565 // loop. The loop is quit when another message is received.
566 accumulator.set_closure(base::Bind(&AccumulateWithNestedLoop, &accumulator,
567 run_loop.QuitClosure()));
568 connector1.set_incoming_receiver(&accumulator);
569
570 run_loop.Run();
571
572 ASSERT_EQ(2u, accumulator.size());
573 }
574
575 } // namespace
576 } // namespace test
577 } // namespace mojo
578