• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_mojo_bootstrap.h"
6 
7 #include <inttypes.h>
8 #include <stdint.h>
9 
10 #include <map>
11 #include <memory>
12 #include <set>
13 #include <utility>
14 #include <vector>
15 
16 #include "base/callback.h"
17 #include "base/containers/queue.h"
18 #include "base/logging.h"
19 #include "base/macros.h"
20 #include "base/memory/ptr_util.h"
21 #include "base/no_destructor.h"
22 #include "base/sequenced_task_runner.h"
23 #include "base/single_thread_task_runner.h"
24 #include "base/strings/stringprintf.h"
25 #include "base/synchronization/lock.h"
26 #include "base/threading/thread_checker.h"
27 #include "base/threading/thread_task_runner_handle.h"
28 #include "base/trace_event/memory_allocator_dump.h"
29 #include "base/trace_event/memory_dump_manager.h"
30 #include "base/trace_event/memory_dump_provider.h"
31 #include "ipc/ipc_channel.h"
32 #include "mojo/public/cpp/bindings/associated_group.h"
33 #include "mojo/public/cpp/bindings/associated_group_controller.h"
34 #include "mojo/public/cpp/bindings/connector.h"
35 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
36 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
37 #include "mojo/public/cpp/bindings/interface_id.h"
38 #include "mojo/public/cpp/bindings/message.h"
39 #include "mojo/public/cpp/bindings/message_header_validator.h"
40 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
41 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
42 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
43 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
44 
45 namespace IPC {
46 
47 namespace {
48 
49 class ChannelAssociatedGroupController;
50 
51 // Used to track some internal Channel state in pursuit of message leaks.
52 //
53 // TODO(https://crbug.com/813045): Remove this.
54 class ControllerMemoryDumpProvider
55     : public base::trace_event::MemoryDumpProvider {
56  public:
ControllerMemoryDumpProvider()57   ControllerMemoryDumpProvider() {
58     base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider(
59         this, "IPCChannel", nullptr);
60   }
61 
~ControllerMemoryDumpProvider()62   ~ControllerMemoryDumpProvider() override {
63     base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider(
64         this);
65   }
66 
AddController(ChannelAssociatedGroupController * controller)67   void AddController(ChannelAssociatedGroupController* controller) {
68     base::AutoLock lock(lock_);
69     controllers_.insert(controller);
70   }
71 
RemoveController(ChannelAssociatedGroupController * controller)72   void RemoveController(ChannelAssociatedGroupController* controller) {
73     base::AutoLock lock(lock_);
74     controllers_.erase(controller);
75   }
76 
77   // base::trace_event::MemoryDumpProvider:
78   bool OnMemoryDump(const base::trace_event::MemoryDumpArgs& args,
79                     base::trace_event::ProcessMemoryDump* pmd) override;
80 
81  private:
82   base::Lock lock_;
83   std::set<ChannelAssociatedGroupController*> controllers_;
84 
85   DISALLOW_COPY_AND_ASSIGN(ControllerMemoryDumpProvider);
86 };
87 
GetMemoryDumpProvider()88 ControllerMemoryDumpProvider& GetMemoryDumpProvider() {
89   static base::NoDestructor<ControllerMemoryDumpProvider> provider;
90   return *provider;
91 }
92 
93 class ChannelAssociatedGroupController
94     : public mojo::AssociatedGroupController,
95       public mojo::MessageReceiver,
96       public mojo::PipeControlMessageHandlerDelegate {
97  public:
ChannelAssociatedGroupController(bool set_interface_id_namespace_bit,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)98   ChannelAssociatedGroupController(
99       bool set_interface_id_namespace_bit,
100       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
101       const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
102       : task_runner_(ipc_task_runner),
103         proxy_task_runner_(proxy_task_runner),
104         set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
105         filters_(this),
106         control_message_handler_(this),
107         control_message_proxy_thunk_(this),
108         control_message_proxy_(&control_message_proxy_thunk_) {
109     thread_checker_.DetachFromThread();
110     control_message_handler_.SetDescription(
111         "IPC::mojom::Bootstrap [master] PipeControlMessageHandler");
112     filters_.Append<mojo::MessageHeaderValidator>(
113         "IPC::mojom::Bootstrap [master] MessageHeaderValidator");
114 
115     GetMemoryDumpProvider().AddController(this);
116   }
117 
GetQueuedMessageCount()118   size_t GetQueuedMessageCount() {
119     base::AutoLock lock(outgoing_messages_lock_);
120     return outgoing_messages_.size();
121   }
122 
Bind(mojo::ScopedMessagePipeHandle handle)123   void Bind(mojo::ScopedMessagePipeHandle handle) {
124     DCHECK(thread_checker_.CalledOnValidThread());
125     DCHECK(task_runner_->BelongsToCurrentThread());
126 
127     connector_.reset(new mojo::Connector(
128         std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
129         task_runner_));
130     connector_->set_incoming_receiver(&filters_);
131     connector_->set_connection_error_handler(
132         base::Bind(&ChannelAssociatedGroupController::OnPipeError,
133                    base::Unretained(this)));
134     connector_->set_enforce_errors_from_incoming_receiver(false);
135     connector_->SetWatcherHeapProfilerTag("IPC Channel");
136   }
137 
Pause()138   void Pause() {
139     DCHECK(!paused_);
140     paused_ = true;
141   }
142 
Unpause()143   void Unpause() {
144     DCHECK(paused_);
145     paused_ = false;
146   }
147 
FlushOutgoingMessages()148   void FlushOutgoingMessages() {
149     std::vector<mojo::Message> outgoing_messages;
150     {
151       base::AutoLock lock(outgoing_messages_lock_);
152       std::swap(outgoing_messages, outgoing_messages_);
153     }
154     for (auto& message : outgoing_messages)
155       SendMessage(&message);
156   }
157 
CreateChannelEndpoints(mojom::ChannelAssociatedPtr * sender,mojom::ChannelAssociatedRequest * receiver)158   void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender,
159                               mojom::ChannelAssociatedRequest* receiver) {
160     mojo::InterfaceId sender_id, receiver_id;
161     if (set_interface_id_namespace_bit_) {
162       sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
163       receiver_id = 1;
164     } else {
165       sender_id = 1;
166       receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
167     }
168 
169     {
170       base::AutoLock locker(lock_);
171       Endpoint* sender_endpoint = new Endpoint(this, sender_id);
172       Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
173       endpoints_.insert({ sender_id, sender_endpoint });
174       endpoints_.insert({ receiver_id, receiver_endpoint });
175       sender_endpoint->set_handle_created();
176       receiver_endpoint->set_handle_created();
177     }
178 
179     mojo::ScopedInterfaceEndpointHandle sender_handle =
180         CreateScopedInterfaceEndpointHandle(sender_id);
181     mojo::ScopedInterfaceEndpointHandle receiver_handle =
182         CreateScopedInterfaceEndpointHandle(receiver_id);
183 
184     sender->Bind(mojom::ChannelAssociatedPtrInfo(std::move(sender_handle), 0));
185     *receiver = mojom::ChannelAssociatedRequest(std::move(receiver_handle));
186   }
187 
ShutDown()188   void ShutDown() {
189     DCHECK(thread_checker_.CalledOnValidThread());
190     shut_down_ = true;
191     connector_->CloseMessagePipe();
192     OnPipeError();
193     connector_.reset();
194 
195     base::AutoLock lock(outgoing_messages_lock_);
196     outgoing_messages_.clear();
197   }
198 
199   // mojo::AssociatedGroupController:
AssociateInterface(mojo::ScopedInterfaceEndpointHandle handle_to_send)200   mojo::InterfaceId AssociateInterface(
201       mojo::ScopedInterfaceEndpointHandle handle_to_send) override {
202     if (!handle_to_send.pending_association())
203       return mojo::kInvalidInterfaceId;
204 
205     uint32_t id = 0;
206     {
207       base::AutoLock locker(lock_);
208       do {
209         if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask)
210           next_interface_id_ = 2;
211         id = next_interface_id_++;
212         if (set_interface_id_namespace_bit_)
213           id |= mojo::kInterfaceIdNamespaceMask;
214       } while (ContainsKey(endpoints_, id));
215 
216       Endpoint* endpoint = new Endpoint(this, id);
217       if (encountered_error_)
218         endpoint->set_peer_closed();
219       endpoint->set_handle_created();
220       endpoints_.insert({id, endpoint});
221     }
222 
223     if (!NotifyAssociation(&handle_to_send, id)) {
224       // The peer handle of |handle_to_send|, which is supposed to join this
225       // associated group, has been closed.
226       {
227         base::AutoLock locker(lock_);
228         Endpoint* endpoint = FindEndpoint(id);
229         if (endpoint)
230           MarkClosedAndMaybeRemove(endpoint);
231       }
232 
233       control_message_proxy_.NotifyPeerEndpointClosed(
234           id, handle_to_send.disconnect_reason());
235     }
236     return id;
237   }
238 
CreateLocalEndpointHandle(mojo::InterfaceId id)239   mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
240       mojo::InterfaceId id) override {
241     if (!mojo::IsValidInterfaceId(id))
242       return mojo::ScopedInterfaceEndpointHandle();
243 
244     // Unless it is the master ID, |id| is from the remote side and therefore
245     // its namespace bit is supposed to be different than the value that this
246     // router would use.
247     if (!mojo::IsMasterInterfaceId(id) &&
248         set_interface_id_namespace_bit_ ==
249             mojo::HasInterfaceIdNamespaceBitSet(id)) {
250       return mojo::ScopedInterfaceEndpointHandle();
251     }
252 
253     base::AutoLock locker(lock_);
254     bool inserted = false;
255     Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
256     if (inserted) {
257       DCHECK(!endpoint->handle_created());
258       if (encountered_error_)
259         endpoint->set_peer_closed();
260     } else {
261       if (endpoint->handle_created())
262         return mojo::ScopedInterfaceEndpointHandle();
263     }
264 
265     endpoint->set_handle_created();
266     return CreateScopedInterfaceEndpointHandle(id);
267   }
268 
CloseEndpointHandle(mojo::InterfaceId id,const base::Optional<mojo::DisconnectReason> & reason)269   void CloseEndpointHandle(
270       mojo::InterfaceId id,
271       const base::Optional<mojo::DisconnectReason>& reason) override {
272     if (!mojo::IsValidInterfaceId(id))
273       return;
274     {
275       base::AutoLock locker(lock_);
276       DCHECK(ContainsKey(endpoints_, id));
277       Endpoint* endpoint = endpoints_[id].get();
278       DCHECK(!endpoint->client());
279       DCHECK(!endpoint->closed());
280       MarkClosedAndMaybeRemove(endpoint);
281     }
282 
283     if (!mojo::IsMasterInterfaceId(id) || reason)
284       control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
285   }
286 
AttachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle,mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)287   mojo::InterfaceEndpointController* AttachEndpointClient(
288       const mojo::ScopedInterfaceEndpointHandle& handle,
289       mojo::InterfaceEndpointClient* client,
290       scoped_refptr<base::SequencedTaskRunner> runner) override {
291     const mojo::InterfaceId id = handle.id();
292 
293     DCHECK(mojo::IsValidInterfaceId(id));
294     DCHECK(client);
295 
296     base::AutoLock locker(lock_);
297     DCHECK(ContainsKey(endpoints_, id));
298 
299     Endpoint* endpoint = endpoints_[id].get();
300     endpoint->AttachClient(client, std::move(runner));
301 
302     if (endpoint->peer_closed())
303       NotifyEndpointOfError(endpoint, true /* force_async */);
304 
305     return endpoint;
306   }
307 
DetachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle)308   void DetachEndpointClient(
309       const mojo::ScopedInterfaceEndpointHandle& handle) override {
310     const mojo::InterfaceId id = handle.id();
311 
312     DCHECK(mojo::IsValidInterfaceId(id));
313 
314     base::AutoLock locker(lock_);
315     DCHECK(ContainsKey(endpoints_, id));
316 
317     Endpoint* endpoint = endpoints_[id].get();
318     endpoint->DetachClient();
319   }
320 
RaiseError()321   void RaiseError() override {
322     // We ignore errors on channel endpoints, leaving the pipe open. There are
323     // good reasons for this:
324     //
325     //   * We should never close a channel endpoint in either process as long as
326     //     the child process is still alive. The child's endpoint should only be
327     //     closed implicitly by process death, and the browser's endpoint should
328     //     only be closed after the child process is confirmed to be dead. Crash
329     //     reporting logic in Chrome relies on this behavior in order to do the
330     //     right thing.
331     //
332     //   * There are two interesting conditions under which RaiseError() can be
333     //     implicitly reached: an incoming message fails validation, or the
334     //     local endpoint drops a response callback without calling it.
335     //
336     //   * In the validation case, we also report the message as bad, and this
337     //     will imminently trigger the common bad-IPC path in the browser,
338     //     causing the browser to kill the offending renderer.
339     //
340     //   * In the dropped response callback case, the net result of ignoring the
341     //     issue is generally innocuous. While indicative of programmer error,
342     //     it's not a severe failure and is already covered by separate DCHECKs.
343     //
344     // See https://crbug.com/861607 for additional discussion.
345   }
346 
PrefersSerializedMessages()347   bool PrefersSerializedMessages() override { return true; }
348 
349  private:
350   class Endpoint;
351   class ControlMessageProxyThunk;
352   friend class Endpoint;
353   friend class ControlMessageProxyThunk;
354 
355   // MessageWrapper objects are always destroyed under the controller's lock. On
356   // destruction, if the message it wrappers contains
357   // ScopedInterfaceEndpointHandles (which cannot be destructed under the
358   // controller's lock), the wrapper unlocks to clean them up.
359   class MessageWrapper {
360    public:
361     MessageWrapper() = default;
362 
MessageWrapper(ChannelAssociatedGroupController * controller,mojo::Message message)363     MessageWrapper(ChannelAssociatedGroupController* controller,
364                    mojo::Message message)
365         : controller_(controller), value_(std::move(message)) {}
366 
MessageWrapper(MessageWrapper && other)367     MessageWrapper(MessageWrapper&& other)
368         : controller_(other.controller_), value_(std::move(other.value_)) {}
369 
~MessageWrapper()370     ~MessageWrapper() {
371       if (value_.associated_endpoint_handles()->empty())
372         return;
373 
374       controller_->lock_.AssertAcquired();
375       {
376         base::AutoUnlock unlocker(controller_->lock_);
377         value_.mutable_associated_endpoint_handles()->clear();
378       }
379     }
380 
operator =(MessageWrapper && other)381     MessageWrapper& operator=(MessageWrapper&& other) {
382       controller_ = other.controller_;
383       value_ = std::move(other.value_);
384       return *this;
385     }
386 
value()387     mojo::Message& value() { return value_; }
388 
389    private:
390     ChannelAssociatedGroupController* controller_ = nullptr;
391     mojo::Message value_;
392 
393     DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
394   };
395 
396   class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
397                    public mojo::InterfaceEndpointController {
398    public:
Endpoint(ChannelAssociatedGroupController * controller,mojo::InterfaceId id)399     Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
400         : controller_(controller), id_(id) {}
401 
id() const402     mojo::InterfaceId id() const { return id_; }
403 
closed() const404     bool closed() const {
405       controller_->lock_.AssertAcquired();
406       return closed_;
407     }
408 
set_closed()409     void set_closed() {
410       controller_->lock_.AssertAcquired();
411       closed_ = true;
412     }
413 
peer_closed() const414     bool peer_closed() const {
415       controller_->lock_.AssertAcquired();
416       return peer_closed_;
417     }
418 
set_peer_closed()419     void set_peer_closed() {
420       controller_->lock_.AssertAcquired();
421       peer_closed_ = true;
422     }
423 
handle_created() const424     bool handle_created() const {
425       controller_->lock_.AssertAcquired();
426       return handle_created_;
427     }
428 
set_handle_created()429     void set_handle_created() {
430       controller_->lock_.AssertAcquired();
431       handle_created_ = true;
432     }
433 
disconnect_reason() const434     const base::Optional<mojo::DisconnectReason>& disconnect_reason() const {
435       return disconnect_reason_;
436     }
437 
set_disconnect_reason(const base::Optional<mojo::DisconnectReason> & disconnect_reason)438     void set_disconnect_reason(
439         const base::Optional<mojo::DisconnectReason>& disconnect_reason) {
440       disconnect_reason_ = disconnect_reason;
441     }
442 
task_runner() const443     base::SequencedTaskRunner* task_runner() const {
444       return task_runner_.get();
445     }
446 
client() const447     mojo::InterfaceEndpointClient* client() const {
448       controller_->lock_.AssertAcquired();
449       return client_;
450     }
451 
AttachClient(mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)452     void AttachClient(mojo::InterfaceEndpointClient* client,
453                       scoped_refptr<base::SequencedTaskRunner> runner) {
454       controller_->lock_.AssertAcquired();
455       DCHECK(!client_);
456       DCHECK(!closed_);
457       DCHECK(runner->RunsTasksInCurrentSequence());
458 
459       task_runner_ = std::move(runner);
460       client_ = client;
461     }
462 
DetachClient()463     void DetachClient() {
464       controller_->lock_.AssertAcquired();
465       DCHECK(client_);
466       DCHECK(task_runner_->RunsTasksInCurrentSequence());
467       DCHECK(!closed_);
468 
469       task_runner_ = nullptr;
470       client_ = nullptr;
471       sync_watcher_.reset();
472     }
473 
EnqueueSyncMessage(MessageWrapper message)474     uint32_t EnqueueSyncMessage(MessageWrapper message) {
475       controller_->lock_.AssertAcquired();
476       uint32_t id = GenerateSyncMessageId();
477       sync_messages_.emplace(id, std::move(message));
478       SignalSyncMessageEvent();
479       return id;
480     }
481 
SignalSyncMessageEvent()482     void SignalSyncMessageEvent() {
483       controller_->lock_.AssertAcquired();
484 
485       if (sync_watcher_)
486         sync_watcher_->SignalEvent();
487     }
488 
PopSyncMessage(uint32_t id)489     MessageWrapper PopSyncMessage(uint32_t id) {
490       controller_->lock_.AssertAcquired();
491       if (sync_messages_.empty() || sync_messages_.front().first != id)
492         return MessageWrapper();
493       MessageWrapper message = std::move(sync_messages_.front().second);
494       sync_messages_.pop();
495       return message;
496     }
497 
498     // mojo::InterfaceEndpointController:
SendMessage(mojo::Message * message)499     bool SendMessage(mojo::Message* message) override {
500       DCHECK(task_runner_->RunsTasksInCurrentSequence());
501       message->set_interface_id(id_);
502       return controller_->SendMessage(message);
503     }
504 
AllowWokenUpBySyncWatchOnSameThread()505     void AllowWokenUpBySyncWatchOnSameThread() override {
506       DCHECK(task_runner_->RunsTasksInCurrentSequence());
507 
508       EnsureSyncWatcherExists();
509       sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
510     }
511 
SyncWatch(const bool * should_stop)512     bool SyncWatch(const bool* should_stop) override {
513       DCHECK(task_runner_->RunsTasksInCurrentSequence());
514 
515       // It's not legal to make sync calls from the master endpoint's thread,
516       // and in fact they must only happen from the proxy task runner.
517       DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
518       DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
519 
520       EnsureSyncWatcherExists();
521       return sync_watcher_->SyncWatch(should_stop);
522     }
523 
524    private:
525     friend class base::RefCountedThreadSafe<Endpoint>;
526 
~Endpoint()527     ~Endpoint() override {
528       controller_->lock_.AssertAcquired();
529       DCHECK(!client_);
530       DCHECK(closed_);
531       DCHECK(peer_closed_);
532       DCHECK(!sync_watcher_);
533     }
534 
OnSyncMessageEventReady()535     void OnSyncMessageEventReady() {
536       DCHECK(task_runner_->RunsTasksInCurrentSequence());
537 
538       scoped_refptr<Endpoint> keepalive(this);
539       scoped_refptr<AssociatedGroupController> controller_keepalive(
540           controller_);
541       base::AutoLock locker(controller_->lock_);
542       bool more_to_process = false;
543       if (!sync_messages_.empty()) {
544         MessageWrapper message_wrapper =
545             std::move(sync_messages_.front().second);
546         sync_messages_.pop();
547 
548         bool dispatch_succeeded;
549         mojo::InterfaceEndpointClient* client = client_;
550         {
551           base::AutoUnlock unlocker(controller_->lock_);
552           dispatch_succeeded =
553               client->HandleIncomingMessage(&message_wrapper.value());
554         }
555 
556         if (!sync_messages_.empty())
557           more_to_process = true;
558 
559         if (!dispatch_succeeded)
560           controller_->RaiseError();
561       }
562 
563       if (!more_to_process)
564         sync_watcher_->ResetEvent();
565 
566       // If there are no queued sync messages and the peer has closed, there
567       // there won't be incoming sync messages in the future. If any
568       // SyncWatch() calls are on the stack for this endpoint, resetting the
569       // watcher will allow them to exit as the stack undwinds.
570       if (!more_to_process && peer_closed_)
571         sync_watcher_.reset();
572     }
573 
EnsureSyncWatcherExists()574     void EnsureSyncWatcherExists() {
575       DCHECK(task_runner_->RunsTasksInCurrentSequence());
576       if (sync_watcher_)
577         return;
578 
579       base::AutoLock locker(controller_->lock_);
580       sync_watcher_ = std::make_unique<mojo::SequenceLocalSyncEventWatcher>(
581           base::BindRepeating(&Endpoint::OnSyncMessageEventReady,
582                               base::Unretained(this)));
583       if (peer_closed_ || !sync_messages_.empty())
584         SignalSyncMessageEvent();
585     }
586 
GenerateSyncMessageId()587     uint32_t GenerateSyncMessageId() {
588       // Overflow is fine.
589       uint32_t id = next_sync_message_id_++;
590       DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
591       return id;
592     }
593 
594     ChannelAssociatedGroupController* const controller_;
595     const mojo::InterfaceId id_;
596 
597     bool closed_ = false;
598     bool peer_closed_ = false;
599     bool handle_created_ = false;
600     base::Optional<mojo::DisconnectReason> disconnect_reason_;
601     mojo::InterfaceEndpointClient* client_ = nullptr;
602     scoped_refptr<base::SequencedTaskRunner> task_runner_;
603     std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_;
604     base::queue<std::pair<uint32_t, MessageWrapper>> sync_messages_;
605     uint32_t next_sync_message_id_ = 0;
606 
607     DISALLOW_COPY_AND_ASSIGN(Endpoint);
608   };
609 
610   class ControlMessageProxyThunk : public MessageReceiver {
611    public:
ControlMessageProxyThunk(ChannelAssociatedGroupController * controller)612     explicit ControlMessageProxyThunk(
613         ChannelAssociatedGroupController* controller)
614         : controller_(controller) {}
615 
616    private:
617     // MessageReceiver:
Accept(mojo::Message * message)618     bool Accept(mojo::Message* message) override {
619       return controller_->SendMessage(message);
620     }
621 
622     ChannelAssociatedGroupController* controller_;
623 
624     DISALLOW_COPY_AND_ASSIGN(ControlMessageProxyThunk);
625   };
626 
~ChannelAssociatedGroupController()627   ~ChannelAssociatedGroupController() override {
628     DCHECK(!connector_);
629 
630     base::AutoLock locker(lock_);
631     for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
632       Endpoint* endpoint = iter->second.get();
633       ++iter;
634 
635       if (!endpoint->closed()) {
636         // This happens when a NotifyPeerEndpointClosed message been received,
637         // but the interface ID hasn't been used to create local endpoint
638         // handle.
639         DCHECK(!endpoint->client());
640         DCHECK(endpoint->peer_closed());
641         MarkClosedAndMaybeRemove(endpoint);
642       } else {
643         MarkPeerClosedAndMaybeRemove(endpoint);
644       }
645     }
646 
647     DCHECK(endpoints_.empty());
648 
649     GetMemoryDumpProvider().RemoveController(this);
650   }
651 
SendMessage(mojo::Message * message)652   bool SendMessage(mojo::Message* message) {
653     if (task_runner_->BelongsToCurrentThread()) {
654       DCHECK(thread_checker_.CalledOnValidThread());
655       if (!connector_ || paused_) {
656         if (!shut_down_) {
657           base::AutoLock lock(outgoing_messages_lock_);
658           outgoing_messages_.emplace_back(std::move(*message));
659         }
660         return true;
661       }
662       return connector_->Accept(message);
663     } else {
664       // Do a message size check here so we don't lose valuable stack
665       // information to the task scheduler.
666       CHECK_LE(message->data_num_bytes(), Channel::kMaximumMessageSize);
667 
668       // We always post tasks to the master endpoint thread when called from
669       // other threads in order to simulate IPC::ChannelProxy::Send behavior.
670       task_runner_->PostTask(
671           FROM_HERE,
672           base::Bind(
673               &ChannelAssociatedGroupController::SendMessageOnMasterThread,
674               this, base::Passed(message)));
675       return true;
676     }
677   }
678 
SendMessageOnMasterThread(mojo::Message message)679   void SendMessageOnMasterThread(mojo::Message message) {
680     DCHECK(thread_checker_.CalledOnValidThread());
681     if (!SendMessage(&message))
682       RaiseError();
683   }
684 
OnPipeError()685   void OnPipeError() {
686     DCHECK(thread_checker_.CalledOnValidThread());
687 
688     // We keep |this| alive here because it's possible for the notifications
689     // below to release all other references.
690     scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
691 
692     base::AutoLock locker(lock_);
693     encountered_error_ = true;
694 
695     std::vector<scoped_refptr<Endpoint>> endpoints_to_notify;
696     for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
697       Endpoint* endpoint = iter->second.get();
698       ++iter;
699 
700       if (endpoint->client())
701         endpoints_to_notify.push_back(endpoint);
702 
703       MarkPeerClosedAndMaybeRemove(endpoint);
704     }
705 
706     for (auto& endpoint : endpoints_to_notify) {
707       // Because a notification may in turn detach any endpoint, we have to
708       // check each client again here.
709       if (endpoint->client())
710         NotifyEndpointOfError(endpoint.get(), false /* force_async */);
711     }
712   }
713 
NotifyEndpointOfError(Endpoint * endpoint,bool force_async)714   void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) {
715     lock_.AssertAcquired();
716     DCHECK(endpoint->task_runner() && endpoint->client());
717     if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
718       mojo::InterfaceEndpointClient* client = endpoint->client();
719       base::Optional<mojo::DisconnectReason> reason(
720           endpoint->disconnect_reason());
721 
722       base::AutoUnlock unlocker(lock_);
723       client->NotifyError(reason);
724     } else {
725       endpoint->task_runner()->PostTask(
726           FROM_HERE,
727           base::Bind(&ChannelAssociatedGroupController::
728                          NotifyEndpointOfErrorOnEndpointThread,
729                      this, endpoint->id(), base::Unretained(endpoint)));
730     }
731   }
732 
NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,Endpoint * endpoint)733   void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
734                                              Endpoint* endpoint) {
735     base::AutoLock locker(lock_);
736     auto iter = endpoints_.find(id);
737     if (iter == endpoints_.end() || iter->second.get() != endpoint)
738       return;
739     if (!endpoint->client())
740       return;
741 
742     DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
743     NotifyEndpointOfError(endpoint, false /* force_async */);
744   }
745 
MarkClosedAndMaybeRemove(Endpoint * endpoint)746   void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
747     lock_.AssertAcquired();
748     endpoint->set_closed();
749     if (endpoint->closed() && endpoint->peer_closed())
750       endpoints_.erase(endpoint->id());
751   }
752 
MarkPeerClosedAndMaybeRemove(Endpoint * endpoint)753   void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
754     lock_.AssertAcquired();
755     endpoint->set_peer_closed();
756     endpoint->SignalSyncMessageEvent();
757     if (endpoint->closed() && endpoint->peer_closed())
758       endpoints_.erase(endpoint->id());
759   }
760 
FindOrInsertEndpoint(mojo::InterfaceId id,bool * inserted)761   Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
762     lock_.AssertAcquired();
763     DCHECK(!inserted || !*inserted);
764 
765     Endpoint* endpoint = FindEndpoint(id);
766     if (!endpoint) {
767       endpoint = new Endpoint(this, id);
768       endpoints_.insert({id, endpoint});
769       if (inserted)
770         *inserted = true;
771     }
772     return endpoint;
773   }
774 
FindEndpoint(mojo::InterfaceId id)775   Endpoint* FindEndpoint(mojo::InterfaceId id) {
776     lock_.AssertAcquired();
777     auto iter = endpoints_.find(id);
778     return iter != endpoints_.end() ? iter->second.get() : nullptr;
779   }
780 
781   // mojo::MessageReceiver:
Accept(mojo::Message * message)782   bool Accept(mojo::Message* message) override {
783     DCHECK(thread_checker_.CalledOnValidThread());
784 
785     if (!message->DeserializeAssociatedEndpointHandles(this))
786       return false;
787 
788     if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
789       return control_message_handler_.Accept(message);
790 
791     mojo::InterfaceId id = message->interface_id();
792     DCHECK(mojo::IsValidInterfaceId(id));
793 
794     base::AutoLock locker(lock_);
795     Endpoint* endpoint = FindEndpoint(id);
796     if (!endpoint)
797       return true;
798 
799     mojo::InterfaceEndpointClient* client = endpoint->client();
800     if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) {
801       // No client has been bound yet or the client runs tasks on another
802       // thread. We assume the other thread must always be the one on which
803       // |proxy_task_runner_| runs tasks, since that's the only valid scenario.
804       //
805       // If the client is not yet bound, it must be bound by the time this task
806       // runs or else it's programmer error.
807       DCHECK(proxy_task_runner_);
808 
809       if (message->has_flag(mojo::Message::kFlagIsSync)) {
810         MessageWrapper message_wrapper(this, std::move(*message));
811         // Sync messages may need to be handled by the endpoint if it's blocking
812         // on a sync reply. We pass ownership of the message to the endpoint's
813         // sync message queue. If the endpoint was blocking, it will dequeue the
814         // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
815         // call will dequeue the message and dispatch it.
816         uint32_t message_id =
817             endpoint->EnqueueSyncMessage(std::move(message_wrapper));
818         proxy_task_runner_->PostTask(
819             FROM_HERE,
820             base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage,
821                        this, id, message_id));
822         return true;
823       }
824 
825       proxy_task_runner_->PostTask(
826           FROM_HERE,
827           base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
828                      this, base::Passed(message)));
829       return true;
830     }
831 
832     // We do not expect to receive sync responses on the master endpoint thread.
833     // If it's happening, it's a bug.
834     DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) ||
835            !message->has_flag(mojo::Message::kFlagIsResponse));
836 
837     base::AutoUnlock unlocker(lock_);
838     return client->HandleIncomingMessage(message);
839   }
840 
AcceptOnProxyThread(mojo::Message message)841   void AcceptOnProxyThread(mojo::Message message) {
842     DCHECK(proxy_task_runner_->BelongsToCurrentThread());
843 
844     mojo::InterfaceId id = message.interface_id();
845     DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
846 
847     base::AutoLock locker(lock_);
848     Endpoint* endpoint = FindEndpoint(id);
849     if (!endpoint)
850       return;
851 
852     mojo::InterfaceEndpointClient* client = endpoint->client();
853     if (!client)
854       return;
855 
856     DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
857 
858     // Sync messages should never make their way to this method.
859     DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
860 
861     bool result = false;
862     {
863       base::AutoUnlock unlocker(lock_);
864       result = client->HandleIncomingMessage(&message);
865     }
866 
867     if (!result)
868       RaiseError();
869   }
870 
AcceptSyncMessage(mojo::InterfaceId interface_id,uint32_t message_id)871   void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) {
872     DCHECK(proxy_task_runner_->BelongsToCurrentThread());
873 
874     base::AutoLock locker(lock_);
875     Endpoint* endpoint = FindEndpoint(interface_id);
876     if (!endpoint)
877       return;
878 
879     // Careful, if the endpoint is detached its members are cleared. Check for
880     // that before dereferencing.
881     mojo::InterfaceEndpointClient* client = endpoint->client();
882     if (!client)
883       return;
884 
885     DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
886     MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
887 
888     // The message must have already been dequeued by the endpoint waking up
889     // from a sync wait. Nothing to do.
890     if (message_wrapper.value().IsNull())
891       return;
892 
893     bool result = false;
894     {
895       base::AutoUnlock unlocker(lock_);
896       result = client->HandleIncomingMessage(&message_wrapper.value());
897     }
898 
899     if (!result)
900       RaiseError();
901   }
902 
903   // mojo::PipeControlMessageHandlerDelegate:
OnPeerAssociatedEndpointClosed(mojo::InterfaceId id,const base::Optional<mojo::DisconnectReason> & reason)904   bool OnPeerAssociatedEndpointClosed(
905       mojo::InterfaceId id,
906       const base::Optional<mojo::DisconnectReason>& reason) override {
907     DCHECK(thread_checker_.CalledOnValidThread());
908 
909     scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
910     base::AutoLock locker(lock_);
911     scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr);
912     if (reason)
913       endpoint->set_disconnect_reason(reason);
914     if (!endpoint->peer_closed()) {
915       if (endpoint->client())
916         NotifyEndpointOfError(endpoint.get(), false /* force_async */);
917       MarkPeerClosedAndMaybeRemove(endpoint.get());
918     }
919 
920     return true;
921   }
922 
923   // Checked in places which must be run on the master endpoint's thread.
924   base::ThreadChecker thread_checker_;
925 
926   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
927 
928   scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
929   const bool set_interface_id_namespace_bit_;
930   bool paused_ = false;
931   std::unique_ptr<mojo::Connector> connector_;
932   mojo::FilterChain filters_;
933   mojo::PipeControlMessageHandler control_message_handler_;
934   ControlMessageProxyThunk control_message_proxy_thunk_;
935 
936   // NOTE: It is unsafe to call into this object while holding |lock_|.
937   mojo::PipeControlMessageProxy control_message_proxy_;
938 
939   // Guards access to |outgoing_messages_| only. Used to support memory dumps
940   // which may be triggered from any thread.
941   base::Lock outgoing_messages_lock_;
942 
943   // Outgoing messages that were sent before this controller was bound to a
944   // real message pipe.
945   std::vector<mojo::Message> outgoing_messages_;
946 
947   // Guards the fields below for thread-safe access.
948   base::Lock lock_;
949 
950   bool encountered_error_ = false;
951   bool shut_down_ = false;
952 
953   // ID #1 is reserved for the mojom::Channel interface.
954   uint32_t next_interface_id_ = 2;
955 
956   std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_;
957 
958   DISALLOW_COPY_AND_ASSIGN(ChannelAssociatedGroupController);
959 };
960 
OnMemoryDump(const base::trace_event::MemoryDumpArgs & args,base::trace_event::ProcessMemoryDump * pmd)961 bool ControllerMemoryDumpProvider::OnMemoryDump(
962     const base::trace_event::MemoryDumpArgs& args,
963     base::trace_event::ProcessMemoryDump* pmd) {
964   base::AutoLock lock(lock_);
965   for (auto* controller : controllers_) {
966     base::trace_event::MemoryAllocatorDump* dump = pmd->CreateAllocatorDump(
967         base::StringPrintf("mojo/queued_ipc_channel_message/0x%" PRIxPTR,
968                            reinterpret_cast<uintptr_t>(controller)));
969     dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount,
970                     base::trace_event::MemoryAllocatorDump::kUnitsObjects,
971                     controller->GetQueuedMessageCount());
972   }
973 
974   return true;
975 }
976 
977 class MojoBootstrapImpl : public MojoBootstrap {
978  public:
MojoBootstrapImpl(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<ChannelAssociatedGroupController> controller)979   MojoBootstrapImpl(
980       mojo::ScopedMessagePipeHandle handle,
981       const scoped_refptr<ChannelAssociatedGroupController> controller)
982       : controller_(controller),
983         associated_group_(controller),
984         handle_(std::move(handle)) {}
985 
~MojoBootstrapImpl()986   ~MojoBootstrapImpl() override {
987     controller_->ShutDown();
988   }
989 
990  private:
Connect(mojom::ChannelAssociatedPtr * sender,mojom::ChannelAssociatedRequest * receiver)991   void Connect(mojom::ChannelAssociatedPtr* sender,
992                mojom::ChannelAssociatedRequest* receiver) override {
993     controller_->Bind(std::move(handle_));
994     controller_->CreateChannelEndpoints(sender, receiver);
995   }
996 
Pause()997   void Pause() override {
998     controller_->Pause();
999   }
1000 
Unpause()1001   void Unpause() override {
1002     controller_->Unpause();
1003   }
1004 
Flush()1005   void Flush() override {
1006     controller_->FlushOutgoingMessages();
1007   }
1008 
GetAssociatedGroup()1009   mojo::AssociatedGroup* GetAssociatedGroup() override {
1010     return &associated_group_;
1011   }
1012 
1013   scoped_refptr<ChannelAssociatedGroupController> controller_;
1014   mojo::AssociatedGroup associated_group_;
1015 
1016   mojo::ScopedMessagePipeHandle handle_;
1017 
1018   DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl);
1019 };
1020 
1021 }  // namespace
1022 
1023 // static
Create(mojo::ScopedMessagePipeHandle handle,Channel::Mode mode,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)1024 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
1025     mojo::ScopedMessagePipeHandle handle,
1026     Channel::Mode mode,
1027     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
1028     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
1029   return std::make_unique<MojoBootstrapImpl>(
1030       std::move(handle),
1031       new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER,
1032                                            ipc_task_runner, proxy_task_runner));
1033 }
1034 
1035 }  // namespace IPC
1036