• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_mojo_bootstrap.h"
6 
7 #include <inttypes.h>
8 #include <stdint.h>
9 
10 #include <map>
11 #include <memory>
12 #include <set>
13 #include <utility>
14 #include <vector>
15 
16 #include <optional>
17 #include "base/check_op.h"
18 #include "base/containers/circular_deque.h"
19 #include "base/containers/contains.h"
20 #include "base/functional/bind.h"
21 #include "base/functional/callback.h"
22 #include "base/memory/ptr_util.h"
23 #include "base/memory/raw_ptr.h"
24 #include "base/no_destructor.h"
25 #include "base/ranges/algorithm.h"
26 #include "base/strings/stringprintf.h"
27 #include "base/synchronization/lock.h"
28 #include "base/synchronization/waitable_event.h"
29 #include "base/task/common/task_annotator.h"
30 #include "base/task/sequenced_task_runner.h"
31 #include "base/task/single_thread_task_runner.h"
32 #include "base/threading/thread_checker.h"
33 #include "base/trace_event/memory_allocator_dump.h"
34 #include "base/trace_event/memory_dump_manager.h"
35 #include "base/trace_event/memory_dump_provider.h"
36 #include "base/trace_event/typed_macros.h"
37 #include "ipc/ipc_channel.h"
38 #include "ipc/urgent_message_observer.h"
39 #include "mojo/public/cpp/bindings/associated_group.h"
40 #include "mojo/public/cpp/bindings/associated_group_controller.h"
41 #include "mojo/public/cpp/bindings/connector.h"
42 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
43 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
44 #include "mojo/public/cpp/bindings/interface_id.h"
45 #include "mojo/public/cpp/bindings/message.h"
46 #include "mojo/public/cpp/bindings/message_header_validator.h"
47 #include "mojo/public/cpp/bindings/mojo_buildflags.h"
48 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
49 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
50 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
51 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
52 #include "mojo/public/cpp/bindings/tracing_helpers.h"
53 #include "third_party/abseil-cpp/absl/base/attributes.h"
54 
55 namespace IPC {
56 
57 namespace {
58 
59 class ChannelAssociatedGroupController;
60 
61 ABSL_CONST_INIT thread_local bool off_sequence_binding_allowed = false;
62 
63 // Used to track some internal Channel state in pursuit of message leaks.
64 //
65 // TODO(https://crbug.com/813045): Remove this.
66 class ControllerMemoryDumpProvider
67     : public base::trace_event::MemoryDumpProvider {
68  public:
ControllerMemoryDumpProvider()69   ControllerMemoryDumpProvider() {
70     base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider(
71         this, "IPCChannel", nullptr);
72   }
73 
74   ControllerMemoryDumpProvider(const ControllerMemoryDumpProvider&) = delete;
75   ControllerMemoryDumpProvider& operator=(const ControllerMemoryDumpProvider&) =
76       delete;
77 
~ControllerMemoryDumpProvider()78   ~ControllerMemoryDumpProvider() override {
79     base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider(
80         this);
81   }
82 
AddController(ChannelAssociatedGroupController * controller)83   void AddController(ChannelAssociatedGroupController* controller) {
84     base::AutoLock lock(lock_);
85     controllers_.insert(controller);
86   }
87 
RemoveController(ChannelAssociatedGroupController * controller)88   void RemoveController(ChannelAssociatedGroupController* controller) {
89     base::AutoLock lock(lock_);
90     controllers_.erase(controller);
91   }
92 
93   // base::trace_event::MemoryDumpProvider:
94   bool OnMemoryDump(const base::trace_event::MemoryDumpArgs& args,
95                     base::trace_event::ProcessMemoryDump* pmd) override;
96 
97  private:
98   base::Lock lock_;
99   std::set<ChannelAssociatedGroupController*> controllers_;
100 };
101 
GetMemoryDumpProvider()102 ControllerMemoryDumpProvider& GetMemoryDumpProvider() {
103   static base::NoDestructor<ControllerMemoryDumpProvider> provider;
104   return *provider;
105 }
106 
107 // Messages are grouped by this info when recording memory metrics.
108 struct MessageMemoryDumpInfo {
MessageMemoryDumpInfoIPC::__anone6f62fab0111::MessageMemoryDumpInfo109   MessageMemoryDumpInfo(const mojo::Message& message)
110       : id(message.name()), profiler_tag(message.heap_profiler_tag()) {}
111   MessageMemoryDumpInfo() = default;
112 
operator ==IPC::__anone6f62fab0111::MessageMemoryDumpInfo113   bool operator==(const MessageMemoryDumpInfo& other) const {
114     return other.id == id && other.profiler_tag == profiler_tag;
115   }
116 
117   uint32_t id = 0;
118   const char* profiler_tag = nullptr;
119 };
120 
121 struct MessageMemoryDumpInfoHash {
operator ()IPC::__anone6f62fab0111::MessageMemoryDumpInfoHash122   size_t operator()(const MessageMemoryDumpInfo& info) const {
123     return base::HashInts(
124         info.id, info.profiler_tag ? base::FastHash(info.profiler_tag) : 0);
125   }
126 };
127 
128 class ScopedUrgentMessageNotification {
129  public:
ScopedUrgentMessageNotification(UrgentMessageObserver * observer=nullptr)130   explicit ScopedUrgentMessageNotification(
131       UrgentMessageObserver* observer = nullptr)
132       : observer_(observer) {
133     if (observer_) {
134       observer_->OnUrgentMessageReceived();
135     }
136   }
137 
~ScopedUrgentMessageNotification()138   ~ScopedUrgentMessageNotification() {
139     if (observer_) {
140       observer_->OnUrgentMessageProcessed();
141     }
142   }
143 
ScopedUrgentMessageNotification(ScopedUrgentMessageNotification && other)144   ScopedUrgentMessageNotification(ScopedUrgentMessageNotification&& other)
145       : observer_(std::exchange(other.observer_, nullptr)) {}
146 
operator =(ScopedUrgentMessageNotification && other)147   ScopedUrgentMessageNotification& operator=(
148       ScopedUrgentMessageNotification&& other) {
149     observer_ = std::exchange(other.observer_, nullptr);
150     return *this;
151   }
152 
153  private:
154   raw_ptr<UrgentMessageObserver> observer_;
155 };
156 
157 class ChannelAssociatedGroupController
158     : public mojo::AssociatedGroupController,
159       public mojo::MessageReceiver,
160       public mojo::PipeControlMessageHandlerDelegate {
161  public:
ChannelAssociatedGroupController(bool set_interface_id_namespace_bit,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)162   ChannelAssociatedGroupController(
163       bool set_interface_id_namespace_bit,
164       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
165       const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
166       : task_runner_(ipc_task_runner),
167         proxy_task_runner_(proxy_task_runner),
168         set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
169         dispatcher_(this),
170         control_message_handler_(this),
171         control_message_proxy_thunk_(this),
172         control_message_proxy_(&control_message_proxy_thunk_) {
173     thread_checker_.DetachFromThread();
174     control_message_handler_.SetDescription(
175         "IPC::mojom::Bootstrap [primary] PipeControlMessageHandler");
176     dispatcher_.SetValidator(std::make_unique<mojo::MessageHeaderValidator>(
177         "IPC::mojom::Bootstrap [primary] MessageHeaderValidator"));
178 
179     GetMemoryDumpProvider().AddController(this);
180   }
181 
182   ChannelAssociatedGroupController(const ChannelAssociatedGroupController&) =
183       delete;
184   ChannelAssociatedGroupController& operator=(
185       const ChannelAssociatedGroupController&) = delete;
186 
GetQueuedMessageCount()187   size_t GetQueuedMessageCount() {
188     base::AutoLock lock(outgoing_messages_lock_);
189     return outgoing_messages_.size();
190   }
191 
GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo * info,size_t * count)192   void GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo* info,
193                                          size_t* count) {
194     std::unordered_map<MessageMemoryDumpInfo, size_t, MessageMemoryDumpInfoHash>
195         counts;
196     std::pair<MessageMemoryDumpInfo, size_t> top_message_info_and_count = {
197         MessageMemoryDumpInfo(), 0};
198     base::AutoLock lock(outgoing_messages_lock_);
199     for (const auto& message : outgoing_messages_) {
200       auto it_and_inserted = counts.emplace(MessageMemoryDumpInfo(message), 0);
201       it_and_inserted.first->second++;
202       if (it_and_inserted.first->second > top_message_info_and_count.second)
203         top_message_info_and_count = *it_and_inserted.first;
204     }
205     *info = top_message_info_and_count.first;
206     *count = top_message_info_and_count.second;
207   }
208 
Pause()209   void Pause() {
210     DCHECK(!paused_);
211     paused_ = true;
212   }
213 
Unpause()214   void Unpause() {
215     DCHECK(paused_);
216     paused_ = false;
217   }
218 
FlushOutgoingMessages()219   void FlushOutgoingMessages() {
220     std::vector<mojo::Message> outgoing_messages;
221     {
222       base::AutoLock lock(outgoing_messages_lock_);
223       std::swap(outgoing_messages, outgoing_messages_);
224     }
225 
226     for (auto& message : outgoing_messages)
227       SendMessage(&message);
228   }
229 
Bind(mojo::ScopedMessagePipeHandle handle,mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)230   void Bind(mojo::ScopedMessagePipeHandle handle,
231             mojo::PendingAssociatedRemote<mojom::Channel>* sender,
232             mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) {
233     connector_ = std::make_unique<mojo::Connector>(
234         std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
235         "IPC Channel");
236     connector_->set_incoming_receiver(&dispatcher_);
237     connector_->set_connection_error_handler(
238         base::BindOnce(&ChannelAssociatedGroupController::OnPipeError,
239                        base::Unretained(this)));
240     connector_->set_enforce_errors_from_incoming_receiver(false);
241 
242     // Don't let the Connector do any sort of queuing on our behalf. Individual
243     // messages bound for the IPC::ChannelProxy thread (i.e. that vast majority
244     // of messages received by this Connector) are already individually
245     // scheduled for dispatch by ChannelProxy, so Connector's normal mode of
246     // operation would only introduce a redundant scheduling step for most
247     // messages.
248     connector_->set_force_immediate_dispatch(true);
249 
250     mojo::InterfaceId sender_id, receiver_id;
251     if (set_interface_id_namespace_bit_) {
252       sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
253       receiver_id = 1;
254     } else {
255       sender_id = 1;
256       receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
257     }
258 
259     {
260       base::AutoLock locker(lock_);
261       Endpoint* sender_endpoint = new Endpoint(this, sender_id);
262       Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
263       endpoints_.insert({ sender_id, sender_endpoint });
264       endpoints_.insert({ receiver_id, receiver_endpoint });
265       sender_endpoint->set_handle_created();
266       receiver_endpoint->set_handle_created();
267     }
268 
269     mojo::ScopedInterfaceEndpointHandle sender_handle =
270         CreateScopedInterfaceEndpointHandle(sender_id);
271     mojo::ScopedInterfaceEndpointHandle receiver_handle =
272         CreateScopedInterfaceEndpointHandle(receiver_id);
273 
274     *sender = mojo::PendingAssociatedRemote<mojom::Channel>(
275         std::move(sender_handle), 0);
276     *receiver = mojo::PendingAssociatedReceiver<mojom::Channel>(
277         std::move(receiver_handle));
278   }
279 
StartReceiving()280   void StartReceiving() { connector_->StartReceiving(task_runner_); }
281 
ShutDown()282   void ShutDown() {
283     DCHECK(thread_checker_.CalledOnValidThread());
284     shut_down_ = true;
285     if (connector_)
286       connector_->CloseMessagePipe();
287     OnPipeError();
288     connector_.reset();
289 
290     base::AutoLock lock(outgoing_messages_lock_);
291     outgoing_messages_.clear();
292   }
293 
294   // mojo::AssociatedGroupController:
AssociateInterface(mojo::ScopedInterfaceEndpointHandle handle_to_send)295   mojo::InterfaceId AssociateInterface(
296       mojo::ScopedInterfaceEndpointHandle handle_to_send) override {
297     if (!handle_to_send.pending_association())
298       return mojo::kInvalidInterfaceId;
299 
300     uint32_t id = 0;
301     {
302       base::AutoLock locker(lock_);
303       do {
304         if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask)
305           next_interface_id_ = 2;
306         id = next_interface_id_++;
307         if (set_interface_id_namespace_bit_)
308           id |= mojo::kInterfaceIdNamespaceMask;
309       } while (base::Contains(endpoints_, id));
310 
311       Endpoint* endpoint = new Endpoint(this, id);
312       if (encountered_error_)
313         endpoint->set_peer_closed();
314       endpoint->set_handle_created();
315       endpoints_.insert({id, endpoint});
316     }
317 
318     if (!NotifyAssociation(&handle_to_send, id)) {
319       // The peer handle of |handle_to_send|, which is supposed to join this
320       // associated group, has been closed.
321       {
322         base::AutoLock locker(lock_);
323         Endpoint* endpoint = FindEndpoint(id);
324         if (endpoint)
325           MarkClosedAndMaybeRemove(endpoint);
326       }
327 
328       control_message_proxy_.NotifyPeerEndpointClosed(
329           id, handle_to_send.disconnect_reason());
330     }
331     return id;
332   }
333 
CreateLocalEndpointHandle(mojo::InterfaceId id)334   mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
335       mojo::InterfaceId id) override {
336     if (!mojo::IsValidInterfaceId(id))
337       return mojo::ScopedInterfaceEndpointHandle();
338 
339     // Unless it is the primary ID, |id| is from the remote side and therefore
340     // its namespace bit is supposed to be different than the value that this
341     // router would use.
342     if (!mojo::IsPrimaryInterfaceId(id) &&
343         set_interface_id_namespace_bit_ ==
344             mojo::HasInterfaceIdNamespaceBitSet(id)) {
345       return mojo::ScopedInterfaceEndpointHandle();
346     }
347 
348     base::AutoLock locker(lock_);
349     bool inserted = false;
350     Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
351     if (inserted) {
352       DCHECK(!endpoint->handle_created());
353       if (encountered_error_)
354         endpoint->set_peer_closed();
355     } else {
356       if (endpoint->handle_created())
357         return mojo::ScopedInterfaceEndpointHandle();
358     }
359 
360     endpoint->set_handle_created();
361     return CreateScopedInterfaceEndpointHandle(id);
362   }
363 
CloseEndpointHandle(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)364   void CloseEndpointHandle(
365       mojo::InterfaceId id,
366       const std::optional<mojo::DisconnectReason>& reason) override {
367     if (!mojo::IsValidInterfaceId(id))
368       return;
369     {
370       base::AutoLock locker(lock_);
371       DCHECK(base::Contains(endpoints_, id));
372       Endpoint* endpoint = endpoints_[id].get();
373       DCHECK(!endpoint->client());
374       DCHECK(!endpoint->closed());
375       MarkClosedAndMaybeRemove(endpoint);
376     }
377 
378     if (!mojo::IsPrimaryInterfaceId(id) || reason)
379       control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
380   }
381 
AttachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle,mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)382   mojo::InterfaceEndpointController* AttachEndpointClient(
383       const mojo::ScopedInterfaceEndpointHandle& handle,
384       mojo::InterfaceEndpointClient* client,
385       scoped_refptr<base::SequencedTaskRunner> runner) override {
386     const mojo::InterfaceId id = handle.id();
387 
388     DCHECK(mojo::IsValidInterfaceId(id));
389     DCHECK(client);
390 
391     base::AutoLock locker(lock_);
392     DCHECK(base::Contains(endpoints_, id));
393 
394     Endpoint* endpoint = endpoints_[id].get();
395     endpoint->AttachClient(client, std::move(runner));
396 
397     if (endpoint->peer_closed())
398       NotifyEndpointOfError(endpoint, true /* force_async */);
399 
400     return endpoint;
401   }
402 
DetachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle)403   void DetachEndpointClient(
404       const mojo::ScopedInterfaceEndpointHandle& handle) override {
405     const mojo::InterfaceId id = handle.id();
406 
407     DCHECK(mojo::IsValidInterfaceId(id));
408 
409     base::AutoLock locker(lock_);
410     DCHECK(base::Contains(endpoints_, id));
411 
412     Endpoint* endpoint = endpoints_[id].get();
413     endpoint->DetachClient();
414   }
415 
RaiseError()416   void RaiseError() override {
417     // We ignore errors on channel endpoints, leaving the pipe open. There are
418     // good reasons for this:
419     //
420     //   * We should never close a channel endpoint in either process as long as
421     //     the child process is still alive. The child's endpoint should only be
422     //     closed implicitly by process death, and the browser's endpoint should
423     //     only be closed after the child process is confirmed to be dead. Crash
424     //     reporting logic in Chrome relies on this behavior in order to do the
425     //     right thing.
426     //
427     //   * There are two interesting conditions under which RaiseError() can be
428     //     implicitly reached: an incoming message fails validation, or the
429     //     local endpoint drops a response callback without calling it.
430     //
431     //   * In the validation case, we also report the message as bad, and this
432     //     will imminently trigger the common bad-IPC path in the browser,
433     //     causing the browser to kill the offending renderer.
434     //
435     //   * In the dropped response callback case, the net result of ignoring the
436     //     issue is generally innocuous. While indicative of programmer error,
437     //     it's not a severe failure and is already covered by separate DCHECKs.
438     //
439     // See https://crbug.com/861607 for additional discussion.
440   }
441 
PrefersSerializedMessages()442   bool PrefersSerializedMessages() override { return true; }
443 
SetUrgentMessageObserver(UrgentMessageObserver * observer)444   void SetUrgentMessageObserver(UrgentMessageObserver* observer) {
445     urgent_message_observer_ = observer;
446   }
447 
448  private:
449   class Endpoint;
450   class ControlMessageProxyThunk;
451   friend class Endpoint;
452   friend class ControlMessageProxyThunk;
453 
454   // MessageWrapper objects are always destroyed under the controller's lock. On
455   // destruction, if the message it wrappers contains
456   // ScopedInterfaceEndpointHandles (which cannot be destructed under the
457   // controller's lock), the wrapper unlocks to clean them up.
458   class MessageWrapper {
459    public:
460     MessageWrapper() = default;
461 
MessageWrapper(ChannelAssociatedGroupController * controller,mojo::Message message)462     MessageWrapper(ChannelAssociatedGroupController* controller,
463                    mojo::Message message)
464         : controller_(controller), value_(std::move(message)) {}
465 
MessageWrapper(MessageWrapper && other)466     MessageWrapper(MessageWrapper&& other)
467         : controller_(other.controller_), value_(std::move(other.value_)) {}
468 
469     MessageWrapper(const MessageWrapper&) = delete;
470     MessageWrapper& operator=(const MessageWrapper&) = delete;
471 
~MessageWrapper()472     ~MessageWrapper() {
473       if (value_.associated_endpoint_handles()->empty())
474         return;
475 
476       controller_->lock_.AssertAcquired();
477       {
478         base::AutoUnlock unlocker(controller_->lock_);
479         value_.mutable_associated_endpoint_handles()->clear();
480       }
481     }
482 
operator =(MessageWrapper && other)483     MessageWrapper& operator=(MessageWrapper&& other) {
484       controller_ = other.controller_;
485       value_ = std::move(other.value_);
486       return *this;
487     }
488 
HasRequestId(uint64_t request_id)489     bool HasRequestId(uint64_t request_id) {
490       return !value_.IsNull() && value_.version() >= 1 &&
491              value_.header_v1()->request_id == request_id;
492     }
493 
value()494     mojo::Message& value() { return value_; }
495 
496    private:
497     raw_ptr<ChannelAssociatedGroupController> controller_ = nullptr;
498     mojo::Message value_;
499   };
500 
501   class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
502                    public mojo::InterfaceEndpointController {
503    public:
Endpoint(ChannelAssociatedGroupController * controller,mojo::InterfaceId id)504     Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
505         : controller_(controller), id_(id) {}
506 
507     Endpoint(const Endpoint&) = delete;
508     Endpoint& operator=(const Endpoint&) = delete;
509 
id() const510     mojo::InterfaceId id() const { return id_; }
511 
closed() const512     bool closed() const {
513       controller_->lock_.AssertAcquired();
514       return closed_;
515     }
516 
set_closed()517     void set_closed() {
518       controller_->lock_.AssertAcquired();
519       closed_ = true;
520     }
521 
peer_closed() const522     bool peer_closed() const {
523       controller_->lock_.AssertAcquired();
524       return peer_closed_;
525     }
526 
set_peer_closed()527     void set_peer_closed() {
528       controller_->lock_.AssertAcquired();
529       peer_closed_ = true;
530     }
531 
handle_created() const532     bool handle_created() const {
533       controller_->lock_.AssertAcquired();
534       return handle_created_;
535     }
536 
set_handle_created()537     void set_handle_created() {
538       controller_->lock_.AssertAcquired();
539       handle_created_ = true;
540     }
541 
disconnect_reason() const542     const std::optional<mojo::DisconnectReason>& disconnect_reason() const {
543       return disconnect_reason_;
544     }
545 
set_disconnect_reason(const std::optional<mojo::DisconnectReason> & disconnect_reason)546     void set_disconnect_reason(
547         const std::optional<mojo::DisconnectReason>& disconnect_reason) {
548       disconnect_reason_ = disconnect_reason;
549     }
550 
task_runner() const551     base::SequencedTaskRunner* task_runner() const {
552       return task_runner_.get();
553     }
554 
was_bound_off_sequence() const555     bool was_bound_off_sequence() const { return was_bound_off_sequence_; }
556 
client() const557     mojo::InterfaceEndpointClient* client() const {
558       controller_->lock_.AssertAcquired();
559       return client_;
560     }
561 
AttachClient(mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)562     void AttachClient(mojo::InterfaceEndpointClient* client,
563                       scoped_refptr<base::SequencedTaskRunner> runner) {
564       controller_->lock_.AssertAcquired();
565       DCHECK(!client_);
566       DCHECK(!closed_);
567 
568       task_runner_ = std::move(runner);
569       client_ = client;
570 
571       if (off_sequence_binding_allowed) {
572         was_bound_off_sequence_ = true;
573       }
574     }
575 
DetachClient()576     void DetachClient() {
577       controller_->lock_.AssertAcquired();
578       DCHECK(client_);
579       DCHECK(!closed_);
580 
581       task_runner_ = nullptr;
582       client_ = nullptr;
583       sync_watcher_.reset();
584     }
585 
EnqueueSyncMessage(MessageWrapper message)586     std::optional<uint32_t> EnqueueSyncMessage(MessageWrapper message) {
587       controller_->lock_.AssertAcquired();
588       if (exclusive_wait_ && exclusive_wait_->TryFulfillingWith(message)) {
589         exclusive_wait_ = nullptr;
590         return std::nullopt;
591       }
592 
593       uint32_t id = GenerateSyncMessageId();
594       sync_messages_.emplace_back(id, std::move(message));
595       SignalSyncMessageEvent();
596       return id;
597     }
598 
SignalSyncMessageEvent()599     void SignalSyncMessageEvent() {
600       controller_->lock_.AssertAcquired();
601 
602       if (sync_watcher_)
603         sync_watcher_->SignalEvent();
604     }
605 
PopSyncMessage(uint32_t id)606     MessageWrapper PopSyncMessage(uint32_t id) {
607       controller_->lock_.AssertAcquired();
608       if (sync_messages_.empty() || sync_messages_.front().first != id)
609         return MessageWrapper();
610       MessageWrapper message = std::move(sync_messages_.front().second);
611       sync_messages_.pop_front();
612       return message;
613     }
614 
615     // mojo::InterfaceEndpointController:
SendMessage(mojo::Message * message)616     bool SendMessage(mojo::Message* message) override {
617       DCHECK(task_runner_->RunsTasksInCurrentSequence());
618       message->set_interface_id(id_);
619       return controller_->SendMessage(message);
620     }
621 
AllowWokenUpBySyncWatchOnSameThread()622     void AllowWokenUpBySyncWatchOnSameThread() override {
623       DCHECK(task_runner_->RunsTasksInCurrentSequence());
624 
625       EnsureSyncWatcherExists();
626       sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
627     }
628 
SyncWatch(const bool & should_stop)629     bool SyncWatch(const bool& should_stop) override {
630       DCHECK(task_runner_->RunsTasksInCurrentSequence());
631 
632       // It's not legal to make sync calls from the primary endpoint's thread,
633       // and in fact they must only happen from the proxy task runner.
634       DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
635       DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
636 
637       EnsureSyncWatcherExists();
638       {
639         base::AutoLock locker(controller_->lock_);
640         if (peer_closed_) {
641           SignalSyncMessageEvent();
642         }
643       }
644       return sync_watcher_->SyncWatch(&should_stop);
645     }
646 
WaitForIncomingSyncReply(uint64_t request_id)647     MessageWrapper WaitForIncomingSyncReply(uint64_t request_id) {
648       std::optional<ExclusiveSyncWait> wait;
649       {
650         base::AutoLock lock(controller_->lock_);
651         for (auto& [id, message] : sync_messages_) {
652           if (message.HasRequestId(request_id)) {
653             return std::move(message);
654           }
655         }
656 
657         DCHECK(!exclusive_wait_);
658         wait.emplace(request_id);
659         exclusive_wait_ = &wait.value();
660       }
661 
662       wait->event.Wait();
663       return std::move(wait->message);
664     }
665 
SyncWatchExclusive(uint64_t request_id)666     bool SyncWatchExclusive(uint64_t request_id) override {
667       MessageWrapper message = WaitForIncomingSyncReply(request_id);
668       if (message.value().IsNull() || !client_) {
669         return false;
670       }
671 
672       if (!client_->HandleIncomingMessage(&message.value())) {
673         base::AutoLock locker(controller_->lock_);
674         controller_->RaiseError();
675         return false;
676       }
677 
678       return true;
679     }
680 
RegisterExternalSyncWaiter(uint64_t request_id)681     void RegisterExternalSyncWaiter(uint64_t request_id) override {}
682 
683    private:
684     friend class base::RefCountedThreadSafe<Endpoint>;
685 
~Endpoint()686     ~Endpoint() override {
687       controller_->lock_.AssertAcquired();
688       DCHECK(!client_);
689       DCHECK(closed_);
690       DCHECK(peer_closed_);
691       DCHECK(!sync_watcher_);
692       if (exclusive_wait_) {
693         exclusive_wait_->event.Signal();
694       }
695     }
696 
OnSyncMessageEventReady()697     void OnSyncMessageEventReady() {
698       DCHECK(task_runner_->RunsTasksInCurrentSequence());
699 
700       // SUBTLE: The order of these scoped_refptrs matters.
701       // `controller_keepalive` MUST outlive `keepalive` because the Endpoint
702       // holds raw pointer to the AssociatedGroupController.
703       scoped_refptr<AssociatedGroupController> controller_keepalive(
704           controller_.get());
705       scoped_refptr<Endpoint> keepalive(this);
706       base::AutoLock locker(controller_->lock_);
707       bool more_to_process = false;
708       if (!sync_messages_.empty()) {
709         MessageWrapper message_wrapper =
710             std::move(sync_messages_.front().second);
711         sync_messages_.pop_front();
712 
713         bool dispatch_succeeded;
714         mojo::InterfaceEndpointClient* client = client_;
715         {
716           base::AutoUnlock unlocker(controller_->lock_);
717           dispatch_succeeded =
718               client->HandleIncomingMessage(&message_wrapper.value());
719         }
720 
721         if (!sync_messages_.empty())
722           more_to_process = true;
723 
724         if (!dispatch_succeeded)
725           controller_->RaiseError();
726       }
727 
728       if (!more_to_process)
729         sync_watcher_->ResetEvent();
730 
731       // If there are no queued sync messages and the peer has closed, there
732       // there won't be incoming sync messages in the future. If any
733       // SyncWatch() calls are on the stack for this endpoint, resetting the
734       // watcher will allow them to exit as the stack undwinds.
735       if (!more_to_process && peer_closed_)
736         sync_watcher_.reset();
737     }
738 
EnsureSyncWatcherExists()739     void EnsureSyncWatcherExists() {
740       DCHECK(task_runner_->RunsTasksInCurrentSequence());
741       if (sync_watcher_)
742         return;
743 
744       base::AutoLock locker(controller_->lock_);
745       sync_watcher_ = std::make_unique<mojo::SequenceLocalSyncEventWatcher>(
746           base::BindRepeating(&Endpoint::OnSyncMessageEventReady,
747                               base::Unretained(this)));
748       if (peer_closed_ || !sync_messages_.empty())
749         SignalSyncMessageEvent();
750     }
751 
GenerateSyncMessageId()752     uint32_t GenerateSyncMessageId() {
753       // Overflow is fine.
754       uint32_t id = next_sync_message_id_++;
755       DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
756       return id;
757     }
758 
759     // Tracks the state of a pending sync wait which excludes all other incoming
760     // IPC on the waiting thread.
761     struct ExclusiveSyncWait {
ExclusiveSyncWaitIPC::__anone6f62fab0111::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait762       explicit ExclusiveSyncWait(uint64_t request_id)
763           : request_id(request_id) {}
764       ~ExclusiveSyncWait() = default;
765 
TryFulfillingWithIPC::__anone6f62fab0111::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait766       bool TryFulfillingWith(MessageWrapper& wrapper) {
767         if (!wrapper.HasRequestId(request_id)) {
768           return false;
769         }
770 
771         message = std::move(wrapper);
772         event.Signal();
773         return true;
774       }
775 
776       uint64_t request_id;
777       base::WaitableEvent event;
778       MessageWrapper message;
779     };
780 
781     const raw_ptr<ChannelAssociatedGroupController> controller_;
782     const mojo::InterfaceId id_;
783 
784     bool closed_ = false;
785     bool peer_closed_ = false;
786     bool handle_created_ = false;
787     bool was_bound_off_sequence_ = false;
788     std::optional<mojo::DisconnectReason> disconnect_reason_;
789     raw_ptr<mojo::InterfaceEndpointClient> client_ = nullptr;
790     scoped_refptr<base::SequencedTaskRunner> task_runner_;
791     std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_;
792     base::circular_deque<std::pair<uint32_t, MessageWrapper>> sync_messages_;
793     raw_ptr<ExclusiveSyncWait> exclusive_wait_ = nullptr;
794     uint32_t next_sync_message_id_ = 0;
795   };
796 
797   class ControlMessageProxyThunk : public MessageReceiver {
798    public:
ControlMessageProxyThunk(ChannelAssociatedGroupController * controller)799     explicit ControlMessageProxyThunk(
800         ChannelAssociatedGroupController* controller)
801         : controller_(controller) {}
802 
803     ControlMessageProxyThunk(const ControlMessageProxyThunk&) = delete;
804     ControlMessageProxyThunk& operator=(const ControlMessageProxyThunk&) =
805         delete;
806 
807    private:
808     // MessageReceiver:
Accept(mojo::Message * message)809     bool Accept(mojo::Message* message) override {
810       return controller_->SendMessage(message);
811     }
812 
813     raw_ptr<ChannelAssociatedGroupController> controller_;
814   };
815 
~ChannelAssociatedGroupController()816   ~ChannelAssociatedGroupController() override {
817     DCHECK(!connector_);
818 
819     base::AutoLock locker(lock_);
820     for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
821       Endpoint* endpoint = iter->second.get();
822       ++iter;
823 
824       if (!endpoint->closed()) {
825         // This happens when a NotifyPeerEndpointClosed message been received,
826         // but the interface ID hasn't been used to create local endpoint
827         // handle.
828         DCHECK(!endpoint->client());
829         DCHECK(endpoint->peer_closed());
830         MarkClosed(endpoint);
831       } else {
832         MarkPeerClosed(endpoint);
833       }
834     }
835     endpoints_.clear();
836 
837     GetMemoryDumpProvider().RemoveController(this);
838   }
839 
SendMessage(mojo::Message * message)840   bool SendMessage(mojo::Message* message) {
841     DCHECK(message->heap_profiler_tag());
842     if (task_runner_->BelongsToCurrentThread()) {
843       DCHECK(thread_checker_.CalledOnValidThread());
844       if (!connector_ || paused_) {
845         if (!shut_down_) {
846           base::AutoLock lock(outgoing_messages_lock_);
847           outgoing_messages_.emplace_back(std::move(*message));
848         }
849         return true;
850       }
851       return connector_->Accept(message);
852     } else {
853       // We always post tasks to the primary endpoint thread when called from
854       // other threads in order to simulate IPC::ChannelProxy::Send behavior.
855       task_runner_->PostTask(
856           FROM_HERE,
857           base::BindOnce(
858               &ChannelAssociatedGroupController::SendMessageOnPrimaryThread,
859               this, std::move(*message)));
860       return true;
861     }
862   }
863 
SendMessageOnPrimaryThread(mojo::Message message)864   void SendMessageOnPrimaryThread(mojo::Message message) {
865     DCHECK(thread_checker_.CalledOnValidThread());
866     if (!SendMessage(&message))
867       RaiseError();
868   }
869 
OnPipeError()870   void OnPipeError() {
871     DCHECK(thread_checker_.CalledOnValidThread());
872 
873     // We keep |this| alive here because it's possible for the notifications
874     // below to release all other references.
875     scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
876 
877     base::AutoLock locker(lock_);
878     encountered_error_ = true;
879 
880     std::vector<uint32_t> endpoints_to_remove;
881     std::vector<scoped_refptr<Endpoint>> endpoints_to_notify;
882     for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
883       Endpoint* endpoint = iter->second.get();
884       ++iter;
885 
886       if (endpoint->client()) {
887         endpoints_to_notify.push_back(endpoint);
888       }
889 
890       if (MarkPeerClosed(endpoint)) {
891         endpoints_to_remove.push_back(endpoint->id());
892       }
893     }
894 
895     for (auto& endpoint : endpoints_to_notify) {
896       // Because a notification may in turn detach any endpoint, we have to
897       // check each client again here.
898       if (endpoint->client())
899         NotifyEndpointOfError(endpoint.get(), false /* force_async */);
900     }
901 
902     for (uint32_t id : endpoints_to_remove) {
903       endpoints_.erase(id);
904     }
905   }
906 
NotifyEndpointOfError(Endpoint * endpoint,bool force_async)907   void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) {
908     lock_.AssertAcquired();
909     DCHECK(endpoint->task_runner() && endpoint->client());
910     if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
911       mojo::InterfaceEndpointClient* client = endpoint->client();
912       std::optional<mojo::DisconnectReason> reason(
913           endpoint->disconnect_reason());
914 
915       base::AutoUnlock unlocker(lock_);
916       client->NotifyError(reason);
917     } else {
918       endpoint->task_runner()->PostTask(
919           FROM_HERE,
920           base::BindOnce(&ChannelAssociatedGroupController::
921                              NotifyEndpointOfErrorOnEndpointThread,
922                          this, endpoint->id(),
923                          // This is safe as `endpoint` is verified to be in
924                          // `endpoints_` (a map with ownership) before use.
925                          base::UnsafeDangling(endpoint)));
926     }
927   }
928 
929   // `endpoint` might be a dangling ptr and must be checked before dereference.
NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,MayBeDangling<Endpoint> endpoint)930   void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
931                                              MayBeDangling<Endpoint> endpoint) {
932     base::AutoLock locker(lock_);
933     auto iter = endpoints_.find(id);
934     if (iter == endpoints_.end() || iter->second.get() != endpoint)
935       return;
936     if (!endpoint->client())
937       return;
938 
939     DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
940     NotifyEndpointOfError(endpoint, false /* force_async */);
941   }
942 
943   // Marks `endpoint` as closed and returns true if and only if its peer was
944   // also already closed.
MarkClosed(Endpoint * endpoint)945   bool MarkClosed(Endpoint* endpoint) {
946     lock_.AssertAcquired();
947     endpoint->set_closed();
948     return endpoint->peer_closed();
949   }
950 
951   // Marks `endpoint` as having a closed peer and returns true if and only if
952   // `endpoint` itself was also already closed.
MarkPeerClosed(Endpoint * endpoint)953   bool MarkPeerClosed(Endpoint* endpoint) {
954     lock_.AssertAcquired();
955     endpoint->set_peer_closed();
956     endpoint->SignalSyncMessageEvent();
957     return endpoint->closed();
958   }
959 
MarkClosedAndMaybeRemove(Endpoint * endpoint)960   void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
961     if (MarkClosed(endpoint)) {
962       endpoints_.erase(endpoint->id());
963     }
964   }
965 
MarkPeerClosedAndMaybeRemove(Endpoint * endpoint)966   void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
967     if (MarkPeerClosed(endpoint)) {
968       endpoints_.erase(endpoint->id());
969     }
970   }
971 
FindOrInsertEndpoint(mojo::InterfaceId id,bool * inserted)972   Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
973     lock_.AssertAcquired();
974     DCHECK(!inserted || !*inserted);
975 
976     Endpoint* endpoint = FindEndpoint(id);
977     if (!endpoint) {
978       endpoint = new Endpoint(this, id);
979       endpoints_.insert({id, endpoint});
980       if (inserted)
981         *inserted = true;
982     }
983     return endpoint;
984   }
985 
FindEndpoint(mojo::InterfaceId id)986   Endpoint* FindEndpoint(mojo::InterfaceId id) {
987     lock_.AssertAcquired();
988     auto iter = endpoints_.find(id);
989     return iter != endpoints_.end() ? iter->second.get() : nullptr;
990   }
991 
992   // mojo::MessageReceiver:
Accept(mojo::Message * message)993   bool Accept(mojo::Message* message) override {
994     DCHECK(thread_checker_.CalledOnValidThread());
995 
996     if (!message->DeserializeAssociatedEndpointHandles(this))
997       return false;
998 
999     if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
1000       return control_message_handler_.Accept(message);
1001 
1002     mojo::InterfaceId id = message->interface_id();
1003     if (!mojo::IsValidInterfaceId(id))
1004       return false;
1005 
1006     base::ReleasableAutoLock locker(&lock_);
1007     Endpoint* endpoint = FindEndpoint(id);
1008     if (!endpoint)
1009       return true;
1010 
1011     mojo::InterfaceEndpointClient* client = endpoint->client();
1012     if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) {
1013       // The ChannelProxy for this channel is bound to `proxy_task_runner_` and
1014       // by default legacy IPCs must dispatch to either the IO thread or the
1015       // proxy task runner. We generally impose the same constraint on
1016       // associated interface endpoints so that FIFO can be guaranteed across
1017       // all interfaces without stalling any of them to wait for a pending
1018       // endpoint to be bound.
1019       //
1020       // This allows us to assume that if an endpoint is not yet bound when we
1021       // receive a message targeting it, it *will* be bound on the proxy task
1022       // runner by the time a newly posted task runs there. Hence we simply post
1023       // a hopeful dispatch task to that task runner.
1024       //
1025       // As it turns out, there are even some instances of endpoints binding to
1026       // alternative (non-IO-thread, non-proxy) task runners, but still
1027       // ultimately relying on the fact that we schedule their messages on the
1028       // proxy task runner. So even if the endpoint is already bound, we
1029       // default to scheduling it on the proxy task runner as long as it's not
1030       // bound specifically to the IO task runner.
1031       // TODO(rockot): Try to sort out these cases and maybe eliminate them.
1032       //
1033       // Finally, it's also possible that an endpoint was bound to an
1034       // alternative task runner and it really does want its messages to
1035       // dispatch there. In that case `was_bound_off_sequence()` will be true to
1036       // signal that we should really use that task runner.
1037       const scoped_refptr<base::SequencedTaskRunner> task_runner =
1038           client && endpoint->was_bound_off_sequence()
1039               ? endpoint->task_runner()
1040               : proxy_task_runner_.get();
1041 
1042       ScopedUrgentMessageNotification scoped_urgent_message_notification(
1043           message->has_flag(mojo::Message::kFlagIsUrgent)
1044               ? urgent_message_observer_
1045               : nullptr);
1046 
1047       if (message->has_flag(mojo::Message::kFlagIsSync)) {
1048         MessageWrapper message_wrapper(this, std::move(*message));
1049         // Sync messages may need to be handled by the endpoint if it's blocking
1050         // on a sync reply. We pass ownership of the message to the endpoint's
1051         // sync message queue. If the endpoint was blocking, it will dequeue the
1052         // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
1053         // call will dequeue the message and dispatch it.
1054         std::optional<uint32_t> message_id =
1055             endpoint->EnqueueSyncMessage(std::move(message_wrapper));
1056         if (message_id) {
1057           task_runner->PostTask(
1058               FROM_HERE,
1059               base::BindOnce(
1060                   &ChannelAssociatedGroupController::AcceptSyncMessage, this,
1061                   id, *message_id,
1062                   std::move(scoped_urgent_message_notification)));
1063         }
1064         return true;
1065       }
1066 
1067       // If |task_runner| has been torn down already, this PostTask will fail
1068       // and destroy |message|. That operation may need to in turn destroy
1069       // in-transit associated endpoints and thus acquire |lock_|. We no longer
1070       // need the lock to be held now, so we can release it before the PostTask.
1071       {
1072         // Grab interface name from |client| before releasing the lock to ensure
1073         // that |client| is safe to access.
1074         base::TaskAnnotator::ScopedSetIpcHash scoped_set_ipc_hash(
1075             client ? client->interface_name() : "unknown interface");
1076         locker.Release();
1077         task_runner->PostTask(
1078             FROM_HERE,
1079             base::BindOnce(
1080                 &ChannelAssociatedGroupController::AcceptOnEndpointThread, this,
1081                 std::move(*message),
1082                 std::move(scoped_urgent_message_notification)));
1083       }
1084       return true;
1085     }
1086 
1087     locker.Release();
1088     // It's safe to access |client| here without holding a lock, because this
1089     // code runs on a proxy thread and |client| can't be destroyed from any
1090     // thread.
1091     return client->HandleIncomingMessage(message);
1092   }
1093 
AcceptOnEndpointThread(mojo::Message message,ScopedUrgentMessageNotification scoped_urgent_message_notification)1094   void AcceptOnEndpointThread(
1095       mojo::Message message,
1096       ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1097     TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1098                  "ChannelAssociatedGroupController::AcceptOnEndpointThread");
1099 
1100     mojo::InterfaceId id = message.interface_id();
1101     DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsPrimaryInterfaceId(id));
1102 
1103     base::AutoLock locker(lock_);
1104     Endpoint* endpoint = FindEndpoint(id);
1105     if (!endpoint)
1106       return;
1107 
1108     mojo::InterfaceEndpointClient* client = endpoint->client();
1109     if (!client)
1110       return;
1111 
1112     if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1113         !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1114       return;
1115     }
1116 
1117     // TODO(altimin): This event is temporarily kept as a debug fallback. Remove
1118     // it once the new implementation proves to be stable.
1119     TRACE_EVENT(
1120         TRACE_DISABLED_BY_DEFAULT("mojom"),
1121         // Using client->interface_name() is safe here because this is a static
1122         // string defined for each mojo interface.
1123         perfetto::StaticString(client->interface_name()),
1124         [&](perfetto::EventContext& ctx) {
1125           static const uint8_t* toplevel_flow_enabled =
1126               TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("toplevel.flow");
1127           if (!*toplevel_flow_enabled)
1128             return;
1129 
1130           perfetto::Flow::Global(message.GetTraceId())(ctx);
1131         });
1132 
1133     // Sync messages should never make their way to this method.
1134     DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
1135 
1136     bool result = false;
1137     {
1138       base::AutoUnlock unlocker(lock_);
1139       result = client->HandleIncomingMessage(&message);
1140     }
1141 
1142     if (!result)
1143       RaiseError();
1144   }
1145 
AcceptSyncMessage(mojo::InterfaceId interface_id,uint32_t message_id,ScopedUrgentMessageNotification scoped_urgent_message_notification)1146   void AcceptSyncMessage(
1147       mojo::InterfaceId interface_id,
1148       uint32_t message_id,
1149       ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1150     TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1151                  "ChannelAssociatedGroupController::AcceptSyncMessage");
1152 
1153     base::AutoLock locker(lock_);
1154     Endpoint* endpoint = FindEndpoint(interface_id);
1155     if (!endpoint)
1156       return;
1157 
1158     // Careful, if the endpoint is detached its members are cleared. Check for
1159     // that before dereferencing.
1160     mojo::InterfaceEndpointClient* client = endpoint->client();
1161     if (!client)
1162       return;
1163 
1164     if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1165         !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1166       return;
1167     }
1168 
1169     // Using client->interface_name() is safe here because this is a static
1170     // string defined for each mojo interface.
1171     TRACE_EVENT0("mojom", client->interface_name());
1172     MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
1173 
1174     // The message must have already been dequeued by the endpoint waking up
1175     // from a sync wait. Nothing to do.
1176     if (message_wrapper.value().IsNull())
1177       return;
1178 
1179     bool result = false;
1180     {
1181       base::AutoUnlock unlocker(lock_);
1182       result = client->HandleIncomingMessage(&message_wrapper.value());
1183     }
1184 
1185     if (!result)
1186       RaiseError();
1187   }
1188 
1189   // mojo::PipeControlMessageHandlerDelegate:
OnPeerAssociatedEndpointClosed(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)1190   bool OnPeerAssociatedEndpointClosed(
1191       mojo::InterfaceId id,
1192       const std::optional<mojo::DisconnectReason>& reason) override {
1193     DCHECK(thread_checker_.CalledOnValidThread());
1194 
1195     scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
1196     base::AutoLock locker(lock_);
1197     scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr);
1198     if (reason)
1199       endpoint->set_disconnect_reason(reason);
1200     if (!endpoint->peer_closed()) {
1201       if (endpoint->client())
1202         NotifyEndpointOfError(endpoint.get(), false /* force_async */);
1203       MarkPeerClosedAndMaybeRemove(endpoint.get());
1204     }
1205 
1206     return true;
1207   }
1208 
WaitForFlushToComplete(mojo::ScopedMessagePipeHandle flush_pipe)1209   bool WaitForFlushToComplete(
1210       mojo::ScopedMessagePipeHandle flush_pipe) override {
1211     // We don't support async flushing on the IPC Channel pipe.
1212     return false;
1213   }
1214 
1215   // Checked in places which must be run on the primary endpoint's thread.
1216   base::ThreadChecker thread_checker_;
1217 
1218   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
1219 
1220   const scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
1221   const bool set_interface_id_namespace_bit_;
1222   bool paused_ = false;
1223   std::unique_ptr<mojo::Connector> connector_;
1224   mojo::MessageDispatcher dispatcher_;
1225   mojo::PipeControlMessageHandler control_message_handler_;
1226   ControlMessageProxyThunk control_message_proxy_thunk_;
1227 
1228   // NOTE: It is unsafe to call into this object while holding |lock_|.
1229   mojo::PipeControlMessageProxy control_message_proxy_;
1230 
1231   // Guards access to |outgoing_messages_| only. Used to support memory dumps
1232   // which may be triggered from any thread.
1233   base::Lock outgoing_messages_lock_;
1234 
1235   // Outgoing messages that were sent before this controller was bound to a
1236   // real message pipe.
1237   std::vector<mojo::Message> outgoing_messages_;
1238 
1239   // Guards the fields below for thread-safe access.
1240   base::Lock lock_;
1241 
1242   bool encountered_error_ = false;
1243   bool shut_down_ = false;
1244 
1245   // ID #1 is reserved for the mojom::Channel interface.
1246   uint32_t next_interface_id_ = 2;
1247 
1248   std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_;
1249 
1250   raw_ptr<UrgentMessageObserver> urgent_message_observer_ = nullptr;
1251 };
1252 
OnMemoryDump(const base::trace_event::MemoryDumpArgs & args,base::trace_event::ProcessMemoryDump * pmd)1253 bool ControllerMemoryDumpProvider::OnMemoryDump(
1254     const base::trace_event::MemoryDumpArgs& args,
1255     base::trace_event::ProcessMemoryDump* pmd) {
1256   base::AutoLock lock(lock_);
1257   for (auto* controller : controllers_) {
1258     base::trace_event::MemoryAllocatorDump* dump = pmd->CreateAllocatorDump(
1259         base::StringPrintf("mojo/queued_ipc_channel_message/0x%" PRIxPTR,
1260                            reinterpret_cast<uintptr_t>(controller)));
1261     dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount,
1262                     base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1263                     controller->GetQueuedMessageCount());
1264     MessageMemoryDumpInfo info;
1265     size_t count = 0;
1266     controller->GetTopQueuedMessageMemoryDumpInfo(&info, &count);
1267     dump->AddScalar("top_message_name", "id", info.id);
1268     dump->AddScalar("top_message_count",
1269                     base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1270                     count);
1271 
1272     if (info.profiler_tag) {
1273       // TODO(ssid): Memory dumps currently do not support adding string
1274       // arguments in background dumps. So, add this value as a trace event for
1275       // now.
1276       TRACE_EVENT2(base::trace_event::MemoryDumpManager::kTraceCategory,
1277                    "ControllerMemoryDumpProvider::OnMemoryDump",
1278                    "top_queued_message_tag", info.profiler_tag,
1279                    "count", count);
1280     }
1281   }
1282 
1283   return true;
1284 }
1285 
1286 class MojoBootstrapImpl : public MojoBootstrap {
1287  public:
MojoBootstrapImpl(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<ChannelAssociatedGroupController> controller)1288   MojoBootstrapImpl(
1289       mojo::ScopedMessagePipeHandle handle,
1290       const scoped_refptr<ChannelAssociatedGroupController> controller)
1291       : controller_(controller),
1292         associated_group_(controller),
1293         handle_(std::move(handle)) {}
1294 
1295   MojoBootstrapImpl(const MojoBootstrapImpl&) = delete;
1296   MojoBootstrapImpl& operator=(const MojoBootstrapImpl&) = delete;
1297 
~MojoBootstrapImpl()1298   ~MojoBootstrapImpl() override {
1299     controller_->ShutDown();
1300   }
1301 
1302  private:
Connect(mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)1303   void Connect(
1304       mojo::PendingAssociatedRemote<mojom::Channel>* sender,
1305       mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) override {
1306     controller_->Bind(std::move(handle_), sender, receiver);
1307   }
1308 
StartReceiving()1309   void StartReceiving() override { controller_->StartReceiving(); }
1310 
Pause()1311   void Pause() override {
1312     controller_->Pause();
1313   }
1314 
Unpause()1315   void Unpause() override {
1316     controller_->Unpause();
1317   }
1318 
Flush()1319   void Flush() override {
1320     controller_->FlushOutgoingMessages();
1321   }
1322 
GetAssociatedGroup()1323   mojo::AssociatedGroup* GetAssociatedGroup() override {
1324     return &associated_group_;
1325   }
1326 
SetUrgentMessageObserver(UrgentMessageObserver * observer)1327   void SetUrgentMessageObserver(UrgentMessageObserver* observer) override {
1328     controller_->SetUrgentMessageObserver(observer);
1329   }
1330 
1331   scoped_refptr<ChannelAssociatedGroupController> controller_;
1332   mojo::AssociatedGroup associated_group_;
1333 
1334   mojo::ScopedMessagePipeHandle handle_;
1335 };
1336 
1337 }  // namespace
1338 
1339 ScopedAllowOffSequenceChannelAssociatedBindings::
ScopedAllowOffSequenceChannelAssociatedBindings()1340     ScopedAllowOffSequenceChannelAssociatedBindings()
1341     : resetter_(&off_sequence_binding_allowed, true) {}
1342 
1343 ScopedAllowOffSequenceChannelAssociatedBindings::
1344     ~ScopedAllowOffSequenceChannelAssociatedBindings() = default;
1345 
1346 // static
Create(mojo::ScopedMessagePipeHandle handle,Channel::Mode mode,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)1347 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
1348     mojo::ScopedMessagePipeHandle handle,
1349     Channel::Mode mode,
1350     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
1351     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
1352   return std::make_unique<MojoBootstrapImpl>(
1353       std::move(handle),
1354       base::MakeRefCounted<ChannelAssociatedGroupController>(
1355           mode == Channel::MODE_SERVER, ipc_task_runner, proxy_task_runner));
1356 }
1357 
1358 }  // namespace IPC
1359