• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_sync_channel.h"
6 
7 #include <stddef.h>
8 
9 #include <memory>
10 #include <string>
11 #include <utility>
12 #include <vector>
13 
14 #include "base/bind.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/macros.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/process/process_handle.h"
20 #include "base/run_loop.h"
21 #include "base/single_thread_task_runner.h"
22 #include "base/strings/string_util.h"
23 #include "base/synchronization/waitable_event.h"
24 #include "base/threading/platform_thread.h"
25 #include "base/threading/thread.h"
26 #include "base/threading/thread_task_runner_handle.h"
27 #include "build/build_config.h"
28 #include "ipc/ipc_listener.h"
29 #include "ipc/ipc_message.h"
30 #include "ipc/ipc_sender.h"
31 #include "ipc/ipc_sync_message_filter.h"
32 #include "ipc/ipc_sync_message_unittest.h"
33 #include "mojo/public/cpp/system/message_pipe.h"
34 #include "testing/gtest/include/gtest/gtest.h"
35 
36 using base::WaitableEvent;
37 
38 namespace IPC {
39 namespace {
40 
41 // Base class for a "process" with listener and IPC threads.
42 class Worker : public Listener, public Sender {
43  public:
44   // Will create a channel without a name.
Worker(Channel::Mode mode,const std::string & thread_name,mojo::ScopedMessagePipeHandle channel_handle)45   Worker(Channel::Mode mode,
46          const std::string& thread_name,
47          mojo::ScopedMessagePipeHandle channel_handle)
48       : done_(
49             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
50                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
51         channel_created_(
52             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
53                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
54         channel_handle_(std::move(channel_handle)),
55         mode_(mode),
56         ipc_thread_((thread_name + "_ipc").c_str()),
57         listener_thread_((thread_name + "_listener").c_str()),
58         overrided_thread_(NULL),
59         shutdown_event_(base::WaitableEvent::ResetPolicy::MANUAL,
60                         base::WaitableEvent::InitialState::NOT_SIGNALED),
61         is_shutdown_(false) {}
62 
63   // Will create a named channel and use this name for the threads' name.
Worker(mojo::ScopedMessagePipeHandle channel_handle,Channel::Mode mode)64   Worker(mojo::ScopedMessagePipeHandle channel_handle, Channel::Mode mode)
65       : done_(
66             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
67                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
68         channel_created_(
69             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
70                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
71         channel_handle_(std::move(channel_handle)),
72         mode_(mode),
73         ipc_thread_("ipc thread"),
74         listener_thread_("listener thread"),
75         overrided_thread_(NULL),
76         shutdown_event_(base::WaitableEvent::ResetPolicy::MANUAL,
77                         base::WaitableEvent::InitialState::NOT_SIGNALED),
78         is_shutdown_(false) {}
79 
~Worker()80   ~Worker() override {
81     // Shutdown() must be called before destruction.
82     CHECK(is_shutdown_);
83   }
Send(Message * msg)84   bool Send(Message* msg) override { return channel_->Send(msg); }
WaitForChannelCreation()85   void WaitForChannelCreation() { channel_created_->Wait(); }
CloseChannel()86   void CloseChannel() {
87     DCHECK(ListenerThread()->task_runner()->BelongsToCurrentThread());
88     channel_->Close();
89   }
Start()90   void Start() {
91     StartThread(&listener_thread_, base::MessageLoop::TYPE_DEFAULT);
92     ListenerThread()->task_runner()->PostTask(
93         FROM_HERE, base::Bind(&Worker::OnStart, base::Unretained(this)));
94   }
Shutdown()95   void Shutdown() {
96     // The IPC thread needs to outlive SyncChannel. We can't do this in
97     // ~Worker(), since that'll reset the vtable pointer (to Worker's), which
98     // may result in a race conditions. See http://crbug.com/25841.
99     WaitableEvent listener_done(
100         base::WaitableEvent::ResetPolicy::AUTOMATIC,
101         base::WaitableEvent::InitialState::NOT_SIGNALED),
102         ipc_done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
103                  base::WaitableEvent::InitialState::NOT_SIGNALED);
104     ListenerThread()->task_runner()->PostTask(
105         FROM_HERE,
106         base::Bind(&Worker::OnListenerThreadShutdown1, base::Unretained(this),
107                    &listener_done, &ipc_done));
108     listener_done.Wait();
109     ipc_done.Wait();
110     ipc_thread_.Stop();
111     listener_thread_.Stop();
112     is_shutdown_ = true;
113   }
OverrideThread(base::Thread * overrided_thread)114   void OverrideThread(base::Thread* overrided_thread) {
115     DCHECK(overrided_thread_ == NULL);
116     overrided_thread_ = overrided_thread;
117   }
SendAnswerToLife(bool pump,bool succeed)118   bool SendAnswerToLife(bool pump, bool succeed) {
119     int answer = 0;
120     SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
121     if (pump)
122       msg->EnableMessagePumping();
123     bool result = Send(msg);
124     DCHECK_EQ(result, succeed);
125     DCHECK_EQ(answer, (succeed ? 42 : 0));
126     return result;
127   }
SendDouble(bool pump,bool succeed)128   bool SendDouble(bool pump, bool succeed) {
129     int answer = 0;
130     SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
131     if (pump)
132       msg->EnableMessagePumping();
133     bool result = Send(msg);
134     DCHECK_EQ(result, succeed);
135     DCHECK_EQ(answer, (succeed ? 10 : 0));
136     return result;
137   }
TakeChannelHandle()138   mojo::MessagePipeHandle TakeChannelHandle() {
139     DCHECK(channel_handle_.is_valid());
140     return channel_handle_.release();
141   }
mode()142   Channel::Mode mode() { return mode_; }
done_event()143   WaitableEvent* done_event() { return done_.get(); }
shutdown_event()144   WaitableEvent* shutdown_event() { return &shutdown_event_; }
ResetChannel()145   void ResetChannel() { channel_.reset(); }
146   // Derived classes need to call this when they've completed their part of
147   // the test.
Done()148   void Done() { done_->Signal(); }
149 
150  protected:
channel()151   SyncChannel* channel() { return channel_.get(); }
152   // Functions for derived classes to implement if they wish.
Run()153   virtual void Run() { }
OnAnswer(int * answer)154   virtual void OnAnswer(int* answer) { NOTREACHED(); }
OnAnswerDelay(Message * reply_msg)155   virtual void OnAnswerDelay(Message* reply_msg) {
156     // The message handler map below can only take one entry for
157     // SyncChannelTestMsg_AnswerToLife, so since some classes want
158     // the normal version while other want the delayed reply, we
159     // call the normal version if the derived class didn't override
160     // this function.
161     int answer;
162     OnAnswer(&answer);
163     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer);
164     Send(reply_msg);
165   }
OnDouble(int in,int * out)166   virtual void OnDouble(int in, int* out) { NOTREACHED(); }
OnDoubleDelay(int in,Message * reply_msg)167   virtual void OnDoubleDelay(int in, Message* reply_msg) {
168     int result;
169     OnDouble(in, &result);
170     SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result);
171     Send(reply_msg);
172   }
173 
OnNestedTestMsg(Message * reply_msg)174   virtual void OnNestedTestMsg(Message* reply_msg) {
175     NOTREACHED();
176   }
177 
CreateChannel()178   virtual SyncChannel* CreateChannel() {
179     std::unique_ptr<SyncChannel> channel = SyncChannel::Create(
180         TakeChannelHandle(), mode_, this, ipc_thread_.task_runner(),
181         base::ThreadTaskRunnerHandle::Get(), true, &shutdown_event_);
182     return channel.release();
183   }
184 
ListenerThread()185   base::Thread* ListenerThread() {
186     return overrided_thread_ ? overrided_thread_ : &listener_thread_;
187   }
188 
ipc_thread() const189   const base::Thread& ipc_thread() const { return ipc_thread_; }
190 
191  private:
192   // Called on the listener thread to create the sync channel.
OnStart()193   void OnStart() {
194     // Link ipc_thread_, listener_thread_ and channel_ altogether.
195     StartThread(&ipc_thread_, base::MessageLoop::TYPE_IO);
196     channel_.reset(CreateChannel());
197     channel_created_->Signal();
198     Run();
199   }
200 
OnListenerThreadShutdown1(WaitableEvent * listener_event,WaitableEvent * ipc_event)201   void OnListenerThreadShutdown1(WaitableEvent* listener_event,
202                                  WaitableEvent* ipc_event) {
203     // SyncChannel needs to be destructed on the thread that it was created on.
204     channel_.reset();
205 
206     base::RunLoop().RunUntilIdle();
207 
208     ipc_thread_.task_runner()->PostTask(
209         FROM_HERE,
210         base::Bind(&Worker::OnIPCThreadShutdown, base::Unretained(this),
211                    listener_event, ipc_event));
212   }
213 
OnIPCThreadShutdown(WaitableEvent * listener_event,WaitableEvent * ipc_event)214   void OnIPCThreadShutdown(WaitableEvent* listener_event,
215                            WaitableEvent* ipc_event) {
216     base::RunLoop().RunUntilIdle();
217     ipc_event->Signal();
218 
219     listener_thread_.task_runner()->PostTask(
220         FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown2,
221                               base::Unretained(this), listener_event));
222   }
223 
OnListenerThreadShutdown2(WaitableEvent * listener_event)224   void OnListenerThreadShutdown2(WaitableEvent* listener_event) {
225     base::RunLoop().RunUntilIdle();
226     listener_event->Signal();
227   }
228 
OnMessageReceived(const Message & message)229   bool OnMessageReceived(const Message& message) override {
230     IPC_BEGIN_MESSAGE_MAP(Worker, message)
231      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay)
232      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife,
233                                      OnAnswerDelay)
234      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelNestedTestMsg_String,
235                                      OnNestedTestMsg)
236     IPC_END_MESSAGE_MAP()
237     return true;
238   }
239 
StartThread(base::Thread * thread,base::MessageLoop::Type type)240   void StartThread(base::Thread* thread, base::MessageLoop::Type type) {
241     base::Thread::Options options;
242     options.message_loop_type = type;
243     thread->StartWithOptions(options);
244   }
245 
246   std::unique_ptr<WaitableEvent> done_;
247   std::unique_ptr<WaitableEvent> channel_created_;
248   mojo::ScopedMessagePipeHandle channel_handle_;
249   Channel::Mode mode_;
250   std::unique_ptr<SyncChannel> channel_;
251   base::Thread ipc_thread_;
252   base::Thread listener_thread_;
253   base::Thread* overrided_thread_;
254 
255   base::WaitableEvent shutdown_event_;
256 
257   bool is_shutdown_;
258 
259   DISALLOW_COPY_AND_ASSIGN(Worker);
260 };
261 
262 
263 // Starts the test with the given workers.  This function deletes the workers
264 // when it's done.
RunTest(std::vector<Worker * > workers)265 void RunTest(std::vector<Worker*> workers) {
266   // First we create the workers that are channel servers, or else the other
267   // workers' channel initialization might fail because the pipe isn't created..
268   for (size_t i = 0; i < workers.size(); ++i) {
269     if (workers[i]->mode() & Channel::MODE_SERVER_FLAG) {
270       workers[i]->Start();
271       workers[i]->WaitForChannelCreation();
272     }
273   }
274 
275   // now create the clients
276   for (size_t i = 0; i < workers.size(); ++i) {
277     if (workers[i]->mode() & Channel::MODE_CLIENT_FLAG)
278       workers[i]->Start();
279   }
280 
281   // wait for all the workers to finish
282   for (size_t i = 0; i < workers.size(); ++i)
283     workers[i]->done_event()->Wait();
284 
285   for (size_t i = 0; i < workers.size(); ++i) {
286     workers[i]->Shutdown();
287     delete workers[i];
288   }
289 }
290 
291 class IPCSyncChannelTest : public testing::Test {
292  private:
293   base::MessageLoop message_loop_;
294 };
295 
296 //------------------------------------------------------------------------------
297 
298 class SimpleServer : public Worker {
299  public:
SimpleServer(bool pump_during_send,mojo::ScopedMessagePipeHandle channel_handle)300   SimpleServer(bool pump_during_send,
301                mojo::ScopedMessagePipeHandle channel_handle)
302       : Worker(Channel::MODE_SERVER,
303                "simpler_server",
304                std::move(channel_handle)),
305         pump_during_send_(pump_during_send) {}
Run()306   void Run() override {
307     SendAnswerToLife(pump_during_send_, true);
308     Done();
309   }
310 
311   bool pump_during_send_;
312 };
313 
314 class SimpleClient : public Worker {
315  public:
SimpleClient(mojo::ScopedMessagePipeHandle channel_handle)316   explicit SimpleClient(mojo::ScopedMessagePipeHandle channel_handle)
317       : Worker(Channel::MODE_CLIENT,
318                "simple_client",
319                std::move(channel_handle)) {}
320 
OnAnswer(int * answer)321   void OnAnswer(int* answer) override {
322     *answer = 42;
323     Done();
324   }
325 };
326 
Simple(bool pump_during_send)327 void Simple(bool pump_during_send) {
328   std::vector<Worker*> workers;
329   mojo::MessagePipe pipe;
330   workers.push_back(
331       new SimpleServer(pump_during_send, std::move(pipe.handle0)));
332   workers.push_back(new SimpleClient(std::move(pipe.handle1)));
333   RunTest(workers);
334 }
335 
336 #if defined(OS_ANDROID)
337 #define MAYBE_Simple DISABLED_Simple
338 #else
339 #define MAYBE_Simple Simple
340 #endif
341 // Tests basic synchronous call
TEST_F(IPCSyncChannelTest,MAYBE_Simple)342 TEST_F(IPCSyncChannelTest, MAYBE_Simple) {
343   Simple(false);
344   Simple(true);
345 }
346 
347 //------------------------------------------------------------------------------
348 
349 // Worker classes which override how the sync channel is created to use the
350 // two-step initialization (calling the lightweight constructor and then
351 // ChannelProxy::Init separately) process.
352 class TwoStepServer : public Worker {
353  public:
TwoStepServer(bool create_pipe_now,mojo::ScopedMessagePipeHandle channel_handle)354   TwoStepServer(bool create_pipe_now,
355                 mojo::ScopedMessagePipeHandle channel_handle)
356       : Worker(Channel::MODE_SERVER,
357                "simpler_server",
358                std::move(channel_handle)),
359         create_pipe_now_(create_pipe_now) {}
360 
Run()361   void Run() override {
362     SendAnswerToLife(false, true);
363     Done();
364   }
365 
CreateChannel()366   SyncChannel* CreateChannel() override {
367     SyncChannel* channel =
368         SyncChannel::Create(TakeChannelHandle(), mode(), this,
369                             ipc_thread().task_runner(),
370                             base::ThreadTaskRunnerHandle::Get(),
371                             create_pipe_now_, shutdown_event())
372             .release();
373     return channel;
374   }
375 
376   bool create_pipe_now_;
377 };
378 
379 class TwoStepClient : public Worker {
380  public:
TwoStepClient(bool create_pipe_now,mojo::ScopedMessagePipeHandle channel_handle)381   TwoStepClient(bool create_pipe_now,
382                 mojo::ScopedMessagePipeHandle channel_handle)
383       : Worker(Channel::MODE_CLIENT,
384                "simple_client",
385                std::move(channel_handle)),
386         create_pipe_now_(create_pipe_now) {}
387 
OnAnswer(int * answer)388   void OnAnswer(int* answer) override {
389     *answer = 42;
390     Done();
391   }
392 
CreateChannel()393   SyncChannel* CreateChannel() override {
394     SyncChannel* channel =
395         SyncChannel::Create(TakeChannelHandle(), mode(), this,
396                             ipc_thread().task_runner(),
397                             base::ThreadTaskRunnerHandle::Get(),
398                             create_pipe_now_, shutdown_event())
399             .release();
400     return channel;
401   }
402 
403   bool create_pipe_now_;
404 };
405 
TwoStep(bool create_server_pipe_now,bool create_client_pipe_now)406 void TwoStep(bool create_server_pipe_now, bool create_client_pipe_now) {
407   std::vector<Worker*> workers;
408   mojo::MessagePipe pipe;
409   workers.push_back(
410       new TwoStepServer(create_server_pipe_now, std::move(pipe.handle0)));
411   workers.push_back(
412       new TwoStepClient(create_client_pipe_now, std::move(pipe.handle1)));
413   RunTest(workers);
414 }
415 
416 // Tests basic two-step initialization, where you call the lightweight
417 // constructor then Init.
TEST_F(IPCSyncChannelTest,TwoStepInitialization)418 TEST_F(IPCSyncChannelTest, TwoStepInitialization) {
419   TwoStep(false, false);
420   TwoStep(false, true);
421   TwoStep(true, false);
422   TwoStep(true, true);
423 }
424 
425 //------------------------------------------------------------------------------
426 
427 class DelayClient : public Worker {
428  public:
DelayClient(mojo::ScopedMessagePipeHandle channel_handle)429   explicit DelayClient(mojo::ScopedMessagePipeHandle channel_handle)
430       : Worker(Channel::MODE_CLIENT,
431                "delay_client",
432                std::move(channel_handle)) {}
433 
OnAnswerDelay(Message * reply_msg)434   void OnAnswerDelay(Message* reply_msg) override {
435     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
436     Send(reply_msg);
437     Done();
438   }
439 };
440 
DelayReply(bool pump_during_send)441 void DelayReply(bool pump_during_send) {
442   std::vector<Worker*> workers;
443   mojo::MessagePipe pipe;
444   workers.push_back(
445       new SimpleServer(pump_during_send, std::move(pipe.handle0)));
446   workers.push_back(new DelayClient(std::move(pipe.handle1)));
447   RunTest(workers);
448 }
449 
450 // Tests that asynchronous replies work
TEST_F(IPCSyncChannelTest,DelayReply)451 TEST_F(IPCSyncChannelTest, DelayReply) {
452   DelayReply(false);
453   DelayReply(true);
454 }
455 
456 //------------------------------------------------------------------------------
457 
458 class NoHangServer : public Worker {
459  public:
NoHangServer(WaitableEvent * got_first_reply,bool pump_during_send,mojo::ScopedMessagePipeHandle channel_handle)460   NoHangServer(WaitableEvent* got_first_reply,
461                bool pump_during_send,
462                mojo::ScopedMessagePipeHandle channel_handle)
463       : Worker(Channel::MODE_SERVER,
464                "no_hang_server",
465                std::move(channel_handle)),
466         got_first_reply_(got_first_reply),
467         pump_during_send_(pump_during_send) {}
Run()468   void Run() override {
469     SendAnswerToLife(pump_during_send_, true);
470     got_first_reply_->Signal();
471 
472     SendAnswerToLife(pump_during_send_, false);
473     Done();
474   }
475 
476   WaitableEvent* got_first_reply_;
477   bool pump_during_send_;
478 };
479 
480 class NoHangClient : public Worker {
481  public:
NoHangClient(WaitableEvent * got_first_reply,mojo::ScopedMessagePipeHandle channel_handle)482   NoHangClient(WaitableEvent* got_first_reply,
483                mojo::ScopedMessagePipeHandle channel_handle)
484       : Worker(Channel::MODE_CLIENT,
485                "no_hang_client",
486                std::move(channel_handle)),
487         got_first_reply_(got_first_reply) {}
488 
OnAnswerDelay(Message * reply_msg)489   void OnAnswerDelay(Message* reply_msg) override {
490     // Use the DELAY_REPLY macro so that we can force the reply to be sent
491     // before this function returns (when the channel will be reset).
492     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
493     Send(reply_msg);
494     got_first_reply_->Wait();
495     CloseChannel();
496     Done();
497   }
498 
499   WaitableEvent* got_first_reply_;
500 };
501 
NoHang(bool pump_during_send)502 void NoHang(bool pump_during_send) {
503   WaitableEvent got_first_reply(
504       base::WaitableEvent::ResetPolicy::AUTOMATIC,
505       base::WaitableEvent::InitialState::NOT_SIGNALED);
506   std::vector<Worker*> workers;
507   mojo::MessagePipe pipe;
508   workers.push_back(new NoHangServer(&got_first_reply, pump_during_send,
509                                      std::move(pipe.handle0)));
510   workers.push_back(
511       new NoHangClient(&got_first_reply, std::move(pipe.handle1)));
512   RunTest(workers);
513 }
514 
515 // Tests that caller doesn't hang if receiver dies
TEST_F(IPCSyncChannelTest,NoHang)516 TEST_F(IPCSyncChannelTest, NoHang) {
517   NoHang(false);
518   NoHang(true);
519 }
520 
521 //------------------------------------------------------------------------------
522 
523 class UnblockServer : public Worker {
524  public:
UnblockServer(bool pump_during_send,bool delete_during_send,mojo::ScopedMessagePipeHandle channel_handle)525   UnblockServer(bool pump_during_send,
526                 bool delete_during_send,
527                 mojo::ScopedMessagePipeHandle channel_handle)
528       : Worker(Channel::MODE_SERVER,
529                "unblock_server",
530                std::move(channel_handle)),
531         pump_during_send_(pump_during_send),
532         delete_during_send_(delete_during_send) {}
Run()533   void Run() override {
534     if (delete_during_send_) {
535       // Use custom code since race conditions mean the answer may or may not be
536       // available.
537       int answer = 0;
538       SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
539       if (pump_during_send_)
540         msg->EnableMessagePumping();
541       Send(msg);
542     } else {
543       SendAnswerToLife(pump_during_send_, true);
544     }
545     Done();
546   }
547 
OnDoubleDelay(int in,Message * reply_msg)548   void OnDoubleDelay(int in, Message* reply_msg) override {
549     SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
550     Send(reply_msg);
551     if (delete_during_send_)
552       ResetChannel();
553   }
554 
555   bool pump_during_send_;
556   bool delete_during_send_;
557 };
558 
559 class UnblockClient : public Worker {
560  public:
UnblockClient(bool pump_during_send,mojo::ScopedMessagePipeHandle channel_handle)561   UnblockClient(bool pump_during_send,
562                 mojo::ScopedMessagePipeHandle channel_handle)
563       : Worker(Channel::MODE_CLIENT,
564                "unblock_client",
565                std::move(channel_handle)),
566         pump_during_send_(pump_during_send) {}
567 
OnAnswer(int * answer)568   void OnAnswer(int* answer) override {
569     SendDouble(pump_during_send_, true);
570     *answer = 42;
571     Done();
572   }
573 
574   bool pump_during_send_;
575 };
576 
Unblock(bool server_pump,bool client_pump,bool delete_during_send)577 void Unblock(bool server_pump, bool client_pump, bool delete_during_send) {
578   std::vector<Worker*> workers;
579   mojo::MessagePipe pipe;
580   workers.push_back(new UnblockServer(server_pump, delete_during_send,
581                                       std::move(pipe.handle0)));
582   workers.push_back(new UnblockClient(client_pump, std::move(pipe.handle1)));
583   RunTest(workers);
584 }
585 
586 // Tests that the caller unblocks to answer a sync message from the receiver.
TEST_F(IPCSyncChannelTest,Unblock)587 TEST_F(IPCSyncChannelTest, Unblock) {
588   Unblock(false, false, false);
589   Unblock(false, true, false);
590   Unblock(true, false, false);
591   Unblock(true, true, false);
592 }
593 
594 //------------------------------------------------------------------------------
595 
596 #if defined(OS_ANDROID)
597 #define MAYBE_ChannelDeleteDuringSend DISABLED_ChannelDeleteDuringSend
598 #else
599 #define MAYBE_ChannelDeleteDuringSend ChannelDeleteDuringSend
600 #endif
601 // Tests that the the SyncChannel object can be deleted during a Send.
TEST_F(IPCSyncChannelTest,MAYBE_ChannelDeleteDuringSend)602 TEST_F(IPCSyncChannelTest, MAYBE_ChannelDeleteDuringSend) {
603   Unblock(false, false, true);
604   Unblock(false, true, true);
605   Unblock(true, false, true);
606   Unblock(true, true, true);
607 }
608 
609 //------------------------------------------------------------------------------
610 
611 class RecursiveServer : public Worker {
612  public:
RecursiveServer(bool expected_send_result,bool pump_first,bool pump_second,mojo::ScopedMessagePipeHandle channel_handle)613   RecursiveServer(bool expected_send_result,
614                   bool pump_first,
615                   bool pump_second,
616                   mojo::ScopedMessagePipeHandle channel_handle)
617       : Worker(Channel::MODE_SERVER,
618                "recursive_server",
619                std::move(channel_handle)),
620         expected_send_result_(expected_send_result),
621         pump_first_(pump_first),
622         pump_second_(pump_second) {}
Run()623   void Run() override {
624     SendDouble(pump_first_, expected_send_result_);
625     Done();
626   }
627 
OnDouble(int in,int * out)628   void OnDouble(int in, int* out) override {
629     *out = in * 2;
630     SendAnswerToLife(pump_second_, expected_send_result_);
631   }
632 
633   bool expected_send_result_, pump_first_, pump_second_;
634 };
635 
636 class RecursiveClient : public Worker {
637  public:
RecursiveClient(bool pump_during_send,bool close_channel,mojo::ScopedMessagePipeHandle channel_handle)638   RecursiveClient(bool pump_during_send,
639                   bool close_channel,
640                   mojo::ScopedMessagePipeHandle channel_handle)
641       : Worker(Channel::MODE_CLIENT,
642                "recursive_client",
643                std::move(channel_handle)),
644         pump_during_send_(pump_during_send),
645         close_channel_(close_channel) {}
646 
OnDoubleDelay(int in,Message * reply_msg)647   void OnDoubleDelay(int in, Message* reply_msg) override {
648     SendDouble(pump_during_send_, !close_channel_);
649     if (close_channel_) {
650       delete reply_msg;
651     } else {
652       SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
653       Send(reply_msg);
654     }
655     Done();
656   }
657 
OnAnswerDelay(Message * reply_msg)658   void OnAnswerDelay(Message* reply_msg) override {
659     if (close_channel_) {
660       delete reply_msg;
661       CloseChannel();
662     } else {
663       SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
664       Send(reply_msg);
665     }
666   }
667 
668   bool pump_during_send_, close_channel_;
669 };
670 
Recursive(bool server_pump_first,bool server_pump_second,bool client_pump)671 void Recursive(
672     bool server_pump_first, bool server_pump_second, bool client_pump) {
673   std::vector<Worker*> workers;
674   mojo::MessagePipe pipe;
675   workers.push_back(new RecursiveServer(
676       true, server_pump_first, server_pump_second, std::move(pipe.handle0)));
677   workers.push_back(
678       new RecursiveClient(client_pump, false, std::move(pipe.handle1)));
679   RunTest(workers);
680 }
681 
682 // Tests a server calling Send while another Send is pending.
TEST_F(IPCSyncChannelTest,Recursive)683 TEST_F(IPCSyncChannelTest, Recursive) {
684   Recursive(false, false, false);
685   Recursive(false, false, true);
686   Recursive(false, true, false);
687   Recursive(false, true, true);
688   Recursive(true, false, false);
689   Recursive(true, false, true);
690   Recursive(true, true, false);
691   Recursive(true, true, true);
692 }
693 
694 //------------------------------------------------------------------------------
695 
RecursiveNoHang(bool server_pump_first,bool server_pump_second,bool client_pump)696 void RecursiveNoHang(
697     bool server_pump_first, bool server_pump_second, bool client_pump) {
698   std::vector<Worker*> workers;
699   mojo::MessagePipe pipe;
700   workers.push_back(new RecursiveServer(
701       false, server_pump_first, server_pump_second, std::move(pipe.handle0)));
702   workers.push_back(
703       new RecursiveClient(client_pump, true, std::move(pipe.handle1)));
704   RunTest(workers);
705 }
706 
707 // Tests that if a caller makes a sync call during an existing sync call and
708 // the receiver dies, neither of the Send() calls hang.
TEST_F(IPCSyncChannelTest,RecursiveNoHang)709 TEST_F(IPCSyncChannelTest, RecursiveNoHang) {
710   RecursiveNoHang(false, false, false);
711   RecursiveNoHang(false, false, true);
712   RecursiveNoHang(false, true, false);
713   RecursiveNoHang(false, true, true);
714   RecursiveNoHang(true, false, false);
715   RecursiveNoHang(true, false, true);
716   RecursiveNoHang(true, true, false);
717   RecursiveNoHang(true, true, true);
718 }
719 
720 //------------------------------------------------------------------------------
721 
722 class MultipleServer1 : public Worker {
723  public:
MultipleServer1(bool pump_during_send,mojo::ScopedMessagePipeHandle channel_handle)724   MultipleServer1(bool pump_during_send,
725                   mojo::ScopedMessagePipeHandle channel_handle)
726       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
727         pump_during_send_(pump_during_send) {}
728 
Run()729   void Run() override {
730     SendDouble(pump_during_send_, true);
731     Done();
732   }
733 
734   bool pump_during_send_;
735 };
736 
737 class MultipleClient1 : public Worker {
738  public:
MultipleClient1(WaitableEvent * client1_msg_received,WaitableEvent * client1_can_reply,mojo::ScopedMessagePipeHandle channel_handle)739   MultipleClient1(WaitableEvent* client1_msg_received,
740                   WaitableEvent* client1_can_reply,
741                   mojo::ScopedMessagePipeHandle channel_handle)
742       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
743         client1_msg_received_(client1_msg_received),
744         client1_can_reply_(client1_can_reply) {}
745 
OnDouble(int in,int * out)746   void OnDouble(int in, int* out) override {
747     client1_msg_received_->Signal();
748     *out = in * 2;
749     client1_can_reply_->Wait();
750     Done();
751   }
752 
753  private:
754   WaitableEvent *client1_msg_received_, *client1_can_reply_;
755 };
756 
757 class MultipleServer2 : public Worker {
758  public:
MultipleServer2(mojo::ScopedMessagePipeHandle channel_handle)759   explicit MultipleServer2(mojo::ScopedMessagePipeHandle channel_handle)
760       : Worker(std::move(channel_handle), Channel::MODE_SERVER) {}
761 
OnAnswer(int * result)762   void OnAnswer(int* result) override {
763     *result = 42;
764     Done();
765   }
766 };
767 
768 class MultipleClient2 : public Worker {
769  public:
MultipleClient2(WaitableEvent * client1_msg_received,WaitableEvent * client1_can_reply,bool pump_during_send,mojo::ScopedMessagePipeHandle channel_handle)770   MultipleClient2(WaitableEvent* client1_msg_received,
771                   WaitableEvent* client1_can_reply,
772                   bool pump_during_send,
773                   mojo::ScopedMessagePipeHandle channel_handle)
774       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
775         client1_msg_received_(client1_msg_received),
776         client1_can_reply_(client1_can_reply),
777         pump_during_send_(pump_during_send) {}
778 
Run()779   void Run() override {
780     client1_msg_received_->Wait();
781     SendAnswerToLife(pump_during_send_, true);
782     client1_can_reply_->Signal();
783     Done();
784   }
785 
786  private:
787   WaitableEvent *client1_msg_received_, *client1_can_reply_;
788   bool pump_during_send_;
789 };
790 
Multiple(bool server_pump,bool client_pump)791 void Multiple(bool server_pump, bool client_pump) {
792   std::vector<Worker*> workers;
793 
794   // A shared worker thread so that server1 and server2 run on one thread.
795   base::Thread worker_thread("Multiple");
796   ASSERT_TRUE(worker_thread.Start());
797 
798   // Server1 sends a sync msg to client1, which blocks the reply until
799   // server2 (which runs on the same worker thread as server1) responds
800   // to a sync msg from client2.
801   WaitableEvent client1_msg_received(
802       base::WaitableEvent::ResetPolicy::AUTOMATIC,
803       base::WaitableEvent::InitialState::NOT_SIGNALED);
804   WaitableEvent client1_can_reply(
805       base::WaitableEvent::ResetPolicy::AUTOMATIC,
806       base::WaitableEvent::InitialState::NOT_SIGNALED);
807 
808   Worker* worker;
809 
810   mojo::MessagePipe pipe1, pipe2;
811   worker = new MultipleServer2(std::move(pipe2.handle0));
812   worker->OverrideThread(&worker_thread);
813   workers.push_back(worker);
814 
815   worker = new MultipleClient2(&client1_msg_received, &client1_can_reply,
816                                client_pump, std::move(pipe2.handle1));
817   workers.push_back(worker);
818 
819   worker = new MultipleServer1(server_pump, std::move(pipe1.handle0));
820   worker->OverrideThread(&worker_thread);
821   workers.push_back(worker);
822 
823   worker = new MultipleClient1(&client1_msg_received, &client1_can_reply,
824                                std::move(pipe1.handle1));
825   workers.push_back(worker);
826 
827   RunTest(workers);
828 }
829 
830 // Tests that multiple SyncObjects on the same listener thread can unblock each
831 // other.
TEST_F(IPCSyncChannelTest,Multiple)832 TEST_F(IPCSyncChannelTest, Multiple) {
833   Multiple(false, false);
834   Multiple(false, true);
835   Multiple(true, false);
836   Multiple(true, true);
837 }
838 
839 //------------------------------------------------------------------------------
840 
841 // This class provides server side functionality to test the case where
842 // multiple sync channels are in use on the same thread on the client and
843 // nested calls are issued.
844 class QueuedReplyServer : public Worker {
845  public:
QueuedReplyServer(base::Thread * listener_thread,mojo::ScopedMessagePipeHandle channel_handle,const std::string & reply_text)846   QueuedReplyServer(base::Thread* listener_thread,
847                     mojo::ScopedMessagePipeHandle channel_handle,
848                     const std::string& reply_text)
849       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
850         reply_text_(reply_text) {
851     Worker::OverrideThread(listener_thread);
852   }
853 
OnNestedTestMsg(Message * reply_msg)854   void OnNestedTestMsg(Message* reply_msg) override {
855     VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_;
856     SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_);
857     Send(reply_msg);
858     Done();
859   }
860 
861  private:
862   std::string reply_text_;
863 };
864 
865 // The QueuedReplyClient class provides functionality to test the case where
866 // multiple sync channels are in use on the same thread and they make nested
867 // sync calls, i.e. while the first channel waits for a response it makes a
868 // sync call on another channel.
869 // The callstack should unwind correctly, i.e. the outermost call should
870 // complete first, and so on.
871 class QueuedReplyClient : public Worker {
872  public:
QueuedReplyClient(base::Thread * listener_thread,mojo::ScopedMessagePipeHandle channel_handle,const std::string & expected_text,bool pump_during_send)873   QueuedReplyClient(base::Thread* listener_thread,
874                     mojo::ScopedMessagePipeHandle channel_handle,
875                     const std::string& expected_text,
876                     bool pump_during_send)
877       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
878         pump_during_send_(pump_during_send),
879         expected_text_(expected_text) {
880     Worker::OverrideThread(listener_thread);
881   }
882 
Run()883   void Run() override {
884     std::string response;
885     SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response);
886     if (pump_during_send_)
887       msg->EnableMessagePumping();
888     bool result = Send(msg);
889     DCHECK(result);
890     DCHECK_EQ(response, expected_text_);
891 
892     VLOG(1) << __FUNCTION__ << " Received reply: " << response;
893     Done();
894   }
895 
896  private:
897   bool pump_during_send_;
898   std::string expected_text_;
899 };
900 
QueuedReply(bool client_pump)901 void QueuedReply(bool client_pump) {
902   std::vector<Worker*> workers;
903 
904   // A shared worker thread for servers
905   base::Thread server_worker_thread("QueuedReply_ServerListener");
906   ASSERT_TRUE(server_worker_thread.Start());
907 
908   base::Thread client_worker_thread("QueuedReply_ClientListener");
909   ASSERT_TRUE(client_worker_thread.Start());
910 
911   Worker* worker;
912 
913   mojo::MessagePipe pipe1, pipe2;
914   worker = new QueuedReplyServer(&server_worker_thread,
915                                  std::move(pipe1.handle0), "Got first message");
916   workers.push_back(worker);
917 
918   worker = new QueuedReplyServer(
919       &server_worker_thread, std::move(pipe2.handle0), "Got second message");
920   workers.push_back(worker);
921 
922   worker =
923       new QueuedReplyClient(&client_worker_thread, std::move(pipe1.handle1),
924                             "Got first message", client_pump);
925   workers.push_back(worker);
926 
927   worker =
928       new QueuedReplyClient(&client_worker_thread, std::move(pipe2.handle1),
929                             "Got second message", client_pump);
930   workers.push_back(worker);
931 
932   RunTest(workers);
933 }
934 
935 // While a blocking send is in progress, the listener thread might answer other
936 // synchronous messages.  This tests that if during the response to another
937 // message the reply to the original messages comes, it is queued up correctly
938 // and the original Send is unblocked later.
939 // We also test that the send call stacks unwind correctly when the channel
940 // pumps messages while waiting for a response.
TEST_F(IPCSyncChannelTest,QueuedReply)941 TEST_F(IPCSyncChannelTest, QueuedReply) {
942   QueuedReply(false);
943   QueuedReply(true);
944 }
945 
946 //------------------------------------------------------------------------------
947 
948 class ChattyClient : public Worker {
949  public:
ChattyClient(mojo::ScopedMessagePipeHandle channel_handle)950   explicit ChattyClient(mojo::ScopedMessagePipeHandle channel_handle)
951       : Worker(Channel::MODE_CLIENT,
952                "chatty_client",
953                std::move(channel_handle)) {}
954 
OnAnswer(int * answer)955   void OnAnswer(int* answer) override {
956     // The PostMessage limit is 10k.  Send 20% more than that.
957     const int kMessageLimit = 10000;
958     const int kMessagesToSend = kMessageLimit * 120 / 100;
959     for (int i = 0; i < kMessagesToSend; ++i) {
960       if (!SendDouble(false, true))
961         break;
962     }
963     *answer = 42;
964     Done();
965   }
966 };
967 
ChattyServer(bool pump_during_send)968 void ChattyServer(bool pump_during_send) {
969   std::vector<Worker*> workers;
970   mojo::MessagePipe pipe;
971   workers.push_back(
972       new UnblockServer(pump_during_send, false, std::move(pipe.handle0)));
973   workers.push_back(new ChattyClient(std::move(pipe.handle1)));
974   RunTest(workers);
975 }
976 
977 #if defined(OS_ANDROID)
978 // Times out.
979 #define MAYBE_ChattyServer DISABLED_ChattyServer
980 #else
981 #define MAYBE_ChattyServer ChattyServer
982 #endif
983 // Tests http://b/1093251 - that sending lots of sync messages while
984 // the receiver is waiting for a sync reply does not overflow the PostMessage
985 // queue.
TEST_F(IPCSyncChannelTest,MAYBE_ChattyServer)986 TEST_F(IPCSyncChannelTest, MAYBE_ChattyServer) {
987   ChattyServer(false);
988 }
989 
990 #if defined(OS_ANDROID)
991 // Times out.
992 #define MAYBE_ChattyServerPumpDuringSend DISABLED_ChattyServerPumpDuringSend
993 #else
994 #define MAYBE_ChattyServerPumpDuringSend ChattyServerPumpDuringSend
995 #endif
TEST_F(IPCSyncChannelTest,MAYBE_ChattyServerPumpDuringSend)996 TEST_F(IPCSyncChannelTest, MAYBE_ChattyServerPumpDuringSend) {
997   ChattyServer(true);
998 }
999 
1000 //------------------------------------------------------------------------------
1001 
NestedCallback(Worker * server)1002 void NestedCallback(Worker* server) {
1003   // Sleep a bit so that we wake up after the reply has been received.
1004   base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(250));
1005   server->SendAnswerToLife(true, true);
1006 }
1007 
1008 bool timeout_occurred = false;
1009 
TimeoutCallback()1010 void TimeoutCallback() {
1011   timeout_occurred = true;
1012 }
1013 
1014 class DoneEventRaceServer : public Worker {
1015  public:
DoneEventRaceServer(mojo::ScopedMessagePipeHandle channel_handle)1016   explicit DoneEventRaceServer(mojo::ScopedMessagePipeHandle channel_handle)
1017       : Worker(Channel::MODE_SERVER,
1018                "done_event_race_server",
1019                std::move(channel_handle)) {}
1020 
Run()1021   void Run() override {
1022     base::ThreadTaskRunnerHandle::Get()->PostTask(
1023         FROM_HERE, base::Bind(&NestedCallback, base::Unretained(this)));
1024     base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
1025         FROM_HERE, base::Bind(&TimeoutCallback),
1026         base::TimeDelta::FromSeconds(9));
1027     // Even though we have a timeout on the Send, it will succeed since for this
1028     // bug, the reply message comes back and is deserialized, however the done
1029     // event wasn't set.  So we indirectly use the timeout task to notice if a
1030     // timeout occurred.
1031     SendAnswerToLife(true, true);
1032     DCHECK(!timeout_occurred);
1033     Done();
1034   }
1035 };
1036 
1037 #if defined(OS_ANDROID)
1038 #define MAYBE_DoneEventRace DISABLED_DoneEventRace
1039 #else
1040 #define MAYBE_DoneEventRace DoneEventRace
1041 #endif
1042 // Tests http://b/1474092 - that if after the done_event is set but before
1043 // OnObjectSignaled is called another message is sent out, then after its
1044 // reply comes back OnObjectSignaled will be called for the first message.
TEST_F(IPCSyncChannelTest,MAYBE_DoneEventRace)1045 TEST_F(IPCSyncChannelTest, MAYBE_DoneEventRace) {
1046   std::vector<Worker*> workers;
1047   mojo::MessagePipe pipe;
1048   workers.push_back(new DoneEventRaceServer(std::move(pipe.handle0)));
1049   workers.push_back(new SimpleClient(std::move(pipe.handle1)));
1050   RunTest(workers);
1051 }
1052 
1053 //------------------------------------------------------------------------------
1054 
1055 class TestSyncMessageFilter : public SyncMessageFilter {
1056  public:
TestSyncMessageFilter(base::WaitableEvent * shutdown_event,Worker * worker,scoped_refptr<base::SingleThreadTaskRunner> task_runner)1057   TestSyncMessageFilter(
1058       base::WaitableEvent* shutdown_event,
1059       Worker* worker,
1060       scoped_refptr<base::SingleThreadTaskRunner> task_runner)
1061       : SyncMessageFilter(shutdown_event),
1062         worker_(worker),
1063         task_runner_(task_runner) {}
1064 
OnFilterAdded(Channel * channel)1065   void OnFilterAdded(Channel* channel) override {
1066     SyncMessageFilter::OnFilterAdded(channel);
1067     task_runner_->PostTask(
1068         FROM_HERE,
1069         base::Bind(&TestSyncMessageFilter::SendMessageOnHelperThread, this));
1070   }
1071 
SendMessageOnHelperThread()1072   void SendMessageOnHelperThread() {
1073     int answer = 0;
1074     bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
1075     DCHECK(result);
1076     DCHECK_EQ(answer, 42);
1077 
1078     worker_->Done();
1079   }
1080 
1081  private:
1082   ~TestSyncMessageFilter() override = default;
1083 
1084   Worker* worker_;
1085   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
1086 };
1087 
1088 class SyncMessageFilterServer : public Worker {
1089  public:
SyncMessageFilterServer(mojo::ScopedMessagePipeHandle channel_handle)1090   explicit SyncMessageFilterServer(mojo::ScopedMessagePipeHandle channel_handle)
1091       : Worker(Channel::MODE_SERVER,
1092                "sync_message_filter_server",
1093                std::move(channel_handle)),
1094         thread_("helper_thread") {
1095     base::Thread::Options options;
1096     options.message_loop_type = base::MessageLoop::TYPE_DEFAULT;
1097     thread_.StartWithOptions(options);
1098     filter_ = new TestSyncMessageFilter(shutdown_event(), this,
1099                                         thread_.task_runner());
1100   }
1101 
Run()1102   void Run() override {
1103     channel()->AddFilter(filter_.get());
1104   }
1105 
1106   base::Thread thread_;
1107   scoped_refptr<TestSyncMessageFilter> filter_;
1108 };
1109 
1110 // This class provides functionality to test the case that a Send on the sync
1111 // channel does not crash after the channel has been closed.
1112 class ServerSendAfterClose : public Worker {
1113  public:
ServerSendAfterClose(mojo::ScopedMessagePipeHandle channel_handle)1114   explicit ServerSendAfterClose(mojo::ScopedMessagePipeHandle channel_handle)
1115       : Worker(Channel::MODE_SERVER,
1116                "simpler_server",
1117                std::move(channel_handle)),
1118         send_result_(true) {}
1119 
SendDummy()1120   bool SendDummy() {
1121     ListenerThread()->task_runner()->PostTask(
1122         FROM_HERE,
1123         base::Bind(base::IgnoreResult(&ServerSendAfterClose::Send),
1124                    base::Unretained(this), new SyncChannelTestMsg_NoArgs));
1125     return true;
1126   }
1127 
send_result() const1128   bool send_result() const {
1129     return send_result_;
1130   }
1131 
1132  private:
Run()1133   void Run() override {
1134     CloseChannel();
1135     Done();
1136   }
1137 
Send(Message * msg)1138   bool Send(Message* msg) override {
1139     send_result_ = Worker::Send(msg);
1140     Done();
1141     return send_result_;
1142   }
1143 
1144   bool send_result_;
1145 };
1146 
1147 // Tests basic synchronous call
TEST_F(IPCSyncChannelTest,SyncMessageFilter)1148 TEST_F(IPCSyncChannelTest, SyncMessageFilter) {
1149   std::vector<Worker*> workers;
1150   mojo::MessagePipe pipe;
1151   workers.push_back(new SyncMessageFilterServer(std::move(pipe.handle0)));
1152   workers.push_back(new SimpleClient(std::move(pipe.handle1)));
1153   RunTest(workers);
1154 }
1155 
1156 // Test the case when the channel is closed and a Send is attempted after that.
TEST_F(IPCSyncChannelTest,SendAfterClose)1157 TEST_F(IPCSyncChannelTest, SendAfterClose) {
1158   mojo::MessagePipe pipe;
1159   ServerSendAfterClose server(std::move(pipe.handle0));
1160   server.Start();
1161 
1162   server.done_event()->Wait();
1163   server.done_event()->Reset();
1164 
1165   server.SendDummy();
1166   server.done_event()->Wait();
1167 
1168   EXPECT_FALSE(server.send_result());
1169 
1170   server.Shutdown();
1171 }
1172 
1173 //------------------------------------------------------------------------------
1174 
1175 class RestrictedDispatchServer : public Worker {
1176  public:
RestrictedDispatchServer(WaitableEvent * sent_ping_event,WaitableEvent * wait_event,mojo::ScopedMessagePipeHandle channel_handle)1177   RestrictedDispatchServer(WaitableEvent* sent_ping_event,
1178                            WaitableEvent* wait_event,
1179                            mojo::ScopedMessagePipeHandle channel_handle)
1180       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
1181         sent_ping_event_(sent_ping_event),
1182         wait_event_(wait_event) {}
1183 
OnDoPing(int ping)1184   void OnDoPing(int ping) {
1185     // Send an asynchronous message that unblocks the caller.
1186     Message* msg = new SyncChannelTestMsg_Ping(ping);
1187     msg->set_unblock(true);
1188     Send(msg);
1189     // Signal the event after the message has been sent on the channel, on the
1190     // IPC thread.
1191     ipc_thread().task_runner()->PostTask(
1192         FROM_HERE, base::Bind(&RestrictedDispatchServer::OnPingSent,
1193                               base::Unretained(this)));
1194   }
1195 
OnPingTTL(int ping,int * out)1196   void OnPingTTL(int ping, int* out) {
1197     *out = ping;
1198     wait_event_->Wait();
1199   }
1200 
ListenerThread()1201   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1202 
1203  private:
OnMessageReceived(const Message & message)1204   bool OnMessageReceived(const Message& message) override {
1205     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchServer, message)
1206      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1207      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
1208      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1209     IPC_END_MESSAGE_MAP()
1210     return true;
1211   }
1212 
OnPingSent()1213   void OnPingSent() {
1214     sent_ping_event_->Signal();
1215   }
1216 
OnNoArgs()1217   void OnNoArgs() { }
1218   WaitableEvent* sent_ping_event_;
1219   WaitableEvent* wait_event_;
1220 };
1221 
1222 class NonRestrictedDispatchServer : public Worker {
1223  public:
NonRestrictedDispatchServer(WaitableEvent * signal_event,mojo::ScopedMessagePipeHandle channel_handle)1224   NonRestrictedDispatchServer(WaitableEvent* signal_event,
1225                               mojo::ScopedMessagePipeHandle channel_handle)
1226       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
1227         signal_event_(signal_event) {}
1228 
ListenerThread()1229   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1230 
OnDoPingTTL(int ping)1231   void OnDoPingTTL(int ping) {
1232     int value = 0;
1233     Send(new SyncChannelTestMsg_PingTTL(ping, &value));
1234     signal_event_->Signal();
1235   }
1236 
1237  private:
OnMessageReceived(const Message & message)1238   bool OnMessageReceived(const Message& message) override {
1239     IPC_BEGIN_MESSAGE_MAP(NonRestrictedDispatchServer, message)
1240      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1241      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1242     IPC_END_MESSAGE_MAP()
1243     return true;
1244   }
1245 
OnNoArgs()1246   void OnNoArgs() { }
1247   WaitableEvent* signal_event_;
1248 };
1249 
1250 class RestrictedDispatchClient : public Worker {
1251  public:
RestrictedDispatchClient(WaitableEvent * sent_ping_event,RestrictedDispatchServer * server,NonRestrictedDispatchServer * server2,int * success,mojo::ScopedMessagePipeHandle restricted_channel_handle,mojo::ScopedMessagePipeHandle non_restricted_channel_handle)1252   RestrictedDispatchClient(
1253       WaitableEvent* sent_ping_event,
1254       RestrictedDispatchServer* server,
1255       NonRestrictedDispatchServer* server2,
1256       int* success,
1257       mojo::ScopedMessagePipeHandle restricted_channel_handle,
1258       mojo::ScopedMessagePipeHandle non_restricted_channel_handle)
1259       : Worker(std::move(restricted_channel_handle), Channel::MODE_CLIENT),
1260         ping_(0),
1261         server_(server),
1262         server2_(server2),
1263         success_(success),
1264         sent_ping_event_(sent_ping_event),
1265         non_restricted_channel_handle_(
1266             std::move(non_restricted_channel_handle)) {}
1267 
Run()1268   void Run() override {
1269     // Incoming messages from our channel should only be dispatched when we
1270     // send a message on that same channel.
1271     channel()->SetRestrictDispatchChannelGroup(1);
1272 
1273     server_->ListenerThread()->task_runner()->PostTask(
1274         FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing,
1275                               base::Unretained(server_), 1));
1276     sent_ping_event_->Wait();
1277     Send(new SyncChannelTestMsg_NoArgs);
1278     if (ping_ == 1)
1279       ++*success_;
1280     else
1281       LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
1282 
1283     non_restricted_channel_ = SyncChannel::Create(
1284         non_restricted_channel_handle_.release(), IPC::Channel::MODE_CLIENT,
1285         this, ipc_thread().task_runner(), base::ThreadTaskRunnerHandle::Get(),
1286         true, shutdown_event());
1287 
1288     server_->ListenerThread()->task_runner()->PostTask(
1289         FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing,
1290                               base::Unretained(server_), 2));
1291     sent_ping_event_->Wait();
1292     // Check that the incoming message is *not* dispatched when sending on the
1293     // non restricted channel.
1294     // TODO(piman): there is a possibility of a false positive race condition
1295     // here, if the message that was posted on the server-side end of the pipe
1296     // is not visible yet on the client side, but I don't know how to solve this
1297     // without hooking into the internals of SyncChannel. I haven't seen it in
1298     // practice (i.e. not setting SetRestrictDispatchToSameChannel does cause
1299     // the following to fail).
1300     non_restricted_channel_->Send(new SyncChannelTestMsg_NoArgs);
1301     if (ping_ == 1)
1302       ++*success_;
1303     else
1304       LOG(ERROR) << "Send dispatched message from restricted channel";
1305 
1306     Send(new SyncChannelTestMsg_NoArgs);
1307     if (ping_ == 2)
1308       ++*success_;
1309     else
1310       LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
1311 
1312     // Check that the incoming message on the non-restricted channel is
1313     // dispatched when sending on the restricted channel.
1314     server2_->ListenerThread()->task_runner()->PostTask(
1315         FROM_HERE, base::Bind(&NonRestrictedDispatchServer::OnDoPingTTL,
1316                               base::Unretained(server2_), 3));
1317     int value = 0;
1318     Send(new SyncChannelTestMsg_PingTTL(4, &value));
1319     if (ping_ == 3 && value == 4)
1320       ++*success_;
1321     else
1322       LOG(ERROR) << "Send failed to dispatch message from unrestricted channel";
1323 
1324     non_restricted_channel_->Send(new SyncChannelTestMsg_Done);
1325     non_restricted_channel_.reset();
1326     Send(new SyncChannelTestMsg_Done);
1327     Done();
1328   }
1329 
1330  private:
OnMessageReceived(const Message & message)1331   bool OnMessageReceived(const Message& message) override {
1332     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchClient, message)
1333      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Ping, OnPing)
1334      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_PingTTL, OnPingTTL)
1335     IPC_END_MESSAGE_MAP()
1336     return true;
1337   }
1338 
OnPing(int ping)1339   void OnPing(int ping) {
1340     ping_ = ping;
1341   }
1342 
OnPingTTL(int ping,IPC::Message * reply)1343   void OnPingTTL(int ping, IPC::Message* reply) {
1344     ping_ = ping;
1345     // This message comes from the NonRestrictedDispatchServer, we have to send
1346     // the reply back manually.
1347     SyncChannelTestMsg_PingTTL::WriteReplyParams(reply, ping);
1348     non_restricted_channel_->Send(reply);
1349   }
1350 
1351   int ping_;
1352   RestrictedDispatchServer* server_;
1353   NonRestrictedDispatchServer* server2_;
1354   int* success_;
1355   WaitableEvent* sent_ping_event_;
1356   std::unique_ptr<SyncChannel> non_restricted_channel_;
1357   mojo::ScopedMessagePipeHandle non_restricted_channel_handle_;
1358 };
1359 
TEST_F(IPCSyncChannelTest,RestrictedDispatch)1360 TEST_F(IPCSyncChannelTest, RestrictedDispatch) {
1361   WaitableEvent sent_ping_event(
1362       base::WaitableEvent::ResetPolicy::AUTOMATIC,
1363       base::WaitableEvent::InitialState::NOT_SIGNALED);
1364   WaitableEvent wait_event(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1365                            base::WaitableEvent::InitialState::NOT_SIGNALED);
1366   mojo::MessagePipe restricted_pipe, non_restricted_pipe;
1367   RestrictedDispatchServer* server = new RestrictedDispatchServer(
1368       &sent_ping_event, &wait_event, std::move(restricted_pipe.handle0));
1369   NonRestrictedDispatchServer* server2 = new NonRestrictedDispatchServer(
1370       &wait_event, std::move(non_restricted_pipe.handle0));
1371 
1372   int success = 0;
1373   std::vector<Worker*> workers;
1374   workers.push_back(server);
1375   workers.push_back(server2);
1376   workers.push_back(
1377       new RestrictedDispatchClient(&sent_ping_event, server, server2, &success,
1378                                    std::move(restricted_pipe.handle1),
1379                                    std::move(non_restricted_pipe.handle1)));
1380   RunTest(workers);
1381   EXPECT_EQ(4, success);
1382 }
1383 
1384 //------------------------------------------------------------------------------
1385 
1386 // This test case inspired by crbug.com/108491
1387 // We create two servers that use the same ListenerThread but have
1388 // SetRestrictDispatchToSameChannel set to true.
1389 // We create clients, then use some specific WaitableEvent wait/signalling to
1390 // ensure that messages get dispatched in a way that causes a deadlock due to
1391 // a nested dispatch and an eligible message in a higher-level dispatch's
1392 // delayed_queue. Specifically, we start with client1 about so send an
1393 // unblocking message to server1, while the shared listener thread for the
1394 // servers server1 and server2 is about to send a non-unblocking message to
1395 // client1. At the same time, client2 will be about to send an unblocking
1396 // message to server2. Server1 will handle the client1->server1 message by
1397 // telling server2 to send a non-unblocking message to client2.
1398 // What should happen is that the send to server2 should find the pending,
1399 // same-context client2->server2 message to dispatch, causing client2 to
1400 // unblock then handle the server2->client2 message, so that the shared
1401 // servers' listener thread can then respond to the client1->server1 message.
1402 // Then client1 can handle the non-unblocking server1->client1 message.
1403 // The old code would end up in a state where the server2->client2 message is
1404 // sent, but the client2->server2 message (which is eligible for dispatch, and
1405 // which is what client2 is waiting for) is stashed in a local delayed_queue
1406 // that has server1's channel context, causing a deadlock.
1407 // WaitableEvents in the events array are used to:
1408 //   event 0: indicate to client1 that server listener is in OnDoServerTask
1409 //   event 1: indicate to client1 that client2 listener is in OnDoClient2Task
1410 //   event 2: indicate to server1 that client2 listener is in OnDoClient2Task
1411 //   event 3: indicate to client2 that server listener is in OnDoServerTask
1412 
1413 class RestrictedDispatchDeadlockServer : public Worker {
1414  public:
RestrictedDispatchDeadlockServer(int server_num,WaitableEvent * server_ready_event,WaitableEvent ** events,RestrictedDispatchDeadlockServer * peer,mojo::ScopedMessagePipeHandle channel_handle)1415   RestrictedDispatchDeadlockServer(int server_num,
1416                                    WaitableEvent* server_ready_event,
1417                                    WaitableEvent** events,
1418                                    RestrictedDispatchDeadlockServer* peer,
1419                                    mojo::ScopedMessagePipeHandle channel_handle)
1420       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
1421         server_num_(server_num),
1422         server_ready_event_(server_ready_event),
1423         events_(events),
1424         peer_(peer) {}
1425 
OnDoServerTask()1426   void OnDoServerTask() {
1427     events_[3]->Signal();
1428     events_[2]->Wait();
1429     events_[0]->Signal();
1430     SendMessageToClient();
1431   }
1432 
Run()1433   void Run() override {
1434     channel()->SetRestrictDispatchChannelGroup(1);
1435     server_ready_event_->Signal();
1436   }
1437 
ListenerThread()1438   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1439 
1440  private:
OnMessageReceived(const Message & message)1441   bool OnMessageReceived(const Message& message) override {
1442     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message)
1443      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1444      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1445     IPC_END_MESSAGE_MAP()
1446     return true;
1447   }
1448 
OnNoArgs()1449   void OnNoArgs() {
1450     if (server_num_ == 1) {
1451       DCHECK(peer_ != NULL);
1452       peer_->SendMessageToClient();
1453     }
1454   }
1455 
SendMessageToClient()1456   void SendMessageToClient() {
1457     Message* msg = new SyncChannelTestMsg_NoArgs;
1458     msg->set_unblock(false);
1459     DCHECK(!msg->should_unblock());
1460     Send(msg);
1461   }
1462 
1463   int server_num_;
1464   WaitableEvent* server_ready_event_;
1465   WaitableEvent** events_;
1466   RestrictedDispatchDeadlockServer* peer_;
1467 };
1468 
1469 class RestrictedDispatchDeadlockClient2 : public Worker {
1470  public:
RestrictedDispatchDeadlockClient2(RestrictedDispatchDeadlockServer * server,WaitableEvent * server_ready_event,WaitableEvent ** events,mojo::ScopedMessagePipeHandle channel_handle)1471   RestrictedDispatchDeadlockClient2(
1472       RestrictedDispatchDeadlockServer* server,
1473       WaitableEvent* server_ready_event,
1474       WaitableEvent** events,
1475       mojo::ScopedMessagePipeHandle channel_handle)
1476       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
1477         server_ready_event_(server_ready_event),
1478         events_(events),
1479         received_msg_(false),
1480         received_noarg_reply_(false),
1481         done_issued_(false) {}
1482 
Run()1483   void Run() override {
1484     server_ready_event_->Wait();
1485   }
1486 
OnDoClient2Task()1487   void OnDoClient2Task() {
1488     events_[3]->Wait();
1489     events_[1]->Signal();
1490     events_[2]->Signal();
1491     DCHECK(received_msg_ == false);
1492 
1493     Message* message = new SyncChannelTestMsg_NoArgs;
1494     message->set_unblock(true);
1495     Send(message);
1496     received_noarg_reply_ = true;
1497   }
1498 
ListenerThread()1499   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1500  private:
OnMessageReceived(const Message & message)1501   bool OnMessageReceived(const Message& message) override {
1502     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message)
1503      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1504     IPC_END_MESSAGE_MAP()
1505     return true;
1506   }
1507 
OnNoArgs()1508   void OnNoArgs() {
1509     received_msg_ = true;
1510     PossiblyDone();
1511   }
1512 
PossiblyDone()1513   void PossiblyDone() {
1514     if (received_noarg_reply_ && received_msg_) {
1515       DCHECK(done_issued_ == false);
1516       done_issued_ = true;
1517       Send(new SyncChannelTestMsg_Done);
1518       Done();
1519     }
1520   }
1521 
1522   WaitableEvent* server_ready_event_;
1523   WaitableEvent** events_;
1524   bool received_msg_;
1525   bool received_noarg_reply_;
1526   bool done_issued_;
1527 };
1528 
1529 class RestrictedDispatchDeadlockClient1 : public Worker {
1530  public:
RestrictedDispatchDeadlockClient1(RestrictedDispatchDeadlockServer * server,RestrictedDispatchDeadlockClient2 * peer,WaitableEvent * server_ready_event,WaitableEvent ** events,mojo::ScopedMessagePipeHandle channel_handle)1531   RestrictedDispatchDeadlockClient1(
1532       RestrictedDispatchDeadlockServer* server,
1533       RestrictedDispatchDeadlockClient2* peer,
1534       WaitableEvent* server_ready_event,
1535       WaitableEvent** events,
1536       mojo::ScopedMessagePipeHandle channel_handle)
1537       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
1538         server_(server),
1539         peer_(peer),
1540         server_ready_event_(server_ready_event),
1541         events_(events),
1542         received_msg_(false),
1543         received_noarg_reply_(false),
1544         done_issued_(false) {}
1545 
Run()1546   void Run() override {
1547     server_ready_event_->Wait();
1548     server_->ListenerThread()->task_runner()->PostTask(
1549         FROM_HERE, base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask,
1550                               base::Unretained(server_)));
1551     peer_->ListenerThread()->task_runner()->PostTask(
1552         FROM_HERE,
1553         base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task,
1554                    base::Unretained(peer_)));
1555     events_[0]->Wait();
1556     events_[1]->Wait();
1557     DCHECK(received_msg_ == false);
1558 
1559     Message* message = new SyncChannelTestMsg_NoArgs;
1560     message->set_unblock(true);
1561     Send(message);
1562     received_noarg_reply_ = true;
1563     PossiblyDone();
1564   }
1565 
1566  private:
OnMessageReceived(const Message & message)1567   bool OnMessageReceived(const Message& message) override {
1568     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message)
1569      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1570     IPC_END_MESSAGE_MAP()
1571     return true;
1572   }
1573 
OnNoArgs()1574   void OnNoArgs() {
1575     received_msg_ = true;
1576     PossiblyDone();
1577   }
1578 
PossiblyDone()1579   void PossiblyDone() {
1580     if (received_noarg_reply_ && received_msg_) {
1581       DCHECK(done_issued_ == false);
1582       done_issued_ = true;
1583       Send(new SyncChannelTestMsg_Done);
1584       Done();
1585     }
1586   }
1587 
1588   RestrictedDispatchDeadlockServer* server_;
1589   RestrictedDispatchDeadlockClient2* peer_;
1590   WaitableEvent* server_ready_event_;
1591   WaitableEvent** events_;
1592   bool received_msg_;
1593   bool received_noarg_reply_;
1594   bool done_issued_;
1595 };
1596 
TEST_F(IPCSyncChannelTest,RestrictedDispatchDeadlock)1597 TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) {
1598   std::vector<Worker*> workers;
1599 
1600   // A shared worker thread so that server1 and server2 run on one thread.
1601   base::Thread worker_thread("RestrictedDispatchDeadlock");
1602   ASSERT_TRUE(worker_thread.Start());
1603 
1604   WaitableEvent server1_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1605                               base::WaitableEvent::InitialState::NOT_SIGNALED);
1606   WaitableEvent server2_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1607                               base::WaitableEvent::InitialState::NOT_SIGNALED);
1608 
1609   WaitableEvent event0(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1610                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1611   WaitableEvent event1(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1612                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1613   WaitableEvent event2(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1614                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1615   WaitableEvent event3(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1616                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1617   WaitableEvent* events[4] = {&event0, &event1, &event2, &event3};
1618 
1619   RestrictedDispatchDeadlockServer* server1;
1620   RestrictedDispatchDeadlockServer* server2;
1621   RestrictedDispatchDeadlockClient1* client1;
1622   RestrictedDispatchDeadlockClient2* client2;
1623 
1624   mojo::MessagePipe pipe1, pipe2;
1625   server2 = new RestrictedDispatchDeadlockServer(
1626       2, &server2_ready, events, NULL, std::move(pipe2.handle0));
1627   server2->OverrideThread(&worker_thread);
1628   workers.push_back(server2);
1629 
1630   client2 = new RestrictedDispatchDeadlockClient2(
1631       server2, &server2_ready, events, std::move(pipe2.handle1));
1632   workers.push_back(client2);
1633 
1634   server1 = new RestrictedDispatchDeadlockServer(
1635       1, &server1_ready, events, server2, std::move(pipe1.handle0));
1636   server1->OverrideThread(&worker_thread);
1637   workers.push_back(server1);
1638 
1639   client1 = new RestrictedDispatchDeadlockClient1(
1640       server1, client2, &server1_ready, events, std::move(pipe1.handle1));
1641   workers.push_back(client1);
1642 
1643   RunTest(workers);
1644 }
1645 
1646 //------------------------------------------------------------------------------
1647 
1648 // This test case inspired by crbug.com/120530
1649 // We create 4 workers that pipe to each other W1->W2->W3->W4->W1 then we send a
1650 // message that recurses through 3, 4 or 5 steps to make sure, say, W1 can
1651 // re-enter when called from W4 while it's sending a message to W2.
1652 // The first worker drives the whole test so it must be treated specially.
1653 
1654 class RestrictedDispatchPipeWorker : public Worker {
1655  public:
RestrictedDispatchPipeWorker(mojo::ScopedMessagePipeHandle channel_handle1,WaitableEvent * event1,mojo::ScopedMessagePipeHandle channel_handle2,WaitableEvent * event2,int group,int * success)1656   RestrictedDispatchPipeWorker(mojo::ScopedMessagePipeHandle channel_handle1,
1657                                WaitableEvent* event1,
1658                                mojo::ScopedMessagePipeHandle channel_handle2,
1659                                WaitableEvent* event2,
1660                                int group,
1661                                int* success)
1662       : Worker(std::move(channel_handle1), Channel::MODE_SERVER),
1663         event1_(event1),
1664         event2_(event2),
1665         other_channel_handle_(std::move(channel_handle2)),
1666         group_(group),
1667         success_(success) {}
1668 
OnPingTTL(int ping,int * ret)1669   void OnPingTTL(int ping, int* ret) {
1670     *ret = 0;
1671     if (!ping)
1672       return;
1673     other_channel_->Send(new SyncChannelTestMsg_PingTTL(ping - 1, ret));
1674     ++*ret;
1675   }
1676 
OnDone()1677   void OnDone() {
1678     if (is_first())
1679       return;
1680     other_channel_->Send(new SyncChannelTestMsg_Done);
1681     other_channel_.reset();
1682     Done();
1683   }
1684 
Run()1685   void Run() override {
1686     channel()->SetRestrictDispatchChannelGroup(group_);
1687     if (is_first())
1688       event1_->Signal();
1689     event2_->Wait();
1690     other_channel_ = SyncChannel::Create(
1691         other_channel_handle_.release(), IPC::Channel::MODE_CLIENT, this,
1692         ipc_thread().task_runner(), base::ThreadTaskRunnerHandle::Get(), true,
1693         shutdown_event());
1694     other_channel_->SetRestrictDispatchChannelGroup(group_);
1695     if (!is_first()) {
1696       event1_->Signal();
1697       return;
1698     }
1699     *success_ = 0;
1700     int value = 0;
1701     OnPingTTL(3, &value);
1702     *success_ += (value == 3);
1703     OnPingTTL(4, &value);
1704     *success_ += (value == 4);
1705     OnPingTTL(5, &value);
1706     *success_ += (value == 5);
1707     other_channel_->Send(new SyncChannelTestMsg_Done);
1708     other_channel_.reset();
1709     Done();
1710   }
1711 
is_first()1712   bool is_first() { return !!success_; }
1713 
1714  private:
OnMessageReceived(const Message & message)1715   bool OnMessageReceived(const Message& message) override {
1716     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchPipeWorker, message)
1717      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
1718      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, OnDone)
1719     IPC_END_MESSAGE_MAP()
1720     return true;
1721   }
1722 
1723   std::unique_ptr<SyncChannel> other_channel_;
1724   WaitableEvent* event1_;
1725   WaitableEvent* event2_;
1726   mojo::ScopedMessagePipeHandle other_channel_handle_;
1727   int group_;
1728   int* success_;
1729 };
1730 
1731 #if defined(OS_ANDROID)
1732 #define MAYBE_RestrictedDispatch4WayDeadlock \
1733   DISABLED_RestrictedDispatch4WayDeadlock
1734 #else
1735 #define MAYBE_RestrictedDispatch4WayDeadlock RestrictedDispatch4WayDeadlock
1736 #endif
TEST_F(IPCSyncChannelTest,MAYBE_RestrictedDispatch4WayDeadlock)1737 TEST_F(IPCSyncChannelTest, MAYBE_RestrictedDispatch4WayDeadlock) {
1738   int success = 0;
1739   std::vector<Worker*> workers;
1740   WaitableEvent event0(base::WaitableEvent::ResetPolicy::MANUAL,
1741                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1742   WaitableEvent event1(base::WaitableEvent::ResetPolicy::MANUAL,
1743                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1744   WaitableEvent event2(base::WaitableEvent::ResetPolicy::MANUAL,
1745                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1746   WaitableEvent event3(base::WaitableEvent::ResetPolicy::MANUAL,
1747                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1748   mojo::MessagePipe pipe0, pipe1, pipe2, pipe3;
1749   workers.push_back(new RestrictedDispatchPipeWorker(
1750       std::move(pipe0.handle0), &event0, std::move(pipe1.handle1), &event1, 1,
1751       &success));
1752   workers.push_back(new RestrictedDispatchPipeWorker(
1753       std::move(pipe1.handle0), &event1, std::move(pipe2.handle1), &event2, 2,
1754       NULL));
1755   workers.push_back(new RestrictedDispatchPipeWorker(
1756       std::move(pipe2.handle0), &event2, std::move(pipe3.handle1), &event3, 3,
1757       NULL));
1758   workers.push_back(new RestrictedDispatchPipeWorker(
1759       std::move(pipe3.handle0), &event3, std::move(pipe0.handle1), &event0, 4,
1760       NULL));
1761   RunTest(workers);
1762   EXPECT_EQ(3, success);
1763 }
1764 
1765 //------------------------------------------------------------------------------
1766 
1767 // This test case inspired by crbug.com/122443
1768 // We want to make sure a reply message with the unblock flag set correctly
1769 // behaves as a reply, not a regular message.
1770 // We have 3 workers. Server1 will send a message to Server2 (which will block),
1771 // during which it will dispatch a message comming from Client, at which point
1772 // it will send another message to Server2. While sending that second message it
1773 // will receive a reply from Server1 with the unblock flag.
1774 
1775 class ReentrantReplyServer1 : public Worker {
1776  public:
ReentrantReplyServer1(WaitableEvent * server_ready,mojo::ScopedMessagePipeHandle channel_handle1,mojo::ScopedMessagePipeHandle channel_handle2)1777   ReentrantReplyServer1(WaitableEvent* server_ready,
1778                         mojo::ScopedMessagePipeHandle channel_handle1,
1779                         mojo::ScopedMessagePipeHandle channel_handle2)
1780       : Worker(std::move(channel_handle1), Channel::MODE_SERVER),
1781         server_ready_(server_ready),
1782         other_channel_handle_(std::move(channel_handle2)) {}
1783 
Run()1784   void Run() override {
1785     server2_channel_ = SyncChannel::Create(
1786         other_channel_handle_.release(), IPC::Channel::MODE_CLIENT, this,
1787         ipc_thread().task_runner(), base::ThreadTaskRunnerHandle::Get(), true,
1788         shutdown_event());
1789     server_ready_->Signal();
1790     Message* msg = new SyncChannelTestMsg_Reentrant1();
1791     server2_channel_->Send(msg);
1792     server2_channel_.reset();
1793     Done();
1794   }
1795 
1796  private:
OnMessageReceived(const Message & message)1797   bool OnMessageReceived(const Message& message) override {
1798     IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer1, message)
1799      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant2, OnReentrant2)
1800      IPC_REPLY_HANDLER(OnReply)
1801     IPC_END_MESSAGE_MAP()
1802     return true;
1803   }
1804 
OnReentrant2()1805   void OnReentrant2() {
1806     Message* msg = new SyncChannelTestMsg_Reentrant3();
1807     server2_channel_->Send(msg);
1808   }
1809 
OnReply(const Message & message)1810   void OnReply(const Message& message) {
1811     // If we get here, the Send() will never receive the reply (thus would
1812     // hang), so abort instead.
1813     LOG(FATAL) << "Reply message was dispatched";
1814   }
1815 
1816   WaitableEvent* server_ready_;
1817   std::unique_ptr<SyncChannel> server2_channel_;
1818   mojo::ScopedMessagePipeHandle other_channel_handle_;
1819 };
1820 
1821 class ReentrantReplyServer2 : public Worker {
1822  public:
ReentrantReplyServer2(mojo::ScopedMessagePipeHandle channel_handle)1823   ReentrantReplyServer2(mojo::ScopedMessagePipeHandle channel_handle)
1824       : Worker(std::move(channel_handle), Channel::MODE_SERVER), reply_(NULL) {}
1825 
1826  private:
OnMessageReceived(const Message & message)1827   bool OnMessageReceived(const Message& message) override {
1828     IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer2, message)
1829      IPC_MESSAGE_HANDLER_DELAY_REPLY(
1830          SyncChannelTestMsg_Reentrant1, OnReentrant1)
1831      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant3, OnReentrant3)
1832     IPC_END_MESSAGE_MAP()
1833     return true;
1834   }
1835 
OnReentrant1(Message * reply)1836   void OnReentrant1(Message* reply) {
1837     DCHECK(!reply_);
1838     reply_ = reply;
1839   }
1840 
OnReentrant3()1841   void OnReentrant3() {
1842     DCHECK(reply_);
1843     Message* reply = reply_;
1844     reply_ = NULL;
1845     reply->set_unblock(true);
1846     Send(reply);
1847     Done();
1848   }
1849 
1850   Message* reply_;
1851 };
1852 
1853 class ReentrantReplyClient : public Worker {
1854  public:
ReentrantReplyClient(WaitableEvent * server_ready,mojo::ScopedMessagePipeHandle channel_handle)1855   ReentrantReplyClient(WaitableEvent* server_ready,
1856                        mojo::ScopedMessagePipeHandle channel_handle)
1857       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
1858         server_ready_(server_ready) {}
1859 
Run()1860   void Run() override {
1861     server_ready_->Wait();
1862     Send(new SyncChannelTestMsg_Reentrant2());
1863     Done();
1864   }
1865 
1866  private:
1867   WaitableEvent* server_ready_;
1868 };
1869 
TEST_F(IPCSyncChannelTest,ReentrantReply)1870 TEST_F(IPCSyncChannelTest, ReentrantReply) {
1871   std::vector<Worker*> workers;
1872   WaitableEvent server_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1873                              base::WaitableEvent::InitialState::NOT_SIGNALED);
1874   mojo::MessagePipe pipe1, pipe2;
1875   workers.push_back(new ReentrantReplyServer2(std::move(pipe2.handle0)));
1876   workers.push_back(new ReentrantReplyServer1(
1877       &server_ready, std::move(pipe1.handle0), std::move(pipe2.handle1)));
1878   workers.push_back(
1879       new ReentrantReplyClient(&server_ready, std::move(pipe1.handle1)));
1880   RunTest(workers);
1881 }
1882 
1883 }  // namespace
1884 }  // namespace IPC
1885