• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_mojo_bootstrap.h"
6 
7 #include <inttypes.h>
8 #include <stdint.h>
9 
10 #include <map>
11 #include <memory>
12 #include <optional>
13 #include <set>
14 #include <utility>
15 #include <vector>
16 
17 #include "base/check_op.h"
18 #include "base/containers/circular_deque.h"
19 #include "base/containers/contains.h"
20 #include "base/feature_list.h"
21 #include "base/functional/bind.h"
22 #include "base/functional/callback.h"
23 #include "base/memory/ptr_util.h"
24 #include "base/memory/raw_ptr.h"
25 #include "base/no_destructor.h"
26 #include "base/not_fatal_until.h"
27 #include "base/ranges/algorithm.h"
28 #include "base/sequence_checker.h"
29 #include "base/strings/stringprintf.h"
30 #include "base/synchronization/lock.h"
31 #include "base/synchronization/waitable_event.h"
32 #include "base/task/common/task_annotator.h"
33 #include "base/task/sequenced_task_runner.h"
34 #include "base/task/single_thread_task_runner.h"
35 #include "base/thread_annotations.h"
36 #include "base/trace_event/memory_allocator_dump.h"
37 #include "base/trace_event/memory_dump_manager.h"
38 #include "base/trace_event/memory_dump_provider.h"
39 #include "base/trace_event/typed_macros.h"
40 #include "ipc/ipc_channel.h"
41 #include "ipc/urgent_message_observer.h"
42 #include "mojo/public/c/system/types.h"
43 #include "mojo/public/cpp/bindings/associated_group.h"
44 #include "mojo/public/cpp/bindings/associated_group_controller.h"
45 #include "mojo/public/cpp/bindings/connector.h"
46 #include "mojo/public/cpp/bindings/features.h"
47 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
48 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
49 #include "mojo/public/cpp/bindings/interface_id.h"
50 #include "mojo/public/cpp/bindings/message.h"
51 #include "mojo/public/cpp/bindings/message_header_validator.h"
52 #include "mojo/public/cpp/bindings/mojo_buildflags.h"
53 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
54 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
55 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
56 #include "mojo/public/cpp/bindings/scoped_message_error_crash_key.h"
57 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
58 #include "mojo/public/cpp/bindings/tracing_helpers.h"
59 
60 namespace IPC {
61 
62 class ChannelAssociatedGroupController;
63 
64 namespace {
65 
66 constinit thread_local bool off_sequence_binding_allowed = false;
67 
68 BASE_FEATURE(kMojoChannelAssociatedSendUsesRunOrPostTask,
69              "MojoChannelAssociatedSendUsesRunOrPostTask",
70              base::FEATURE_DISABLED_BY_DEFAULT);
71 
72 // Used to track some internal Channel state in pursuit of message leaks.
73 //
74 // TODO(crbug.com/40563310): Remove this.
75 class ControllerMemoryDumpProvider
76     : public base::trace_event::MemoryDumpProvider {
77  public:
ControllerMemoryDumpProvider()78   ControllerMemoryDumpProvider() {
79     base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider(
80         this, "IPCChannel", nullptr);
81   }
82 
83   ControllerMemoryDumpProvider(const ControllerMemoryDumpProvider&) = delete;
84   ControllerMemoryDumpProvider& operator=(const ControllerMemoryDumpProvider&) =
85       delete;
86 
~ControllerMemoryDumpProvider()87   ~ControllerMemoryDumpProvider() override {
88     base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider(
89         this);
90   }
91 
AddController(ChannelAssociatedGroupController * controller)92   void AddController(ChannelAssociatedGroupController* controller) {
93     base::AutoLock lock(lock_);
94     controllers_.insert(controller);
95   }
96 
RemoveController(ChannelAssociatedGroupController * controller)97   void RemoveController(ChannelAssociatedGroupController* controller) {
98     base::AutoLock lock(lock_);
99     controllers_.erase(controller);
100   }
101 
102   // base::trace_event::MemoryDumpProvider:
103   bool OnMemoryDump(const base::trace_event::MemoryDumpArgs& args,
104                     base::trace_event::ProcessMemoryDump* pmd) override;
105 
106  private:
107   base::Lock lock_;
108   std::set<raw_ptr<ChannelAssociatedGroupController, SetExperimental>>
109       controllers_;
110 };
111 
GetMemoryDumpProvider()112 ControllerMemoryDumpProvider& GetMemoryDumpProvider() {
113   static base::NoDestructor<ControllerMemoryDumpProvider> provider;
114   return *provider;
115 }
116 
117 // Messages are grouped by this info when recording memory metrics.
118 struct MessageMemoryDumpInfo {
MessageMemoryDumpInfoIPC::__anon907ea5930111::MessageMemoryDumpInfo119   MessageMemoryDumpInfo(const mojo::Message& message)
120       : id(message.name()), profiler_tag(message.heap_profiler_tag()) {}
121   MessageMemoryDumpInfo() = default;
122 
operator ==IPC::__anon907ea5930111::MessageMemoryDumpInfo123   bool operator==(const MessageMemoryDumpInfo& other) const {
124     return other.id == id && other.profiler_tag == profiler_tag;
125   }
126 
127   uint32_t id = 0;
128   const char* profiler_tag = nullptr;
129 };
130 
131 struct MessageMemoryDumpInfoHash {
operator ()IPC::__anon907ea5930111::MessageMemoryDumpInfoHash132   size_t operator()(const MessageMemoryDumpInfo& info) const {
133     return base::HashInts(
134         info.id, info.profiler_tag ? base::FastHash(info.profiler_tag) : 0);
135   }
136 };
137 
138 class ScopedUrgentMessageNotification {
139  public:
ScopedUrgentMessageNotification(UrgentMessageObserver * observer=nullptr)140   explicit ScopedUrgentMessageNotification(
141       UrgentMessageObserver* observer = nullptr)
142       : observer_(observer) {
143     if (observer_) {
144       observer_->OnUrgentMessageReceived();
145     }
146   }
147 
~ScopedUrgentMessageNotification()148   ~ScopedUrgentMessageNotification() {
149     if (observer_) {
150       observer_->OnUrgentMessageProcessed();
151     }
152   }
153 
ScopedUrgentMessageNotification(ScopedUrgentMessageNotification && other)154   ScopedUrgentMessageNotification(ScopedUrgentMessageNotification&& other)
155       : observer_(std::exchange(other.observer_, nullptr)) {}
156 
operator =(ScopedUrgentMessageNotification && other)157   ScopedUrgentMessageNotification& operator=(
158       ScopedUrgentMessageNotification&& other) {
159     observer_ = std::exchange(other.observer_, nullptr);
160     return *this;
161   }
162 
163  private:
164   raw_ptr<UrgentMessageObserver> observer_;
165 };
166 
167 }  // namespace
168 
169 class ChannelAssociatedGroupController
170     : public mojo::AssociatedGroupController,
171       public mojo::MessageReceiver,
172       public mojo::PipeControlMessageHandlerDelegate {
173  public:
ChannelAssociatedGroupController(bool set_interface_id_namespace_bit,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)174   ChannelAssociatedGroupController(
175       bool set_interface_id_namespace_bit,
176       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
177       const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
178       : task_runner_(ipc_task_runner),
179         proxy_task_runner_(proxy_task_runner),
180         set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
181         dispatcher_(this),
182         control_message_handler_(this),
183         control_message_proxy_thunk_(this),
184         control_message_proxy_(&control_message_proxy_thunk_) {
185     control_message_handler_.SetDescription(
186         "IPC::mojom::Bootstrap [primary] PipeControlMessageHandler");
187     dispatcher_.SetValidator(std::make_unique<mojo::MessageHeaderValidator>(
188         "IPC::mojom::Bootstrap [primary] MessageHeaderValidator"));
189 
190     GetMemoryDumpProvider().AddController(this);
191 
192     DETACH_FROM_SEQUENCE(sequence_checker_);
193   }
194 
195   ChannelAssociatedGroupController(const ChannelAssociatedGroupController&) =
196       delete;
197   ChannelAssociatedGroupController& operator=(
198       const ChannelAssociatedGroupController&) = delete;
199 
GetQueuedMessageCount()200   size_t GetQueuedMessageCount() {
201     base::AutoLock lock(outgoing_messages_lock_);
202     return outgoing_messages_.size();
203   }
204 
GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo * info,size_t * count)205   void GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo* info,
206                                          size_t* count) {
207     std::unordered_map<MessageMemoryDumpInfo, size_t, MessageMemoryDumpInfoHash>
208         counts;
209     std::pair<MessageMemoryDumpInfo, size_t> top_message_info_and_count = {
210         MessageMemoryDumpInfo(), 0};
211     base::AutoLock lock(outgoing_messages_lock_);
212     for (const auto& message : outgoing_messages_) {
213       auto it_and_inserted = counts.emplace(MessageMemoryDumpInfo(message), 0);
214       it_and_inserted.first->second++;
215       if (it_and_inserted.first->second > top_message_info_and_count.second)
216         top_message_info_and_count = *it_and_inserted.first;
217     }
218     *info = top_message_info_and_count.first;
219     *count = top_message_info_and_count.second;
220   }
221 
Pause()222   void Pause() {
223     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
224     CHECK(was_bound_or_message_sent_);
225     CHECK(!paused_);
226     paused_ = true;
227   }
228 
Unpause()229   void Unpause() {
230     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
231     CHECK(was_bound_or_message_sent_);
232     CHECK(paused_);
233     paused_ = false;
234   }
235 
FlushOutgoingMessages()236   void FlushOutgoingMessages() {
237     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
238     CHECK(was_bound_or_message_sent_);
239 
240     std::vector<mojo::Message> outgoing_messages;
241     {
242       base::AutoLock lock(outgoing_messages_lock_);
243       std::swap(outgoing_messages, outgoing_messages_);
244     }
245 
246     for (auto& message : outgoing_messages)
247       SendMessage(&message);
248   }
249 
Bind(mojo::ScopedMessagePipeHandle handle,mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)250   void Bind(mojo::ScopedMessagePipeHandle handle,
251             mojo::PendingAssociatedRemote<mojom::Channel>* sender,
252             mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) {
253     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
254 
255     connector_ = std::make_unique<mojo::Connector>(
256         std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
257         "IPC Channel");
258     connector_->set_incoming_receiver(&dispatcher_);
259     connector_->set_connection_error_handler(
260         base::BindOnce(&ChannelAssociatedGroupController::OnPipeError,
261                        base::Unretained(this)));
262     connector_->set_enforce_errors_from_incoming_receiver(false);
263 
264     // Don't let the Connector do any sort of queuing on our behalf. Individual
265     // messages bound for the IPC::ChannelProxy thread (i.e. that vast majority
266     // of messages received by this Connector) are already individually
267     // scheduled for dispatch by ChannelProxy, so Connector's normal mode of
268     // operation would only introduce a redundant scheduling step for most
269     // messages.
270     connector_->set_force_immediate_dispatch(true);
271 
272     mojo::InterfaceId sender_id, receiver_id;
273     if (set_interface_id_namespace_bit_) {
274       sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
275       receiver_id = 1;
276     } else {
277       sender_id = 1;
278       receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
279     }
280 
281     {
282       base::AutoLock locker(lock_);
283       Endpoint* sender_endpoint = new Endpoint(this, sender_id);
284       Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
285       endpoints_.insert({ sender_id, sender_endpoint });
286       endpoints_.insert({ receiver_id, receiver_endpoint });
287       sender_endpoint->set_handle_created();
288       receiver_endpoint->set_handle_created();
289     }
290 
291     mojo::ScopedInterfaceEndpointHandle sender_handle =
292         CreateScopedInterfaceEndpointHandle(sender_id);
293     mojo::ScopedInterfaceEndpointHandle receiver_handle =
294         CreateScopedInterfaceEndpointHandle(receiver_id);
295 
296     *sender = mojo::PendingAssociatedRemote<mojom::Channel>(
297         std::move(sender_handle), 0);
298     *receiver = mojo::PendingAssociatedReceiver<mojom::Channel>(
299         std::move(receiver_handle));
300 
301     if (!was_bound_or_message_sent_) {
302       was_bound_or_message_sent_ = true;
303       DETACH_FROM_SEQUENCE(sequence_checker_);
304     }
305   }
306 
StartReceiving()307   void StartReceiving() {
308     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
309     CHECK(was_bound_or_message_sent_);
310     connector_->StartReceiving(task_runner_);
311   }
312 
ShutDown()313   void ShutDown() {
314     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
315     shut_down_ = true;
316     if (connector_)
317       connector_->CloseMessagePipe();
318     OnPipeError();
319     connector_.reset();
320 
321     base::AutoLock lock(outgoing_messages_lock_);
322     outgoing_messages_.clear();
323   }
324 
325   // mojo::AssociatedGroupController:
AssociateInterface(mojo::ScopedInterfaceEndpointHandle handle_to_send)326   mojo::InterfaceId AssociateInterface(
327       mojo::ScopedInterfaceEndpointHandle handle_to_send) override {
328     if (!handle_to_send.pending_association())
329       return mojo::kInvalidInterfaceId;
330 
331     uint32_t id = 0;
332     {
333       base::AutoLock locker(lock_);
334       do {
335         if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask)
336           next_interface_id_ = 2;
337         id = next_interface_id_++;
338         if (set_interface_id_namespace_bit_)
339           id |= mojo::kInterfaceIdNamespaceMask;
340       } while (base::Contains(endpoints_, id));
341 
342       Endpoint* endpoint = new Endpoint(this, id);
343       if (encountered_error_)
344         endpoint->set_peer_closed();
345       endpoint->set_handle_created();
346       endpoints_.insert({id, endpoint});
347     }
348 
349     if (!NotifyAssociation(&handle_to_send, id)) {
350       // The peer handle of |handle_to_send|, which is supposed to join this
351       // associated group, has been closed.
352       {
353         base::AutoLock locker(lock_);
354         Endpoint* endpoint = FindEndpoint(id);
355         if (endpoint)
356           MarkClosedAndMaybeRemove(endpoint);
357       }
358 
359       control_message_proxy_.NotifyPeerEndpointClosed(
360           id, handle_to_send.disconnect_reason());
361     }
362     return id;
363   }
364 
CreateLocalEndpointHandle(mojo::InterfaceId id)365   mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
366       mojo::InterfaceId id) override {
367     if (!mojo::IsValidInterfaceId(id))
368       return mojo::ScopedInterfaceEndpointHandle();
369 
370     // Unless it is the primary ID, |id| is from the remote side and therefore
371     // its namespace bit is supposed to be different than the value that this
372     // router would use.
373     if (!mojo::IsPrimaryInterfaceId(id) &&
374         set_interface_id_namespace_bit_ ==
375             mojo::HasInterfaceIdNamespaceBitSet(id)) {
376       return mojo::ScopedInterfaceEndpointHandle();
377     }
378 
379     base::AutoLock locker(lock_);
380     bool inserted = false;
381     Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
382     if (inserted) {
383       DCHECK(!endpoint->handle_created());
384       if (encountered_error_)
385         endpoint->set_peer_closed();
386     } else {
387       if (endpoint->handle_created())
388         return mojo::ScopedInterfaceEndpointHandle();
389     }
390 
391     endpoint->set_handle_created();
392     return CreateScopedInterfaceEndpointHandle(id);
393   }
394 
CloseEndpointHandle(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)395   void CloseEndpointHandle(
396       mojo::InterfaceId id,
397       const std::optional<mojo::DisconnectReason>& reason) override {
398     if (!mojo::IsValidInterfaceId(id))
399       return;
400     {
401       base::AutoLock locker(lock_);
402       DCHECK(base::Contains(endpoints_, id));
403       Endpoint* endpoint = endpoints_[id].get();
404       DCHECK(!endpoint->client());
405       DCHECK(!endpoint->closed());
406       MarkClosedAndMaybeRemove(endpoint);
407     }
408 
409     if (!mojo::IsPrimaryInterfaceId(id) || reason)
410       control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
411   }
412 
NotifyLocalEndpointOfPeerClosure(mojo::InterfaceId id)413   void NotifyLocalEndpointOfPeerClosure(mojo::InterfaceId id) override {
414     if (!task_runner_->RunsTasksInCurrentSequence()) {
415       task_runner_->PostTask(
416           FROM_HERE, base::BindOnce(&ChannelAssociatedGroupController::
417                                         NotifyLocalEndpointOfPeerClosure,
418                                     base::WrapRefCounted(this), id));
419       return;
420     }
421     OnPeerAssociatedEndpointClosed(id, std::nullopt);
422   }
423 
AttachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle,mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)424   mojo::InterfaceEndpointController* AttachEndpointClient(
425       const mojo::ScopedInterfaceEndpointHandle& handle,
426       mojo::InterfaceEndpointClient* client,
427       scoped_refptr<base::SequencedTaskRunner> runner) override {
428     const mojo::InterfaceId id = handle.id();
429 
430     DCHECK(mojo::IsValidInterfaceId(id));
431     DCHECK(client);
432 
433     base::AutoLock locker(lock_);
434     DCHECK(base::Contains(endpoints_, id));
435 
436     Endpoint* endpoint = endpoints_[id].get();
437     endpoint->AttachClient(client, std::move(runner));
438 
439     if (endpoint->peer_closed())
440       NotifyEndpointOfError(endpoint, true /* force_async */);
441 
442     return endpoint;
443   }
444 
DetachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle)445   void DetachEndpointClient(
446       const mojo::ScopedInterfaceEndpointHandle& handle) override {
447     const mojo::InterfaceId id = handle.id();
448 
449     DCHECK(mojo::IsValidInterfaceId(id));
450 
451     base::AutoLock locker(lock_);
452     DCHECK(base::Contains(endpoints_, id));
453 
454     Endpoint* endpoint = endpoints_[id].get();
455     endpoint->DetachClient();
456   }
457 
RaiseError()458   void RaiseError() override {
459     // We ignore errors on channel endpoints, leaving the pipe open. There are
460     // good reasons for this:
461     //
462     //   * We should never close a channel endpoint in either process as long as
463     //     the child process is still alive. The child's endpoint should only be
464     //     closed implicitly by process death, and the browser's endpoint should
465     //     only be closed after the child process is confirmed to be dead. Crash
466     //     reporting logic in Chrome relies on this behavior in order to do the
467     //     right thing.
468     //
469     //   * There are two interesting conditions under which RaiseError() can be
470     //     implicitly reached: an incoming message fails validation, or the
471     //     local endpoint drops a response callback without calling it.
472     //
473     //   * In the validation case, we also report the message as bad, and this
474     //     will imminently trigger the common bad-IPC path in the browser,
475     //     causing the browser to kill the offending renderer.
476     //
477     //   * In the dropped response callback case, the net result of ignoring the
478     //     issue is generally innocuous. While indicative of programmer error,
479     //     it's not a severe failure and is already covered by separate DCHECKs.
480     //
481     // See https://crbug.com/861607 for additional discussion.
482   }
483 
PrefersSerializedMessages()484   bool PrefersSerializedMessages() override { return true; }
485 
SetUrgentMessageObserver(UrgentMessageObserver * observer)486   void SetUrgentMessageObserver(UrgentMessageObserver* observer) {
487     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
488     CHECK(!was_bound_or_message_sent_);
489     urgent_message_observer_ = observer;
490     DETACH_FROM_SEQUENCE(sequence_checker_);
491   }
492 
493  private:
494   class Endpoint;
495   class ControlMessageProxyThunk;
496   friend class Endpoint;
497   friend class ControlMessageProxyThunk;
498 
499   // MessageWrapper objects are always destroyed under the controller's lock. On
500   // destruction, if the message it wrappers contains
501   // ScopedInterfaceEndpointHandles (which cannot be destructed under the
502   // controller's lock), the wrapper unlocks to clean them up.
503   class MessageWrapper {
504    public:
505     MessageWrapper() = default;
506 
MessageWrapper(ChannelAssociatedGroupController * controller,mojo::Message message)507     MessageWrapper(ChannelAssociatedGroupController* controller,
508                    mojo::Message message)
509         : controller_(controller), value_(std::move(message)) {}
510 
MessageWrapper(MessageWrapper && other)511     MessageWrapper(MessageWrapper&& other)
512         : controller_(other.controller_), value_(std::move(other.value_)) {}
513 
514     MessageWrapper(const MessageWrapper&) = delete;
515     MessageWrapper& operator=(const MessageWrapper&) = delete;
516 
~MessageWrapper()517     ~MessageWrapper() {
518       if (value_.associated_endpoint_handles()->empty())
519         return;
520 
521       controller_->lock_.AssertAcquired();
522       {
523         base::AutoUnlock unlocker(controller_->lock_);
524         value_.mutable_associated_endpoint_handles()->clear();
525       }
526     }
527 
operator =(MessageWrapper && other)528     MessageWrapper& operator=(MessageWrapper&& other) {
529       controller_ = other.controller_;
530       value_ = std::move(other.value_);
531       return *this;
532     }
533 
HasRequestId(uint64_t request_id)534     bool HasRequestId(uint64_t request_id) {
535       return !value_.IsNull() && value_.version() >= 1 &&
536              value_.header_v1()->request_id == request_id;
537     }
538 
value()539     mojo::Message& value() { return value_; }
540 
541    private:
542     raw_ptr<ChannelAssociatedGroupController> controller_ = nullptr;
543     mojo::Message value_;
544   };
545 
546   class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
547                    public mojo::InterfaceEndpointController {
548    public:
Endpoint(ChannelAssociatedGroupController * controller,mojo::InterfaceId id)549     Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
550         : controller_(controller), id_(id) {}
551 
552     Endpoint(const Endpoint&) = delete;
553     Endpoint& operator=(const Endpoint&) = delete;
554 
id() const555     mojo::InterfaceId id() const { return id_; }
556 
closed() const557     bool closed() const {
558       controller_->lock_.AssertAcquired();
559       return closed_;
560     }
561 
set_closed()562     void set_closed() {
563       controller_->lock_.AssertAcquired();
564       closed_ = true;
565     }
566 
peer_closed() const567     bool peer_closed() const {
568       controller_->lock_.AssertAcquired();
569       return peer_closed_;
570     }
571 
set_peer_closed()572     void set_peer_closed() {
573       controller_->lock_.AssertAcquired();
574       peer_closed_ = true;
575     }
576 
handle_created() const577     bool handle_created() const {
578       controller_->lock_.AssertAcquired();
579       return handle_created_;
580     }
581 
set_handle_created()582     void set_handle_created() {
583       controller_->lock_.AssertAcquired();
584       handle_created_ = true;
585     }
586 
disconnect_reason() const587     const std::optional<mojo::DisconnectReason>& disconnect_reason() const {
588       return disconnect_reason_;
589     }
590 
set_disconnect_reason(const std::optional<mojo::DisconnectReason> & disconnect_reason)591     void set_disconnect_reason(
592         const std::optional<mojo::DisconnectReason>& disconnect_reason) {
593       disconnect_reason_ = disconnect_reason;
594     }
595 
task_runner() const596     base::SequencedTaskRunner* task_runner() const {
597       return task_runner_.get();
598     }
599 
was_bound_off_sequence() const600     bool was_bound_off_sequence() const { return was_bound_off_sequence_; }
601 
client() const602     mojo::InterfaceEndpointClient* client() const {
603       controller_->lock_.AssertAcquired();
604       return client_;
605     }
606 
AttachClient(mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)607     void AttachClient(mojo::InterfaceEndpointClient* client,
608                       scoped_refptr<base::SequencedTaskRunner> runner) {
609       controller_->lock_.AssertAcquired();
610       DCHECK(!client_);
611       DCHECK(!closed_);
612 
613       task_runner_ = std::move(runner);
614       client_ = client;
615 
616       if (off_sequence_binding_allowed) {
617         was_bound_off_sequence_ = true;
618       }
619     }
620 
DetachClient()621     void DetachClient() {
622       controller_->lock_.AssertAcquired();
623       DCHECK(client_);
624       DCHECK(!closed_);
625 
626       task_runner_ = nullptr;
627       client_ = nullptr;
628       sync_watcher_.reset();
629     }
630 
EnqueueSyncMessage(MessageWrapper message)631     std::optional<uint32_t> EnqueueSyncMessage(MessageWrapper message) {
632       controller_->lock_.AssertAcquired();
633       if (exclusive_wait_ && exclusive_wait_->TryFulfillingWith(message)) {
634         exclusive_wait_ = nullptr;
635         return std::nullopt;
636       }
637 
638       uint32_t id = GenerateSyncMessageId();
639       sync_messages_.emplace_back(id, std::move(message));
640       SignalSyncMessageEvent();
641       return id;
642     }
643 
SignalSyncMessageEvent()644     void SignalSyncMessageEvent() {
645       controller_->lock_.AssertAcquired();
646 
647       if (sync_watcher_)
648         sync_watcher_->SignalEvent();
649     }
650 
PopSyncMessage(uint32_t id)651     MessageWrapper PopSyncMessage(uint32_t id) {
652       controller_->lock_.AssertAcquired();
653       if (sync_messages_.empty() || sync_messages_.front().first != id)
654         return MessageWrapper();
655       MessageWrapper message = std::move(sync_messages_.front().second);
656       sync_messages_.pop_front();
657       return message;
658     }
659 
660     // mojo::InterfaceEndpointController:
SendMessage(mojo::Message * message)661     bool SendMessage(mojo::Message* message) override {
662       DCHECK(task_runner_->RunsTasksInCurrentSequence());
663       message->set_interface_id(id_);
664       return controller_->SendMessage(message);
665     }
666 
AllowWokenUpBySyncWatchOnSameThread()667     void AllowWokenUpBySyncWatchOnSameThread() override {
668       DCHECK(task_runner_->RunsTasksInCurrentSequence());
669 
670       EnsureSyncWatcherExists();
671       sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
672     }
673 
SyncWatch(const bool & should_stop)674     bool SyncWatch(const bool& should_stop) override {
675       DCHECK(task_runner_->RunsTasksInCurrentSequence());
676 
677       // It's not legal to make sync calls from the primary endpoint's thread,
678       // and in fact they must only happen from the proxy task runner.
679       DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
680       DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
681 
682       EnsureSyncWatcherExists();
683       {
684         base::AutoLock locker(controller_->lock_);
685         if (peer_closed_) {
686           SignalSyncMessageEvent();
687         }
688       }
689       return sync_watcher_->SyncWatch(&should_stop);
690     }
691 
WaitForIncomingSyncReply(uint64_t request_id)692     MessageWrapper WaitForIncomingSyncReply(uint64_t request_id) {
693       std::optional<ExclusiveSyncWait> wait;
694       {
695         base::AutoLock lock(controller_->lock_);
696         for (auto& [id, message] : sync_messages_) {
697           if (message.HasRequestId(request_id)) {
698             return std::move(message);
699           }
700         }
701 
702         DCHECK(!exclusive_wait_);
703         wait.emplace(request_id);
704         exclusive_wait_ = &wait.value();
705       }
706 
707       wait->event.Wait();
708       return std::move(wait->message);
709     }
710 
SyncWatchExclusive(uint64_t request_id)711     bool SyncWatchExclusive(uint64_t request_id) override {
712       MessageWrapper message = WaitForIncomingSyncReply(request_id);
713       if (message.value().IsNull() || !client_) {
714         return false;
715       }
716 
717       if (!client_->HandleIncomingMessage(&message.value())) {
718         base::AutoLock locker(controller_->lock_);
719         controller_->RaiseError();
720         return false;
721       }
722 
723       return true;
724     }
725 
RegisterExternalSyncWaiter(uint64_t request_id)726     void RegisterExternalSyncWaiter(uint64_t request_id) override {}
727 
728    private:
729     friend class base::RefCountedThreadSafe<Endpoint>;
730 
~Endpoint()731     ~Endpoint() override {
732       controller_->lock_.AssertAcquired();
733       DCHECK(!client_);
734       DCHECK(closed_);
735       DCHECK(peer_closed_);
736       DCHECK(!sync_watcher_);
737       if (exclusive_wait_) {
738         exclusive_wait_->event.Signal();
739       }
740     }
741 
OnSyncMessageEventReady()742     void OnSyncMessageEventReady() {
743       DCHECK(task_runner_->RunsTasksInCurrentSequence());
744 
745       // SUBTLE: The order of these scoped_refptrs matters.
746       // `controller_keepalive` MUST outlive `keepalive` because the Endpoint
747       // holds raw pointer to the AssociatedGroupController.
748       scoped_refptr<AssociatedGroupController> controller_keepalive(
749           controller_.get());
750       scoped_refptr<Endpoint> keepalive(this);
751       base::AutoLock locker(controller_->lock_);
752       bool more_to_process = false;
753       if (!sync_messages_.empty()) {
754         MessageWrapper message_wrapper =
755             std::move(sync_messages_.front().second);
756         sync_messages_.pop_front();
757 
758         bool dispatch_succeeded;
759         mojo::InterfaceEndpointClient* client = client_;
760         {
761           base::AutoUnlock unlocker(controller_->lock_);
762           dispatch_succeeded =
763               client->HandleIncomingMessage(&message_wrapper.value());
764         }
765 
766         if (!sync_messages_.empty())
767           more_to_process = true;
768 
769         if (!dispatch_succeeded)
770           controller_->RaiseError();
771       }
772 
773       if (!more_to_process)
774         sync_watcher_->ResetEvent();
775 
776       // If there are no queued sync messages and the peer has closed, there
777       // there won't be incoming sync messages in the future. If any
778       // SyncWatch() calls are on the stack for this endpoint, resetting the
779       // watcher will allow them to exit as the stack undwinds.
780       if (!more_to_process && peer_closed_)
781         sync_watcher_.reset();
782     }
783 
EnsureSyncWatcherExists()784     void EnsureSyncWatcherExists() {
785       DCHECK(task_runner_->RunsTasksInCurrentSequence());
786       if (sync_watcher_)
787         return;
788 
789       base::AutoLock locker(controller_->lock_);
790       sync_watcher_ = std::make_unique<mojo::SequenceLocalSyncEventWatcher>(
791           base::BindRepeating(&Endpoint::OnSyncMessageEventReady,
792                               base::Unretained(this)));
793       if (peer_closed_ || !sync_messages_.empty())
794         SignalSyncMessageEvent();
795     }
796 
GenerateSyncMessageId()797     uint32_t GenerateSyncMessageId() {
798       // Overflow is fine.
799       uint32_t id = next_sync_message_id_++;
800       DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
801       return id;
802     }
803 
804     // Tracks the state of a pending sync wait which excludes all other incoming
805     // IPC on the waiting thread.
806     struct ExclusiveSyncWait {
ExclusiveSyncWaitIPC::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait807       explicit ExclusiveSyncWait(uint64_t request_id)
808           : request_id(request_id) {}
809       ~ExclusiveSyncWait() = default;
810 
TryFulfillingWithIPC::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait811       bool TryFulfillingWith(MessageWrapper& wrapper) {
812         if (!wrapper.HasRequestId(request_id)) {
813           return false;
814         }
815 
816         message = std::move(wrapper);
817         event.Signal();
818         return true;
819       }
820 
821       uint64_t request_id;
822       base::WaitableEvent event;
823       MessageWrapper message;
824     };
825 
826     const raw_ptr<ChannelAssociatedGroupController> controller_;
827     const mojo::InterfaceId id_;
828 
829     bool closed_ = false;
830     bool peer_closed_ = false;
831     bool handle_created_ = false;
832     bool was_bound_off_sequence_ = false;
833     std::optional<mojo::DisconnectReason> disconnect_reason_;
834     raw_ptr<mojo::InterfaceEndpointClient> client_ = nullptr;
835     scoped_refptr<base::SequencedTaskRunner> task_runner_;
836     std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_;
837     base::circular_deque<std::pair<uint32_t, MessageWrapper>> sync_messages_;
838     raw_ptr<ExclusiveSyncWait> exclusive_wait_ = nullptr;
839     uint32_t next_sync_message_id_ = 0;
840   };
841 
842   class ControlMessageProxyThunk : public MessageReceiver {
843    public:
ControlMessageProxyThunk(ChannelAssociatedGroupController * controller)844     explicit ControlMessageProxyThunk(
845         ChannelAssociatedGroupController* controller)
846         : controller_(controller) {}
847 
848     ControlMessageProxyThunk(const ControlMessageProxyThunk&) = delete;
849     ControlMessageProxyThunk& operator=(const ControlMessageProxyThunk&) =
850         delete;
851 
852    private:
853     // MessageReceiver:
Accept(mojo::Message * message)854     bool Accept(mojo::Message* message) override {
855       return controller_->SendMessage(message);
856     }
857 
858     raw_ptr<ChannelAssociatedGroupController> controller_;
859   };
860 
~ChannelAssociatedGroupController()861   ~ChannelAssociatedGroupController() override {
862     DCHECK(!connector_);
863 
864     base::AutoLock locker(lock_);
865     for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
866       Endpoint* endpoint = iter->second.get();
867       ++iter;
868 
869       if (!endpoint->closed()) {
870         // This happens when a NotifyPeerEndpointClosed message been received,
871         // but the interface ID hasn't been used to create local endpoint
872         // handle.
873         DCHECK(!endpoint->client());
874         DCHECK(endpoint->peer_closed());
875         MarkClosed(endpoint);
876       } else {
877         MarkPeerClosed(endpoint);
878       }
879     }
880     endpoints_.clear();
881 
882     GetMemoryDumpProvider().RemoveController(this);
883   }
884 
SendMessage(mojo::Message * message)885   bool SendMessage(mojo::Message* message) {
886     DCHECK(message->heap_profiler_tag());
887     if (task_runner_->BelongsToCurrentThread()) {
888       return SendMessageOnSequence(message);
889     }
890 
891     // PostTask (or RunOrPostTask) so that `message` is sent after messages from
892     // tasks that are already queued (e.g. by `IPC::ChannelProxy::Send`).
893     auto callback = base::BindOnce(
894         &ChannelAssociatedGroupController::SendMessageOnSequenceViaTask, this,
895         std::move(*message));
896     if (base::FeatureList::IsEnabled(
897             kMojoChannelAssociatedSendUsesRunOrPostTask)) {
898       task_runner_->RunOrPostTask(base::subtle::RunOrPostTaskPassKey(),
899                                   FROM_HERE, std::move(callback));
900     } else {
901       task_runner_->PostTask(FROM_HERE, std::move(callback));
902     }
903 
904     return true;
905   }
906 
SendMessageOnSequence(mojo::Message * message)907   bool SendMessageOnSequence(mojo::Message* message) {
908     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
909     was_bound_or_message_sent_ = true;
910 
911     if (!connector_ || paused_) {
912       if (!shut_down_) {
913         base::AutoLock lock(outgoing_messages_lock_);
914         outgoing_messages_.emplace_back(std::move(*message));
915       }
916       return true;
917     }
918 
919     MojoResult result = connector_->AcceptAndGetResult(message);
920     if (result == MOJO_RESULT_OK) {
921       return true;
922     }
923 
924     CHECK(connector_->encountered_error(), base::NotFatalUntil::M135);
925     return false;
926   }
927 
SendMessageOnSequenceViaTask(mojo::Message message)928   void SendMessageOnSequenceViaTask(mojo::Message message) {
929     if (!SendMessageOnSequence(&message)) {
930       RaiseError();
931     }
932   }
933 
OnPipeError()934   void OnPipeError() {
935     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
936 
937     // We keep |this| alive here because it's possible for the notifications
938     // below to release all other references.
939     scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
940 
941     base::AutoLock locker(lock_);
942     encountered_error_ = true;
943 
944     std::vector<uint32_t> endpoints_to_remove;
945     std::vector<scoped_refptr<Endpoint>> endpoints_to_notify;
946     for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
947       Endpoint* endpoint = iter->second.get();
948       ++iter;
949 
950       if (endpoint->client()) {
951         endpoints_to_notify.push_back(endpoint);
952       }
953 
954       if (MarkPeerClosed(endpoint)) {
955         endpoints_to_remove.push_back(endpoint->id());
956       }
957     }
958 
959     for (auto& endpoint : endpoints_to_notify) {
960       // Because a notification may in turn detach any endpoint, we have to
961       // check each client again here.
962       if (endpoint->client())
963         NotifyEndpointOfError(endpoint.get(), false /* force_async */);
964     }
965 
966     for (uint32_t id : endpoints_to_remove) {
967       endpoints_.erase(id);
968     }
969   }
970 
NotifyEndpointOfError(Endpoint * endpoint,bool force_async)971   void NotifyEndpointOfError(Endpoint* endpoint, bool force_async)
972       EXCLUSIVE_LOCKS_REQUIRED(lock_) {
973     DCHECK(endpoint->task_runner() && endpoint->client());
974     if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
975       mojo::InterfaceEndpointClient* client = endpoint->client();
976       std::optional<mojo::DisconnectReason> reason(
977           endpoint->disconnect_reason());
978 
979       base::AutoUnlock unlocker(lock_);
980       client->NotifyError(reason);
981     } else {
982       endpoint->task_runner()->PostTask(
983           FROM_HERE,
984           base::BindOnce(&ChannelAssociatedGroupController::
985                              NotifyEndpointOfErrorOnEndpointThread,
986                          this, endpoint->id(),
987                          // This is safe as `endpoint` is verified to be in
988                          // `endpoints_` (a map with ownership) before use.
989                          base::UnsafeDangling(endpoint)));
990     }
991   }
992 
993   // `endpoint` might be a dangling ptr and must be checked before dereference.
NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,MayBeDangling<Endpoint> endpoint)994   void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
995                                              MayBeDangling<Endpoint> endpoint) {
996     base::AutoLock locker(lock_);
997     auto iter = endpoints_.find(id);
998     if (iter == endpoints_.end() || iter->second.get() != endpoint)
999       return;
1000     if (!endpoint->client())
1001       return;
1002 
1003     DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
1004     NotifyEndpointOfError(endpoint, false /* force_async */);
1005   }
1006 
1007   // Marks `endpoint` as closed and returns true if and only if its peer was
1008   // also already closed.
MarkClosed(Endpoint * endpoint)1009   bool MarkClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1010     endpoint->set_closed();
1011     return endpoint->peer_closed();
1012   }
1013 
1014   // Marks `endpoint` as having a closed peer and returns true if and only if
1015   // `endpoint` itself was also already closed.
MarkPeerClosed(Endpoint * endpoint)1016   bool MarkPeerClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1017     endpoint->set_peer_closed();
1018     endpoint->SignalSyncMessageEvent();
1019     return endpoint->closed();
1020   }
1021 
MarkClosedAndMaybeRemove(Endpoint * endpoint)1022   void MarkClosedAndMaybeRemove(Endpoint* endpoint)
1023       EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1024     if (MarkClosed(endpoint)) {
1025       endpoints_.erase(endpoint->id());
1026     }
1027   }
1028 
MarkPeerClosedAndMaybeRemove(Endpoint * endpoint)1029   void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint)
1030       EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1031     if (MarkPeerClosed(endpoint)) {
1032       endpoints_.erase(endpoint->id());
1033     }
1034   }
1035 
FindOrInsertEndpoint(mojo::InterfaceId id,bool * inserted)1036   Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted)
1037       EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1038     DCHECK(!inserted || !*inserted);
1039 
1040     Endpoint* endpoint = FindEndpoint(id);
1041     if (!endpoint) {
1042       endpoint = new Endpoint(this, id);
1043       endpoints_.insert({id, endpoint});
1044       if (inserted)
1045         *inserted = true;
1046     }
1047     return endpoint;
1048   }
1049 
FindEndpoint(mojo::InterfaceId id)1050   Endpoint* FindEndpoint(mojo::InterfaceId id) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1051     auto iter = endpoints_.find(id);
1052     return iter != endpoints_.end() ? iter->second.get() : nullptr;
1053   }
1054 
1055   // mojo::MessageReceiver:
Accept(mojo::Message * message)1056   bool Accept(mojo::Message* message) override {
1057     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
1058 
1059     if (!message->DeserializeAssociatedEndpointHandles(this))
1060       return false;
1061 
1062     if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
1063       return control_message_handler_.Accept(message);
1064 
1065     mojo::InterfaceId id = message->interface_id();
1066     if (!mojo::IsValidInterfaceId(id))
1067       return false;
1068 
1069     base::ReleasableAutoLock locker(&lock_);
1070     Endpoint* endpoint = FindEndpoint(id);
1071     if (!endpoint)
1072       return true;
1073 
1074     mojo::InterfaceEndpointClient* client = endpoint->client();
1075     if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) {
1076       // The ChannelProxy for this channel is bound to `proxy_task_runner_` and
1077       // by default legacy IPCs must dispatch to either the IO thread or the
1078       // proxy task runner. We generally impose the same constraint on
1079       // associated interface endpoints so that FIFO can be guaranteed across
1080       // all interfaces without stalling any of them to wait for a pending
1081       // endpoint to be bound.
1082       //
1083       // This allows us to assume that if an endpoint is not yet bound when we
1084       // receive a message targeting it, it *will* be bound on the proxy task
1085       // runner by the time a newly posted task runs there. Hence we simply post
1086       // a hopeful dispatch task to that task runner.
1087       //
1088       // As it turns out, there are even some instances of endpoints binding to
1089       // alternative (non-IO-thread, non-proxy) task runners, but still
1090       // ultimately relying on the fact that we schedule their messages on the
1091       // proxy task runner. So even if the endpoint is already bound, we
1092       // default to scheduling it on the proxy task runner as long as it's not
1093       // bound specifically to the IO task runner.
1094       // TODO(rockot): Try to sort out these cases and maybe eliminate them.
1095       //
1096       // Finally, it's also possible that an endpoint was bound to an
1097       // alternative task runner and it really does want its messages to
1098       // dispatch there. In that case `was_bound_off_sequence()` will be true to
1099       // signal that we should really use that task runner.
1100       const scoped_refptr<base::SequencedTaskRunner> task_runner =
1101           client && endpoint->was_bound_off_sequence()
1102               ? endpoint->task_runner()
1103               : proxy_task_runner_.get();
1104 
1105       ScopedUrgentMessageNotification scoped_urgent_message_notification(
1106           message->has_flag(mojo::Message::kFlagIsUrgent)
1107               ? urgent_message_observer_
1108               : nullptr);
1109 
1110       if (message->has_flag(mojo::Message::kFlagIsSync)) {
1111         MessageWrapper message_wrapper(this, std::move(*message));
1112         // Sync messages may need to be handled by the endpoint if it's blocking
1113         // on a sync reply. We pass ownership of the message to the endpoint's
1114         // sync message queue. If the endpoint was blocking, it will dequeue the
1115         // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
1116         // call will dequeue the message and dispatch it.
1117         std::optional<uint32_t> message_id =
1118             endpoint->EnqueueSyncMessage(std::move(message_wrapper));
1119         if (message_id) {
1120           task_runner->PostTask(
1121               FROM_HERE,
1122               base::BindOnce(
1123                   &ChannelAssociatedGroupController::AcceptSyncMessage, this,
1124                   id, *message_id,
1125                   std::move(scoped_urgent_message_notification)));
1126         }
1127         return true;
1128       }
1129 
1130       // If |task_runner| has been torn down already, this PostTask will fail
1131       // and destroy |message|. That operation may need to in turn destroy
1132       // in-transit associated endpoints and thus acquire |lock_|. We no longer
1133       // need the lock to be held now, so we can release it before the PostTask.
1134       {
1135         // Grab interface name from |client| before releasing the lock to ensure
1136         // that |client| is safe to access.
1137         base::TaskAnnotator::ScopedSetIpcHash scoped_set_ipc_hash(
1138             client ? client->interface_name() : "unknown interface");
1139         locker.Release();
1140         task_runner->PostTask(
1141             FROM_HERE,
1142             base::BindOnce(
1143                 &ChannelAssociatedGroupController::AcceptOnEndpointThread, this,
1144                 std::move(*message),
1145                 std::move(scoped_urgent_message_notification)));
1146       }
1147       return true;
1148     }
1149 
1150     locker.Release();
1151     // It's safe to access |client| here without holding a lock, because this
1152     // code runs on a proxy thread and |client| can't be destroyed from any
1153     // thread.
1154     return client->HandleIncomingMessage(message);
1155   }
1156 
AcceptOnEndpointThread(mojo::Message message,ScopedUrgentMessageNotification scoped_urgent_message_notification)1157   void AcceptOnEndpointThread(
1158       mojo::Message message,
1159       ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1160     TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1161                  "ChannelAssociatedGroupController::AcceptOnEndpointThread");
1162 
1163     mojo::InterfaceId id = message.interface_id();
1164     DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsPrimaryInterfaceId(id));
1165 
1166     base::AutoLock locker(lock_);
1167     Endpoint* endpoint = FindEndpoint(id);
1168     if (!endpoint)
1169       return;
1170 
1171     mojo::InterfaceEndpointClient* client = endpoint->client();
1172     if (!client)
1173       return;
1174 
1175     if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1176         !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1177       return;
1178     }
1179 
1180     // TODO(altimin): This event is temporarily kept as a debug fallback. Remove
1181     // it once the new implementation proves to be stable.
1182     TRACE_EVENT(
1183         TRACE_DISABLED_BY_DEFAULT("mojom"),
1184         // Using client->interface_name() is safe here because this is a static
1185         // string defined for each mojo interface.
1186         perfetto::StaticString(client->interface_name()),
1187         [&](perfetto::EventContext& ctx) {
1188           static const uint8_t* toplevel_flow_enabled =
1189               TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("toplevel.flow");
1190           if (!*toplevel_flow_enabled)
1191             return;
1192 
1193           perfetto::Flow::Global(message.GetTraceId())(ctx);
1194         });
1195 
1196     // Sync messages should never make their way to this method.
1197     DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
1198 
1199     bool result = false;
1200     {
1201       base::AutoUnlock unlocker(lock_);
1202       result = client->HandleIncomingMessage(&message);
1203     }
1204 
1205     if (!result)
1206       RaiseError();
1207   }
1208 
AcceptSyncMessage(mojo::InterfaceId interface_id,uint32_t message_id,ScopedUrgentMessageNotification scoped_urgent_message_notification)1209   void AcceptSyncMessage(
1210       mojo::InterfaceId interface_id,
1211       uint32_t message_id,
1212       ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1213     TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1214                  "ChannelAssociatedGroupController::AcceptSyncMessage");
1215 
1216     base::AutoLock locker(lock_);
1217     Endpoint* endpoint = FindEndpoint(interface_id);
1218     if (!endpoint)
1219       return;
1220 
1221     // Careful, if the endpoint is detached its members are cleared. Check for
1222     // that before dereferencing.
1223     mojo::InterfaceEndpointClient* client = endpoint->client();
1224     if (!client)
1225       return;
1226 
1227     if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1228         !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1229       return;
1230     }
1231 
1232     // Using client->interface_name() is safe here because this is a static
1233     // string defined for each mojo interface.
1234     TRACE_EVENT0("mojom", client->interface_name());
1235     MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
1236 
1237     // The message must have already been dequeued by the endpoint waking up
1238     // from a sync wait. Nothing to do.
1239     if (message_wrapper.value().IsNull())
1240       return;
1241 
1242     bool result = false;
1243     {
1244       base::AutoUnlock unlocker(lock_);
1245       result = client->HandleIncomingMessage(&message_wrapper.value());
1246     }
1247 
1248     if (!result)
1249       RaiseError();
1250   }
1251 
1252   // mojo::PipeControlMessageHandlerDelegate:
OnPeerAssociatedEndpointClosed(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)1253   bool OnPeerAssociatedEndpointClosed(
1254       mojo::InterfaceId id,
1255       const std::optional<mojo::DisconnectReason>& reason) override {
1256     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
1257 
1258     scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
1259     base::AutoLock locker(lock_);
1260     scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr);
1261     if (reason)
1262       endpoint->set_disconnect_reason(reason);
1263     if (!endpoint->peer_closed()) {
1264       if (endpoint->client())
1265         NotifyEndpointOfError(endpoint.get(), false /* force_async */);
1266       MarkPeerClosedAndMaybeRemove(endpoint.get());
1267     }
1268 
1269     return true;
1270   }
1271 
WaitForFlushToComplete(mojo::ScopedMessagePipeHandle flush_pipe)1272   bool WaitForFlushToComplete(
1273       mojo::ScopedMessagePipeHandle flush_pipe) override {
1274     // We don't support async flushing on the IPC Channel pipe.
1275     return false;
1276   }
1277 
1278   const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
1279   const scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
1280   const bool set_interface_id_namespace_bit_;
1281 
1282   // Ensures sequenced access to members below.
1283   SEQUENCE_CHECKER(sequence_checker_);
1284 
1285   // Whether `Bind()` or `SendMessageOnSequence()` was called.
1286   // `sequence_checker_` can be detached when this is `false`.
1287   bool was_bound_or_message_sent_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1288 
1289   bool paused_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1290   bool shut_down_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1291   std::unique_ptr<mojo::Connector> connector_
1292       GUARDED_BY_CONTEXT(sequence_checker_);
1293   mojo::MessageDispatcher dispatcher_ GUARDED_BY_CONTEXT(sequence_checker_);
1294   mojo::PipeControlMessageHandler control_message_handler_
1295       GUARDED_BY_CONTEXT(sequence_checker_);
1296   ControlMessageProxyThunk control_message_proxy_thunk_
1297       GUARDED_BY_CONTEXT(sequence_checker_);
1298   raw_ptr<UrgentMessageObserver> urgent_message_observer_
1299       GUARDED_BY_CONTEXT(sequence_checker_) = nullptr;
1300 
1301   // NOTE: It is unsafe to call into this object while holding |lock_|.
1302   mojo::PipeControlMessageProxy control_message_proxy_;
1303 
1304   // Outgoing messages sent before this controller Bound() to a pipe or while it
1305   // was paused. Protected by a lock to support memory dumps from any thread.
1306   base::Lock outgoing_messages_lock_;
1307   std::vector<mojo::Message> outgoing_messages_
1308       GUARDED_BY(outgoing_messages_lock_);
1309 
1310   // Guards the fields below for thread-safe access.
1311   base::Lock lock_;
1312 
1313   bool encountered_error_ GUARDED_BY(lock_) = false;
1314 
1315   // ID #1 is reserved for the mojom::Channel interface.
1316   uint32_t next_interface_id_ GUARDED_BY(lock_) = 2;
1317 
1318   std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_ GUARDED_BY(lock_);
1319 };
1320 
1321 namespace {
1322 
OnMemoryDump(const base::trace_event::MemoryDumpArgs & args,base::trace_event::ProcessMemoryDump * pmd)1323 bool ControllerMemoryDumpProvider::OnMemoryDump(
1324     const base::trace_event::MemoryDumpArgs& args,
1325     base::trace_event::ProcessMemoryDump* pmd) {
1326   base::AutoLock lock(lock_);
1327   for (ChannelAssociatedGroupController* controller : controllers_) {
1328     base::trace_event::MemoryAllocatorDump* dump = pmd->CreateAllocatorDump(
1329         base::StringPrintf("mojo/queued_ipc_channel_message/0x%" PRIxPTR,
1330                            reinterpret_cast<uintptr_t>(controller)));
1331     dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount,
1332                     base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1333                     controller->GetQueuedMessageCount());
1334     MessageMemoryDumpInfo info;
1335     size_t count = 0;
1336     controller->GetTopQueuedMessageMemoryDumpInfo(&info, &count);
1337     dump->AddScalar("top_message_name", "id", info.id);
1338     dump->AddScalar("top_message_count",
1339                     base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1340                     count);
1341 
1342     if (info.profiler_tag) {
1343       // TODO(ssid): Memory dumps currently do not support adding string
1344       // arguments in background dumps. So, add this value as a trace event for
1345       // now.
1346       TRACE_EVENT2(base::trace_event::MemoryDumpManager::kTraceCategory,
1347                    "ControllerMemoryDumpProvider::OnMemoryDump",
1348                    "top_queued_message_tag", info.profiler_tag,
1349                    "count", count);
1350     }
1351   }
1352 
1353   return true;
1354 }
1355 
1356 class MojoBootstrapImpl : public MojoBootstrap {
1357  public:
MojoBootstrapImpl(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<ChannelAssociatedGroupController> controller)1358   MojoBootstrapImpl(
1359       mojo::ScopedMessagePipeHandle handle,
1360       const scoped_refptr<ChannelAssociatedGroupController> controller)
1361       : controller_(controller),
1362         associated_group_(controller),
1363         handle_(std::move(handle)) {}
1364 
1365   MojoBootstrapImpl(const MojoBootstrapImpl&) = delete;
1366   MojoBootstrapImpl& operator=(const MojoBootstrapImpl&) = delete;
1367 
~MojoBootstrapImpl()1368   ~MojoBootstrapImpl() override {
1369     controller_->ShutDown();
1370   }
1371 
1372  private:
Connect(mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)1373   void Connect(
1374       mojo::PendingAssociatedRemote<mojom::Channel>* sender,
1375       mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) override {
1376     controller_->Bind(std::move(handle_), sender, receiver);
1377   }
1378 
StartReceiving()1379   void StartReceiving() override { controller_->StartReceiving(); }
1380 
Pause()1381   void Pause() override {
1382     controller_->Pause();
1383   }
1384 
Unpause()1385   void Unpause() override {
1386     controller_->Unpause();
1387   }
1388 
Flush()1389   void Flush() override {
1390     controller_->FlushOutgoingMessages();
1391   }
1392 
GetAssociatedGroup()1393   mojo::AssociatedGroup* GetAssociatedGroup() override {
1394     return &associated_group_;
1395   }
1396 
SetUrgentMessageObserver(UrgentMessageObserver * observer)1397   void SetUrgentMessageObserver(UrgentMessageObserver* observer) override {
1398     controller_->SetUrgentMessageObserver(observer);
1399   }
1400 
1401   scoped_refptr<ChannelAssociatedGroupController> controller_;
1402   mojo::AssociatedGroup associated_group_;
1403 
1404   mojo::ScopedMessagePipeHandle handle_;
1405 };
1406 
1407 }  // namespace
1408 
1409 ScopedAllowOffSequenceChannelAssociatedBindings::
ScopedAllowOffSequenceChannelAssociatedBindings()1410     ScopedAllowOffSequenceChannelAssociatedBindings()
1411     : resetter_(&off_sequence_binding_allowed, true) {}
1412 
1413 ScopedAllowOffSequenceChannelAssociatedBindings::
1414     ~ScopedAllowOffSequenceChannelAssociatedBindings() = default;
1415 
1416 // static
Create(mojo::ScopedMessagePipeHandle handle,Channel::Mode mode,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)1417 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
1418     mojo::ScopedMessagePipeHandle handle,
1419     Channel::Mode mode,
1420     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
1421     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
1422   return std::make_unique<MojoBootstrapImpl>(
1423       std::move(handle),
1424       base::MakeRefCounted<ChannelAssociatedGroupController>(
1425           mode == Channel::MODE_SERVER, ipc_task_runner, proxy_task_runner));
1426 }
1427 
1428 }  // namespace IPC
1429