1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_mojo_bootstrap.h"
6
7 #include <inttypes.h>
8 #include <stdint.h>
9
10 #include <map>
11 #include <memory>
12 #include <set>
13 #include <utility>
14 #include <vector>
15
16 #include <optional>
17 #include "base/check_op.h"
18 #include "base/containers/circular_deque.h"
19 #include "base/containers/contains.h"
20 #include "base/functional/bind.h"
21 #include "base/functional/callback.h"
22 #include "base/memory/ptr_util.h"
23 #include "base/memory/raw_ptr.h"
24 #include "base/no_destructor.h"
25 #include "base/ranges/algorithm.h"
26 #include "base/strings/stringprintf.h"
27 #include "base/synchronization/lock.h"
28 #include "base/synchronization/waitable_event.h"
29 #include "base/task/common/task_annotator.h"
30 #include "base/task/sequenced_task_runner.h"
31 #include "base/task/single_thread_task_runner.h"
32 #include "base/threading/thread_checker.h"
33 #include "base/trace_event/memory_allocator_dump.h"
34 #include "base/trace_event/memory_dump_manager.h"
35 #include "base/trace_event/memory_dump_provider.h"
36 #include "base/trace_event/typed_macros.h"
37 #include "ipc/ipc_channel.h"
38 #include "ipc/urgent_message_observer.h"
39 #include "mojo/public/cpp/bindings/associated_group.h"
40 #include "mojo/public/cpp/bindings/associated_group_controller.h"
41 #include "mojo/public/cpp/bindings/connector.h"
42 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
43 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
44 #include "mojo/public/cpp/bindings/interface_id.h"
45 #include "mojo/public/cpp/bindings/message.h"
46 #include "mojo/public/cpp/bindings/message_header_validator.h"
47 #include "mojo/public/cpp/bindings/mojo_buildflags.h"
48 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
49 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
50 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
51 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
52 #include "mojo/public/cpp/bindings/tracing_helpers.h"
53 #include "third_party/abseil-cpp/absl/base/attributes.h"
54
55 namespace IPC {
56
57 namespace {
58
59 class ChannelAssociatedGroupController;
60
61 ABSL_CONST_INIT thread_local bool off_sequence_binding_allowed = false;
62
63 // Used to track some internal Channel state in pursuit of message leaks.
64 //
65 // TODO(https://crbug.com/813045): Remove this.
66 class ControllerMemoryDumpProvider
67 : public base::trace_event::MemoryDumpProvider {
68 public:
ControllerMemoryDumpProvider()69 ControllerMemoryDumpProvider() {
70 base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider(
71 this, "IPCChannel", nullptr);
72 }
73
74 ControllerMemoryDumpProvider(const ControllerMemoryDumpProvider&) = delete;
75 ControllerMemoryDumpProvider& operator=(const ControllerMemoryDumpProvider&) =
76 delete;
77
~ControllerMemoryDumpProvider()78 ~ControllerMemoryDumpProvider() override {
79 base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider(
80 this);
81 }
82
AddController(ChannelAssociatedGroupController * controller)83 void AddController(ChannelAssociatedGroupController* controller) {
84 base::AutoLock lock(lock_);
85 controllers_.insert(controller);
86 }
87
RemoveController(ChannelAssociatedGroupController * controller)88 void RemoveController(ChannelAssociatedGroupController* controller) {
89 base::AutoLock lock(lock_);
90 controllers_.erase(controller);
91 }
92
93 // base::trace_event::MemoryDumpProvider:
94 bool OnMemoryDump(const base::trace_event::MemoryDumpArgs& args,
95 base::trace_event::ProcessMemoryDump* pmd) override;
96
97 private:
98 base::Lock lock_;
99 std::set<ChannelAssociatedGroupController*> controllers_;
100 };
101
GetMemoryDumpProvider()102 ControllerMemoryDumpProvider& GetMemoryDumpProvider() {
103 static base::NoDestructor<ControllerMemoryDumpProvider> provider;
104 return *provider;
105 }
106
107 // Messages are grouped by this info when recording memory metrics.
108 struct MessageMemoryDumpInfo {
MessageMemoryDumpInfoIPC::__anone6f62fab0111::MessageMemoryDumpInfo109 MessageMemoryDumpInfo(const mojo::Message& message)
110 : id(message.name()), profiler_tag(message.heap_profiler_tag()) {}
111 MessageMemoryDumpInfo() = default;
112
operator ==IPC::__anone6f62fab0111::MessageMemoryDumpInfo113 bool operator==(const MessageMemoryDumpInfo& other) const {
114 return other.id == id && other.profiler_tag == profiler_tag;
115 }
116
117 uint32_t id = 0;
118 const char* profiler_tag = nullptr;
119 };
120
121 struct MessageMemoryDumpInfoHash {
operator ()IPC::__anone6f62fab0111::MessageMemoryDumpInfoHash122 size_t operator()(const MessageMemoryDumpInfo& info) const {
123 return base::HashInts(
124 info.id, info.profiler_tag ? base::FastHash(info.profiler_tag) : 0);
125 }
126 };
127
128 class ScopedUrgentMessageNotification {
129 public:
ScopedUrgentMessageNotification(UrgentMessageObserver * observer=nullptr)130 explicit ScopedUrgentMessageNotification(
131 UrgentMessageObserver* observer = nullptr)
132 : observer_(observer) {
133 if (observer_) {
134 observer_->OnUrgentMessageReceived();
135 }
136 }
137
~ScopedUrgentMessageNotification()138 ~ScopedUrgentMessageNotification() {
139 if (observer_) {
140 observer_->OnUrgentMessageProcessed();
141 }
142 }
143
ScopedUrgentMessageNotification(ScopedUrgentMessageNotification && other)144 ScopedUrgentMessageNotification(ScopedUrgentMessageNotification&& other)
145 : observer_(std::exchange(other.observer_, nullptr)) {}
146
operator =(ScopedUrgentMessageNotification && other)147 ScopedUrgentMessageNotification& operator=(
148 ScopedUrgentMessageNotification&& other) {
149 observer_ = std::exchange(other.observer_, nullptr);
150 return *this;
151 }
152
153 private:
154 raw_ptr<UrgentMessageObserver> observer_;
155 };
156
157 class ChannelAssociatedGroupController
158 : public mojo::AssociatedGroupController,
159 public mojo::MessageReceiver,
160 public mojo::PipeControlMessageHandlerDelegate {
161 public:
ChannelAssociatedGroupController(bool set_interface_id_namespace_bit,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)162 ChannelAssociatedGroupController(
163 bool set_interface_id_namespace_bit,
164 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
165 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
166 : task_runner_(ipc_task_runner),
167 proxy_task_runner_(proxy_task_runner),
168 set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
169 dispatcher_(this),
170 control_message_handler_(this),
171 control_message_proxy_thunk_(this),
172 control_message_proxy_(&control_message_proxy_thunk_) {
173 thread_checker_.DetachFromThread();
174 control_message_handler_.SetDescription(
175 "IPC::mojom::Bootstrap [primary] PipeControlMessageHandler");
176 dispatcher_.SetValidator(std::make_unique<mojo::MessageHeaderValidator>(
177 "IPC::mojom::Bootstrap [primary] MessageHeaderValidator"));
178
179 GetMemoryDumpProvider().AddController(this);
180 }
181
182 ChannelAssociatedGroupController(const ChannelAssociatedGroupController&) =
183 delete;
184 ChannelAssociatedGroupController& operator=(
185 const ChannelAssociatedGroupController&) = delete;
186
GetQueuedMessageCount()187 size_t GetQueuedMessageCount() {
188 base::AutoLock lock(outgoing_messages_lock_);
189 return outgoing_messages_.size();
190 }
191
GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo * info,size_t * count)192 void GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo* info,
193 size_t* count) {
194 std::unordered_map<MessageMemoryDumpInfo, size_t, MessageMemoryDumpInfoHash>
195 counts;
196 std::pair<MessageMemoryDumpInfo, size_t> top_message_info_and_count = {
197 MessageMemoryDumpInfo(), 0};
198 base::AutoLock lock(outgoing_messages_lock_);
199 for (const auto& message : outgoing_messages_) {
200 auto it_and_inserted = counts.emplace(MessageMemoryDumpInfo(message), 0);
201 it_and_inserted.first->second++;
202 if (it_and_inserted.first->second > top_message_info_and_count.second)
203 top_message_info_and_count = *it_and_inserted.first;
204 }
205 *info = top_message_info_and_count.first;
206 *count = top_message_info_and_count.second;
207 }
208
Pause()209 void Pause() {
210 DCHECK(!paused_);
211 paused_ = true;
212 }
213
Unpause()214 void Unpause() {
215 DCHECK(paused_);
216 paused_ = false;
217 }
218
FlushOutgoingMessages()219 void FlushOutgoingMessages() {
220 std::vector<mojo::Message> outgoing_messages;
221 {
222 base::AutoLock lock(outgoing_messages_lock_);
223 std::swap(outgoing_messages, outgoing_messages_);
224 }
225
226 for (auto& message : outgoing_messages)
227 SendMessage(&message);
228 }
229
Bind(mojo::ScopedMessagePipeHandle handle,mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)230 void Bind(mojo::ScopedMessagePipeHandle handle,
231 mojo::PendingAssociatedRemote<mojom::Channel>* sender,
232 mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) {
233 connector_ = std::make_unique<mojo::Connector>(
234 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
235 "IPC Channel");
236 connector_->set_incoming_receiver(&dispatcher_);
237 connector_->set_connection_error_handler(
238 base::BindOnce(&ChannelAssociatedGroupController::OnPipeError,
239 base::Unretained(this)));
240 connector_->set_enforce_errors_from_incoming_receiver(false);
241
242 // Don't let the Connector do any sort of queuing on our behalf. Individual
243 // messages bound for the IPC::ChannelProxy thread (i.e. that vast majority
244 // of messages received by this Connector) are already individually
245 // scheduled for dispatch by ChannelProxy, so Connector's normal mode of
246 // operation would only introduce a redundant scheduling step for most
247 // messages.
248 connector_->set_force_immediate_dispatch(true);
249
250 mojo::InterfaceId sender_id, receiver_id;
251 if (set_interface_id_namespace_bit_) {
252 sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
253 receiver_id = 1;
254 } else {
255 sender_id = 1;
256 receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
257 }
258
259 {
260 base::AutoLock locker(lock_);
261 Endpoint* sender_endpoint = new Endpoint(this, sender_id);
262 Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
263 endpoints_.insert({ sender_id, sender_endpoint });
264 endpoints_.insert({ receiver_id, receiver_endpoint });
265 sender_endpoint->set_handle_created();
266 receiver_endpoint->set_handle_created();
267 }
268
269 mojo::ScopedInterfaceEndpointHandle sender_handle =
270 CreateScopedInterfaceEndpointHandle(sender_id);
271 mojo::ScopedInterfaceEndpointHandle receiver_handle =
272 CreateScopedInterfaceEndpointHandle(receiver_id);
273
274 *sender = mojo::PendingAssociatedRemote<mojom::Channel>(
275 std::move(sender_handle), 0);
276 *receiver = mojo::PendingAssociatedReceiver<mojom::Channel>(
277 std::move(receiver_handle));
278 }
279
StartReceiving()280 void StartReceiving() { connector_->StartReceiving(task_runner_); }
281
ShutDown()282 void ShutDown() {
283 DCHECK(thread_checker_.CalledOnValidThread());
284 shut_down_ = true;
285 if (connector_)
286 connector_->CloseMessagePipe();
287 OnPipeError();
288 connector_.reset();
289
290 base::AutoLock lock(outgoing_messages_lock_);
291 outgoing_messages_.clear();
292 }
293
294 // mojo::AssociatedGroupController:
AssociateInterface(mojo::ScopedInterfaceEndpointHandle handle_to_send)295 mojo::InterfaceId AssociateInterface(
296 mojo::ScopedInterfaceEndpointHandle handle_to_send) override {
297 if (!handle_to_send.pending_association())
298 return mojo::kInvalidInterfaceId;
299
300 uint32_t id = 0;
301 {
302 base::AutoLock locker(lock_);
303 do {
304 if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask)
305 next_interface_id_ = 2;
306 id = next_interface_id_++;
307 if (set_interface_id_namespace_bit_)
308 id |= mojo::kInterfaceIdNamespaceMask;
309 } while (base::Contains(endpoints_, id));
310
311 Endpoint* endpoint = new Endpoint(this, id);
312 if (encountered_error_)
313 endpoint->set_peer_closed();
314 endpoint->set_handle_created();
315 endpoints_.insert({id, endpoint});
316 }
317
318 if (!NotifyAssociation(&handle_to_send, id)) {
319 // The peer handle of |handle_to_send|, which is supposed to join this
320 // associated group, has been closed.
321 {
322 base::AutoLock locker(lock_);
323 Endpoint* endpoint = FindEndpoint(id);
324 if (endpoint)
325 MarkClosedAndMaybeRemove(endpoint);
326 }
327
328 control_message_proxy_.NotifyPeerEndpointClosed(
329 id, handle_to_send.disconnect_reason());
330 }
331 return id;
332 }
333
CreateLocalEndpointHandle(mojo::InterfaceId id)334 mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
335 mojo::InterfaceId id) override {
336 if (!mojo::IsValidInterfaceId(id))
337 return mojo::ScopedInterfaceEndpointHandle();
338
339 // Unless it is the primary ID, |id| is from the remote side and therefore
340 // its namespace bit is supposed to be different than the value that this
341 // router would use.
342 if (!mojo::IsPrimaryInterfaceId(id) &&
343 set_interface_id_namespace_bit_ ==
344 mojo::HasInterfaceIdNamespaceBitSet(id)) {
345 return mojo::ScopedInterfaceEndpointHandle();
346 }
347
348 base::AutoLock locker(lock_);
349 bool inserted = false;
350 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
351 if (inserted) {
352 DCHECK(!endpoint->handle_created());
353 if (encountered_error_)
354 endpoint->set_peer_closed();
355 } else {
356 if (endpoint->handle_created())
357 return mojo::ScopedInterfaceEndpointHandle();
358 }
359
360 endpoint->set_handle_created();
361 return CreateScopedInterfaceEndpointHandle(id);
362 }
363
CloseEndpointHandle(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)364 void CloseEndpointHandle(
365 mojo::InterfaceId id,
366 const std::optional<mojo::DisconnectReason>& reason) override {
367 if (!mojo::IsValidInterfaceId(id))
368 return;
369 {
370 base::AutoLock locker(lock_);
371 DCHECK(base::Contains(endpoints_, id));
372 Endpoint* endpoint = endpoints_[id].get();
373 DCHECK(!endpoint->client());
374 DCHECK(!endpoint->closed());
375 MarkClosedAndMaybeRemove(endpoint);
376 }
377
378 if (!mojo::IsPrimaryInterfaceId(id) || reason)
379 control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
380 }
381
AttachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle,mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)382 mojo::InterfaceEndpointController* AttachEndpointClient(
383 const mojo::ScopedInterfaceEndpointHandle& handle,
384 mojo::InterfaceEndpointClient* client,
385 scoped_refptr<base::SequencedTaskRunner> runner) override {
386 const mojo::InterfaceId id = handle.id();
387
388 DCHECK(mojo::IsValidInterfaceId(id));
389 DCHECK(client);
390
391 base::AutoLock locker(lock_);
392 DCHECK(base::Contains(endpoints_, id));
393
394 Endpoint* endpoint = endpoints_[id].get();
395 endpoint->AttachClient(client, std::move(runner));
396
397 if (endpoint->peer_closed())
398 NotifyEndpointOfError(endpoint, true /* force_async */);
399
400 return endpoint;
401 }
402
DetachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle)403 void DetachEndpointClient(
404 const mojo::ScopedInterfaceEndpointHandle& handle) override {
405 const mojo::InterfaceId id = handle.id();
406
407 DCHECK(mojo::IsValidInterfaceId(id));
408
409 base::AutoLock locker(lock_);
410 DCHECK(base::Contains(endpoints_, id));
411
412 Endpoint* endpoint = endpoints_[id].get();
413 endpoint->DetachClient();
414 }
415
RaiseError()416 void RaiseError() override {
417 // We ignore errors on channel endpoints, leaving the pipe open. There are
418 // good reasons for this:
419 //
420 // * We should never close a channel endpoint in either process as long as
421 // the child process is still alive. The child's endpoint should only be
422 // closed implicitly by process death, and the browser's endpoint should
423 // only be closed after the child process is confirmed to be dead. Crash
424 // reporting logic in Chrome relies on this behavior in order to do the
425 // right thing.
426 //
427 // * There are two interesting conditions under which RaiseError() can be
428 // implicitly reached: an incoming message fails validation, or the
429 // local endpoint drops a response callback without calling it.
430 //
431 // * In the validation case, we also report the message as bad, and this
432 // will imminently trigger the common bad-IPC path in the browser,
433 // causing the browser to kill the offending renderer.
434 //
435 // * In the dropped response callback case, the net result of ignoring the
436 // issue is generally innocuous. While indicative of programmer error,
437 // it's not a severe failure and is already covered by separate DCHECKs.
438 //
439 // See https://crbug.com/861607 for additional discussion.
440 }
441
PrefersSerializedMessages()442 bool PrefersSerializedMessages() override { return true; }
443
SetUrgentMessageObserver(UrgentMessageObserver * observer)444 void SetUrgentMessageObserver(UrgentMessageObserver* observer) {
445 urgent_message_observer_ = observer;
446 }
447
448 private:
449 class Endpoint;
450 class ControlMessageProxyThunk;
451 friend class Endpoint;
452 friend class ControlMessageProxyThunk;
453
454 // MessageWrapper objects are always destroyed under the controller's lock. On
455 // destruction, if the message it wrappers contains
456 // ScopedInterfaceEndpointHandles (which cannot be destructed under the
457 // controller's lock), the wrapper unlocks to clean them up.
458 class MessageWrapper {
459 public:
460 MessageWrapper() = default;
461
MessageWrapper(ChannelAssociatedGroupController * controller,mojo::Message message)462 MessageWrapper(ChannelAssociatedGroupController* controller,
463 mojo::Message message)
464 : controller_(controller), value_(std::move(message)) {}
465
MessageWrapper(MessageWrapper && other)466 MessageWrapper(MessageWrapper&& other)
467 : controller_(other.controller_), value_(std::move(other.value_)) {}
468
469 MessageWrapper(const MessageWrapper&) = delete;
470 MessageWrapper& operator=(const MessageWrapper&) = delete;
471
~MessageWrapper()472 ~MessageWrapper() {
473 if (value_.associated_endpoint_handles()->empty())
474 return;
475
476 controller_->lock_.AssertAcquired();
477 {
478 base::AutoUnlock unlocker(controller_->lock_);
479 value_.mutable_associated_endpoint_handles()->clear();
480 }
481 }
482
operator =(MessageWrapper && other)483 MessageWrapper& operator=(MessageWrapper&& other) {
484 controller_ = other.controller_;
485 value_ = std::move(other.value_);
486 return *this;
487 }
488
HasRequestId(uint64_t request_id)489 bool HasRequestId(uint64_t request_id) {
490 return !value_.IsNull() && value_.version() >= 1 &&
491 value_.header_v1()->request_id == request_id;
492 }
493
value()494 mojo::Message& value() { return value_; }
495
496 private:
497 raw_ptr<ChannelAssociatedGroupController> controller_ = nullptr;
498 mojo::Message value_;
499 };
500
501 class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
502 public mojo::InterfaceEndpointController {
503 public:
Endpoint(ChannelAssociatedGroupController * controller,mojo::InterfaceId id)504 Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
505 : controller_(controller), id_(id) {}
506
507 Endpoint(const Endpoint&) = delete;
508 Endpoint& operator=(const Endpoint&) = delete;
509
id() const510 mojo::InterfaceId id() const { return id_; }
511
closed() const512 bool closed() const {
513 controller_->lock_.AssertAcquired();
514 return closed_;
515 }
516
set_closed()517 void set_closed() {
518 controller_->lock_.AssertAcquired();
519 closed_ = true;
520 }
521
peer_closed() const522 bool peer_closed() const {
523 controller_->lock_.AssertAcquired();
524 return peer_closed_;
525 }
526
set_peer_closed()527 void set_peer_closed() {
528 controller_->lock_.AssertAcquired();
529 peer_closed_ = true;
530 }
531
handle_created() const532 bool handle_created() const {
533 controller_->lock_.AssertAcquired();
534 return handle_created_;
535 }
536
set_handle_created()537 void set_handle_created() {
538 controller_->lock_.AssertAcquired();
539 handle_created_ = true;
540 }
541
disconnect_reason() const542 const std::optional<mojo::DisconnectReason>& disconnect_reason() const {
543 return disconnect_reason_;
544 }
545
set_disconnect_reason(const std::optional<mojo::DisconnectReason> & disconnect_reason)546 void set_disconnect_reason(
547 const std::optional<mojo::DisconnectReason>& disconnect_reason) {
548 disconnect_reason_ = disconnect_reason;
549 }
550
task_runner() const551 base::SequencedTaskRunner* task_runner() const {
552 return task_runner_.get();
553 }
554
was_bound_off_sequence() const555 bool was_bound_off_sequence() const { return was_bound_off_sequence_; }
556
client() const557 mojo::InterfaceEndpointClient* client() const {
558 controller_->lock_.AssertAcquired();
559 return client_;
560 }
561
AttachClient(mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)562 void AttachClient(mojo::InterfaceEndpointClient* client,
563 scoped_refptr<base::SequencedTaskRunner> runner) {
564 controller_->lock_.AssertAcquired();
565 DCHECK(!client_);
566 DCHECK(!closed_);
567
568 task_runner_ = std::move(runner);
569 client_ = client;
570
571 if (off_sequence_binding_allowed) {
572 was_bound_off_sequence_ = true;
573 }
574 }
575
DetachClient()576 void DetachClient() {
577 controller_->lock_.AssertAcquired();
578 DCHECK(client_);
579 DCHECK(!closed_);
580
581 task_runner_ = nullptr;
582 client_ = nullptr;
583 sync_watcher_.reset();
584 }
585
EnqueueSyncMessage(MessageWrapper message)586 std::optional<uint32_t> EnqueueSyncMessage(MessageWrapper message) {
587 controller_->lock_.AssertAcquired();
588 if (exclusive_wait_ && exclusive_wait_->TryFulfillingWith(message)) {
589 exclusive_wait_ = nullptr;
590 return std::nullopt;
591 }
592
593 uint32_t id = GenerateSyncMessageId();
594 sync_messages_.emplace_back(id, std::move(message));
595 SignalSyncMessageEvent();
596 return id;
597 }
598
SignalSyncMessageEvent()599 void SignalSyncMessageEvent() {
600 controller_->lock_.AssertAcquired();
601
602 if (sync_watcher_)
603 sync_watcher_->SignalEvent();
604 }
605
PopSyncMessage(uint32_t id)606 MessageWrapper PopSyncMessage(uint32_t id) {
607 controller_->lock_.AssertAcquired();
608 if (sync_messages_.empty() || sync_messages_.front().first != id)
609 return MessageWrapper();
610 MessageWrapper message = std::move(sync_messages_.front().second);
611 sync_messages_.pop_front();
612 return message;
613 }
614
615 // mojo::InterfaceEndpointController:
SendMessage(mojo::Message * message)616 bool SendMessage(mojo::Message* message) override {
617 DCHECK(task_runner_->RunsTasksInCurrentSequence());
618 message->set_interface_id(id_);
619 return controller_->SendMessage(message);
620 }
621
AllowWokenUpBySyncWatchOnSameThread()622 void AllowWokenUpBySyncWatchOnSameThread() override {
623 DCHECK(task_runner_->RunsTasksInCurrentSequence());
624
625 EnsureSyncWatcherExists();
626 sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
627 }
628
SyncWatch(const bool & should_stop)629 bool SyncWatch(const bool& should_stop) override {
630 DCHECK(task_runner_->RunsTasksInCurrentSequence());
631
632 // It's not legal to make sync calls from the primary endpoint's thread,
633 // and in fact they must only happen from the proxy task runner.
634 DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
635 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
636
637 EnsureSyncWatcherExists();
638 {
639 base::AutoLock locker(controller_->lock_);
640 if (peer_closed_) {
641 SignalSyncMessageEvent();
642 }
643 }
644 return sync_watcher_->SyncWatch(&should_stop);
645 }
646
WaitForIncomingSyncReply(uint64_t request_id)647 MessageWrapper WaitForIncomingSyncReply(uint64_t request_id) {
648 std::optional<ExclusiveSyncWait> wait;
649 {
650 base::AutoLock lock(controller_->lock_);
651 for (auto& [id, message] : sync_messages_) {
652 if (message.HasRequestId(request_id)) {
653 return std::move(message);
654 }
655 }
656
657 DCHECK(!exclusive_wait_);
658 wait.emplace(request_id);
659 exclusive_wait_ = &wait.value();
660 }
661
662 wait->event.Wait();
663 return std::move(wait->message);
664 }
665
SyncWatchExclusive(uint64_t request_id)666 bool SyncWatchExclusive(uint64_t request_id) override {
667 MessageWrapper message = WaitForIncomingSyncReply(request_id);
668 if (message.value().IsNull() || !client_) {
669 return false;
670 }
671
672 if (!client_->HandleIncomingMessage(&message.value())) {
673 base::AutoLock locker(controller_->lock_);
674 controller_->RaiseError();
675 return false;
676 }
677
678 return true;
679 }
680
RegisterExternalSyncWaiter(uint64_t request_id)681 void RegisterExternalSyncWaiter(uint64_t request_id) override {}
682
683 private:
684 friend class base::RefCountedThreadSafe<Endpoint>;
685
~Endpoint()686 ~Endpoint() override {
687 controller_->lock_.AssertAcquired();
688 DCHECK(!client_);
689 DCHECK(closed_);
690 DCHECK(peer_closed_);
691 DCHECK(!sync_watcher_);
692 if (exclusive_wait_) {
693 exclusive_wait_->event.Signal();
694 }
695 }
696
OnSyncMessageEventReady()697 void OnSyncMessageEventReady() {
698 DCHECK(task_runner_->RunsTasksInCurrentSequence());
699
700 // SUBTLE: The order of these scoped_refptrs matters.
701 // `controller_keepalive` MUST outlive `keepalive` because the Endpoint
702 // holds raw pointer to the AssociatedGroupController.
703 scoped_refptr<AssociatedGroupController> controller_keepalive(
704 controller_.get());
705 scoped_refptr<Endpoint> keepalive(this);
706 base::AutoLock locker(controller_->lock_);
707 bool more_to_process = false;
708 if (!sync_messages_.empty()) {
709 MessageWrapper message_wrapper =
710 std::move(sync_messages_.front().second);
711 sync_messages_.pop_front();
712
713 bool dispatch_succeeded;
714 mojo::InterfaceEndpointClient* client = client_;
715 {
716 base::AutoUnlock unlocker(controller_->lock_);
717 dispatch_succeeded =
718 client->HandleIncomingMessage(&message_wrapper.value());
719 }
720
721 if (!sync_messages_.empty())
722 more_to_process = true;
723
724 if (!dispatch_succeeded)
725 controller_->RaiseError();
726 }
727
728 if (!more_to_process)
729 sync_watcher_->ResetEvent();
730
731 // If there are no queued sync messages and the peer has closed, there
732 // there won't be incoming sync messages in the future. If any
733 // SyncWatch() calls are on the stack for this endpoint, resetting the
734 // watcher will allow them to exit as the stack undwinds.
735 if (!more_to_process && peer_closed_)
736 sync_watcher_.reset();
737 }
738
EnsureSyncWatcherExists()739 void EnsureSyncWatcherExists() {
740 DCHECK(task_runner_->RunsTasksInCurrentSequence());
741 if (sync_watcher_)
742 return;
743
744 base::AutoLock locker(controller_->lock_);
745 sync_watcher_ = std::make_unique<mojo::SequenceLocalSyncEventWatcher>(
746 base::BindRepeating(&Endpoint::OnSyncMessageEventReady,
747 base::Unretained(this)));
748 if (peer_closed_ || !sync_messages_.empty())
749 SignalSyncMessageEvent();
750 }
751
GenerateSyncMessageId()752 uint32_t GenerateSyncMessageId() {
753 // Overflow is fine.
754 uint32_t id = next_sync_message_id_++;
755 DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
756 return id;
757 }
758
759 // Tracks the state of a pending sync wait which excludes all other incoming
760 // IPC on the waiting thread.
761 struct ExclusiveSyncWait {
ExclusiveSyncWaitIPC::__anone6f62fab0111::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait762 explicit ExclusiveSyncWait(uint64_t request_id)
763 : request_id(request_id) {}
764 ~ExclusiveSyncWait() = default;
765
TryFulfillingWithIPC::__anone6f62fab0111::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait766 bool TryFulfillingWith(MessageWrapper& wrapper) {
767 if (!wrapper.HasRequestId(request_id)) {
768 return false;
769 }
770
771 message = std::move(wrapper);
772 event.Signal();
773 return true;
774 }
775
776 uint64_t request_id;
777 base::WaitableEvent event;
778 MessageWrapper message;
779 };
780
781 const raw_ptr<ChannelAssociatedGroupController> controller_;
782 const mojo::InterfaceId id_;
783
784 bool closed_ = false;
785 bool peer_closed_ = false;
786 bool handle_created_ = false;
787 bool was_bound_off_sequence_ = false;
788 std::optional<mojo::DisconnectReason> disconnect_reason_;
789 raw_ptr<mojo::InterfaceEndpointClient> client_ = nullptr;
790 scoped_refptr<base::SequencedTaskRunner> task_runner_;
791 std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_;
792 base::circular_deque<std::pair<uint32_t, MessageWrapper>> sync_messages_;
793 raw_ptr<ExclusiveSyncWait> exclusive_wait_ = nullptr;
794 uint32_t next_sync_message_id_ = 0;
795 };
796
797 class ControlMessageProxyThunk : public MessageReceiver {
798 public:
ControlMessageProxyThunk(ChannelAssociatedGroupController * controller)799 explicit ControlMessageProxyThunk(
800 ChannelAssociatedGroupController* controller)
801 : controller_(controller) {}
802
803 ControlMessageProxyThunk(const ControlMessageProxyThunk&) = delete;
804 ControlMessageProxyThunk& operator=(const ControlMessageProxyThunk&) =
805 delete;
806
807 private:
808 // MessageReceiver:
Accept(mojo::Message * message)809 bool Accept(mojo::Message* message) override {
810 return controller_->SendMessage(message);
811 }
812
813 raw_ptr<ChannelAssociatedGroupController> controller_;
814 };
815
~ChannelAssociatedGroupController()816 ~ChannelAssociatedGroupController() override {
817 DCHECK(!connector_);
818
819 base::AutoLock locker(lock_);
820 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
821 Endpoint* endpoint = iter->second.get();
822 ++iter;
823
824 if (!endpoint->closed()) {
825 // This happens when a NotifyPeerEndpointClosed message been received,
826 // but the interface ID hasn't been used to create local endpoint
827 // handle.
828 DCHECK(!endpoint->client());
829 DCHECK(endpoint->peer_closed());
830 MarkClosed(endpoint);
831 } else {
832 MarkPeerClosed(endpoint);
833 }
834 }
835 endpoints_.clear();
836
837 GetMemoryDumpProvider().RemoveController(this);
838 }
839
SendMessage(mojo::Message * message)840 bool SendMessage(mojo::Message* message) {
841 DCHECK(message->heap_profiler_tag());
842 if (task_runner_->BelongsToCurrentThread()) {
843 DCHECK(thread_checker_.CalledOnValidThread());
844 if (!connector_ || paused_) {
845 if (!shut_down_) {
846 base::AutoLock lock(outgoing_messages_lock_);
847 outgoing_messages_.emplace_back(std::move(*message));
848 }
849 return true;
850 }
851 return connector_->Accept(message);
852 } else {
853 // We always post tasks to the primary endpoint thread when called from
854 // other threads in order to simulate IPC::ChannelProxy::Send behavior.
855 task_runner_->PostTask(
856 FROM_HERE,
857 base::BindOnce(
858 &ChannelAssociatedGroupController::SendMessageOnPrimaryThread,
859 this, std::move(*message)));
860 return true;
861 }
862 }
863
SendMessageOnPrimaryThread(mojo::Message message)864 void SendMessageOnPrimaryThread(mojo::Message message) {
865 DCHECK(thread_checker_.CalledOnValidThread());
866 if (!SendMessage(&message))
867 RaiseError();
868 }
869
OnPipeError()870 void OnPipeError() {
871 DCHECK(thread_checker_.CalledOnValidThread());
872
873 // We keep |this| alive here because it's possible for the notifications
874 // below to release all other references.
875 scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
876
877 base::AutoLock locker(lock_);
878 encountered_error_ = true;
879
880 std::vector<uint32_t> endpoints_to_remove;
881 std::vector<scoped_refptr<Endpoint>> endpoints_to_notify;
882 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
883 Endpoint* endpoint = iter->second.get();
884 ++iter;
885
886 if (endpoint->client()) {
887 endpoints_to_notify.push_back(endpoint);
888 }
889
890 if (MarkPeerClosed(endpoint)) {
891 endpoints_to_remove.push_back(endpoint->id());
892 }
893 }
894
895 for (auto& endpoint : endpoints_to_notify) {
896 // Because a notification may in turn detach any endpoint, we have to
897 // check each client again here.
898 if (endpoint->client())
899 NotifyEndpointOfError(endpoint.get(), false /* force_async */);
900 }
901
902 for (uint32_t id : endpoints_to_remove) {
903 endpoints_.erase(id);
904 }
905 }
906
NotifyEndpointOfError(Endpoint * endpoint,bool force_async)907 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) {
908 lock_.AssertAcquired();
909 DCHECK(endpoint->task_runner() && endpoint->client());
910 if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
911 mojo::InterfaceEndpointClient* client = endpoint->client();
912 std::optional<mojo::DisconnectReason> reason(
913 endpoint->disconnect_reason());
914
915 base::AutoUnlock unlocker(lock_);
916 client->NotifyError(reason);
917 } else {
918 endpoint->task_runner()->PostTask(
919 FROM_HERE,
920 base::BindOnce(&ChannelAssociatedGroupController::
921 NotifyEndpointOfErrorOnEndpointThread,
922 this, endpoint->id(),
923 // This is safe as `endpoint` is verified to be in
924 // `endpoints_` (a map with ownership) before use.
925 base::UnsafeDangling(endpoint)));
926 }
927 }
928
929 // `endpoint` might be a dangling ptr and must be checked before dereference.
NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,MayBeDangling<Endpoint> endpoint)930 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
931 MayBeDangling<Endpoint> endpoint) {
932 base::AutoLock locker(lock_);
933 auto iter = endpoints_.find(id);
934 if (iter == endpoints_.end() || iter->second.get() != endpoint)
935 return;
936 if (!endpoint->client())
937 return;
938
939 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
940 NotifyEndpointOfError(endpoint, false /* force_async */);
941 }
942
943 // Marks `endpoint` as closed and returns true if and only if its peer was
944 // also already closed.
MarkClosed(Endpoint * endpoint)945 bool MarkClosed(Endpoint* endpoint) {
946 lock_.AssertAcquired();
947 endpoint->set_closed();
948 return endpoint->peer_closed();
949 }
950
951 // Marks `endpoint` as having a closed peer and returns true if and only if
952 // `endpoint` itself was also already closed.
MarkPeerClosed(Endpoint * endpoint)953 bool MarkPeerClosed(Endpoint* endpoint) {
954 lock_.AssertAcquired();
955 endpoint->set_peer_closed();
956 endpoint->SignalSyncMessageEvent();
957 return endpoint->closed();
958 }
959
MarkClosedAndMaybeRemove(Endpoint * endpoint)960 void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
961 if (MarkClosed(endpoint)) {
962 endpoints_.erase(endpoint->id());
963 }
964 }
965
MarkPeerClosedAndMaybeRemove(Endpoint * endpoint)966 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
967 if (MarkPeerClosed(endpoint)) {
968 endpoints_.erase(endpoint->id());
969 }
970 }
971
FindOrInsertEndpoint(mojo::InterfaceId id,bool * inserted)972 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
973 lock_.AssertAcquired();
974 DCHECK(!inserted || !*inserted);
975
976 Endpoint* endpoint = FindEndpoint(id);
977 if (!endpoint) {
978 endpoint = new Endpoint(this, id);
979 endpoints_.insert({id, endpoint});
980 if (inserted)
981 *inserted = true;
982 }
983 return endpoint;
984 }
985
FindEndpoint(mojo::InterfaceId id)986 Endpoint* FindEndpoint(mojo::InterfaceId id) {
987 lock_.AssertAcquired();
988 auto iter = endpoints_.find(id);
989 return iter != endpoints_.end() ? iter->second.get() : nullptr;
990 }
991
992 // mojo::MessageReceiver:
Accept(mojo::Message * message)993 bool Accept(mojo::Message* message) override {
994 DCHECK(thread_checker_.CalledOnValidThread());
995
996 if (!message->DeserializeAssociatedEndpointHandles(this))
997 return false;
998
999 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
1000 return control_message_handler_.Accept(message);
1001
1002 mojo::InterfaceId id = message->interface_id();
1003 if (!mojo::IsValidInterfaceId(id))
1004 return false;
1005
1006 base::ReleasableAutoLock locker(&lock_);
1007 Endpoint* endpoint = FindEndpoint(id);
1008 if (!endpoint)
1009 return true;
1010
1011 mojo::InterfaceEndpointClient* client = endpoint->client();
1012 if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) {
1013 // The ChannelProxy for this channel is bound to `proxy_task_runner_` and
1014 // by default legacy IPCs must dispatch to either the IO thread or the
1015 // proxy task runner. We generally impose the same constraint on
1016 // associated interface endpoints so that FIFO can be guaranteed across
1017 // all interfaces without stalling any of them to wait for a pending
1018 // endpoint to be bound.
1019 //
1020 // This allows us to assume that if an endpoint is not yet bound when we
1021 // receive a message targeting it, it *will* be bound on the proxy task
1022 // runner by the time a newly posted task runs there. Hence we simply post
1023 // a hopeful dispatch task to that task runner.
1024 //
1025 // As it turns out, there are even some instances of endpoints binding to
1026 // alternative (non-IO-thread, non-proxy) task runners, but still
1027 // ultimately relying on the fact that we schedule their messages on the
1028 // proxy task runner. So even if the endpoint is already bound, we
1029 // default to scheduling it on the proxy task runner as long as it's not
1030 // bound specifically to the IO task runner.
1031 // TODO(rockot): Try to sort out these cases and maybe eliminate them.
1032 //
1033 // Finally, it's also possible that an endpoint was bound to an
1034 // alternative task runner and it really does want its messages to
1035 // dispatch there. In that case `was_bound_off_sequence()` will be true to
1036 // signal that we should really use that task runner.
1037 const scoped_refptr<base::SequencedTaskRunner> task_runner =
1038 client && endpoint->was_bound_off_sequence()
1039 ? endpoint->task_runner()
1040 : proxy_task_runner_.get();
1041
1042 ScopedUrgentMessageNotification scoped_urgent_message_notification(
1043 message->has_flag(mojo::Message::kFlagIsUrgent)
1044 ? urgent_message_observer_
1045 : nullptr);
1046
1047 if (message->has_flag(mojo::Message::kFlagIsSync)) {
1048 MessageWrapper message_wrapper(this, std::move(*message));
1049 // Sync messages may need to be handled by the endpoint if it's blocking
1050 // on a sync reply. We pass ownership of the message to the endpoint's
1051 // sync message queue. If the endpoint was blocking, it will dequeue the
1052 // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
1053 // call will dequeue the message and dispatch it.
1054 std::optional<uint32_t> message_id =
1055 endpoint->EnqueueSyncMessage(std::move(message_wrapper));
1056 if (message_id) {
1057 task_runner->PostTask(
1058 FROM_HERE,
1059 base::BindOnce(
1060 &ChannelAssociatedGroupController::AcceptSyncMessage, this,
1061 id, *message_id,
1062 std::move(scoped_urgent_message_notification)));
1063 }
1064 return true;
1065 }
1066
1067 // If |task_runner| has been torn down already, this PostTask will fail
1068 // and destroy |message|. That operation may need to in turn destroy
1069 // in-transit associated endpoints and thus acquire |lock_|. We no longer
1070 // need the lock to be held now, so we can release it before the PostTask.
1071 {
1072 // Grab interface name from |client| before releasing the lock to ensure
1073 // that |client| is safe to access.
1074 base::TaskAnnotator::ScopedSetIpcHash scoped_set_ipc_hash(
1075 client ? client->interface_name() : "unknown interface");
1076 locker.Release();
1077 task_runner->PostTask(
1078 FROM_HERE,
1079 base::BindOnce(
1080 &ChannelAssociatedGroupController::AcceptOnEndpointThread, this,
1081 std::move(*message),
1082 std::move(scoped_urgent_message_notification)));
1083 }
1084 return true;
1085 }
1086
1087 locker.Release();
1088 // It's safe to access |client| here without holding a lock, because this
1089 // code runs on a proxy thread and |client| can't be destroyed from any
1090 // thread.
1091 return client->HandleIncomingMessage(message);
1092 }
1093
AcceptOnEndpointThread(mojo::Message message,ScopedUrgentMessageNotification scoped_urgent_message_notification)1094 void AcceptOnEndpointThread(
1095 mojo::Message message,
1096 ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1097 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1098 "ChannelAssociatedGroupController::AcceptOnEndpointThread");
1099
1100 mojo::InterfaceId id = message.interface_id();
1101 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsPrimaryInterfaceId(id));
1102
1103 base::AutoLock locker(lock_);
1104 Endpoint* endpoint = FindEndpoint(id);
1105 if (!endpoint)
1106 return;
1107
1108 mojo::InterfaceEndpointClient* client = endpoint->client();
1109 if (!client)
1110 return;
1111
1112 if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1113 !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1114 return;
1115 }
1116
1117 // TODO(altimin): This event is temporarily kept as a debug fallback. Remove
1118 // it once the new implementation proves to be stable.
1119 TRACE_EVENT(
1120 TRACE_DISABLED_BY_DEFAULT("mojom"),
1121 // Using client->interface_name() is safe here because this is a static
1122 // string defined for each mojo interface.
1123 perfetto::StaticString(client->interface_name()),
1124 [&](perfetto::EventContext& ctx) {
1125 static const uint8_t* toplevel_flow_enabled =
1126 TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("toplevel.flow");
1127 if (!*toplevel_flow_enabled)
1128 return;
1129
1130 perfetto::Flow::Global(message.GetTraceId())(ctx);
1131 });
1132
1133 // Sync messages should never make their way to this method.
1134 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
1135
1136 bool result = false;
1137 {
1138 base::AutoUnlock unlocker(lock_);
1139 result = client->HandleIncomingMessage(&message);
1140 }
1141
1142 if (!result)
1143 RaiseError();
1144 }
1145
AcceptSyncMessage(mojo::InterfaceId interface_id,uint32_t message_id,ScopedUrgentMessageNotification scoped_urgent_message_notification)1146 void AcceptSyncMessage(
1147 mojo::InterfaceId interface_id,
1148 uint32_t message_id,
1149 ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1150 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1151 "ChannelAssociatedGroupController::AcceptSyncMessage");
1152
1153 base::AutoLock locker(lock_);
1154 Endpoint* endpoint = FindEndpoint(interface_id);
1155 if (!endpoint)
1156 return;
1157
1158 // Careful, if the endpoint is detached its members are cleared. Check for
1159 // that before dereferencing.
1160 mojo::InterfaceEndpointClient* client = endpoint->client();
1161 if (!client)
1162 return;
1163
1164 if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1165 !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1166 return;
1167 }
1168
1169 // Using client->interface_name() is safe here because this is a static
1170 // string defined for each mojo interface.
1171 TRACE_EVENT0("mojom", client->interface_name());
1172 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
1173
1174 // The message must have already been dequeued by the endpoint waking up
1175 // from a sync wait. Nothing to do.
1176 if (message_wrapper.value().IsNull())
1177 return;
1178
1179 bool result = false;
1180 {
1181 base::AutoUnlock unlocker(lock_);
1182 result = client->HandleIncomingMessage(&message_wrapper.value());
1183 }
1184
1185 if (!result)
1186 RaiseError();
1187 }
1188
1189 // mojo::PipeControlMessageHandlerDelegate:
OnPeerAssociatedEndpointClosed(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)1190 bool OnPeerAssociatedEndpointClosed(
1191 mojo::InterfaceId id,
1192 const std::optional<mojo::DisconnectReason>& reason) override {
1193 DCHECK(thread_checker_.CalledOnValidThread());
1194
1195 scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
1196 base::AutoLock locker(lock_);
1197 scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr);
1198 if (reason)
1199 endpoint->set_disconnect_reason(reason);
1200 if (!endpoint->peer_closed()) {
1201 if (endpoint->client())
1202 NotifyEndpointOfError(endpoint.get(), false /* force_async */);
1203 MarkPeerClosedAndMaybeRemove(endpoint.get());
1204 }
1205
1206 return true;
1207 }
1208
WaitForFlushToComplete(mojo::ScopedMessagePipeHandle flush_pipe)1209 bool WaitForFlushToComplete(
1210 mojo::ScopedMessagePipeHandle flush_pipe) override {
1211 // We don't support async flushing on the IPC Channel pipe.
1212 return false;
1213 }
1214
1215 // Checked in places which must be run on the primary endpoint's thread.
1216 base::ThreadChecker thread_checker_;
1217
1218 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
1219
1220 const scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
1221 const bool set_interface_id_namespace_bit_;
1222 bool paused_ = false;
1223 std::unique_ptr<mojo::Connector> connector_;
1224 mojo::MessageDispatcher dispatcher_;
1225 mojo::PipeControlMessageHandler control_message_handler_;
1226 ControlMessageProxyThunk control_message_proxy_thunk_;
1227
1228 // NOTE: It is unsafe to call into this object while holding |lock_|.
1229 mojo::PipeControlMessageProxy control_message_proxy_;
1230
1231 // Guards access to |outgoing_messages_| only. Used to support memory dumps
1232 // which may be triggered from any thread.
1233 base::Lock outgoing_messages_lock_;
1234
1235 // Outgoing messages that were sent before this controller was bound to a
1236 // real message pipe.
1237 std::vector<mojo::Message> outgoing_messages_;
1238
1239 // Guards the fields below for thread-safe access.
1240 base::Lock lock_;
1241
1242 bool encountered_error_ = false;
1243 bool shut_down_ = false;
1244
1245 // ID #1 is reserved for the mojom::Channel interface.
1246 uint32_t next_interface_id_ = 2;
1247
1248 std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_;
1249
1250 raw_ptr<UrgentMessageObserver> urgent_message_observer_ = nullptr;
1251 };
1252
OnMemoryDump(const base::trace_event::MemoryDumpArgs & args,base::trace_event::ProcessMemoryDump * pmd)1253 bool ControllerMemoryDumpProvider::OnMemoryDump(
1254 const base::trace_event::MemoryDumpArgs& args,
1255 base::trace_event::ProcessMemoryDump* pmd) {
1256 base::AutoLock lock(lock_);
1257 for (auto* controller : controllers_) {
1258 base::trace_event::MemoryAllocatorDump* dump = pmd->CreateAllocatorDump(
1259 base::StringPrintf("mojo/queued_ipc_channel_message/0x%" PRIxPTR,
1260 reinterpret_cast<uintptr_t>(controller)));
1261 dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount,
1262 base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1263 controller->GetQueuedMessageCount());
1264 MessageMemoryDumpInfo info;
1265 size_t count = 0;
1266 controller->GetTopQueuedMessageMemoryDumpInfo(&info, &count);
1267 dump->AddScalar("top_message_name", "id", info.id);
1268 dump->AddScalar("top_message_count",
1269 base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1270 count);
1271
1272 if (info.profiler_tag) {
1273 // TODO(ssid): Memory dumps currently do not support adding string
1274 // arguments in background dumps. So, add this value as a trace event for
1275 // now.
1276 TRACE_EVENT2(base::trace_event::MemoryDumpManager::kTraceCategory,
1277 "ControllerMemoryDumpProvider::OnMemoryDump",
1278 "top_queued_message_tag", info.profiler_tag,
1279 "count", count);
1280 }
1281 }
1282
1283 return true;
1284 }
1285
1286 class MojoBootstrapImpl : public MojoBootstrap {
1287 public:
MojoBootstrapImpl(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<ChannelAssociatedGroupController> controller)1288 MojoBootstrapImpl(
1289 mojo::ScopedMessagePipeHandle handle,
1290 const scoped_refptr<ChannelAssociatedGroupController> controller)
1291 : controller_(controller),
1292 associated_group_(controller),
1293 handle_(std::move(handle)) {}
1294
1295 MojoBootstrapImpl(const MojoBootstrapImpl&) = delete;
1296 MojoBootstrapImpl& operator=(const MojoBootstrapImpl&) = delete;
1297
~MojoBootstrapImpl()1298 ~MojoBootstrapImpl() override {
1299 controller_->ShutDown();
1300 }
1301
1302 private:
Connect(mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)1303 void Connect(
1304 mojo::PendingAssociatedRemote<mojom::Channel>* sender,
1305 mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) override {
1306 controller_->Bind(std::move(handle_), sender, receiver);
1307 }
1308
StartReceiving()1309 void StartReceiving() override { controller_->StartReceiving(); }
1310
Pause()1311 void Pause() override {
1312 controller_->Pause();
1313 }
1314
Unpause()1315 void Unpause() override {
1316 controller_->Unpause();
1317 }
1318
Flush()1319 void Flush() override {
1320 controller_->FlushOutgoingMessages();
1321 }
1322
GetAssociatedGroup()1323 mojo::AssociatedGroup* GetAssociatedGroup() override {
1324 return &associated_group_;
1325 }
1326
SetUrgentMessageObserver(UrgentMessageObserver * observer)1327 void SetUrgentMessageObserver(UrgentMessageObserver* observer) override {
1328 controller_->SetUrgentMessageObserver(observer);
1329 }
1330
1331 scoped_refptr<ChannelAssociatedGroupController> controller_;
1332 mojo::AssociatedGroup associated_group_;
1333
1334 mojo::ScopedMessagePipeHandle handle_;
1335 };
1336
1337 } // namespace
1338
1339 ScopedAllowOffSequenceChannelAssociatedBindings::
ScopedAllowOffSequenceChannelAssociatedBindings()1340 ScopedAllowOffSequenceChannelAssociatedBindings()
1341 : resetter_(&off_sequence_binding_allowed, true) {}
1342
1343 ScopedAllowOffSequenceChannelAssociatedBindings::
1344 ~ScopedAllowOffSequenceChannelAssociatedBindings() = default;
1345
1346 // static
Create(mojo::ScopedMessagePipeHandle handle,Channel::Mode mode,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)1347 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
1348 mojo::ScopedMessagePipeHandle handle,
1349 Channel::Mode mode,
1350 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
1351 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
1352 return std::make_unique<MojoBootstrapImpl>(
1353 std::move(handle),
1354 base::MakeRefCounted<ChannelAssociatedGroupController>(
1355 mode == Channel::MODE_SERVER, ipc_task_runner, proxy_task_runner));
1356 }
1357
1358 } // namespace IPC
1359