• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_sync_channel.h"
6 
7 #include "base/bind.h"
8 #include "base/debug/trace_event.h"
9 #include "base/lazy_instance.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/synchronization/waitable_event.h"
13 #include "base/synchronization/waitable_event_watcher.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "base/threading/thread_local.h"
16 #include "ipc/ipc_logging.h"
17 #include "ipc/ipc_message_macros.h"
18 #include "ipc/ipc_sync_message.h"
19 
20 using base::TimeDelta;
21 using base::TimeTicks;
22 using base::WaitableEvent;
23 
24 namespace IPC {
25 // When we're blocked in a Send(), we need to process incoming synchronous
26 // messages right away because it could be blocking our reply (either
27 // directly from the same object we're calling, or indirectly through one or
28 // more other channels).  That means that in SyncContext's OnMessageReceived,
29 // we need to process sync message right away if we're blocked.  However a
30 // simple check isn't sufficient, because the listener thread can be in the
31 // process of calling Send.
32 // To work around this, when SyncChannel filters a sync message, it sets
33 // an event that the listener thread waits on during its Send() call.  This
34 // allows us to dispatch incoming sync messages when blocked.  The race
35 // condition is handled because if Send is in the process of being called, it
36 // will check the event.  In case the listener thread isn't sending a message,
37 // we queue a task on the listener thread to dispatch the received messages.
38 // The messages are stored in this queue object that's shared among all
39 // SyncChannel objects on the same thread (since one object can receive a
40 // sync message while another one is blocked).
41 
42 class SyncChannel::ReceivedSyncMsgQueue :
43     public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
44  public:
45   // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
46   // if necessary.  Call RemoveContext on the same thread when done.
AddContext()47   static ReceivedSyncMsgQueue* AddContext() {
48     // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
49     // SyncChannel objects can block the same thread).
50     ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
51     if (!rv) {
52       rv = new ReceivedSyncMsgQueue();
53       ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
54     }
55     rv->listener_count_++;
56     return rv;
57   }
58 
59   // Called on IPC thread when a synchronous message or reply arrives.
QueueMessage(const Message & msg,SyncChannel::SyncContext * context)60   void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
61     bool was_task_pending;
62     {
63       base::AutoLock auto_lock(message_lock_);
64 
65       was_task_pending = task_pending_;
66       task_pending_ = true;
67 
68       // We set the event in case the listener thread is blocked (or is about
69       // to). In case it's not, the PostTask dispatches the messages.
70       message_queue_.push_back(QueuedMessage(new Message(msg), context));
71       message_queue_version_++;
72     }
73 
74     dispatch_event_.Signal();
75     if (!was_task_pending) {
76       listener_task_runner_->PostTask(
77           FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask,
78                                 this, scoped_refptr<SyncContext>(context)));
79     }
80   }
81 
QueueReply(const Message & msg,SyncChannel::SyncContext * context)82   void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
83     received_replies_.push_back(QueuedMessage(new Message(msg), context));
84   }
85 
86   // Called on the listener's thread to process any queues synchronous
87   // messages.
DispatchMessagesTask(SyncContext * context)88   void DispatchMessagesTask(SyncContext* context) {
89     {
90       base::AutoLock auto_lock(message_lock_);
91       task_pending_ = false;
92     }
93     context->DispatchMessages();
94   }
95 
DispatchMessages(SyncContext * dispatching_context)96   void DispatchMessages(SyncContext* dispatching_context) {
97     bool first_time = true;
98     uint32 expected_version = 0;
99     SyncMessageQueue::iterator it;
100     while (true) {
101       Message* message = NULL;
102       scoped_refptr<SyncChannel::SyncContext> context;
103       {
104         base::AutoLock auto_lock(message_lock_);
105         if (first_time || message_queue_version_ != expected_version) {
106           it = message_queue_.begin();
107           first_time = false;
108         }
109         for (; it != message_queue_.end(); it++) {
110           int message_group = it->context->restrict_dispatch_group();
111           if (message_group == kRestrictDispatchGroup_None ||
112               message_group == dispatching_context->restrict_dispatch_group()) {
113             message = it->message;
114             context = it->context;
115             it = message_queue_.erase(it);
116             message_queue_version_++;
117             expected_version = message_queue_version_;
118             break;
119           }
120         }
121       }
122 
123       if (message == NULL)
124         break;
125       context->OnDispatchMessage(*message);
126       delete message;
127     }
128   }
129 
130   // SyncChannel calls this in its destructor.
RemoveContext(SyncContext * context)131   void RemoveContext(SyncContext* context) {
132     base::AutoLock auto_lock(message_lock_);
133 
134     SyncMessageQueue::iterator iter = message_queue_.begin();
135     while (iter != message_queue_.end()) {
136       if (iter->context.get() == context) {
137         delete iter->message;
138         iter = message_queue_.erase(iter);
139         message_queue_version_++;
140       } else {
141         iter++;
142       }
143     }
144 
145     if (--listener_count_ == 0) {
146       DCHECK(lazy_tls_ptr_.Pointer()->Get());
147       lazy_tls_ptr_.Pointer()->Set(NULL);
148     }
149   }
150 
dispatch_event()151   WaitableEvent* dispatch_event() { return &dispatch_event_; }
listener_task_runner()152   base::SingleThreadTaskRunner* listener_task_runner() {
153     return listener_task_runner_.get();
154   }
155 
156   // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
157   static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
158       lazy_tls_ptr_;
159 
160   // Called on the ipc thread to check if we can unblock any current Send()
161   // calls based on a queued reply.
DispatchReplies()162   void DispatchReplies() {
163     for (size_t i = 0; i < received_replies_.size(); ++i) {
164       Message* message = received_replies_[i].message;
165       if (received_replies_[i].context->TryToUnblockListener(message)) {
166         delete message;
167         received_replies_.erase(received_replies_.begin() + i);
168         return;
169       }
170     }
171   }
172 
top_send_done_watcher()173   base::WaitableEventWatcher* top_send_done_watcher() {
174     return top_send_done_watcher_;
175   }
176 
set_top_send_done_watcher(base::WaitableEventWatcher * watcher)177   void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) {
178     top_send_done_watcher_ = watcher;
179   }
180 
181  private:
182   friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
183 
184   // See the comment in SyncChannel::SyncChannel for why this event is created
185   // as manual reset.
ReceivedSyncMsgQueue()186   ReceivedSyncMsgQueue() :
187       message_queue_version_(0),
188       dispatch_event_(true, false),
189       listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
190       task_pending_(false),
191       listener_count_(0),
192       top_send_done_watcher_(NULL) {
193   }
194 
~ReceivedSyncMsgQueue()195   ~ReceivedSyncMsgQueue() {}
196 
197   // Holds information about a queued synchronous message or reply.
198   struct QueuedMessage {
QueuedMessageIPC::SyncChannel::ReceivedSyncMsgQueue::QueuedMessage199     QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
200     Message* message;
201     scoped_refptr<SyncChannel::SyncContext> context;
202   };
203 
204   typedef std::list<QueuedMessage> SyncMessageQueue;
205   SyncMessageQueue message_queue_;
206   uint32 message_queue_version_;  // Used to signal DispatchMessages to rescan
207 
208   std::vector<QueuedMessage> received_replies_;
209 
210   // Set when we got a synchronous message that we must respond to as the
211   // sender needs its reply before it can reply to our original synchronous
212   // message.
213   WaitableEvent dispatch_event_;
214   scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
215   base::Lock message_lock_;
216   bool task_pending_;
217   int listener_count_;
218 
219   // The current send done event watcher for this thread. Used to maintain
220   // a local global stack of send done watchers to ensure that nested sync
221   // message loops complete correctly.
222   base::WaitableEventWatcher* top_send_done_watcher_;
223 };
224 
225 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
226     SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
227         LAZY_INSTANCE_INITIALIZER;
228 
SyncContext(Listener * listener,base::SingleThreadTaskRunner * ipc_task_runner,WaitableEvent * shutdown_event)229 SyncChannel::SyncContext::SyncContext(
230     Listener* listener,
231     base::SingleThreadTaskRunner* ipc_task_runner,
232     WaitableEvent* shutdown_event)
233     : ChannelProxy::Context(listener, ipc_task_runner),
234       received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
235       shutdown_event_(shutdown_event),
236       restrict_dispatch_group_(kRestrictDispatchGroup_None) {
237 }
238 
~SyncContext()239 SyncChannel::SyncContext::~SyncContext() {
240   while (!deserializers_.empty())
241     Pop();
242 }
243 
244 // Adds information about an outgoing sync message to the context so that
245 // we know how to deserialize the reply.  Returns a handle that's set when
246 // the reply has arrived.
Push(SyncMessage * sync_msg)247 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
248   // Create the tracking information for this message. This object is stored
249   // by value since all members are pointers that are cheap to copy. These
250   // pointers are cleaned up in the Pop() function.
251   //
252   // The event is created as manual reset because in between Signal and
253   // OnObjectSignalled, another Send can happen which would stop the watcher
254   // from being called.  The event would get watched later, when the nested
255   // Send completes, so the event will need to remain set.
256   PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
257                          sync_msg->GetReplyDeserializer(),
258                          new WaitableEvent(true, false));
259   base::AutoLock auto_lock(deserializers_lock_);
260   deserializers_.push_back(pending);
261 }
262 
Pop()263 bool SyncChannel::SyncContext::Pop() {
264   bool result;
265   {
266     base::AutoLock auto_lock(deserializers_lock_);
267     PendingSyncMsg msg = deserializers_.back();
268     delete msg.deserializer;
269     delete msg.done_event;
270     msg.done_event = NULL;
271     deserializers_.pop_back();
272     result = msg.send_result;
273   }
274 
275   // We got a reply to a synchronous Send() call that's blocking the listener
276   // thread.  However, further down the call stack there could be another
277   // blocking Send() call, whose reply we received after we made this last
278   // Send() call.  So check if we have any queued replies available that
279   // can now unblock the listener thread.
280   ipc_task_runner()->PostTask(
281       FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
282                             received_sync_msgs_.get()));
283 
284   return result;
285 }
286 
GetSendDoneEvent()287 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
288   base::AutoLock auto_lock(deserializers_lock_);
289   return deserializers_.back().done_event;
290 }
291 
GetDispatchEvent()292 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
293   return received_sync_msgs_->dispatch_event();
294 }
295 
DispatchMessages()296 void SyncChannel::SyncContext::DispatchMessages() {
297   received_sync_msgs_->DispatchMessages(this);
298 }
299 
TryToUnblockListener(const Message * msg)300 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
301   base::AutoLock auto_lock(deserializers_lock_);
302   if (deserializers_.empty() ||
303       !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
304     return false;
305   }
306 
307   // TODO(bauerb): Remove logging once investigation of http://crbug.com/141055
308   // has finished.
309   if (!msg->is_reply_error()) {
310     bool send_result = deserializers_.back().deserializer->
311         SerializeOutputParameters(*msg);
312     deserializers_.back().send_result = send_result;
313     VLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
314   } else {
315     VLOG(1) << "Received error reply";
316   }
317   deserializers_.back().done_event->Signal();
318 
319   return true;
320 }
321 
Clear()322 void SyncChannel::SyncContext::Clear() {
323   CancelPendingSends();
324   received_sync_msgs_->RemoveContext(this);
325   Context::Clear();
326 }
327 
OnMessageReceived(const Message & msg)328 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
329   // Give the filters a chance at processing this message.
330   if (TryFilters(msg))
331     return true;
332 
333   if (TryToUnblockListener(&msg))
334     return true;
335 
336   if (msg.is_reply()) {
337     received_sync_msgs_->QueueReply(msg, this);
338     return true;
339   }
340 
341   if (msg.should_unblock()) {
342     received_sync_msgs_->QueueMessage(msg, this);
343     return true;
344   }
345 
346   return Context::OnMessageReceivedNoFilter(msg);
347 }
348 
OnChannelError()349 void SyncChannel::SyncContext::OnChannelError() {
350   CancelPendingSends();
351   shutdown_watcher_.StopWatching();
352   Context::OnChannelError();
353 }
354 
OnChannelOpened()355 void SyncChannel::SyncContext::OnChannelOpened() {
356   shutdown_watcher_.StartWatching(
357       shutdown_event_,
358       base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled,
359                  base::Unretained(this)));
360   Context::OnChannelOpened();
361 }
362 
OnChannelClosed()363 void SyncChannel::SyncContext::OnChannelClosed() {
364   CancelPendingSends();
365   shutdown_watcher_.StopWatching();
366   Context::OnChannelClosed();
367 }
368 
OnSendTimeout(int message_id)369 void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
370   base::AutoLock auto_lock(deserializers_lock_);
371   PendingSyncMessageQueue::iterator iter;
372   VLOG(1) << "Send timeout";
373   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
374     if (iter->id == message_id) {
375       iter->done_event->Signal();
376       break;
377     }
378   }
379 }
380 
CancelPendingSends()381 void SyncChannel::SyncContext::CancelPendingSends() {
382   base::AutoLock auto_lock(deserializers_lock_);
383   PendingSyncMessageQueue::iterator iter;
384   // TODO(bauerb): Remove once http://crbug/141055 is fixed.
385   VLOG(1) << "Canceling pending sends";
386   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
387     iter->done_event->Signal();
388 }
389 
OnWaitableEventSignaled(WaitableEvent * event)390 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
391   if (event == shutdown_event_) {
392     // Process shut down before we can get a reply to a synchronous message.
393     // Cancel pending Send calls, which will end up setting the send done event.
394     CancelPendingSends();
395   } else {
396     // We got the reply, timed out or the process shutdown.
397     DCHECK_EQ(GetSendDoneEvent(), event);
398     base::MessageLoop::current()->QuitNow();
399   }
400 }
401 
402 base::WaitableEventWatcher::EventCallback
MakeWaitableEventCallback()403     SyncChannel::SyncContext::MakeWaitableEventCallback() {
404   return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this);
405 }
406 
SyncChannel(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,Listener * listener,base::SingleThreadTaskRunner * ipc_task_runner,bool create_pipe_now,WaitableEvent * shutdown_event)407 SyncChannel::SyncChannel(
408     const IPC::ChannelHandle& channel_handle,
409     Channel::Mode mode,
410     Listener* listener,
411     base::SingleThreadTaskRunner* ipc_task_runner,
412     bool create_pipe_now,
413     WaitableEvent* shutdown_event)
414     : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
415       sync_messages_with_no_timeout_allowed_(true) {
416   ChannelProxy::Init(channel_handle, mode, create_pipe_now);
417   StartWatching();
418 }
419 
SyncChannel(Listener * listener,base::SingleThreadTaskRunner * ipc_task_runner,WaitableEvent * shutdown_event)420 SyncChannel::SyncChannel(
421     Listener* listener,
422     base::SingleThreadTaskRunner* ipc_task_runner,
423     WaitableEvent* shutdown_event)
424     : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
425       sync_messages_with_no_timeout_allowed_(true) {
426   StartWatching();
427 }
428 
~SyncChannel()429 SyncChannel::~SyncChannel() {
430 }
431 
SetRestrictDispatchChannelGroup(int group)432 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
433   sync_context()->set_restrict_dispatch_group(group);
434 }
435 
Send(Message * message)436 bool SyncChannel::Send(Message* message) {
437   return SendWithTimeout(message, base::kNoTimeout);
438 }
439 
SendWithTimeout(Message * message,int timeout_ms)440 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
441 #ifdef IPC_MESSAGE_LOG_ENABLED
442   Logging* logger = Logging::GetInstance();
443   std::string name;
444   logger->GetMessageText(message->type(), &name, message, NULL);
445   TRACE_EVENT1("task", "SyncChannel::SendWithTimeout",
446                "name", name);
447 #else
448   TRACE_EVENT2("task", "SyncChannel::SendWithTimeout",
449                "class", IPC_MESSAGE_ID_CLASS(message->type()),
450                "line", IPC_MESSAGE_ID_LINE(message->type()));
451 #endif
452   if (!message->is_sync()) {
453     ChannelProxy::Send(message);
454     return true;
455   }
456 
457   // *this* might get deleted in WaitForReply.
458   scoped_refptr<SyncContext> context(sync_context());
459   if (context->shutdown_event()->IsSignaled()) {
460     VLOG(1) << "shutdown event is signaled";
461     delete message;
462     return false;
463   }
464 
465   DCHECK(sync_messages_with_no_timeout_allowed_ ||
466          timeout_ms != base::kNoTimeout);
467   SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
468   context->Push(sync_msg);
469   int message_id = SyncMessage::GetMessageId(*sync_msg);
470   WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
471 
472   ChannelProxy::Send(message);
473 
474   if (timeout_ms != base::kNoTimeout) {
475     // We use the sync message id so that when a message times out, we don't
476     // confuse it with another send that is either above/below this Send in
477     // the call stack.
478     context->ipc_task_runner()->PostDelayedTask(
479         FROM_HERE,
480         base::Bind(&SyncContext::OnSendTimeout, context.get(), message_id),
481         base::TimeDelta::FromMilliseconds(timeout_ms));
482   }
483 
484   // Wait for reply, or for any other incoming synchronous messages.
485   // *this* might get deleted, so only call static functions at this point.
486   WaitForReply(context.get(), pump_messages_event);
487 
488   return context->Pop();
489 }
490 
WaitForReply(SyncContext * context,WaitableEvent * pump_messages_event)491 void SyncChannel::WaitForReply(
492     SyncContext* context, WaitableEvent* pump_messages_event) {
493   context->DispatchMessages();
494   while (true) {
495     WaitableEvent* objects[] = {
496       context->GetDispatchEvent(),
497       context->GetSendDoneEvent(),
498       pump_messages_event
499     };
500 
501     unsigned count = pump_messages_event ? 3: 2;
502     size_t result = WaitableEvent::WaitMany(objects, count);
503     if (result == 0 /* dispatch event */) {
504       // We're waiting for a reply, but we received a blocking synchronous
505       // call.  We must process it or otherwise a deadlock might occur.
506       context->GetDispatchEvent()->Reset();
507       context->DispatchMessages();
508       continue;
509     }
510 
511     if (result == 2 /* pump_messages_event */)
512       WaitForReplyWithNestedMessageLoop(context);  // Run a nested message loop.
513 
514     break;
515   }
516 }
517 
WaitForReplyWithNestedMessageLoop(SyncContext * context)518 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
519   base::WaitableEventWatcher send_done_watcher;
520 
521   ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
522   DCHECK(sync_msg_queue != NULL);
523 
524   base::WaitableEventWatcher* old_send_done_event_watcher =
525       sync_msg_queue->top_send_done_watcher();
526 
527   base::WaitableEventWatcher::EventCallback old_callback;
528   base::WaitableEvent* old_event = NULL;
529 
530   // Maintain a local global stack of send done delegates to ensure that
531   // nested sync calls complete in the correct sequence, i.e. the
532   // outermost call completes first, etc.
533   if (old_send_done_event_watcher) {
534     old_callback = old_send_done_event_watcher->callback();
535     old_event = old_send_done_event_watcher->GetWatchedEvent();
536     old_send_done_event_watcher->StopWatching();
537   }
538 
539   sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
540 
541   send_done_watcher.StartWatching(context->GetSendDoneEvent(),
542                                   context->MakeWaitableEventCallback());
543 
544   {
545     base::MessageLoop::ScopedNestableTaskAllower allow(
546         base::MessageLoop::current());
547     base::MessageLoop::current()->Run();
548   }
549 
550   sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher);
551   if (old_send_done_event_watcher && old_event) {
552     old_send_done_event_watcher->StartWatching(old_event, old_callback);
553   }
554 }
555 
OnWaitableEventSignaled(WaitableEvent * event)556 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
557   DCHECK(event == sync_context()->GetDispatchEvent());
558   // The call to DispatchMessages might delete this object, so reregister
559   // the object watcher first.
560   event->Reset();
561   dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_);
562   sync_context()->DispatchMessages();
563 }
564 
StartWatching()565 void SyncChannel::StartWatching() {
566   // Ideally we only want to watch this object when running a nested message
567   // loop.  However, we don't know when it exits if there's another nested
568   // message loop running under it or not, so we wouldn't know whether to
569   // stop or keep watching.  So we always watch it, and create the event as
570   // manual reset since the object watcher might otherwise reset the event
571   // when we're doing a WaitMany.
572   dispatch_watcher_callback_ =
573       base::Bind(&SyncChannel::OnWaitableEventSignaled,
574                   base::Unretained(this));
575   dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(),
576                                   dispatch_watcher_callback_);
577 }
578 
579 }  // namespace IPC
580