• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_sync_channel.h"
6 
7 #include <stddef.h>
8 #include <stdint.h>
9 
10 #include <utility>
11 
12 #include "base/bind.h"
13 #include "base/lazy_instance.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/macros.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/run_loop.h"
19 #include "base/sequenced_task_runner.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/threading/thread_local.h"
22 #include "base/threading/thread_task_runner_handle.h"
23 #include "base/trace_event/trace_event.h"
24 #include "ipc/ipc_channel_factory.h"
25 #include "ipc/ipc_logging.h"
26 #include "ipc/ipc_message_macros.h"
27 #include "ipc/ipc_sync_message.h"
28 #include "mojo/public/cpp/bindings/sync_event_watcher.h"
29 
30 using base::WaitableEvent;
31 
32 namespace IPC {
33 
34 namespace {
35 
36 // A generic callback used when watching handles synchronously. Sets |*signal|
37 // to true.
OnEventReady(bool * signal)38 void OnEventReady(bool* signal) {
39   *signal = true;
40 }
41 
42 base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky
43     g_pump_messages_event = LAZY_INSTANCE_INITIALIZER;
44 
45 }  // namespace
46 
47 // When we're blocked in a Send(), we need to process incoming synchronous
48 // messages right away because it could be blocking our reply (either
49 // directly from the same object we're calling, or indirectly through one or
50 // more other channels).  That means that in SyncContext's OnMessageReceived,
51 // we need to process sync message right away if we're blocked.  However a
52 // simple check isn't sufficient, because the listener thread can be in the
53 // process of calling Send.
54 // To work around this, when SyncChannel filters a sync message, it sets
55 // an event that the listener thread waits on during its Send() call.  This
56 // allows us to dispatch incoming sync messages when blocked.  The race
57 // condition is handled because if Send is in the process of being called, it
58 // will check the event.  In case the listener thread isn't sending a message,
59 // we queue a task on the listener thread to dispatch the received messages.
60 // The messages are stored in this queue object that's shared among all
61 // SyncChannel objects on the same thread (since one object can receive a
62 // sync message while another one is blocked).
63 
64 class SyncChannel::ReceivedSyncMsgQueue :
65     public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
66  public:
67   // SyncChannel::WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we
68   // may nest waiting message loops arbitrarily deep on the SyncChannel's
69   // thread. Every such operation has a corresponding WaitableEvent to be
70   // watched which, when signalled for IPC completion, breaks out of the loop.
71   // A reference to the innermost (i.e. topmost) watcher is held in
72   // |ReceivedSyncMsgQueue::top_send_done_event_watcher_|.
73   //
74   // NestedSendDoneWatcher provides a simple scoper which is used by
75   // WaitForReplyWithNestedMessageLoop to begin watching a new local "send done"
76   // event, preserving the previous topmost state on the local stack until the
77   // new inner loop is broken. If yet another subsequent nested loop is started
78   // therein the process is repeated again in the new inner stack frame, and so
79   // on.
80   //
81   // When this object is destroyed on stack unwind, the previous topmost state
82   // is swapped back into |ReceivedSyncMsgQueue::top_send_done_event_watcher_|,
83   // and its watch is resumed immediately.
84   class NestedSendDoneWatcher {
85    public:
NestedSendDoneWatcher(SyncChannel::SyncContext * context,base::RunLoop * run_loop,scoped_refptr<base::SequencedTaskRunner> task_runner)86     NestedSendDoneWatcher(SyncChannel::SyncContext* context,
87                           base::RunLoop* run_loop,
88                           scoped_refptr<base::SequencedTaskRunner> task_runner)
89         : sync_msg_queue_(context->received_sync_msgs()),
90           outer_state_(sync_msg_queue_->top_send_done_event_watcher_),
91           event_(context->GetSendDoneEvent()),
92           callback_(
93               base::BindOnce(&SyncChannel::SyncContext::OnSendDoneEventSignaled,
94                              context,
95                              run_loop)),
96           task_runner_(std::move(task_runner)) {
97       sync_msg_queue_->top_send_done_event_watcher_ = this;
98       if (outer_state_)
99         outer_state_->StopWatching();
100       StartWatching();
101     }
102 
~NestedSendDoneWatcher()103     ~NestedSendDoneWatcher() {
104       sync_msg_queue_->top_send_done_event_watcher_ = outer_state_;
105       if (outer_state_)
106         outer_state_->StartWatching();
107     }
108 
109    private:
Run(WaitableEvent * event)110     void Run(WaitableEvent* event) {
111       DCHECK(callback_);
112       std::move(callback_).Run(event);
113     }
114 
StartWatching()115     void StartWatching() {
116       watcher_.StartWatching(
117           event_,
118           base::BindOnce(&NestedSendDoneWatcher::Run, base::Unretained(this)),
119           task_runner_);
120     }
121 
StopWatching()122     void StopWatching() { watcher_.StopWatching(); }
123 
124     ReceivedSyncMsgQueue* const sync_msg_queue_;
125     NestedSendDoneWatcher* const outer_state_;
126 
127     base::WaitableEvent* const event_;
128     base::WaitableEventWatcher::EventCallback callback_;
129     base::WaitableEventWatcher watcher_;
130     scoped_refptr<base::SequencedTaskRunner> task_runner_;
131 
132     DISALLOW_COPY_AND_ASSIGN(NestedSendDoneWatcher);
133   };
134 
135   // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
136   // if necessary.  Call RemoveContext on the same thread when done.
AddContext()137   static ReceivedSyncMsgQueue* AddContext() {
138     // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
139     // SyncChannel objects can block the same thread).
140     ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
141     if (!rv) {
142       rv = new ReceivedSyncMsgQueue();
143       ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
144     }
145     rv->listener_count_++;
146     return rv;
147   }
148 
149   // Prevents messages from being dispatched immediately when the dispatch event
150   // is signaled. Instead, |*dispatch_flag| will be set.
BlockDispatch(bool * dispatch_flag)151   void BlockDispatch(bool* dispatch_flag) { dispatch_flag_ = dispatch_flag; }
152 
153   // Allows messages to be dispatched immediately when the dispatch event is
154   // signaled.
UnblockDispatch()155   void UnblockDispatch() { dispatch_flag_ = nullptr; }
156 
157   // Called on IPC thread when a synchronous message or reply arrives.
QueueMessage(const Message & msg,SyncChannel::SyncContext * context)158   void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
159     bool was_task_pending;
160     {
161       base::AutoLock auto_lock(message_lock_);
162 
163       was_task_pending = task_pending_;
164       task_pending_ = true;
165 
166       // We set the event in case the listener thread is blocked (or is about
167       // to). In case it's not, the PostTask dispatches the messages.
168       message_queue_.push_back(QueuedMessage(new Message(msg), context));
169       message_queue_version_++;
170     }
171 
172     dispatch_event_.Signal();
173     if (!was_task_pending) {
174       listener_task_runner_->PostTask(
175           FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask,
176                                 this, base::RetainedRef(context)));
177     }
178   }
179 
QueueReply(const Message & msg,SyncChannel::SyncContext * context)180   void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
181     received_replies_.push_back(QueuedMessage(new Message(msg), context));
182   }
183 
184   // Called on the listener's thread to process any queues synchronous
185   // messages.
DispatchMessagesTask(SyncContext * context)186   void DispatchMessagesTask(SyncContext* context) {
187     {
188       base::AutoLock auto_lock(message_lock_);
189       task_pending_ = false;
190     }
191     context->DispatchMessages();
192   }
193 
194   // Dispatches any queued incoming sync messages. If |dispatching_context| is
195   // not null, messages which target a restricted dispatch channel will only be
196   // dispatched if |dispatching_context| belongs to the same restricted dispatch
197   // group as that channel. If |dispatching_context| is null, all queued
198   // messages are dispatched.
DispatchMessages(SyncContext * dispatching_context)199   void DispatchMessages(SyncContext* dispatching_context) {
200     bool first_time = true;
201     uint32_t expected_version = 0;
202     SyncMessageQueue::iterator it;
203     while (true) {
204       Message* message = nullptr;
205       scoped_refptr<SyncChannel::SyncContext> context;
206       {
207         base::AutoLock auto_lock(message_lock_);
208         if (first_time || message_queue_version_ != expected_version) {
209           it = message_queue_.begin();
210           first_time = false;
211         }
212         for (; it != message_queue_.end(); it++) {
213           int message_group = it->context->restrict_dispatch_group();
214           if (message_group == kRestrictDispatchGroup_None ||
215               (dispatching_context &&
216                message_group ==
217                    dispatching_context->restrict_dispatch_group())) {
218             message = it->message;
219             context = it->context;
220             it = message_queue_.erase(it);
221             message_queue_version_++;
222             expected_version = message_queue_version_;
223             break;
224           }
225         }
226       }
227 
228       if (message == nullptr)
229         break;
230       context->OnDispatchMessage(*message);
231       delete message;
232     }
233   }
234 
235   // SyncChannel calls this in its destructor.
RemoveContext(SyncContext * context)236   void RemoveContext(SyncContext* context) {
237     base::AutoLock auto_lock(message_lock_);
238 
239     SyncMessageQueue::iterator iter = message_queue_.begin();
240     while (iter != message_queue_.end()) {
241       if (iter->context.get() == context) {
242         delete iter->message;
243         iter = message_queue_.erase(iter);
244         message_queue_version_++;
245       } else {
246         iter++;
247       }
248     }
249 
250     if (--listener_count_ == 0) {
251       DCHECK(lazy_tls_ptr_.Pointer()->Get());
252       lazy_tls_ptr_.Pointer()->Set(nullptr);
253       sync_dispatch_watcher_.reset();
254     }
255   }
256 
dispatch_event()257   base::WaitableEvent* dispatch_event() { return &dispatch_event_; }
listener_task_runner()258   base::SingleThreadTaskRunner* listener_task_runner() {
259     return listener_task_runner_.get();
260   }
261 
262   // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
263   static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>::
264       DestructorAtExit lazy_tls_ptr_;
265 
266   // Called on the ipc thread to check if we can unblock any current Send()
267   // calls based on a queued reply.
DispatchReplies()268   void DispatchReplies() {
269     for (size_t i = 0; i < received_replies_.size(); ++i) {
270       Message* message = received_replies_[i].message;
271       if (received_replies_[i].context->TryToUnblockListener(message)) {
272         delete message;
273         received_replies_.erase(received_replies_.begin() + i);
274         return;
275       }
276     }
277   }
278 
279  private:
280   friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
281 
282   // See the comment in SyncChannel::SyncChannel for why this event is created
283   // as manual reset.
ReceivedSyncMsgQueue()284   ReceivedSyncMsgQueue()
285       : message_queue_version_(0),
286         dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
287                         base::WaitableEvent::InitialState::NOT_SIGNALED),
288         listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
289         sync_dispatch_watcher_(std::make_unique<mojo::SyncEventWatcher>(
290             &dispatch_event_,
291             base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady,
292                        base::Unretained(this)))) {
293     sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
294   }
295 
296   ~ReceivedSyncMsgQueue() = default;
297 
OnDispatchEventReady()298   void OnDispatchEventReady() {
299     if (dispatch_flag_) {
300       *dispatch_flag_ = true;
301       return;
302     }
303 
304     // We were woken up during a sync wait, but no specific SyncChannel is
305     // currently waiting. i.e., some other Mojo interface on this thread is
306     // waiting for a response. Since we don't support anything analogous to
307     // restricted dispatch on Mojo interfaces, in this case it's safe to
308     // dispatch sync messages for any context.
309     DispatchMessages(nullptr);
310   }
311 
312   // Holds information about a queued synchronous message or reply.
313   struct QueuedMessage {
QueuedMessageIPC::SyncChannel::ReceivedSyncMsgQueue::QueuedMessage314     QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
315     Message* message;
316     scoped_refptr<SyncChannel::SyncContext> context;
317   };
318 
319   typedef std::list<QueuedMessage> SyncMessageQueue;
320   SyncMessageQueue message_queue_;
321 
322   // Used to signal DispatchMessages to rescan
323   uint32_t message_queue_version_ = 0;
324 
325   std::vector<QueuedMessage> received_replies_;
326 
327   // Signaled when we get a synchronous message that we must respond to, as the
328   // sender needs its reply before it can reply to our original synchronous
329   // message.
330   base::WaitableEvent dispatch_event_;
331   scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
332   base::Lock message_lock_;
333   bool task_pending_ = false;
334   int listener_count_ = 0;
335 
336   // The current NestedSendDoneWatcher for this thread, if we're currently
337   // in a SyncChannel::WaitForReplyWithNestedMessageLoop. See
338   // NestedSendDoneWatcher comments for more details.
339   NestedSendDoneWatcher* top_send_done_event_watcher_ = nullptr;
340 
341   // If not null, the address of a flag to set when the dispatch event signals,
342   // in lieu of actually dispatching messages. This is used by
343   // SyncChannel::WaitForReply to restrict the scope of queued messages we're
344   // allowed to process while it's waiting.
345   bool* dispatch_flag_ = nullptr;
346 
347   // Watches |dispatch_event_| during all sync handle watches on this thread.
348   std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
349 };
350 
351 base::LazyInstance<base::ThreadLocalPointer<
352     SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit
353     SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
354         LAZY_INSTANCE_INITIALIZER;
355 
SyncContext(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)356 SyncChannel::SyncContext::SyncContext(
357     Listener* listener,
358     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
359     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
360     WaitableEvent* shutdown_event)
361     : ChannelProxy::Context(listener, ipc_task_runner, listener_task_runner),
362       received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
363       shutdown_event_(shutdown_event),
364       restrict_dispatch_group_(kRestrictDispatchGroup_None) {}
365 
OnSendDoneEventSignaled(base::RunLoop * nested_loop,base::WaitableEvent * event)366 void SyncChannel::SyncContext::OnSendDoneEventSignaled(
367     base::RunLoop* nested_loop,
368     base::WaitableEvent* event) {
369   DCHECK_EQ(GetSendDoneEvent(), event);
370   nested_loop->Quit();
371 }
372 
~SyncContext()373 SyncChannel::SyncContext::~SyncContext() {
374   while (!deserializers_.empty())
375     Pop();
376 }
377 
378 // Adds information about an outgoing sync message to the context so that
379 // we know how to deserialize the reply. Returns |true| if the message was added
380 // to the context or |false| if it was rejected (e.g. due to shutdown.)
Push(SyncMessage * sync_msg)381 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
382   // Create the tracking information for this message. This object is stored
383   // by value since all members are pointers that are cheap to copy. These
384   // pointers are cleaned up in the Pop() function.
385   //
386   // The event is created as manual reset because in between Signal and
387   // OnObjectSignalled, another Send can happen which would stop the watcher
388   // from being called.  The event would get watched later, when the nested
389   // Send completes, so the event will need to remain set.
390   base::AutoLock auto_lock(deserializers_lock_);
391   if (reject_new_deserializers_)
392     return false;
393   PendingSyncMsg pending(
394       SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
395       new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL,
396                               base::WaitableEvent::InitialState::NOT_SIGNALED));
397   deserializers_.push_back(pending);
398   return true;
399 }
400 
Pop()401 bool SyncChannel::SyncContext::Pop() {
402   bool result;
403   {
404     base::AutoLock auto_lock(deserializers_lock_);
405     PendingSyncMsg msg = deserializers_.back();
406     delete msg.deserializer;
407     delete msg.done_event;
408     msg.done_event = nullptr;
409     deserializers_.pop_back();
410     result = msg.send_result;
411   }
412 
413   // We got a reply to a synchronous Send() call that's blocking the listener
414   // thread.  However, further down the call stack there could be another
415   // blocking Send() call, whose reply we received after we made this last
416   // Send() call.  So check if we have any queued replies available that
417   // can now unblock the listener thread.
418   ipc_task_runner()->PostTask(
419       FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
420                             received_sync_msgs_));
421 
422   return result;
423 }
424 
GetSendDoneEvent()425 base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
426   base::AutoLock auto_lock(deserializers_lock_);
427   return deserializers_.back().done_event;
428 }
429 
GetDispatchEvent()430 base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
431   return received_sync_msgs_->dispatch_event();
432 }
433 
DispatchMessages()434 void SyncChannel::SyncContext::DispatchMessages() {
435   received_sync_msgs_->DispatchMessages(this);
436 }
437 
TryToUnblockListener(const Message * msg)438 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
439   base::AutoLock auto_lock(deserializers_lock_);
440   if (deserializers_.empty() ||
441       !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
442     return false;
443   }
444 
445   if (!msg->is_reply_error()) {
446     bool send_result = deserializers_.back().deserializer->
447         SerializeOutputParameters(*msg);
448     deserializers_.back().send_result = send_result;
449     DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
450   } else {
451     DVLOG(1) << "Received error reply";
452   }
453 
454   base::WaitableEvent* done_event = deserializers_.back().done_event;
455   TRACE_EVENT_FLOW_BEGIN0(
456       TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
457       "SyncChannel::SyncContext::TryToUnblockListener", done_event);
458 
459   done_event->Signal();
460 
461   return true;
462 }
463 
Clear()464 void SyncChannel::SyncContext::Clear() {
465   CancelPendingSends();
466   received_sync_msgs_->RemoveContext(this);
467   Context::Clear();
468 }
469 
OnMessageReceived(const Message & msg)470 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
471   // Give the filters a chance at processing this message.
472   if (TryFilters(msg))
473     return true;
474 
475   if (TryToUnblockListener(&msg))
476     return true;
477 
478   if (msg.is_reply()) {
479     received_sync_msgs_->QueueReply(msg, this);
480     return true;
481   }
482 
483   if (msg.should_unblock()) {
484     received_sync_msgs_->QueueMessage(msg, this);
485     return true;
486   }
487 
488   return Context::OnMessageReceivedNoFilter(msg);
489 }
490 
OnChannelError()491 void SyncChannel::SyncContext::OnChannelError() {
492   CancelPendingSends();
493   shutdown_watcher_.StopWatching();
494   Context::OnChannelError();
495 }
496 
OnChannelOpened()497 void SyncChannel::SyncContext::OnChannelOpened() {
498   shutdown_watcher_.StartWatching(
499       shutdown_event_,
500       base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled,
501                  base::Unretained(this)),
502       base::SequencedTaskRunnerHandle::Get());
503   Context::OnChannelOpened();
504 }
505 
OnChannelClosed()506 void SyncChannel::SyncContext::OnChannelClosed() {
507   CancelPendingSends();
508   shutdown_watcher_.StopWatching();
509   Context::OnChannelClosed();
510 }
511 
CancelPendingSends()512 void SyncChannel::SyncContext::CancelPendingSends() {
513   base::AutoLock auto_lock(deserializers_lock_);
514   reject_new_deserializers_ = true;
515   PendingSyncMessageQueue::iterator iter;
516   DVLOG(1) << "Canceling pending sends";
517   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
518     TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
519                             "SyncChannel::SyncContext::CancelPendingSends",
520                             iter->done_event);
521     iter->done_event->Signal();
522   }
523 }
524 
OnShutdownEventSignaled(WaitableEvent * event)525 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) {
526   DCHECK_EQ(event, shutdown_event_);
527 
528   // Process shut down before we can get a reply to a synchronous message.
529   // Cancel pending Send calls, which will end up setting the send done event.
530   CancelPendingSends();
531 }
532 
533 // static
Create(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,bool create_pipe_now,base::WaitableEvent * shutdown_event)534 std::unique_ptr<SyncChannel> SyncChannel::Create(
535     const IPC::ChannelHandle& channel_handle,
536     Channel::Mode mode,
537     Listener* listener,
538     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
539     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
540     bool create_pipe_now,
541     base::WaitableEvent* shutdown_event) {
542   std::unique_ptr<SyncChannel> channel =
543       Create(listener, ipc_task_runner, listener_task_runner, shutdown_event);
544   channel->Init(channel_handle, mode, create_pipe_now);
545   return channel;
546 }
547 
548 // static
Create(std::unique_ptr<ChannelFactory> factory,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,bool create_pipe_now,base::WaitableEvent * shutdown_event)549 std::unique_ptr<SyncChannel> SyncChannel::Create(
550     std::unique_ptr<ChannelFactory> factory,
551     Listener* listener,
552     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
553     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
554     bool create_pipe_now,
555     base::WaitableEvent* shutdown_event) {
556   std::unique_ptr<SyncChannel> channel =
557       Create(listener, ipc_task_runner, listener_task_runner, shutdown_event);
558   channel->Init(std::move(factory), create_pipe_now);
559   return channel;
560 }
561 
562 // static
Create(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)563 std::unique_ptr<SyncChannel> SyncChannel::Create(
564     Listener* listener,
565     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
566     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
567     WaitableEvent* shutdown_event) {
568   return base::WrapUnique(new SyncChannel(
569       listener, ipc_task_runner, listener_task_runner, shutdown_event));
570 }
571 
SyncChannel(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)572 SyncChannel::SyncChannel(
573     Listener* listener,
574     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
575     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
576     WaitableEvent* shutdown_event)
577     : ChannelProxy(new SyncContext(listener,
578                                    ipc_task_runner,
579                                    listener_task_runner,
580                                    shutdown_event)),
581       sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
582   // The current (listener) thread must be distinct from the IPC thread, or else
583   // sending synchronous messages will deadlock.
584   DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
585   StartWatching();
586 }
587 
588 SyncChannel::~SyncChannel() = default;
589 
SetRestrictDispatchChannelGroup(int group)590 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
591   sync_context()->set_restrict_dispatch_group(group);
592 }
593 
CreateSyncMessageFilter()594 scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() {
595   scoped_refptr<SyncMessageFilter> filter = new SyncMessageFilter(
596       sync_context()->shutdown_event());
597   AddFilter(filter.get());
598   if (!did_init())
599     pre_init_sync_message_filters_.push_back(filter);
600   return filter;
601 }
602 
Send(Message * message)603 bool SyncChannel::Send(Message* message) {
604 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
605   std::string name;
606   Logging::GetInstance()->GetMessageText(
607       message->type(), &name, message, nullptr);
608   TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name);
609 #else
610   TRACE_EVENT2("ipc", "SyncChannel::Send",
611                "class", IPC_MESSAGE_ID_CLASS(message->type()),
612                "line", IPC_MESSAGE_ID_LINE(message->type()));
613 #endif
614   if (!message->is_sync()) {
615     ChannelProxy::SendInternal(message);
616     return true;
617   }
618 
619   SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
620   bool pump_messages = sync_msg->ShouldPumpMessages();
621 
622   // *this* might get deleted in WaitForReply.
623   scoped_refptr<SyncContext> context(sync_context());
624   if (!context->Push(sync_msg)) {
625     DVLOG(1) << "Channel is shutting down. Dropping sync message.";
626     delete message;
627     return false;
628   }
629 
630   ChannelProxy::SendInternal(message);
631 
632   // Wait for reply, or for any other incoming synchronous messages.
633   // |this| might get deleted, so only call static functions at this point.
634   scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_;
635   WaitForReply(registry.get(), context.get(), pump_messages);
636 
637   TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
638                         "SyncChannel::Send", context->GetSendDoneEvent());
639 
640   return context->Pop();
641 }
642 
WaitForReply(mojo::SyncHandleRegistry * registry,SyncContext * context,bool pump_messages)643 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
644                                SyncContext* context,
645                                bool pump_messages) {
646   context->DispatchMessages();
647 
648   base::WaitableEvent* pump_messages_event = nullptr;
649   if (pump_messages) {
650     if (!g_pump_messages_event.Get()) {
651       g_pump_messages_event.Get() = std::make_unique<base::WaitableEvent>(
652           base::WaitableEvent::ResetPolicy::MANUAL,
653           base::WaitableEvent::InitialState::SIGNALED);
654     }
655     pump_messages_event = g_pump_messages_event.Get().get();
656   }
657 
658   while (true) {
659     bool dispatch = false;
660     bool send_done = false;
661     bool should_pump_messages = false;
662     base::Closure on_send_done_callback = base::Bind(&OnEventReady, &send_done);
663     registry->RegisterEvent(context->GetSendDoneEvent(), on_send_done_callback);
664 
665     base::Closure on_pump_messages_callback;
666     if (pump_messages_event) {
667       on_pump_messages_callback =
668           base::Bind(&OnEventReady, &should_pump_messages);
669       registry->RegisterEvent(pump_messages_event, on_pump_messages_callback);
670     }
671 
672     const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
673     context->received_sync_msgs()->BlockDispatch(&dispatch);
674     registry->Wait(stop_flags, 3);
675     context->received_sync_msgs()->UnblockDispatch();
676 
677     registry->UnregisterEvent(context->GetSendDoneEvent(),
678                               on_send_done_callback);
679     if (pump_messages_event)
680       registry->UnregisterEvent(pump_messages_event, on_pump_messages_callback);
681 
682     if (dispatch) {
683       // We're waiting for a reply, but we received a blocking synchronous call.
684       // We must process it to avoid potential deadlocks.
685       context->GetDispatchEvent()->Reset();
686       context->DispatchMessages();
687       continue;
688     }
689 
690     if (should_pump_messages)
691       WaitForReplyWithNestedMessageLoop(context);  // Run a nested run loop.
692 
693     break;
694   }
695 }
696 
WaitForReplyWithNestedMessageLoop(SyncContext * context)697 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
698   base::RunLoop nested_loop(base::RunLoop::Type::kNestableTasksAllowed);
699   ReceivedSyncMsgQueue::NestedSendDoneWatcher watcher(
700       context, &nested_loop, context->listener_task_runner());
701   nested_loop.Run();
702 }
703 
OnDispatchEventSignaled(base::WaitableEvent * event)704 void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
705   DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
706   sync_context()->GetDispatchEvent()->Reset();
707 
708   StartWatching();
709 
710   // NOTE: May delete |this|.
711   sync_context()->DispatchMessages();
712 }
713 
StartWatching()714 void SyncChannel::StartWatching() {
715   // |dispatch_watcher_| watches the event asynchronously, only dispatching
716   // messages once the listener thread is unblocked and pumping its task queue.
717   // The ReceivedSyncMsgQueue also watches this event and may dispatch
718   // immediately if woken up by a message which it's allowed to dispatch.
719   dispatch_watcher_.StartWatching(
720       sync_context()->GetDispatchEvent(),
721       base::BindOnce(&SyncChannel::OnDispatchEventSignaled,
722                      base::Unretained(this)),
723       sync_context()->listener_task_runner());
724 }
725 
OnChannelInit()726 void SyncChannel::OnChannelInit() {
727   pre_init_sync_message_filters_.clear();
728 }
729 
730 }  // namespace IPC
731