• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_sync_channel.h"
6 
7 #include <stddef.h>
8 #include <stdint.h>
9 
10 #include <memory>
11 #include <utility>
12 
13 #include "base/functional/bind.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/memory/ptr_util.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/run_loop.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/task/sequenced_task_runner.h"
21 #include "base/task/single_thread_task_runner.h"
22 #include "base/trace_event/trace_event.h"
23 #include "build/build_config.h"
24 #include "ipc/ipc_channel_factory.h"
25 #include "ipc/ipc_logging.h"
26 #include "ipc/ipc_message.h"
27 #include "ipc/ipc_message_macros.h"
28 #include "ipc/ipc_sync_message.h"
29 #include "mojo/public/cpp/bindings/sync_event_watcher.h"
30 
31 #if !BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
32 #include "ipc/trace_ipc_message.h"
33 #endif
34 
35 using base::WaitableEvent;
36 
37 namespace IPC {
38 
39 namespace {
40 
41 // A generic callback used when watching handles synchronously. Sets |*signal|
42 // to true.
OnEventReady(bool * signal)43 void OnEventReady(bool* signal) {
44   *signal = true;
45 }
46 
47 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
48 constinit thread_local SyncChannel::ReceivedSyncMsgQueue* received_queue =
49     nullptr;
50 
51 }  // namespace
52 
53 // When we're blocked in a Send(), we need to process incoming synchronous
54 // messages right away because it could be blocking our reply (either
55 // directly from the same object we're calling, or indirectly through one or
56 // more other channels).  That means that in SyncContext's OnMessageReceived,
57 // we need to process sync message right away if we're blocked.  However a
58 // simple check isn't sufficient, because the listener thread can be in the
59 // process of calling Send.
60 // To work around this, when SyncChannel filters a sync message, it sets
61 // an event that the listener thread waits on during its Send() call.  This
62 // allows us to dispatch incoming sync messages when blocked.  The race
63 // condition is handled because if Send is in the process of being called, it
64 // will check the event.  In case the listener thread isn't sending a message,
65 // we queue a task on the listener thread to dispatch the received messages.
66 // The messages are stored in this queue object that's shared among all
67 // SyncChannel objects on the same thread (since one object can receive a
68 // sync message while another one is blocked).
69 
70 class SyncChannel::ReceivedSyncMsgQueue :
71     public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
72  public:
73   // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
74   // if necessary.  Call RemoveContext on the same thread when done.
AddContext()75   static ReceivedSyncMsgQueue* AddContext() {
76     // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
77     // SyncChannel objects can block the same thread).
78     if (!received_queue) {
79       received_queue = new ReceivedSyncMsgQueue();
80     }
81     ++received_queue->listener_count_;
82     return received_queue;
83   }
84 
85   // Prevents messages from being dispatched immediately when the dispatch event
86   // is signaled. Instead, |*dispatch_flag| will be set.
BlockDispatch(bool * dispatch_flag)87   void BlockDispatch(bool* dispatch_flag) { dispatch_flag_ = dispatch_flag; }
88 
89   // Allows messages to be dispatched immediately when the dispatch event is
90   // signaled.
UnblockDispatch()91   void UnblockDispatch() { dispatch_flag_ = nullptr; }
92 
93   // Called on IPC thread when a synchronous message or reply arrives.
QueueMessage(const Message & msg,SyncChannel::SyncContext * context)94   void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
95     bool was_task_pending;
96     {
97       base::AutoLock auto_lock(message_lock_);
98 
99       was_task_pending = task_pending_;
100       task_pending_ = true;
101 
102       // We set the event in case the listener thread is blocked (or is about
103       // to). In case it's not, the PostTask dispatches the messages.
104       message_queue_.push_back({std::make_unique<Message>(msg), context});
105       message_queue_version_++;
106     }
107 
108     dispatch_event_.Signal();
109     if (!was_task_pending) {
110       listener_task_runner_->PostTask(
111           FROM_HERE, base::BindOnce(&ReceivedSyncMsgQueue::DispatchMessagesTask,
112                                     this, base::RetainedRef(context)));
113     }
114   }
115 
QueueReply(const Message & msg,SyncChannel::SyncContext * context)116   void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
117     received_replies_.push_back({std::make_unique<Message>(msg), context});
118   }
119 
120   // Called on the listener's thread to process any queues synchronous
121   // messages.
DispatchMessagesTask(SyncContext * context)122   void DispatchMessagesTask(SyncContext* context) {
123     {
124       base::AutoLock auto_lock(message_lock_);
125       task_pending_ = false;
126     }
127     context->DispatchMessages();
128   }
129 
130   // Dispatches any queued incoming sync messages. If |dispatching_context| is
131   // not null, messages which target a restricted dispatch channel will only be
132   // dispatched if |dispatching_context| belongs to the same restricted dispatch
133   // group as that channel. If |dispatching_context| is null, all queued
134   // messages are dispatched.
DispatchMessages(SyncContext * dispatching_context)135   void DispatchMessages(SyncContext* dispatching_context) {
136     bool first_time = true;
137     uint32_t expected_version = 0;
138     SyncMessageQueue::iterator it;
139     while (true) {
140       std::unique_ptr<Message> message;
141       scoped_refptr<SyncChannel::SyncContext> context;
142       {
143         base::AutoLock auto_lock(message_lock_);
144         if (first_time || message_queue_version_ != expected_version) {
145           it = message_queue_.begin();
146           first_time = false;
147         }
148         for (; it != message_queue_.end(); it++) {
149           int message_group = it->context->restrict_dispatch_group();
150           if (message_group == kRestrictDispatchGroup_None ||
151               (dispatching_context &&
152                message_group ==
153                    dispatching_context->restrict_dispatch_group())) {
154             message = std::move(it->message);
155             context = std::move(it->context);
156             it = message_queue_.erase(it);
157             message_queue_version_++;
158             expected_version = message_queue_version_;
159             break;
160           }
161         }
162       }
163       if (!message) {
164         break;
165       }
166       context->OnDispatchMessage(*message);
167     }
168   }
169 
170   // SyncChannel calls this in its destructor.
RemoveContext(SyncContext * context)171   void RemoveContext(SyncContext* context) {
172     base::AutoLock auto_lock(message_lock_);
173 
174     SyncMessageQueue::iterator iter = message_queue_.begin();
175     while (iter != message_queue_.end()) {
176       if (iter->context.get() == context) {
177         iter = message_queue_.erase(iter);
178         message_queue_version_++;
179       } else {
180         iter++;
181       }
182     }
183 
184     if (--listener_count_ == 0) {
185       DCHECK(received_queue);
186       received_queue = nullptr;
187       sync_dispatch_watcher_.reset();
188     }
189   }
190 
dispatch_event()191   base::WaitableEvent* dispatch_event() { return &dispatch_event_; }
listener_task_runner()192   base::SingleThreadTaskRunner* listener_task_runner() {
193     return listener_task_runner_.get();
194   }
195 
196   // Called on the ipc thread to check if we can unblock any current Send()
197   // calls based on a queued reply.
DispatchReplies()198   void DispatchReplies() {
199     for (size_t i = 0; i < received_replies_.size(); ++i) {
200       Message* message = received_replies_[i].message.get();
201       if (received_replies_[i].context->TryToUnblockListener(message)) {
202         received_replies_.erase(received_replies_.begin() + i);
203         return;
204       }
205     }
206   }
207 
208  private:
209   friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
210 
211   // See the comment in SyncChannel::SyncChannel for why this event is created
212   // as manual reset.
ReceivedSyncMsgQueue()213   ReceivedSyncMsgQueue()
214       : message_queue_version_(0),
215         dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
216                         base::WaitableEvent::InitialState::NOT_SIGNALED),
217         listener_task_runner_(
218             base::SingleThreadTaskRunner::GetCurrentDefault()),
219         sync_dispatch_watcher_(std::make_unique<mojo::SyncEventWatcher>(
220             &dispatch_event_,
221             base::BindRepeating(&ReceivedSyncMsgQueue::OnDispatchEventReady,
222                                 base::Unretained(this)))) {
223     sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
224   }
225 
226   ~ReceivedSyncMsgQueue() = default;
227 
OnDispatchEventReady()228   void OnDispatchEventReady() {
229     if (dispatch_flag_) {
230       *dispatch_flag_ = true;
231       return;
232     }
233 
234     // We were woken up during a sync wait, but no specific SyncChannel is
235     // currently waiting. i.e., some other Mojo interface on this thread is
236     // waiting for a response. Since we don't support anything analogous to
237     // restricted dispatch on Mojo interfaces, in this case it's safe to
238     // dispatch sync messages for any context.
239     DispatchMessages(nullptr);
240   }
241 
242   // Holds information about a queued synchronous message or reply.
243   struct QueuedMessage {
244     std::unique_ptr<Message> message;
245     scoped_refptr<SyncChannel::SyncContext> context;
246   };
247 
248   typedef std::list<QueuedMessage> SyncMessageQueue;
249   SyncMessageQueue message_queue_;
250 
251   // Used to signal DispatchMessages to rescan
252   uint32_t message_queue_version_ = 0;
253 
254   std::vector<QueuedMessage> received_replies_;
255 
256   // Signaled when we get a synchronous message that we must respond to, as the
257   // sender needs its reply before it can reply to our original synchronous
258   // message.
259   base::WaitableEvent dispatch_event_;
260   scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
261   base::Lock message_lock_;
262   bool task_pending_ = false;
263   int listener_count_ = 0;
264 
265   // If not null, the address of a flag to set when the dispatch event signals,
266   // in lieu of actually dispatching messages. This is used by
267   // SyncChannel::WaitForReply to restrict the scope of queued messages we're
268   // allowed to process while it's waiting.
269   raw_ptr<bool> dispatch_flag_ = nullptr;
270 
271   // Watches |dispatch_event_| during all sync handle watches on this thread.
272   std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
273 };
274 
SyncContext(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)275 SyncChannel::SyncContext::SyncContext(
276     Listener* listener,
277     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
278     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
279     WaitableEvent* shutdown_event)
280     : ChannelProxy::Context(listener, ipc_task_runner, listener_task_runner),
281       received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
282       shutdown_event_(shutdown_event),
283       restrict_dispatch_group_(kRestrictDispatchGroup_None) {}
284 
OnSendDoneEventSignaled(base::RunLoop * nested_loop,base::WaitableEvent * event)285 void SyncChannel::SyncContext::OnSendDoneEventSignaled(
286     base::RunLoop* nested_loop,
287     base::WaitableEvent* event) {
288   DCHECK_EQ(GetSendDoneEvent(), event);
289   nested_loop->Quit();
290 }
291 
~SyncContext()292 SyncChannel::SyncContext::~SyncContext() {
293   while (!deserializers_.empty())
294     Pop();
295 }
296 
297 // Adds information about an outgoing sync message to the context so that
298 // we know how to deserialize the reply. Returns |true| if the message was added
299 // to the context or |false| if it was rejected (e.g. due to shutdown.)
Push(SyncMessage * sync_msg)300 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
301   // Create the tracking information for this message. This object is stored
302   // by value since all members are pointers that are cheap to copy. These
303   // pointers are cleaned up in the Pop() function.
304   //
305   // The event is created as manual reset because in between Signal and
306   // OnObjectSignalled, another Send can happen which would stop the watcher
307   // from being called.  The event would get watched later, when the nested
308   // Send completes, so the event will need to remain set.
309   base::AutoLock auto_lock(deserializers_lock_);
310   if (reject_new_deserializers_)
311     return false;
312 
313   PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
314                          sync_msg->TakeReplyDeserializer(),
315                          std::make_unique<base::WaitableEvent>(
316                              base::WaitableEvent::ResetPolicy::MANUAL,
317                              base::WaitableEvent::InitialState::NOT_SIGNALED));
318   deserializers_.push_back(std::move(pending));
319   return true;
320 }
321 
Pop()322 bool SyncChannel::SyncContext::Pop() {
323   bool result;
324   {
325     base::AutoLock auto_lock(deserializers_lock_);
326     result = deserializers_.back().send_result;
327     deserializers_.pop_back();
328   }
329 
330   // We got a reply to a synchronous Send() call that's blocking the listener
331   // thread.  However, further down the call stack there could be another
332   // blocking Send() call, whose reply we received after we made this last
333   // Send() call.  So check if we have any queued replies available that
334   // can now unblock the listener thread.
335   ipc_task_runner()->PostTask(
336       FROM_HERE, base::BindOnce(&ReceivedSyncMsgQueue::DispatchReplies,
337                                 received_sync_msgs_));
338 
339   return result;
340 }
341 
GetSendDoneEvent()342 base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
343   base::AutoLock auto_lock(deserializers_lock_);
344   return deserializers_.back().done_event.get();
345 }
346 
GetDispatchEvent()347 base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
348   return received_sync_msgs_->dispatch_event();
349 }
350 
DispatchMessages()351 void SyncChannel::SyncContext::DispatchMessages() {
352   received_sync_msgs_->DispatchMessages(this);
353 }
354 
TryToUnblockListener(const Message * msg)355 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
356   base::AutoLock auto_lock(deserializers_lock_);
357   if (deserializers_.empty() ||
358       !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
359     return false;
360   }
361 
362   if (!msg->is_reply_error()) {
363     bool send_result = deserializers_.back().deserializer->
364         SerializeOutputParameters(*msg);
365     deserializers_.back().send_result = send_result;
366     DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
367   } else {
368     DVLOG(1) << "Received error reply";
369   }
370 
371   base::WaitableEvent* done_event = deserializers_.back().done_event.get();
372   TRACE_EVENT_WITH_FLOW0("toplevel.flow",
373                          "SyncChannel::SyncContext::TryToUnblockListener",
374                          done_event, TRACE_EVENT_FLAG_FLOW_OUT);
375 
376   done_event->Signal();
377 
378   return true;
379 }
380 
Clear()381 void SyncChannel::SyncContext::Clear() {
382   CancelPendingSends();
383   received_sync_msgs_->RemoveContext(this);
384   Context::Clear();
385 }
386 
OnMessageReceived(const Message & msg)387 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
388   // Give the filters a chance at processing this message.
389   if (TryFilters(msg))
390     return true;
391 
392   if (TryToUnblockListener(&msg))
393     return true;
394 
395   if (msg.is_reply()) {
396     received_sync_msgs_->QueueReply(msg, this);
397     return true;
398   }
399 
400   if (msg.should_unblock()) {
401     received_sync_msgs_->QueueMessage(msg, this);
402     return true;
403   }
404 
405   return Context::OnMessageReceivedNoFilter(msg);
406 }
407 
OnChannelError()408 void SyncChannel::SyncContext::OnChannelError() {
409   CancelPendingSends();
410   shutdown_watcher_.StopWatching();
411   Context::OnChannelError();
412 }
413 
OnChannelOpened()414 void SyncChannel::SyncContext::OnChannelOpened() {
415   if (shutdown_event_) {
416     shutdown_watcher_.StartWatching(
417         shutdown_event_,
418         base::BindOnce(&SyncChannel::SyncContext::OnShutdownEventSignaled,
419                        base::Unretained(this)),
420         base::SequencedTaskRunner::GetCurrentDefault());
421   }
422   Context::OnChannelOpened();
423 }
424 
OnChannelClosed()425 void SyncChannel::SyncContext::OnChannelClosed() {
426   CancelPendingSends();
427   shutdown_watcher_.StopWatching();
428   Context::OnChannelClosed();
429 }
430 
CancelPendingSends()431 void SyncChannel::SyncContext::CancelPendingSends() {
432   base::AutoLock auto_lock(deserializers_lock_);
433   reject_new_deserializers_ = true;
434   PendingSyncMessageQueue::iterator iter;
435   DVLOG(1) << "Canceling pending sends";
436   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
437     TRACE_EVENT_WITH_FLOW0("toplevel.flow",
438                            "SyncChannel::SyncContext::CancelPendingSends",
439                            iter->done_event.get(), TRACE_EVENT_FLAG_FLOW_OUT);
440     iter->done_event->Signal();
441   }
442 }
443 
OnShutdownEventSignaled(WaitableEvent * event)444 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) {
445   DCHECK_EQ(event, shutdown_event_);
446 
447   // Process shut down before we can get a reply to a synchronous message.
448   // Cancel pending Send calls, which will end up setting the send done event.
449   CancelPendingSends();
450 }
451 
452 // static
Create(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,bool create_pipe_now,base::WaitableEvent * shutdown_event)453 std::unique_ptr<SyncChannel> SyncChannel::Create(
454     const IPC::ChannelHandle& channel_handle,
455     Channel::Mode mode,
456     Listener* listener,
457     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
458     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
459     bool create_pipe_now,
460     base::WaitableEvent* shutdown_event) {
461   // TODO(tobiasjs): The shutdown_event object is passed to a refcounted
462   // Context object, and as a result it is not easy to ensure that it
463   // outlives the Context.  There should be some way to either reset
464   // the shutdown_event when it is destroyed, or allow the Context to
465   // control the lifetime of shutdown_event.
466   std::unique_ptr<SyncChannel> channel =
467       Create(listener, ipc_task_runner, listener_task_runner, shutdown_event);
468   channel->Init(channel_handle, mode, create_pipe_now);
469   return channel;
470 }
471 
472 // static
Create(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)473 std::unique_ptr<SyncChannel> SyncChannel::Create(
474     Listener* listener,
475     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
476     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
477     WaitableEvent* shutdown_event) {
478   return base::WrapUnique(new SyncChannel(
479       listener, ipc_task_runner, listener_task_runner, shutdown_event));
480 }
481 
SyncChannel(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)482 SyncChannel::SyncChannel(
483     Listener* listener,
484     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
485     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
486     WaitableEvent* shutdown_event)
487     : ChannelProxy(new SyncContext(listener,
488                                    ipc_task_runner,
489                                    listener_task_runner,
490                                    shutdown_event)),
491       sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
492   // The current (listener) thread must be distinct from the IPC thread, or else
493   // sending synchronous messages will deadlock.
494   DCHECK_NE(ipc_task_runner.get(),
495             base::SingleThreadTaskRunner::GetCurrentDefault().get());
496   StartWatching();
497 }
498 
AddListenerTaskRunner(int32_t routing_id,scoped_refptr<base::SingleThreadTaskRunner> task_runner)499 void SyncChannel::AddListenerTaskRunner(
500     int32_t routing_id,
501     scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
502   context()->AddListenerTaskRunner(routing_id, std::move(task_runner));
503 }
504 
RemoveListenerTaskRunner(int32_t routing_id)505 void SyncChannel::RemoveListenerTaskRunner(int32_t routing_id) {
506   context()->RemoveListenerTaskRunner(routing_id);
507 }
508 
509 SyncChannel::~SyncChannel() = default;
510 
SetRestrictDispatchChannelGroup(int group)511 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
512   sync_context()->set_restrict_dispatch_group(group);
513 }
514 
CreateSyncMessageFilter()515 scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() {
516   scoped_refptr<SyncMessageFilter> filter = new SyncMessageFilter(
517       sync_context()->shutdown_event());
518   AddFilter(filter.get());
519   if (!did_init())
520     pre_init_sync_message_filters_.push_back(filter);
521   return filter;
522 }
523 
Send(Message * message)524 bool SyncChannel::Send(Message* message) {
525 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
526   std::string name;
527   Logging::GetInstance()->GetMessageText(
528       message->type(), &name, message, nullptr);
529   TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name);
530 #else
531   TRACE_IPC_MESSAGE_SEND("ipc", "SyncChannel::Send", message);
532 #endif
533   if (!message->is_sync()) {
534     ChannelProxy::SendInternal(message);
535     return true;
536   }
537 
538   SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
539 
540   // *this* might get deleted in WaitForReply.
541   scoped_refptr<SyncContext> context(sync_context());
542   if (!context->Push(sync_msg)) {
543     DVLOG(1) << "Channel is shutting down. Dropping sync message.";
544     delete message;
545     return false;
546   }
547 
548   ChannelProxy::SendInternal(message);
549 
550   // Wait for reply, or for any other incoming synchronous messages.
551   // |this| might get deleted, so only call static functions at this point.
552   scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_;
553   WaitForReply(registry.get(), context.get());
554 
555   TRACE_EVENT_WITH_FLOW0("toplevel.flow", "SyncChannel::Send",
556                          context->GetSendDoneEvent(), TRACE_EVENT_FLAG_FLOW_IN);
557 
558   return context->Pop();
559 }
560 
WaitForReply(mojo::SyncHandleRegistry * registry,SyncContext * context)561 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
562                                SyncContext* context) {
563   context->DispatchMessages();
564 
565   while (true) {
566     bool dispatch = false;
567     {
568       bool send_done = false;
569       mojo::SyncHandleRegistry::EventCallbackSubscription
570           send_done_subscription = registry->RegisterEvent(
571               context->GetSendDoneEvent(),
572               base::BindRepeating(&OnEventReady, &send_done));
573 
574       const bool* stop_flags[] = {&dispatch, &send_done};
575       context->received_sync_msgs()->BlockDispatch(&dispatch);
576       registry->Wait(stop_flags, 2);
577       context->received_sync_msgs()->UnblockDispatch();
578     }
579 
580     if (dispatch) {
581       // We're waiting for a reply, but we received a blocking synchronous call.
582       // We must process it to avoid potential deadlocks.
583       context->GetDispatchEvent()->Reset();
584       context->DispatchMessages();
585       continue;
586     }
587     break;
588   }
589 }
590 
OnDispatchEventSignaled(base::WaitableEvent * event)591 void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
592   DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
593   sync_context()->GetDispatchEvent()->Reset();
594 
595   StartWatching();
596 
597   // NOTE: May delete |this|.
598   sync_context()->DispatchMessages();
599 }
600 
StartWatching()601 void SyncChannel::StartWatching() {
602   // |dispatch_watcher_| watches the event asynchronously, only dispatching
603   // messages once the listener thread is unblocked and pumping its task queue.
604   // The ReceivedSyncMsgQueue also watches this event and may dispatch
605   // immediately if woken up by a message which it's allowed to dispatch.
606   dispatch_watcher_.StartWatching(
607       sync_context()->GetDispatchEvent(),
608       base::BindOnce(&SyncChannel::OnDispatchEventSignaled,
609                      base::Unretained(this)),
610       sync_context()->listener_task_runner());
611 }
612 
OnChannelInit()613 void SyncChannel::OnChannelInit() {
614   pre_init_sync_message_filters_.clear();
615 }
616 
617 }  // namespace IPC
618