• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_sync_channel.h"
6 
7 #include "base/bind.h"
8 #include "base/debug/trace_event.h"
9 #include "base/lazy_instance.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/synchronization/waitable_event.h"
13 #include "base/synchronization/waitable_event_watcher.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "base/threading/thread_local.h"
16 #include "ipc/ipc_logging.h"
17 #include "ipc/ipc_message_macros.h"
18 #include "ipc/ipc_sync_message.h"
19 
20 using base::TimeDelta;
21 using base::TimeTicks;
22 using base::WaitableEvent;
23 
24 namespace IPC {
25 // When we're blocked in a Send(), we need to process incoming synchronous
26 // messages right away because it could be blocking our reply (either
27 // directly from the same object we're calling, or indirectly through one or
28 // more other channels).  That means that in SyncContext's OnMessageReceived,
29 // we need to process sync message right away if we're blocked.  However a
30 // simple check isn't sufficient, because the listener thread can be in the
31 // process of calling Send.
32 // To work around this, when SyncChannel filters a sync message, it sets
33 // an event that the listener thread waits on during its Send() call.  This
34 // allows us to dispatch incoming sync messages when blocked.  The race
35 // condition is handled because if Send is in the process of being called, it
36 // will check the event.  In case the listener thread isn't sending a message,
37 // we queue a task on the listener thread to dispatch the received messages.
38 // The messages are stored in this queue object that's shared among all
39 // SyncChannel objects on the same thread (since one object can receive a
40 // sync message while another one is blocked).
41 
42 class SyncChannel::ReceivedSyncMsgQueue :
43     public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
44  public:
45   // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
46   // if necessary.  Call RemoveContext on the same thread when done.
AddContext()47   static ReceivedSyncMsgQueue* AddContext() {
48     // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
49     // SyncChannel objects can block the same thread).
50     ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
51     if (!rv) {
52       rv = new ReceivedSyncMsgQueue();
53       ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
54     }
55     rv->listener_count_++;
56     return rv;
57   }
58 
59   // Called on IPC thread when a synchronous message or reply arrives.
QueueMessage(const Message & msg,SyncChannel::SyncContext * context)60   void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
61     bool was_task_pending;
62     {
63       base::AutoLock auto_lock(message_lock_);
64 
65       was_task_pending = task_pending_;
66       task_pending_ = true;
67 
68       // We set the event in case the listener thread is blocked (or is about
69       // to). In case it's not, the PostTask dispatches the messages.
70       message_queue_.push_back(QueuedMessage(new Message(msg), context));
71       message_queue_version_++;
72     }
73 
74     dispatch_event_.Signal();
75     if (!was_task_pending) {
76       listener_task_runner_->PostTask(
77           FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask,
78                                 this, scoped_refptr<SyncContext>(context)));
79     }
80   }
81 
QueueReply(const Message & msg,SyncChannel::SyncContext * context)82   void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
83     received_replies_.push_back(QueuedMessage(new Message(msg), context));
84   }
85 
86   // Called on the listener's thread to process any queues synchronous
87   // messages.
DispatchMessagesTask(SyncContext * context)88   void DispatchMessagesTask(SyncContext* context) {
89     {
90       base::AutoLock auto_lock(message_lock_);
91       task_pending_ = false;
92     }
93     context->DispatchMessages();
94   }
95 
DispatchMessages(SyncContext * dispatching_context)96   void DispatchMessages(SyncContext* dispatching_context) {
97     bool first_time = true;
98     uint32 expected_version = 0;
99     SyncMessageQueue::iterator it;
100     while (true) {
101       Message* message = NULL;
102       scoped_refptr<SyncChannel::SyncContext> context;
103       {
104         base::AutoLock auto_lock(message_lock_);
105         if (first_time || message_queue_version_ != expected_version) {
106           it = message_queue_.begin();
107           first_time = false;
108         }
109         for (; it != message_queue_.end(); it++) {
110           int message_group = it->context->restrict_dispatch_group();
111           if (message_group == kRestrictDispatchGroup_None ||
112               message_group == dispatching_context->restrict_dispatch_group()) {
113             message = it->message;
114             context = it->context;
115             it = message_queue_.erase(it);
116             message_queue_version_++;
117             expected_version = message_queue_version_;
118             break;
119           }
120         }
121       }
122 
123       if (message == NULL)
124         break;
125       context->OnDispatchMessage(*message);
126       delete message;
127     }
128   }
129 
130   // SyncChannel calls this in its destructor.
RemoveContext(SyncContext * context)131   void RemoveContext(SyncContext* context) {
132     base::AutoLock auto_lock(message_lock_);
133 
134     SyncMessageQueue::iterator iter = message_queue_.begin();
135     while (iter != message_queue_.end()) {
136       if (iter->context.get() == context) {
137         delete iter->message;
138         iter = message_queue_.erase(iter);
139         message_queue_version_++;
140       } else {
141         iter++;
142       }
143     }
144 
145     if (--listener_count_ == 0) {
146       DCHECK(lazy_tls_ptr_.Pointer()->Get());
147       lazy_tls_ptr_.Pointer()->Set(NULL);
148     }
149   }
150 
dispatch_event()151   WaitableEvent* dispatch_event() { return &dispatch_event_; }
listener_task_runner()152   base::SingleThreadTaskRunner* listener_task_runner() {
153     return listener_task_runner_.get();
154   }
155 
156   // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
157   static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
158       lazy_tls_ptr_;
159 
160   // Called on the ipc thread to check if we can unblock any current Send()
161   // calls based on a queued reply.
DispatchReplies()162   void DispatchReplies() {
163     for (size_t i = 0; i < received_replies_.size(); ++i) {
164       Message* message = received_replies_[i].message;
165       if (received_replies_[i].context->TryToUnblockListener(message)) {
166         delete message;
167         received_replies_.erase(received_replies_.begin() + i);
168         return;
169       }
170     }
171   }
172 
top_send_done_watcher()173   base::WaitableEventWatcher* top_send_done_watcher() {
174     return top_send_done_watcher_;
175   }
176 
set_top_send_done_watcher(base::WaitableEventWatcher * watcher)177   void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) {
178     top_send_done_watcher_ = watcher;
179   }
180 
181  private:
182   friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
183 
184   // See the comment in SyncChannel::SyncChannel for why this event is created
185   // as manual reset.
ReceivedSyncMsgQueue()186   ReceivedSyncMsgQueue() :
187       message_queue_version_(0),
188       dispatch_event_(true, false),
189       listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
190       task_pending_(false),
191       listener_count_(0),
192       top_send_done_watcher_(NULL) {
193   }
194 
~ReceivedSyncMsgQueue()195   ~ReceivedSyncMsgQueue() {}
196 
197   // Holds information about a queued synchronous message or reply.
198   struct QueuedMessage {
QueuedMessageIPC::SyncChannel::ReceivedSyncMsgQueue::QueuedMessage199     QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
200     Message* message;
201     scoped_refptr<SyncChannel::SyncContext> context;
202   };
203 
204   typedef std::list<QueuedMessage> SyncMessageQueue;
205   SyncMessageQueue message_queue_;
206   uint32 message_queue_version_;  // Used to signal DispatchMessages to rescan
207 
208   std::vector<QueuedMessage> received_replies_;
209 
210   // Set when we got a synchronous message that we must respond to as the
211   // sender needs its reply before it can reply to our original synchronous
212   // message.
213   WaitableEvent dispatch_event_;
214   scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
215   base::Lock message_lock_;
216   bool task_pending_;
217   int listener_count_;
218 
219   // The current send done event watcher for this thread. Used to maintain
220   // a local global stack of send done watchers to ensure that nested sync
221   // message loops complete correctly.
222   base::WaitableEventWatcher* top_send_done_watcher_;
223 };
224 
225 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
226     SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
227         LAZY_INSTANCE_INITIALIZER;
228 
SyncContext(Listener * listener,base::SingleThreadTaskRunner * ipc_task_runner,WaitableEvent * shutdown_event)229 SyncChannel::SyncContext::SyncContext(
230     Listener* listener,
231     base::SingleThreadTaskRunner* ipc_task_runner,
232     WaitableEvent* shutdown_event)
233     : ChannelProxy::Context(listener, ipc_task_runner),
234       received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
235       shutdown_event_(shutdown_event),
236       restrict_dispatch_group_(kRestrictDispatchGroup_None) {
237 }
238 
~SyncContext()239 SyncChannel::SyncContext::~SyncContext() {
240   while (!deserializers_.empty())
241     Pop();
242 }
243 
244 // Adds information about an outgoing sync message to the context so that
245 // we know how to deserialize the reply.  Returns a handle that's set when
246 // the reply has arrived.
Push(SyncMessage * sync_msg)247 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
248   // Create the tracking information for this message. This object is stored
249   // by value since all members are pointers that are cheap to copy. These
250   // pointers are cleaned up in the Pop() function.
251   //
252   // The event is created as manual reset because in between Signal and
253   // OnObjectSignalled, another Send can happen which would stop the watcher
254   // from being called.  The event would get watched later, when the nested
255   // Send completes, so the event will need to remain set.
256   PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
257                          sync_msg->GetReplyDeserializer(),
258                          new WaitableEvent(true, false));
259   base::AutoLock auto_lock(deserializers_lock_);
260   deserializers_.push_back(pending);
261 }
262 
Pop()263 bool SyncChannel::SyncContext::Pop() {
264   bool result;
265   {
266     base::AutoLock auto_lock(deserializers_lock_);
267     PendingSyncMsg msg = deserializers_.back();
268     delete msg.deserializer;
269     delete msg.done_event;
270     msg.done_event = NULL;
271     deserializers_.pop_back();
272     result = msg.send_result;
273   }
274 
275   // We got a reply to a synchronous Send() call that's blocking the listener
276   // thread.  However, further down the call stack there could be another
277   // blocking Send() call, whose reply we received after we made this last
278   // Send() call.  So check if we have any queued replies available that
279   // can now unblock the listener thread.
280   ipc_task_runner()->PostTask(
281       FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
282                             received_sync_msgs_.get()));
283 
284   return result;
285 }
286 
GetSendDoneEvent()287 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
288   base::AutoLock auto_lock(deserializers_lock_);
289   return deserializers_.back().done_event;
290 }
291 
GetDispatchEvent()292 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
293   return received_sync_msgs_->dispatch_event();
294 }
295 
DispatchMessages()296 void SyncChannel::SyncContext::DispatchMessages() {
297   received_sync_msgs_->DispatchMessages(this);
298 }
299 
TryToUnblockListener(const Message * msg)300 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
301   base::AutoLock auto_lock(deserializers_lock_);
302   if (deserializers_.empty() ||
303       !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
304     return false;
305   }
306 
307   // TODO(bauerb): Remove logging once investigation of http://crbug.com/141055
308   // has finished.
309   if (!msg->is_reply_error()) {
310     bool send_result = deserializers_.back().deserializer->
311         SerializeOutputParameters(*msg);
312     deserializers_.back().send_result = send_result;
313     VLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
314   } else {
315     VLOG(1) << "Received error reply";
316   }
317   deserializers_.back().done_event->Signal();
318 
319   return true;
320 }
321 
Clear()322 void SyncChannel::SyncContext::Clear() {
323   CancelPendingSends();
324   received_sync_msgs_->RemoveContext(this);
325   Context::Clear();
326 }
327 
OnMessageReceived(const Message & msg)328 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
329   // Give the filters a chance at processing this message.
330   if (TryFilters(msg))
331     return true;
332 
333   if (TryToUnblockListener(&msg))
334     return true;
335 
336   if (msg.is_reply()) {
337     received_sync_msgs_->QueueReply(msg, this);
338     return true;
339   }
340 
341   if (msg.should_unblock()) {
342     received_sync_msgs_->QueueMessage(msg, this);
343     return true;
344   }
345 
346   return Context::OnMessageReceivedNoFilter(msg);
347 }
348 
OnChannelError()349 void SyncChannel::SyncContext::OnChannelError() {
350   CancelPendingSends();
351   shutdown_watcher_.StopWatching();
352   Context::OnChannelError();
353 }
354 
OnChannelOpened()355 void SyncChannel::SyncContext::OnChannelOpened() {
356   shutdown_watcher_.StartWatching(
357       shutdown_event_,
358       base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled,
359                  base::Unretained(this)));
360   Context::OnChannelOpened();
361 }
362 
OnChannelClosed()363 void SyncChannel::SyncContext::OnChannelClosed() {
364   CancelPendingSends();
365   shutdown_watcher_.StopWatching();
366   Context::OnChannelClosed();
367 }
368 
OnSendTimeout(int message_id)369 void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
370   base::AutoLock auto_lock(deserializers_lock_);
371   PendingSyncMessageQueue::iterator iter;
372   VLOG(1) << "Send timeout";
373   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
374     if (iter->id == message_id) {
375       iter->done_event->Signal();
376       break;
377     }
378   }
379 }
380 
CancelPendingSends()381 void SyncChannel::SyncContext::CancelPendingSends() {
382   base::AutoLock auto_lock(deserializers_lock_);
383   PendingSyncMessageQueue::iterator iter;
384   // TODO(bauerb): Remove once http://crbug/141055 is fixed.
385   VLOG(1) << "Canceling pending sends";
386   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
387     iter->done_event->Signal();
388 }
389 
OnWaitableEventSignaled(WaitableEvent * event)390 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
391   if (event == shutdown_event_) {
392     // Process shut down before we can get a reply to a synchronous message.
393     // Cancel pending Send calls, which will end up setting the send done event.
394     CancelPendingSends();
395   } else {
396     // We got the reply, timed out or the process shutdown.
397     DCHECK_EQ(GetSendDoneEvent(), event);
398     base::MessageLoop::current()->QuitNow();
399   }
400 }
401 
402 base::WaitableEventWatcher::EventCallback
MakeWaitableEventCallback()403     SyncChannel::SyncContext::MakeWaitableEventCallback() {
404   return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this);
405 }
406 
407 // static
Create(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,Listener * listener,base::SingleThreadTaskRunner * ipc_task_runner,bool create_pipe_now,base::WaitableEvent * shutdown_event)408 scoped_ptr<SyncChannel> SyncChannel::Create(
409     const IPC::ChannelHandle& channel_handle,
410     Channel::Mode mode,
411     Listener* listener,
412     base::SingleThreadTaskRunner* ipc_task_runner,
413     bool create_pipe_now,
414     base::WaitableEvent* shutdown_event) {
415   scoped_ptr<SyncChannel> channel =
416       Create(listener, ipc_task_runner, shutdown_event);
417   channel->Init(channel_handle, mode, create_pipe_now);
418   return channel.Pass();
419 }
420 
421 // static
Create(Listener * listener,base::SingleThreadTaskRunner * ipc_task_runner,WaitableEvent * shutdown_event)422 scoped_ptr<SyncChannel> SyncChannel::Create(
423     Listener* listener,
424     base::SingleThreadTaskRunner* ipc_task_runner,
425     WaitableEvent* shutdown_event) {
426   return make_scoped_ptr(
427       new SyncChannel(listener, ipc_task_runner, shutdown_event));
428 }
429 
SyncChannel(Listener * listener,base::SingleThreadTaskRunner * ipc_task_runner,WaitableEvent * shutdown_event)430 SyncChannel::SyncChannel(
431     Listener* listener,
432     base::SingleThreadTaskRunner* ipc_task_runner,
433     WaitableEvent* shutdown_event)
434     : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) {
435   // The current (listener) thread must be distinct from the IPC thread, or else
436   // sending synchronous messages will deadlock.
437   DCHECK_NE(ipc_task_runner, base::ThreadTaskRunnerHandle::Get());
438   StartWatching();
439 }
440 
~SyncChannel()441 SyncChannel::~SyncChannel() {
442 }
443 
SetRestrictDispatchChannelGroup(int group)444 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
445   sync_context()->set_restrict_dispatch_group(group);
446 }
447 
Send(Message * message)448 bool SyncChannel::Send(Message* message) {
449 #ifdef IPC_MESSAGE_LOG_ENABLED
450   Logging* logger = Logging::GetInstance();
451   std::string name;
452   logger->GetMessageText(message->type(), &name, message, NULL);
453   TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name);
454 #else
455   TRACE_EVENT2("ipc", "SyncChannel::Send",
456                "class", IPC_MESSAGE_ID_CLASS(message->type()),
457                "line", IPC_MESSAGE_ID_LINE(message->type()));
458 #endif
459   if (!message->is_sync()) {
460     ChannelProxy::Send(message);
461     return true;
462   }
463 
464   // *this* might get deleted in WaitForReply.
465   scoped_refptr<SyncContext> context(sync_context());
466   if (context->shutdown_event()->IsSignaled()) {
467     VLOG(1) << "shutdown event is signaled";
468     delete message;
469     return false;
470   }
471 
472   SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
473   context->Push(sync_msg);
474   WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
475 
476   ChannelProxy::Send(message);
477 
478   // Wait for reply, or for any other incoming synchronous messages.
479   // *this* might get deleted, so only call static functions at this point.
480   WaitForReply(context.get(), pump_messages_event);
481 
482   return context->Pop();
483 }
484 
WaitForReply(SyncContext * context,WaitableEvent * pump_messages_event)485 void SyncChannel::WaitForReply(
486     SyncContext* context, WaitableEvent* pump_messages_event) {
487   context->DispatchMessages();
488   while (true) {
489     WaitableEvent* objects[] = {
490       context->GetDispatchEvent(),
491       context->GetSendDoneEvent(),
492       pump_messages_event
493     };
494 
495     unsigned count = pump_messages_event ? 3: 2;
496     size_t result = WaitableEvent::WaitMany(objects, count);
497     if (result == 0 /* dispatch event */) {
498       // We're waiting for a reply, but we received a blocking synchronous
499       // call.  We must process it or otherwise a deadlock might occur.
500       context->GetDispatchEvent()->Reset();
501       context->DispatchMessages();
502       continue;
503     }
504 
505     if (result == 2 /* pump_messages_event */)
506       WaitForReplyWithNestedMessageLoop(context);  // Run a nested message loop.
507 
508     break;
509   }
510 }
511 
WaitForReplyWithNestedMessageLoop(SyncContext * context)512 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
513   base::WaitableEventWatcher send_done_watcher;
514 
515   ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
516   DCHECK(sync_msg_queue != NULL);
517 
518   base::WaitableEventWatcher* old_send_done_event_watcher =
519       sync_msg_queue->top_send_done_watcher();
520 
521   base::WaitableEventWatcher::EventCallback old_callback;
522   base::WaitableEvent* old_event = NULL;
523 
524   // Maintain a local global stack of send done delegates to ensure that
525   // nested sync calls complete in the correct sequence, i.e. the
526   // outermost call completes first, etc.
527   if (old_send_done_event_watcher) {
528     old_callback = old_send_done_event_watcher->callback();
529     old_event = old_send_done_event_watcher->GetWatchedEvent();
530     old_send_done_event_watcher->StopWatching();
531   }
532 
533   sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
534 
535   send_done_watcher.StartWatching(context->GetSendDoneEvent(),
536                                   context->MakeWaitableEventCallback());
537 
538   {
539     base::MessageLoop::ScopedNestableTaskAllower allow(
540         base::MessageLoop::current());
541     base::MessageLoop::current()->Run();
542   }
543 
544   sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher);
545   if (old_send_done_event_watcher && old_event) {
546     old_send_done_event_watcher->StartWatching(old_event, old_callback);
547   }
548 }
549 
OnWaitableEventSignaled(WaitableEvent * event)550 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
551   DCHECK(event == sync_context()->GetDispatchEvent());
552   // The call to DispatchMessages might delete this object, so reregister
553   // the object watcher first.
554   event->Reset();
555   dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_);
556   sync_context()->DispatchMessages();
557 }
558 
StartWatching()559 void SyncChannel::StartWatching() {
560   // Ideally we only want to watch this object when running a nested message
561   // loop.  However, we don't know when it exits if there's another nested
562   // message loop running under it or not, so we wouldn't know whether to
563   // stop or keep watching.  So we always watch it, and create the event as
564   // manual reset since the object watcher might otherwise reset the event
565   // when we're doing a WaitMany.
566   dispatch_watcher_callback_ =
567       base::Bind(&SyncChannel::OnWaitableEventSignaled,
568                   base::Unretained(this));
569   dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(),
570                                   dispatch_watcher_callback_);
571 }
572 
573 }  // namespace IPC
574