• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/bindings/connector.h"
6 
7 #include <stddef.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <utility>
11 
12 #include "base/bind.h"
13 #include "base/callback.h"
14 #include "base/callback_helpers.h"
15 #include "base/message_loop/message_loop.h"
16 #include "base/run_loop.h"
17 #include "base/threading/thread_task_runner_handle.h"
18 #include "mojo/public/cpp/bindings/lib/message_builder.h"
19 #include "mojo/public/cpp/bindings/tests/message_queue.h"
20 #include "testing/gtest/include/gtest/gtest.h"
21 
22 namespace mojo {
23 namespace test {
24 namespace {
25 
26 class MessageAccumulator : public MessageReceiver {
27  public:
MessageAccumulator()28   MessageAccumulator() {}
MessageAccumulator(const base::Closure & closure)29   explicit MessageAccumulator(const base::Closure& closure)
30       : closure_(closure) {}
31 
Accept(Message * message)32   bool Accept(Message* message) override {
33     queue_.Push(message);
34     if (!closure_.is_null())
35       base::ResetAndReturn(&closure_).Run();
36     return true;
37   }
38 
IsEmpty() const39   bool IsEmpty() const { return queue_.IsEmpty(); }
40 
Pop(Message * message)41   void Pop(Message* message) { queue_.Pop(message); }
42 
set_closure(const base::Closure & closure)43   void set_closure(const base::Closure& closure) { closure_ = closure; }
44 
size() const45   size_t size() const { return queue_.size(); }
46 
47  private:
48   MessageQueue queue_;
49   base::Closure closure_;
50 };
51 
52 class ConnectorDeletingMessageAccumulator : public MessageAccumulator {
53  public:
ConnectorDeletingMessageAccumulator(Connector ** connector)54   ConnectorDeletingMessageAccumulator(Connector** connector)
55       : connector_(connector) {}
56 
Accept(Message * message)57   bool Accept(Message* message) override {
58     delete *connector_;
59     *connector_ = nullptr;
60     return MessageAccumulator::Accept(message);
61   }
62 
63  private:
64   Connector** connector_;
65 };
66 
67 class ReentrantMessageAccumulator : public MessageAccumulator {
68  public:
ReentrantMessageAccumulator(Connector * connector)69   ReentrantMessageAccumulator(Connector* connector)
70       : connector_(connector), number_of_calls_(0) {}
71 
Accept(Message * message)72   bool Accept(Message* message) override {
73     if (!MessageAccumulator::Accept(message))
74       return false;
75     number_of_calls_++;
76     if (number_of_calls_ == 1) {
77       return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
78     }
79     return true;
80   }
81 
number_of_calls()82   int number_of_calls() { return number_of_calls_; }
83 
84  private:
85   Connector* connector_;
86   int number_of_calls_;
87 };
88 
89 class ConnectorTest : public testing::Test {
90  public:
ConnectorTest()91   ConnectorTest() {}
92 
SetUp()93   void SetUp() override {
94     CreateMessagePipe(nullptr, &handle0_, &handle1_);
95   }
96 
TearDown()97   void TearDown() override {}
98 
AllocMessage(const char * text,Message * message)99   void AllocMessage(const char* text, Message* message) {
100     size_t payload_size = strlen(text) + 1;  // Plus null terminator.
101     internal::MessageBuilder builder(1, payload_size);
102     memcpy(builder.buffer()->Allocate(payload_size), text, payload_size);
103 
104     builder.message()->MoveTo(message);
105   }
106 
107  protected:
108   ScopedMessagePipeHandle handle0_;
109   ScopedMessagePipeHandle handle1_;
110 
111  private:
112   base::MessageLoop loop_;
113 };
114 
TEST_F(ConnectorTest,Basic)115 TEST_F(ConnectorTest, Basic) {
116   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
117                        base::ThreadTaskRunnerHandle::Get());
118   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
119                        base::ThreadTaskRunnerHandle::Get());
120 
121   const char kText[] = "hello world";
122 
123   Message message;
124   AllocMessage(kText, &message);
125 
126   connector0.Accept(&message);
127 
128   base::RunLoop run_loop;
129   MessageAccumulator accumulator(run_loop.QuitClosure());
130   connector1.set_incoming_receiver(&accumulator);
131 
132   run_loop.Run();
133 
134   ASSERT_FALSE(accumulator.IsEmpty());
135 
136   Message message_received;
137   accumulator.Pop(&message_received);
138 
139   EXPECT_EQ(
140       std::string(kText),
141       std::string(reinterpret_cast<const char*>(message_received.payload())));
142 }
143 
TEST_F(ConnectorTest,Basic_Synchronous)144 TEST_F(ConnectorTest, Basic_Synchronous) {
145   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
146                        base::ThreadTaskRunnerHandle::Get());
147   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
148                        base::ThreadTaskRunnerHandle::Get());
149 
150   const char kText[] = "hello world";
151 
152   Message message;
153   AllocMessage(kText, &message);
154 
155   connector0.Accept(&message);
156 
157   MessageAccumulator accumulator;
158   connector1.set_incoming_receiver(&accumulator);
159 
160   connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
161 
162   ASSERT_FALSE(accumulator.IsEmpty());
163 
164   Message message_received;
165   accumulator.Pop(&message_received);
166 
167   EXPECT_EQ(
168       std::string(kText),
169       std::string(reinterpret_cast<const char*>(message_received.payload())));
170 }
171 
TEST_F(ConnectorTest,Basic_EarlyIncomingReceiver)172 TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) {
173   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
174                        base::ThreadTaskRunnerHandle::Get());
175   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
176                        base::ThreadTaskRunnerHandle::Get());
177 
178   base::RunLoop run_loop;
179   MessageAccumulator accumulator(run_loop.QuitClosure());
180   connector1.set_incoming_receiver(&accumulator);
181 
182   const char kText[] = "hello world";
183 
184   Message message;
185   AllocMessage(kText, &message);
186 
187   connector0.Accept(&message);
188 
189   run_loop.Run();
190 
191   ASSERT_FALSE(accumulator.IsEmpty());
192 
193   Message message_received;
194   accumulator.Pop(&message_received);
195 
196   EXPECT_EQ(
197       std::string(kText),
198       std::string(reinterpret_cast<const char*>(message_received.payload())));
199 }
200 
TEST_F(ConnectorTest,Basic_TwoMessages)201 TEST_F(ConnectorTest, Basic_TwoMessages) {
202   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
203                        base::ThreadTaskRunnerHandle::Get());
204   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
205                        base::ThreadTaskRunnerHandle::Get());
206 
207   const char* kText[] = {"hello", "world"};
208 
209   for (size_t i = 0; i < arraysize(kText); ++i) {
210     Message message;
211     AllocMessage(kText[i], &message);
212 
213     connector0.Accept(&message);
214   }
215 
216   MessageAccumulator accumulator;
217   connector1.set_incoming_receiver(&accumulator);
218 
219   for (size_t i = 0; i < arraysize(kText); ++i) {
220     if (accumulator.IsEmpty()) {
221       base::RunLoop run_loop;
222       accumulator.set_closure(run_loop.QuitClosure());
223       run_loop.Run();
224     }
225     ASSERT_FALSE(accumulator.IsEmpty());
226 
227     Message message_received;
228     accumulator.Pop(&message_received);
229 
230     EXPECT_EQ(
231         std::string(kText[i]),
232         std::string(reinterpret_cast<const char*>(message_received.payload())));
233   }
234 }
235 
TEST_F(ConnectorTest,Basic_TwoMessages_Synchronous)236 TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) {
237   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
238                        base::ThreadTaskRunnerHandle::Get());
239   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
240                        base::ThreadTaskRunnerHandle::Get());
241 
242   const char* kText[] = {"hello", "world"};
243 
244   for (size_t i = 0; i < arraysize(kText); ++i) {
245     Message message;
246     AllocMessage(kText[i], &message);
247 
248     connector0.Accept(&message);
249   }
250 
251   MessageAccumulator accumulator;
252   connector1.set_incoming_receiver(&accumulator);
253 
254   connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
255 
256   ASSERT_FALSE(accumulator.IsEmpty());
257 
258   Message message_received;
259   accumulator.Pop(&message_received);
260 
261   EXPECT_EQ(
262       std::string(kText[0]),
263       std::string(reinterpret_cast<const char*>(message_received.payload())));
264 
265   ASSERT_TRUE(accumulator.IsEmpty());
266 }
267 
TEST_F(ConnectorTest,WriteToClosedPipe)268 TEST_F(ConnectorTest, WriteToClosedPipe) {
269   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
270                        base::ThreadTaskRunnerHandle::Get());
271 
272   const char kText[] = "hello world";
273 
274   Message message;
275   AllocMessage(kText, &message);
276 
277   // Close the other end of the pipe.
278   handle1_.reset();
279 
280   // Not observed yet because we haven't spun the message loop yet.
281   EXPECT_FALSE(connector0.encountered_error());
282 
283   // Write failures are not reported.
284   bool ok = connector0.Accept(&message);
285   EXPECT_TRUE(ok);
286 
287   // Still not observed.
288   EXPECT_FALSE(connector0.encountered_error());
289 
290   // Spin the message loop, and then we should start observing the closed pipe.
291   base::RunLoop run_loop;
292   connector0.set_connection_error_handler(run_loop.QuitClosure());
293   run_loop.Run();
294 
295   EXPECT_TRUE(connector0.encountered_error());
296 }
297 
TEST_F(ConnectorTest,MessageWithHandles)298 TEST_F(ConnectorTest, MessageWithHandles) {
299   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
300                        base::ThreadTaskRunnerHandle::Get());
301   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
302                        base::ThreadTaskRunnerHandle::Get());
303 
304   const char kText[] = "hello world";
305 
306   Message message1;
307   AllocMessage(kText, &message1);
308 
309   MessagePipe pipe;
310   message1.mutable_handles()->push_back(pipe.handle0.release());
311 
312   connector0.Accept(&message1);
313 
314   // The message should have been transferred, releasing the handles.
315   EXPECT_TRUE(message1.handles()->empty());
316 
317   base::RunLoop run_loop;
318   MessageAccumulator accumulator(run_loop.QuitClosure());
319   connector1.set_incoming_receiver(&accumulator);
320 
321   run_loop.Run();
322 
323   ASSERT_FALSE(accumulator.IsEmpty());
324 
325   Message message_received;
326   accumulator.Pop(&message_received);
327 
328   EXPECT_EQ(
329       std::string(kText),
330       std::string(reinterpret_cast<const char*>(message_received.payload())));
331   ASSERT_EQ(1U, message_received.handles()->size());
332 
333   // Now send a message to the transferred handle and confirm it's sent through
334   // to the orginal pipe.
335   // TODO(vtl): Do we need a better way of "downcasting" the handle types?
336   ScopedMessagePipeHandle smph;
337   smph.reset(MessagePipeHandle(message_received.handles()->front().value()));
338   message_received.mutable_handles()->front() = Handle();
339   // |smph| now owns this handle.
340 
341   Connector connector_received(std::move(smph), Connector::SINGLE_THREADED_SEND,
342                                base::ThreadTaskRunnerHandle::Get());
343   Connector connector_original(std::move(pipe.handle1),
344                                Connector::SINGLE_THREADED_SEND,
345                                base::ThreadTaskRunnerHandle::Get());
346 
347   Message message2;
348   AllocMessage(kText, &message2);
349 
350   connector_received.Accept(&message2);
351   base::RunLoop run_loop2;
352   MessageAccumulator accumulator2(run_loop2.QuitClosure());
353   connector_original.set_incoming_receiver(&accumulator2);
354   run_loop2.Run();
355 
356   ASSERT_FALSE(accumulator2.IsEmpty());
357 
358   accumulator2.Pop(&message_received);
359 
360   EXPECT_EQ(
361       std::string(kText),
362       std::string(reinterpret_cast<const char*>(message_received.payload())));
363 }
364 
TEST_F(ConnectorTest,WaitForIncomingMessageWithError)365 TEST_F(ConnectorTest, WaitForIncomingMessageWithError) {
366   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
367                        base::ThreadTaskRunnerHandle::Get());
368   // Close the other end of the pipe.
369   handle1_.reset();
370   ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE));
371 }
372 
TEST_F(ConnectorTest,WaitForIncomingMessageWithDeletion)373 TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) {
374   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
375                        base::ThreadTaskRunnerHandle::Get());
376   Connector* connector1 =
377       new Connector(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
378                     base::ThreadTaskRunnerHandle::Get());
379 
380   const char kText[] = "hello world";
381 
382   Message message;
383   AllocMessage(kText, &message);
384 
385   connector0.Accept(&message);
386 
387   ConnectorDeletingMessageAccumulator accumulator(&connector1);
388   connector1->set_incoming_receiver(&accumulator);
389 
390   connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
391 
392   ASSERT_FALSE(connector1);
393   ASSERT_FALSE(accumulator.IsEmpty());
394 
395   Message message_received;
396   accumulator.Pop(&message_received);
397 
398   EXPECT_EQ(
399       std::string(kText),
400       std::string(reinterpret_cast<const char*>(message_received.payload())));
401 }
402 
TEST_F(ConnectorTest,WaitForIncomingMessageWithReentrancy)403 TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) {
404   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
405                        base::ThreadTaskRunnerHandle::Get());
406   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
407                        base::ThreadTaskRunnerHandle::Get());
408 
409   const char* kText[] = {"hello", "world"};
410 
411   for (size_t i = 0; i < arraysize(kText); ++i) {
412     Message message;
413     AllocMessage(kText[i], &message);
414 
415     connector0.Accept(&message);
416   }
417 
418   ReentrantMessageAccumulator accumulator(&connector1);
419   connector1.set_incoming_receiver(&accumulator);
420 
421   for (size_t i = 0; i < arraysize(kText); ++i) {
422     if (accumulator.IsEmpty()) {
423       base::RunLoop run_loop;
424       accumulator.set_closure(run_loop.QuitClosure());
425       run_loop.Run();
426     }
427     ASSERT_FALSE(accumulator.IsEmpty());
428 
429     Message message_received;
430     accumulator.Pop(&message_received);
431 
432     EXPECT_EQ(
433         std::string(kText[i]),
434         std::string(reinterpret_cast<const char*>(message_received.payload())));
435   }
436 
437   ASSERT_EQ(2, accumulator.number_of_calls());
438 }
439 
ForwardErrorHandler(bool * called,const base::Closure & callback)440 void ForwardErrorHandler(bool* called, const base::Closure& callback) {
441   *called = true;
442   callback.Run();
443 }
444 
TEST_F(ConnectorTest,RaiseError)445 TEST_F(ConnectorTest, RaiseError) {
446   base::RunLoop run_loop, run_loop2;
447   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
448                        base::ThreadTaskRunnerHandle::Get());
449   bool error_handler_called0 = false;
450   connector0.set_connection_error_handler(
451       base::Bind(&ForwardErrorHandler, &error_handler_called0,
452                  run_loop.QuitClosure()));
453 
454   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
455                        base::ThreadTaskRunnerHandle::Get());
456   bool error_handler_called1 = false;
457   connector1.set_connection_error_handler(
458       base::Bind(&ForwardErrorHandler, &error_handler_called1,
459                  run_loop2.QuitClosure()));
460 
461   const char kText[] = "hello world";
462 
463   Message message;
464   AllocMessage(kText, &message);
465 
466   connector0.Accept(&message);
467   connector0.RaiseError();
468 
469   base::RunLoop run_loop3;
470   MessageAccumulator accumulator(run_loop3.QuitClosure());
471   connector1.set_incoming_receiver(&accumulator);
472 
473   run_loop3.Run();
474 
475   // Messages sent prior to RaiseError() still arrive at the other end.
476   ASSERT_FALSE(accumulator.IsEmpty());
477 
478   Message message_received;
479   accumulator.Pop(&message_received);
480 
481   EXPECT_EQ(
482       std::string(kText),
483       std::string(reinterpret_cast<const char*>(message_received.payload())));
484 
485   run_loop.Run();
486   run_loop2.Run();
487 
488   // Connection error handler is called at both sides.
489   EXPECT_TRUE(error_handler_called0);
490   EXPECT_TRUE(error_handler_called1);
491 
492   // The error flag is set at both sides.
493   EXPECT_TRUE(connector0.encountered_error());
494   EXPECT_TRUE(connector1.encountered_error());
495 
496   // The message pipe handle is valid at both sides.
497   EXPECT_TRUE(connector0.is_valid());
498   EXPECT_TRUE(connector1.is_valid());
499 }
500 
PauseConnectorAndRunClosure(Connector * connector,const base::Closure & closure)501 void PauseConnectorAndRunClosure(Connector* connector,
502                                  const base::Closure& closure) {
503   connector->PauseIncomingMethodCallProcessing();
504   closure.Run();
505 }
506 
TEST_F(ConnectorTest,PauseWithQueuedMessages)507 TEST_F(ConnectorTest, PauseWithQueuedMessages) {
508   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
509                        base::ThreadTaskRunnerHandle::Get());
510   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
511                        base::ThreadTaskRunnerHandle::Get());
512 
513   const char kText[] = "hello world";
514 
515   // Queue up two messages.
516   Message message;
517   AllocMessage(kText, &message);
518   connector0.Accept(&message);
519   AllocMessage(kText, &message);
520   connector0.Accept(&message);
521 
522   base::RunLoop run_loop;
523   // Configure the accumulator such that it pauses after the first message is
524   // received.
525   MessageAccumulator accumulator(
526       base::Bind(&PauseConnectorAndRunClosure, &connector1,
527                  run_loop.QuitClosure()));
528   connector1.set_incoming_receiver(&accumulator);
529 
530   run_loop.Run();
531 
532   // As we paused after the first message we should only have gotten one
533   // message.
534   ASSERT_EQ(1u, accumulator.size());
535 }
536 
AccumulateWithNestedLoop(MessageAccumulator * accumulator,const base::Closure & closure)537 void AccumulateWithNestedLoop(MessageAccumulator* accumulator,
538                               const base::Closure& closure) {
539   base::RunLoop nested_run_loop;
540   base::MessageLoop::ScopedNestableTaskAllower allow(
541       base::MessageLoop::current());
542   accumulator->set_closure(nested_run_loop.QuitClosure());
543   nested_run_loop.Run();
544   closure.Run();
545 }
546 
TEST_F(ConnectorTest,ProcessWhenNested)547 TEST_F(ConnectorTest, ProcessWhenNested) {
548   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
549                        base::ThreadTaskRunnerHandle::Get());
550   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
551                        base::ThreadTaskRunnerHandle::Get());
552 
553   const char kText[] = "hello world";
554 
555   // Queue up two messages.
556   Message message;
557   AllocMessage(kText, &message);
558   connector0.Accept(&message);
559   AllocMessage(kText, &message);
560   connector0.Accept(&message);
561 
562   base::RunLoop run_loop;
563   MessageAccumulator accumulator;
564   // When the accumulator gets the first message it spins a nested message
565   // loop. The loop is quit when another message is received.
566   accumulator.set_closure(base::Bind(&AccumulateWithNestedLoop, &accumulator,
567                                      run_loop.QuitClosure()));
568   connector1.set_incoming_receiver(&accumulator);
569 
570   run_loop.Run();
571 
572   ASSERT_EQ(2u, accumulator.size());
573 }
574 
575 }  // namespace
576 }  // namespace test
577 }  // namespace mojo
578