1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_sync_channel.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <utility>
11
12 #include "base/bind.h"
13 #include "base/lazy_instance.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/macros.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/run_loop.h"
19 #include "base/sequenced_task_runner.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/threading/thread_local.h"
22 #include "base/threading/thread_task_runner_handle.h"
23 #include "base/trace_event/trace_event.h"
24 #include "ipc/ipc_channel_factory.h"
25 #include "ipc/ipc_logging.h"
26 #include "ipc/ipc_message_macros.h"
27 #include "ipc/ipc_sync_message.h"
28 #include "mojo/public/cpp/bindings/sync_event_watcher.h"
29
30 using base::WaitableEvent;
31
32 namespace IPC {
33
34 namespace {
35
36 // A generic callback used when watching handles synchronously. Sets |*signal|
37 // to true.
OnEventReady(bool * signal)38 void OnEventReady(bool* signal) {
39 *signal = true;
40 }
41
42 base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky
43 g_pump_messages_event = LAZY_INSTANCE_INITIALIZER;
44
45 } // namespace
46
47 // When we're blocked in a Send(), we need to process incoming synchronous
48 // messages right away because it could be blocking our reply (either
49 // directly from the same object we're calling, or indirectly through one or
50 // more other channels). That means that in SyncContext's OnMessageReceived,
51 // we need to process sync message right away if we're blocked. However a
52 // simple check isn't sufficient, because the listener thread can be in the
53 // process of calling Send.
54 // To work around this, when SyncChannel filters a sync message, it sets
55 // an event that the listener thread waits on during its Send() call. This
56 // allows us to dispatch incoming sync messages when blocked. The race
57 // condition is handled because if Send is in the process of being called, it
58 // will check the event. In case the listener thread isn't sending a message,
59 // we queue a task on the listener thread to dispatch the received messages.
60 // The messages are stored in this queue object that's shared among all
61 // SyncChannel objects on the same thread (since one object can receive a
62 // sync message while another one is blocked).
63
64 class SyncChannel::ReceivedSyncMsgQueue :
65 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
66 public:
67 // SyncChannel::WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we
68 // may nest waiting message loops arbitrarily deep on the SyncChannel's
69 // thread. Every such operation has a corresponding WaitableEvent to be
70 // watched which, when signalled for IPC completion, breaks out of the loop.
71 // A reference to the innermost (i.e. topmost) watcher is held in
72 // |ReceivedSyncMsgQueue::top_send_done_event_watcher_|.
73 //
74 // NestedSendDoneWatcher provides a simple scoper which is used by
75 // WaitForReplyWithNestedMessageLoop to begin watching a new local "send done"
76 // event, preserving the previous topmost state on the local stack until the
77 // new inner loop is broken. If yet another subsequent nested loop is started
78 // therein the process is repeated again in the new inner stack frame, and so
79 // on.
80 //
81 // When this object is destroyed on stack unwind, the previous topmost state
82 // is swapped back into |ReceivedSyncMsgQueue::top_send_done_event_watcher_|,
83 // and its watch is resumed immediately.
84 class NestedSendDoneWatcher {
85 public:
NestedSendDoneWatcher(SyncChannel::SyncContext * context,base::RunLoop * run_loop,scoped_refptr<base::SequencedTaskRunner> task_runner)86 NestedSendDoneWatcher(SyncChannel::SyncContext* context,
87 base::RunLoop* run_loop,
88 scoped_refptr<base::SequencedTaskRunner> task_runner)
89 : sync_msg_queue_(context->received_sync_msgs()),
90 outer_state_(sync_msg_queue_->top_send_done_event_watcher_),
91 event_(context->GetSendDoneEvent()),
92 callback_(
93 base::BindOnce(&SyncChannel::SyncContext::OnSendDoneEventSignaled,
94 context,
95 run_loop)),
96 task_runner_(std::move(task_runner)) {
97 sync_msg_queue_->top_send_done_event_watcher_ = this;
98 if (outer_state_)
99 outer_state_->StopWatching();
100 StartWatching();
101 }
102
~NestedSendDoneWatcher()103 ~NestedSendDoneWatcher() {
104 sync_msg_queue_->top_send_done_event_watcher_ = outer_state_;
105 if (outer_state_)
106 outer_state_->StartWatching();
107 }
108
109 private:
Run(WaitableEvent * event)110 void Run(WaitableEvent* event) {
111 DCHECK(callback_);
112 std::move(callback_).Run(event);
113 }
114
StartWatching()115 void StartWatching() {
116 watcher_.StartWatching(
117 event_,
118 base::BindOnce(&NestedSendDoneWatcher::Run, base::Unretained(this)),
119 task_runner_);
120 }
121
StopWatching()122 void StopWatching() { watcher_.StopWatching(); }
123
124 ReceivedSyncMsgQueue* const sync_msg_queue_;
125 NestedSendDoneWatcher* const outer_state_;
126
127 base::WaitableEvent* const event_;
128 base::WaitableEventWatcher::EventCallback callback_;
129 base::WaitableEventWatcher watcher_;
130 scoped_refptr<base::SequencedTaskRunner> task_runner_;
131
132 DISALLOW_COPY_AND_ASSIGN(NestedSendDoneWatcher);
133 };
134
135 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
136 // if necessary. Call RemoveContext on the same thread when done.
AddContext()137 static ReceivedSyncMsgQueue* AddContext() {
138 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
139 // SyncChannel objects can block the same thread).
140 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
141 if (!rv) {
142 rv = new ReceivedSyncMsgQueue();
143 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
144 }
145 rv->listener_count_++;
146 return rv;
147 }
148
149 // Prevents messages from being dispatched immediately when the dispatch event
150 // is signaled. Instead, |*dispatch_flag| will be set.
BlockDispatch(bool * dispatch_flag)151 void BlockDispatch(bool* dispatch_flag) { dispatch_flag_ = dispatch_flag; }
152
153 // Allows messages to be dispatched immediately when the dispatch event is
154 // signaled.
UnblockDispatch()155 void UnblockDispatch() { dispatch_flag_ = nullptr; }
156
157 // Called on IPC thread when a synchronous message or reply arrives.
QueueMessage(const Message & msg,SyncChannel::SyncContext * context)158 void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
159 bool was_task_pending;
160 {
161 base::AutoLock auto_lock(message_lock_);
162
163 was_task_pending = task_pending_;
164 task_pending_ = true;
165
166 // We set the event in case the listener thread is blocked (or is about
167 // to). In case it's not, the PostTask dispatches the messages.
168 message_queue_.push_back(QueuedMessage(new Message(msg), context));
169 message_queue_version_++;
170 }
171
172 dispatch_event_.Signal();
173 if (!was_task_pending) {
174 listener_task_runner_->PostTask(
175 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask,
176 this, base::RetainedRef(context)));
177 }
178 }
179
QueueReply(const Message & msg,SyncChannel::SyncContext * context)180 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
181 received_replies_.push_back(QueuedMessage(new Message(msg), context));
182 }
183
184 // Called on the listener's thread to process any queues synchronous
185 // messages.
DispatchMessagesTask(SyncContext * context)186 void DispatchMessagesTask(SyncContext* context) {
187 {
188 base::AutoLock auto_lock(message_lock_);
189 task_pending_ = false;
190 }
191 context->DispatchMessages();
192 }
193
194 // Dispatches any queued incoming sync messages. If |dispatching_context| is
195 // not null, messages which target a restricted dispatch channel will only be
196 // dispatched if |dispatching_context| belongs to the same restricted dispatch
197 // group as that channel. If |dispatching_context| is null, all queued
198 // messages are dispatched.
DispatchMessages(SyncContext * dispatching_context)199 void DispatchMessages(SyncContext* dispatching_context) {
200 bool first_time = true;
201 uint32_t expected_version = 0;
202 SyncMessageQueue::iterator it;
203 while (true) {
204 Message* message = nullptr;
205 scoped_refptr<SyncChannel::SyncContext> context;
206 {
207 base::AutoLock auto_lock(message_lock_);
208 if (first_time || message_queue_version_ != expected_version) {
209 it = message_queue_.begin();
210 first_time = false;
211 }
212 for (; it != message_queue_.end(); it++) {
213 int message_group = it->context->restrict_dispatch_group();
214 if (message_group == kRestrictDispatchGroup_None ||
215 (dispatching_context &&
216 message_group ==
217 dispatching_context->restrict_dispatch_group())) {
218 message = it->message;
219 context = it->context;
220 it = message_queue_.erase(it);
221 message_queue_version_++;
222 expected_version = message_queue_version_;
223 break;
224 }
225 }
226 }
227
228 if (message == nullptr)
229 break;
230 context->OnDispatchMessage(*message);
231 delete message;
232 }
233 }
234
235 // SyncChannel calls this in its destructor.
RemoveContext(SyncContext * context)236 void RemoveContext(SyncContext* context) {
237 base::AutoLock auto_lock(message_lock_);
238
239 SyncMessageQueue::iterator iter = message_queue_.begin();
240 while (iter != message_queue_.end()) {
241 if (iter->context.get() == context) {
242 delete iter->message;
243 iter = message_queue_.erase(iter);
244 message_queue_version_++;
245 } else {
246 iter++;
247 }
248 }
249
250 if (--listener_count_ == 0) {
251 DCHECK(lazy_tls_ptr_.Pointer()->Get());
252 lazy_tls_ptr_.Pointer()->Set(nullptr);
253 sync_dispatch_watcher_.reset();
254 }
255 }
256
dispatch_event()257 base::WaitableEvent* dispatch_event() { return &dispatch_event_; }
listener_task_runner()258 base::SingleThreadTaskRunner* listener_task_runner() {
259 return listener_task_runner_.get();
260 }
261
262 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
263 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>::
264 DestructorAtExit lazy_tls_ptr_;
265
266 // Called on the ipc thread to check if we can unblock any current Send()
267 // calls based on a queued reply.
DispatchReplies()268 void DispatchReplies() {
269 for (size_t i = 0; i < received_replies_.size(); ++i) {
270 Message* message = received_replies_[i].message;
271 if (received_replies_[i].context->TryToUnblockListener(message)) {
272 delete message;
273 received_replies_.erase(received_replies_.begin() + i);
274 return;
275 }
276 }
277 }
278
279 private:
280 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
281
282 // See the comment in SyncChannel::SyncChannel for why this event is created
283 // as manual reset.
ReceivedSyncMsgQueue()284 ReceivedSyncMsgQueue()
285 : message_queue_version_(0),
286 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
287 base::WaitableEvent::InitialState::NOT_SIGNALED),
288 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
289 sync_dispatch_watcher_(std::make_unique<mojo::SyncEventWatcher>(
290 &dispatch_event_,
291 base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady,
292 base::Unretained(this)))) {
293 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
294 }
295
296 ~ReceivedSyncMsgQueue() = default;
297
OnDispatchEventReady()298 void OnDispatchEventReady() {
299 if (dispatch_flag_) {
300 *dispatch_flag_ = true;
301 return;
302 }
303
304 // We were woken up during a sync wait, but no specific SyncChannel is
305 // currently waiting. i.e., some other Mojo interface on this thread is
306 // waiting for a response. Since we don't support anything analogous to
307 // restricted dispatch on Mojo interfaces, in this case it's safe to
308 // dispatch sync messages for any context.
309 DispatchMessages(nullptr);
310 }
311
312 // Holds information about a queued synchronous message or reply.
313 struct QueuedMessage {
QueuedMessageIPC::SyncChannel::ReceivedSyncMsgQueue::QueuedMessage314 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
315 Message* message;
316 scoped_refptr<SyncChannel::SyncContext> context;
317 };
318
319 typedef std::list<QueuedMessage> SyncMessageQueue;
320 SyncMessageQueue message_queue_;
321
322 // Used to signal DispatchMessages to rescan
323 uint32_t message_queue_version_ = 0;
324
325 std::vector<QueuedMessage> received_replies_;
326
327 // Signaled when we get a synchronous message that we must respond to, as the
328 // sender needs its reply before it can reply to our original synchronous
329 // message.
330 base::WaitableEvent dispatch_event_;
331 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
332 base::Lock message_lock_;
333 bool task_pending_ = false;
334 int listener_count_ = 0;
335
336 // The current NestedSendDoneWatcher for this thread, if we're currently
337 // in a SyncChannel::WaitForReplyWithNestedMessageLoop. See
338 // NestedSendDoneWatcher comments for more details.
339 NestedSendDoneWatcher* top_send_done_event_watcher_ = nullptr;
340
341 // If not null, the address of a flag to set when the dispatch event signals,
342 // in lieu of actually dispatching messages. This is used by
343 // SyncChannel::WaitForReply to restrict the scope of queued messages we're
344 // allowed to process while it's waiting.
345 bool* dispatch_flag_ = nullptr;
346
347 // Watches |dispatch_event_| during all sync handle watches on this thread.
348 std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
349 };
350
351 base::LazyInstance<base::ThreadLocalPointer<
352 SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit
353 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
354 LAZY_INSTANCE_INITIALIZER;
355
SyncContext(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)356 SyncChannel::SyncContext::SyncContext(
357 Listener* listener,
358 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
359 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
360 WaitableEvent* shutdown_event)
361 : ChannelProxy::Context(listener, ipc_task_runner, listener_task_runner),
362 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
363 shutdown_event_(shutdown_event),
364 restrict_dispatch_group_(kRestrictDispatchGroup_None) {}
365
OnSendDoneEventSignaled(base::RunLoop * nested_loop,base::WaitableEvent * event)366 void SyncChannel::SyncContext::OnSendDoneEventSignaled(
367 base::RunLoop* nested_loop,
368 base::WaitableEvent* event) {
369 DCHECK_EQ(GetSendDoneEvent(), event);
370 nested_loop->Quit();
371 }
372
~SyncContext()373 SyncChannel::SyncContext::~SyncContext() {
374 while (!deserializers_.empty())
375 Pop();
376 }
377
378 // Adds information about an outgoing sync message to the context so that
379 // we know how to deserialize the reply. Returns |true| if the message was added
380 // to the context or |false| if it was rejected (e.g. due to shutdown.)
Push(SyncMessage * sync_msg)381 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
382 // Create the tracking information for this message. This object is stored
383 // by value since all members are pointers that are cheap to copy. These
384 // pointers are cleaned up in the Pop() function.
385 //
386 // The event is created as manual reset because in between Signal and
387 // OnObjectSignalled, another Send can happen which would stop the watcher
388 // from being called. The event would get watched later, when the nested
389 // Send completes, so the event will need to remain set.
390 base::AutoLock auto_lock(deserializers_lock_);
391 if (reject_new_deserializers_)
392 return false;
393 PendingSyncMsg pending(
394 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
395 new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL,
396 base::WaitableEvent::InitialState::NOT_SIGNALED));
397 deserializers_.push_back(pending);
398 return true;
399 }
400
Pop()401 bool SyncChannel::SyncContext::Pop() {
402 bool result;
403 {
404 base::AutoLock auto_lock(deserializers_lock_);
405 PendingSyncMsg msg = deserializers_.back();
406 delete msg.deserializer;
407 delete msg.done_event;
408 msg.done_event = nullptr;
409 deserializers_.pop_back();
410 result = msg.send_result;
411 }
412
413 // We got a reply to a synchronous Send() call that's blocking the listener
414 // thread. However, further down the call stack there could be another
415 // blocking Send() call, whose reply we received after we made this last
416 // Send() call. So check if we have any queued replies available that
417 // can now unblock the listener thread.
418 ipc_task_runner()->PostTask(
419 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
420 received_sync_msgs_));
421
422 return result;
423 }
424
GetSendDoneEvent()425 base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
426 base::AutoLock auto_lock(deserializers_lock_);
427 return deserializers_.back().done_event;
428 }
429
GetDispatchEvent()430 base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
431 return received_sync_msgs_->dispatch_event();
432 }
433
DispatchMessages()434 void SyncChannel::SyncContext::DispatchMessages() {
435 received_sync_msgs_->DispatchMessages(this);
436 }
437
TryToUnblockListener(const Message * msg)438 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
439 base::AutoLock auto_lock(deserializers_lock_);
440 if (deserializers_.empty() ||
441 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
442 return false;
443 }
444
445 if (!msg->is_reply_error()) {
446 bool send_result = deserializers_.back().deserializer->
447 SerializeOutputParameters(*msg);
448 deserializers_.back().send_result = send_result;
449 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
450 } else {
451 DVLOG(1) << "Received error reply";
452 }
453
454 base::WaitableEvent* done_event = deserializers_.back().done_event;
455 TRACE_EVENT_FLOW_BEGIN0(
456 TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
457 "SyncChannel::SyncContext::TryToUnblockListener", done_event);
458
459 done_event->Signal();
460
461 return true;
462 }
463
Clear()464 void SyncChannel::SyncContext::Clear() {
465 CancelPendingSends();
466 received_sync_msgs_->RemoveContext(this);
467 Context::Clear();
468 }
469
OnMessageReceived(const Message & msg)470 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
471 // Give the filters a chance at processing this message.
472 if (TryFilters(msg))
473 return true;
474
475 if (TryToUnblockListener(&msg))
476 return true;
477
478 if (msg.is_reply()) {
479 received_sync_msgs_->QueueReply(msg, this);
480 return true;
481 }
482
483 if (msg.should_unblock()) {
484 received_sync_msgs_->QueueMessage(msg, this);
485 return true;
486 }
487
488 return Context::OnMessageReceivedNoFilter(msg);
489 }
490
OnChannelError()491 void SyncChannel::SyncContext::OnChannelError() {
492 CancelPendingSends();
493 shutdown_watcher_.StopWatching();
494 Context::OnChannelError();
495 }
496
OnChannelOpened()497 void SyncChannel::SyncContext::OnChannelOpened() {
498 shutdown_watcher_.StartWatching(
499 shutdown_event_,
500 base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled,
501 base::Unretained(this)),
502 base::SequencedTaskRunnerHandle::Get());
503 Context::OnChannelOpened();
504 }
505
OnChannelClosed()506 void SyncChannel::SyncContext::OnChannelClosed() {
507 CancelPendingSends();
508 shutdown_watcher_.StopWatching();
509 Context::OnChannelClosed();
510 }
511
CancelPendingSends()512 void SyncChannel::SyncContext::CancelPendingSends() {
513 base::AutoLock auto_lock(deserializers_lock_);
514 reject_new_deserializers_ = true;
515 PendingSyncMessageQueue::iterator iter;
516 DVLOG(1) << "Canceling pending sends";
517 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
518 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
519 "SyncChannel::SyncContext::CancelPendingSends",
520 iter->done_event);
521 iter->done_event->Signal();
522 }
523 }
524
OnShutdownEventSignaled(WaitableEvent * event)525 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) {
526 DCHECK_EQ(event, shutdown_event_);
527
528 // Process shut down before we can get a reply to a synchronous message.
529 // Cancel pending Send calls, which will end up setting the send done event.
530 CancelPendingSends();
531 }
532
533 // static
Create(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,bool create_pipe_now,base::WaitableEvent * shutdown_event)534 std::unique_ptr<SyncChannel> SyncChannel::Create(
535 const IPC::ChannelHandle& channel_handle,
536 Channel::Mode mode,
537 Listener* listener,
538 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
539 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
540 bool create_pipe_now,
541 base::WaitableEvent* shutdown_event) {
542 std::unique_ptr<SyncChannel> channel =
543 Create(listener, ipc_task_runner, listener_task_runner, shutdown_event);
544 channel->Init(channel_handle, mode, create_pipe_now);
545 return channel;
546 }
547
548 // static
Create(std::unique_ptr<ChannelFactory> factory,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,bool create_pipe_now,base::WaitableEvent * shutdown_event)549 std::unique_ptr<SyncChannel> SyncChannel::Create(
550 std::unique_ptr<ChannelFactory> factory,
551 Listener* listener,
552 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
553 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
554 bool create_pipe_now,
555 base::WaitableEvent* shutdown_event) {
556 std::unique_ptr<SyncChannel> channel =
557 Create(listener, ipc_task_runner, listener_task_runner, shutdown_event);
558 channel->Init(std::move(factory), create_pipe_now);
559 return channel;
560 }
561
562 // static
Create(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)563 std::unique_ptr<SyncChannel> SyncChannel::Create(
564 Listener* listener,
565 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
566 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
567 WaitableEvent* shutdown_event) {
568 return base::WrapUnique(new SyncChannel(
569 listener, ipc_task_runner, listener_task_runner, shutdown_event));
570 }
571
SyncChannel(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner,WaitableEvent * shutdown_event)572 SyncChannel::SyncChannel(
573 Listener* listener,
574 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
575 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
576 WaitableEvent* shutdown_event)
577 : ChannelProxy(new SyncContext(listener,
578 ipc_task_runner,
579 listener_task_runner,
580 shutdown_event)),
581 sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
582 // The current (listener) thread must be distinct from the IPC thread, or else
583 // sending synchronous messages will deadlock.
584 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
585 StartWatching();
586 }
587
588 SyncChannel::~SyncChannel() = default;
589
SetRestrictDispatchChannelGroup(int group)590 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
591 sync_context()->set_restrict_dispatch_group(group);
592 }
593
CreateSyncMessageFilter()594 scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() {
595 scoped_refptr<SyncMessageFilter> filter = new SyncMessageFilter(
596 sync_context()->shutdown_event());
597 AddFilter(filter.get());
598 if (!did_init())
599 pre_init_sync_message_filters_.push_back(filter);
600 return filter;
601 }
602
Send(Message * message)603 bool SyncChannel::Send(Message* message) {
604 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
605 std::string name;
606 Logging::GetInstance()->GetMessageText(
607 message->type(), &name, message, nullptr);
608 TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name);
609 #else
610 TRACE_EVENT2("ipc", "SyncChannel::Send",
611 "class", IPC_MESSAGE_ID_CLASS(message->type()),
612 "line", IPC_MESSAGE_ID_LINE(message->type()));
613 #endif
614 if (!message->is_sync()) {
615 ChannelProxy::SendInternal(message);
616 return true;
617 }
618
619 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
620 bool pump_messages = sync_msg->ShouldPumpMessages();
621
622 // *this* might get deleted in WaitForReply.
623 scoped_refptr<SyncContext> context(sync_context());
624 if (!context->Push(sync_msg)) {
625 DVLOG(1) << "Channel is shutting down. Dropping sync message.";
626 delete message;
627 return false;
628 }
629
630 ChannelProxy::SendInternal(message);
631
632 // Wait for reply, or for any other incoming synchronous messages.
633 // |this| might get deleted, so only call static functions at this point.
634 scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_;
635 WaitForReply(registry.get(), context.get(), pump_messages);
636
637 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
638 "SyncChannel::Send", context->GetSendDoneEvent());
639
640 return context->Pop();
641 }
642
WaitForReply(mojo::SyncHandleRegistry * registry,SyncContext * context,bool pump_messages)643 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
644 SyncContext* context,
645 bool pump_messages) {
646 context->DispatchMessages();
647
648 base::WaitableEvent* pump_messages_event = nullptr;
649 if (pump_messages) {
650 if (!g_pump_messages_event.Get()) {
651 g_pump_messages_event.Get() = std::make_unique<base::WaitableEvent>(
652 base::WaitableEvent::ResetPolicy::MANUAL,
653 base::WaitableEvent::InitialState::SIGNALED);
654 }
655 pump_messages_event = g_pump_messages_event.Get().get();
656 }
657
658 while (true) {
659 bool dispatch = false;
660 bool send_done = false;
661 bool should_pump_messages = false;
662 base::Closure on_send_done_callback = base::Bind(&OnEventReady, &send_done);
663 registry->RegisterEvent(context->GetSendDoneEvent(), on_send_done_callback);
664
665 base::Closure on_pump_messages_callback;
666 if (pump_messages_event) {
667 on_pump_messages_callback =
668 base::Bind(&OnEventReady, &should_pump_messages);
669 registry->RegisterEvent(pump_messages_event, on_pump_messages_callback);
670 }
671
672 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
673 context->received_sync_msgs()->BlockDispatch(&dispatch);
674 registry->Wait(stop_flags, 3);
675 context->received_sync_msgs()->UnblockDispatch();
676
677 registry->UnregisterEvent(context->GetSendDoneEvent(),
678 on_send_done_callback);
679 if (pump_messages_event)
680 registry->UnregisterEvent(pump_messages_event, on_pump_messages_callback);
681
682 if (dispatch) {
683 // We're waiting for a reply, but we received a blocking synchronous call.
684 // We must process it to avoid potential deadlocks.
685 context->GetDispatchEvent()->Reset();
686 context->DispatchMessages();
687 continue;
688 }
689
690 if (should_pump_messages)
691 WaitForReplyWithNestedMessageLoop(context); // Run a nested run loop.
692
693 break;
694 }
695 }
696
WaitForReplyWithNestedMessageLoop(SyncContext * context)697 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
698 base::RunLoop nested_loop(base::RunLoop::Type::kNestableTasksAllowed);
699 ReceivedSyncMsgQueue::NestedSendDoneWatcher watcher(
700 context, &nested_loop, context->listener_task_runner());
701 nested_loop.Run();
702 }
703
OnDispatchEventSignaled(base::WaitableEvent * event)704 void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
705 DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
706 sync_context()->GetDispatchEvent()->Reset();
707
708 StartWatching();
709
710 // NOTE: May delete |this|.
711 sync_context()->DispatchMessages();
712 }
713
StartWatching()714 void SyncChannel::StartWatching() {
715 // |dispatch_watcher_| watches the event asynchronously, only dispatching
716 // messages once the listener thread is unblocked and pumping its task queue.
717 // The ReceivedSyncMsgQueue also watches this event and may dispatch
718 // immediately if woken up by a message which it's allowed to dispatch.
719 dispatch_watcher_.StartWatching(
720 sync_context()->GetDispatchEvent(),
721 base::BindOnce(&SyncChannel::OnDispatchEventSignaled,
722 base::Unretained(this)),
723 sync_context()->listener_task_runner());
724 }
725
OnChannelInit()726 void SyncChannel::OnChannelInit() {
727 pre_init_sync_message_filters_.clear();
728 }
729
730 } // namespace IPC
731