1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_sync_channel.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <memory>
11 #include <utility>
12
13 #include "base/functional/bind.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/memory/ptr_util.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/run_loop.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/task/sequenced_task_runner.h"
21 #include "base/task/single_thread_task_runner.h"
22 #include "base/trace_event/trace_event.h"
23 #include "build/build_config.h"
24 #include "ipc/ipc_channel_factory.h"
25 #include "ipc/ipc_logging.h"
26 #include "ipc/ipc_message.h"
27 #include "ipc/ipc_message_macros.h"
28 #include "ipc/ipc_sync_message.h"
29 #include "mojo/public/cpp/bindings/sync_event_watcher.h"
30
31 #if !BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
32 #include "ipc/trace_ipc_message.h"
33 #endif
34
35 using base::WaitableEvent;
36
37 namespace IPC {
38
39 namespace {
40
41 // A generic callback used when watching handles synchronously. Sets |*signal|
42 // to true.
OnEventReady(bool * signal)43 void OnEventReady(bool* signal) {
44 *signal = true;
45 }
46
47 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
48 constinit thread_local SyncChannel::ReceivedSyncMsgQueue* received_queue =
49 nullptr;
50
51 } // namespace
52
53 // When we're blocked in a Send(), we need to process incoming synchronous
54 // messages right away because it could be blocking our reply (either
55 // directly from the same object we're calling, or indirectly through one or
56 // more other channels). That means that in SyncContext's OnMessageReceived,
57 // we need to process sync message right away if we're blocked. However a
58 // simple check isn't sufficient, because the listener thread can be in the
59 // process of calling Send.
60 // To work around this, when SyncChannel filters a sync message, it sets
61 // an event that the listener thread waits on during its Send() call. This
62 // allows us to dispatch incoming sync messages when blocked. The race
63 // condition is handled because if Send is in the process of being called, it
64 // will check the event. In case the listener thread isn't sending a message,
65 // we queue a task on the listener thread to dispatch the received messages.
66 // The messages are stored in this queue object that's shared among all
67 // SyncChannel objects on the same thread (since one object can receive a
68 // sync message while another one is blocked).
69
70 class SyncChannel::ReceivedSyncMsgQueue :
71 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
72 public:
73 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
74 // if necessary. Call RemoveContext on the same thread when done.
AddContext()75 static ReceivedSyncMsgQueue* AddContext() {
76 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
77 // SyncChannel objects can block the same thread).
78 if (!received_queue) {
79 received_queue = new ReceivedSyncMsgQueue();
80 }
81 ++received_queue->listener_count_;
82 return received_queue;
83 }
84
85 // Prevents messages from being dispatched immediately when the dispatch event
86 // is signaled. Instead, |*dispatch_flag| will be set.
BlockDispatch(bool * dispatch_flag)87 void BlockDispatch(bool* dispatch_flag) { dispatch_flag_ = dispatch_flag; }
88
89 // Allows messages to be dispatched immediately when the dispatch event is
90 // signaled.
UnblockDispatch()91 void UnblockDispatch() { dispatch_flag_ = nullptr; }
92
93 // Called on IPC thread when a synchronous message or reply arrives.
QueueMessage(const Message & msg,SyncChannel::SyncContext * context)94 void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
95 bool was_task_pending;
96 {
97 base::AutoLock auto_lock(message_lock_);
98
99 was_task_pending = task_pending_;
100 task_pending_ = true;
101
102 // We set the event in case the listener thread is blocked (or is about
103 // to). In case it's not, the PostTask dispatches the messages.
104 message_queue_.push_back({std::make_unique<Message>(msg), context});
105 message_queue_version_++;
106 }
107
108 dispatch_event_.Signal();
109 if (!was_task_pending) {
110 listener_task_runner_->PostTask(
111 FROM_HERE, base::BindOnce(&ReceivedSyncMsgQueue::DispatchMessagesTask,
112 this, base::RetainedRef(context)));
113 }
114 }
115
QueueReply(const Message & msg,SyncChannel::SyncContext * context)116 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
117 received_replies_.push_back({std::make_unique<Message>(msg), context});
118 }
119
120 // Called on the listener's thread to process any queues synchronous
121 // messages.
DispatchMessagesTask(SyncContext * context)122 void DispatchMessagesTask(SyncContext* context) {
123 {
124 base::AutoLock auto_lock(message_lock_);
125 task_pending_ = false;
126 }
127 context->DispatchMessages();
128 }
129
130 // Dispatches any queued incoming sync messages. If |dispatching_context| is
131 // not null, messages which target a restricted dispatch channel will only be
132 // dispatched if |dispatching_context| belongs to the same restricted dispatch
133 // group as that channel. If |dispatching_context| is null, all queued
134 // messages are dispatched.
DispatchMessages(SyncContext * dispatching_context)135 void DispatchMessages(SyncContext* dispatching_context) {
136 bool first_time = true;
137 uint32_t expected_version = 0;
138 SyncMessageQueue::iterator it;
139 while (true) {
140 std::unique_ptr<Message> message;
141 scoped_refptr<SyncChannel::SyncContext> context;
142 {
143 base::AutoLock auto_lock(message_lock_);
144 if (first_time || message_queue_version_ != expected_version) {
145 it = message_queue_.begin();
146 first_time = false;
147 }
148 for (; it != message_queue_.end(); it++) {
149 int message_group = it->context->restrict_dispatch_group();
150 if (message_group == kRestrictDispatchGroup_None ||
151 (dispatching_context &&
152 message_group ==
153 dispatching_context->restrict_dispatch_group())) {
154 message = std::move(it->message);
155 context = std::move(it->context);
156 it = message_queue_.erase(it);
157 message_queue_version_++;
158 expected_version = message_queue_version_;
159 break;
160 }
161 }
162 }
163 if (!message) {
164 break;
165 }
166 context->OnDispatchMessage(*message);
167 }
168 }
169
170 // SyncChannel calls this in its destructor.
RemoveContext(SyncContext * context)171 void RemoveContext(SyncContext* context) {
172 base::AutoLock auto_lock(message_lock_);
173
174 SyncMessageQueue::iterator iter = message_queue_.begin();
175 while (iter != message_queue_.end()) {
176 if (iter->context.get() == context) {
177 iter = message_queue_.erase(iter);
178 message_queue_version_++;
179 } else {
180 iter++;
181 }
182 }
183
184 if (--listener_count_ == 0) {
185 DCHECK(received_queue);
186 received_queue = nullptr;
187 sync_dispatch_watcher_.reset();
188 }
189 }
190
dispatch_event()191 base::WaitableEvent* dispatch_event() { return &dispatch_event_; }
listener_task_runner()192 base::SingleThreadTaskRunner* listener_task_runner() {
193 return listener_task_runner_.get();
194 }
195
196 // Called on the ipc thread to check if we can unblock any current Send()
197 // calls based on a queued reply.
DispatchReplies()198 void DispatchReplies() {
199 for (size_t i = 0; i < received_replies_.size(); ++i) {
200 Message* message = received_replies_[i].message.get();
201 if (received_replies_[i].context->TryToUnblockListener(message)) {
202 received_replies_.erase(received_replies_.begin() + i);
203 return;
204 }
205 }
206 }
207
208 private:
209 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
210
211 // See the comment in SyncChannel::SyncChannel for why this event is created
212 // as manual reset.
ReceivedSyncMsgQueue()213 ReceivedSyncMsgQueue()
214 : message_queue_version_(0),
215 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
216 base::WaitableEvent::InitialState::NOT_SIGNALED),
217 listener_task_runner_(
218 base::SingleThreadTaskRunner::GetCurrentDefault()),
219 sync_dispatch_watcher_(std::make_unique<mojo::SyncEventWatcher>(
220 &dispatch_event_,
221 base::BindRepeating(&ReceivedSyncMsgQueue::OnDispatchEventReady,
222 base::Unretained(this)))) {
223 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
224 }
225
226 ~ReceivedSyncMsgQueue() = default;
227
OnDispatchEventReady()228 void OnDispatchEventReady() {
229 if (dispatch_flag_) {
230 *dispatch_flag_ = true;
231 return;
232 }
233
234 // We were woken up during a sync wait, but no specific SyncChannel is
235 // currently waiting. i.e., some other Mojo interface on this thread is
236 // waiting for a response. Since we don't support anything analogous to
237 // restricted dispatch on Mojo interfaces, in this case it's safe to
238 // dispatch sync messages for any context.
239 DispatchMessages(nullptr);
240 }
241
242 // Holds information about a queued synchronous message or reply.
243 struct QueuedMessage {
244 std::unique_ptr<Message> message;
245 scoped_refptr<SyncChannel::SyncContext> context;
246 };
247
248 typedef std::list<QueuedMessage> SyncMessageQueue;
249 SyncMessageQueue message_queue_;
250
251 // Used to signal DispatchMessages to rescan
252 uint32_t message_queue_version_ = 0;
253
254 std::vector<QueuedMessage> received_replies_;
255
256 // Signaled when we get a synchronous message that we must respond to, as the
257 // sender needs its reply before it can reply to our original synchronous
258 // message.
259 base::WaitableEvent dispatch_event_;
260 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
261 base::Lock message_lock_;
262 bool task_pending_ = false;
263 int listener_count_ = 0;
264
265 // If not null, the address of a flag to set when the dispatch event signals,
266 // in lieu of actually dispatching messages. This is used by
267 // SyncChannel::WaitForReply to restrict the scope of queued messages we're
268 // allowed to process while it's waiting.
269 raw_ptr<bool> dispatch_flag_ = nullptr;
270
271 // Watches |dispatch_event_| during all sync handle watches on this thread.
272 std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
273 };
274
SyncContext(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)275 SyncChannel::SyncContext::SyncContext(
276 Listener* listener,
277 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
278 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
279 WaitableEvent* shutdown_event)
280 : ChannelProxy::Context(listener, ipc_task_runner, listener_task_runner),
281 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
282 shutdown_event_(shutdown_event),
283 restrict_dispatch_group_(kRestrictDispatchGroup_None) {}
284
OnSendDoneEventSignaled(base::RunLoop * nested_loop,base::WaitableEvent * event)285 void SyncChannel::SyncContext::OnSendDoneEventSignaled(
286 base::RunLoop* nested_loop,
287 base::WaitableEvent* event) {
288 DCHECK_EQ(GetSendDoneEvent(), event);
289 nested_loop->Quit();
290 }
291
~SyncContext()292 SyncChannel::SyncContext::~SyncContext() {
293 while (!deserializers_.empty())
294 Pop();
295 }
296
297 // Adds information about an outgoing sync message to the context so that
298 // we know how to deserialize the reply. Returns |true| if the message was added
299 // to the context or |false| if it was rejected (e.g. due to shutdown.)
Push(SyncMessage * sync_msg)300 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
301 // Create the tracking information for this message. This object is stored
302 // by value since all members are pointers that are cheap to copy. These
303 // pointers are cleaned up in the Pop() function.
304 //
305 // The event is created as manual reset because in between Signal and
306 // OnObjectSignalled, another Send can happen which would stop the watcher
307 // from being called. The event would get watched later, when the nested
308 // Send completes, so the event will need to remain set.
309 base::AutoLock auto_lock(deserializers_lock_);
310 if (reject_new_deserializers_)
311 return false;
312
313 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
314 sync_msg->TakeReplyDeserializer(),
315 std::make_unique<base::WaitableEvent>(
316 base::WaitableEvent::ResetPolicy::MANUAL,
317 base::WaitableEvent::InitialState::NOT_SIGNALED));
318 deserializers_.push_back(std::move(pending));
319 return true;
320 }
321
Pop()322 bool SyncChannel::SyncContext::Pop() {
323 bool result;
324 {
325 base::AutoLock auto_lock(deserializers_lock_);
326 result = deserializers_.back().send_result;
327 deserializers_.pop_back();
328 }
329
330 // We got a reply to a synchronous Send() call that's blocking the listener
331 // thread. However, further down the call stack there could be another
332 // blocking Send() call, whose reply we received after we made this last
333 // Send() call. So check if we have any queued replies available that
334 // can now unblock the listener thread.
335 ipc_task_runner()->PostTask(
336 FROM_HERE, base::BindOnce(&ReceivedSyncMsgQueue::DispatchReplies,
337 received_sync_msgs_));
338
339 return result;
340 }
341
GetSendDoneEvent()342 base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
343 base::AutoLock auto_lock(deserializers_lock_);
344 return deserializers_.back().done_event.get();
345 }
346
GetDispatchEvent()347 base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
348 return received_sync_msgs_->dispatch_event();
349 }
350
DispatchMessages()351 void SyncChannel::SyncContext::DispatchMessages() {
352 received_sync_msgs_->DispatchMessages(this);
353 }
354
TryToUnblockListener(const Message * msg)355 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
356 base::AutoLock auto_lock(deserializers_lock_);
357 if (deserializers_.empty() ||
358 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
359 return false;
360 }
361
362 if (!msg->is_reply_error()) {
363 bool send_result = deserializers_.back().deserializer->
364 SerializeOutputParameters(*msg);
365 deserializers_.back().send_result = send_result;
366 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
367 } else {
368 DVLOG(1) << "Received error reply";
369 }
370
371 base::WaitableEvent* done_event = deserializers_.back().done_event.get();
372 TRACE_EVENT_WITH_FLOW0("toplevel.flow",
373 "SyncChannel::SyncContext::TryToUnblockListener",
374 done_event, TRACE_EVENT_FLAG_FLOW_OUT);
375
376 done_event->Signal();
377
378 return true;
379 }
380
Clear()381 void SyncChannel::SyncContext::Clear() {
382 CancelPendingSends();
383 received_sync_msgs_->RemoveContext(this);
384 Context::Clear();
385 }
386
OnMessageReceived(const Message & msg)387 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
388 // Give the filters a chance at processing this message.
389 if (TryFilters(msg))
390 return true;
391
392 if (TryToUnblockListener(&msg))
393 return true;
394
395 if (msg.is_reply()) {
396 received_sync_msgs_->QueueReply(msg, this);
397 return true;
398 }
399
400 if (msg.should_unblock()) {
401 received_sync_msgs_->QueueMessage(msg, this);
402 return true;
403 }
404
405 return Context::OnMessageReceivedNoFilter(msg);
406 }
407
OnChannelError()408 void SyncChannel::SyncContext::OnChannelError() {
409 CancelPendingSends();
410 shutdown_watcher_.StopWatching();
411 Context::OnChannelError();
412 }
413
OnChannelOpened()414 void SyncChannel::SyncContext::OnChannelOpened() {
415 if (shutdown_event_) {
416 shutdown_watcher_.StartWatching(
417 shutdown_event_,
418 base::BindOnce(&SyncChannel::SyncContext::OnShutdownEventSignaled,
419 base::Unretained(this)),
420 base::SequencedTaskRunner::GetCurrentDefault());
421 }
422 Context::OnChannelOpened();
423 }
424
OnChannelClosed()425 void SyncChannel::SyncContext::OnChannelClosed() {
426 CancelPendingSends();
427 shutdown_watcher_.StopWatching();
428 Context::OnChannelClosed();
429 }
430
CancelPendingSends()431 void SyncChannel::SyncContext::CancelPendingSends() {
432 base::AutoLock auto_lock(deserializers_lock_);
433 reject_new_deserializers_ = true;
434 PendingSyncMessageQueue::iterator iter;
435 DVLOG(1) << "Canceling pending sends";
436 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
437 TRACE_EVENT_WITH_FLOW0("toplevel.flow",
438 "SyncChannel::SyncContext::CancelPendingSends",
439 iter->done_event.get(), TRACE_EVENT_FLAG_FLOW_OUT);
440 iter->done_event->Signal();
441 }
442 }
443
OnShutdownEventSignaled(WaitableEvent * event)444 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) {
445 DCHECK_EQ(event, shutdown_event_);
446
447 // Process shut down before we can get a reply to a synchronous message.
448 // Cancel pending Send calls, which will end up setting the send done event.
449 CancelPendingSends();
450 }
451
452 // static
Create(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,bool create_pipe_now,base::WaitableEvent * shutdown_event)453 std::unique_ptr<SyncChannel> SyncChannel::Create(
454 const IPC::ChannelHandle& channel_handle,
455 Channel::Mode mode,
456 Listener* listener,
457 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
458 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
459 bool create_pipe_now,
460 base::WaitableEvent* shutdown_event) {
461 // TODO(tobiasjs): The shutdown_event object is passed to a refcounted
462 // Context object, and as a result it is not easy to ensure that it
463 // outlives the Context. There should be some way to either reset
464 // the shutdown_event when it is destroyed, or allow the Context to
465 // control the lifetime of shutdown_event.
466 std::unique_ptr<SyncChannel> channel =
467 Create(listener, ipc_task_runner, listener_task_runner, shutdown_event);
468 channel->Init(channel_handle, mode, create_pipe_now);
469 return channel;
470 }
471
472 // static
Create(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)473 std::unique_ptr<SyncChannel> SyncChannel::Create(
474 Listener* listener,
475 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
476 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
477 WaitableEvent* shutdown_event) {
478 return base::WrapUnique(new SyncChannel(
479 listener, ipc_task_runner, listener_task_runner, shutdown_event));
480 }
481
SyncChannel(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)482 SyncChannel::SyncChannel(
483 Listener* listener,
484 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
485 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
486 WaitableEvent* shutdown_event)
487 : ChannelProxy(new SyncContext(listener,
488 ipc_task_runner,
489 listener_task_runner,
490 shutdown_event)),
491 sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
492 // The current (listener) thread must be distinct from the IPC thread, or else
493 // sending synchronous messages will deadlock.
494 DCHECK_NE(ipc_task_runner.get(),
495 base::SingleThreadTaskRunner::GetCurrentDefault().get());
496 StartWatching();
497 }
498
AddListenerTaskRunner(int32_t routing_id,scoped_refptr<base::SingleThreadTaskRunner> task_runner)499 void SyncChannel::AddListenerTaskRunner(
500 int32_t routing_id,
501 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
502 context()->AddListenerTaskRunner(routing_id, std::move(task_runner));
503 }
504
RemoveListenerTaskRunner(int32_t routing_id)505 void SyncChannel::RemoveListenerTaskRunner(int32_t routing_id) {
506 context()->RemoveListenerTaskRunner(routing_id);
507 }
508
509 SyncChannel::~SyncChannel() = default;
510
SetRestrictDispatchChannelGroup(int group)511 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
512 sync_context()->set_restrict_dispatch_group(group);
513 }
514
CreateSyncMessageFilter()515 scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() {
516 scoped_refptr<SyncMessageFilter> filter = new SyncMessageFilter(
517 sync_context()->shutdown_event());
518 AddFilter(filter.get());
519 if (!did_init())
520 pre_init_sync_message_filters_.push_back(filter);
521 return filter;
522 }
523
Send(Message * message)524 bool SyncChannel::Send(Message* message) {
525 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
526 std::string name;
527 Logging::GetInstance()->GetMessageText(
528 message->type(), &name, message, nullptr);
529 TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name);
530 #else
531 TRACE_IPC_MESSAGE_SEND("ipc", "SyncChannel::Send", message);
532 #endif
533 if (!message->is_sync()) {
534 ChannelProxy::SendInternal(message);
535 return true;
536 }
537
538 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
539
540 // *this* might get deleted in WaitForReply.
541 scoped_refptr<SyncContext> context(sync_context());
542 if (!context->Push(sync_msg)) {
543 DVLOG(1) << "Channel is shutting down. Dropping sync message.";
544 delete message;
545 return false;
546 }
547
548 ChannelProxy::SendInternal(message);
549
550 // Wait for reply, or for any other incoming synchronous messages.
551 // |this| might get deleted, so only call static functions at this point.
552 scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_;
553 WaitForReply(registry.get(), context.get());
554
555 TRACE_EVENT_WITH_FLOW0("toplevel.flow", "SyncChannel::Send",
556 context->GetSendDoneEvent(), TRACE_EVENT_FLAG_FLOW_IN);
557
558 return context->Pop();
559 }
560
WaitForReply(mojo::SyncHandleRegistry * registry,SyncContext * context)561 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
562 SyncContext* context) {
563 context->DispatchMessages();
564
565 while (true) {
566 bool dispatch = false;
567 {
568 bool send_done = false;
569 mojo::SyncHandleRegistry::EventCallbackSubscription
570 send_done_subscription = registry->RegisterEvent(
571 context->GetSendDoneEvent(),
572 base::BindRepeating(&OnEventReady, &send_done));
573
574 const bool* stop_flags[] = {&dispatch, &send_done};
575 context->received_sync_msgs()->BlockDispatch(&dispatch);
576 registry->Wait(stop_flags, 2);
577 context->received_sync_msgs()->UnblockDispatch();
578 }
579
580 if (dispatch) {
581 // We're waiting for a reply, but we received a blocking synchronous call.
582 // We must process it to avoid potential deadlocks.
583 context->GetDispatchEvent()->Reset();
584 context->DispatchMessages();
585 continue;
586 }
587 break;
588 }
589 }
590
OnDispatchEventSignaled(base::WaitableEvent * event)591 void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
592 DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
593 sync_context()->GetDispatchEvent()->Reset();
594
595 StartWatching();
596
597 // NOTE: May delete |this|.
598 sync_context()->DispatchMessages();
599 }
600
StartWatching()601 void SyncChannel::StartWatching() {
602 // |dispatch_watcher_| watches the event asynchronously, only dispatching
603 // messages once the listener thread is unblocked and pumping its task queue.
604 // The ReceivedSyncMsgQueue also watches this event and may dispatch
605 // immediately if woken up by a message which it's allowed to dispatch.
606 dispatch_watcher_.StartWatching(
607 sync_context()->GetDispatchEvent(),
608 base::BindOnce(&SyncChannel::OnDispatchEventSignaled,
609 base::Unretained(this)),
610 sync_context()->listener_task_runner());
611 }
612
OnChannelInit()613 void SyncChannel::OnChannelInit() {
614 pre_init_sync_message_filters_.clear();
615 }
616
617 } // namespace IPC
618