1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_mojo_bootstrap.h"
6
7 #include <inttypes.h>
8 #include <stdint.h>
9
10 #include <map>
11 #include <memory>
12 #include <optional>
13 #include <set>
14 #include <utility>
15 #include <vector>
16
17 #include "base/check_op.h"
18 #include "base/containers/circular_deque.h"
19 #include "base/containers/contains.h"
20 #include "base/feature_list.h"
21 #include "base/functional/bind.h"
22 #include "base/functional/callback.h"
23 #include "base/memory/ptr_util.h"
24 #include "base/memory/raw_ptr.h"
25 #include "base/no_destructor.h"
26 #include "base/not_fatal_until.h"
27 #include "base/ranges/algorithm.h"
28 #include "base/sequence_checker.h"
29 #include "base/strings/stringprintf.h"
30 #include "base/synchronization/lock.h"
31 #include "base/synchronization/waitable_event.h"
32 #include "base/task/common/task_annotator.h"
33 #include "base/task/sequenced_task_runner.h"
34 #include "base/task/single_thread_task_runner.h"
35 #include "base/thread_annotations.h"
36 #include "base/trace_event/memory_allocator_dump.h"
37 #include "base/trace_event/memory_dump_manager.h"
38 #include "base/trace_event/memory_dump_provider.h"
39 #include "base/trace_event/typed_macros.h"
40 #include "ipc/ipc_channel.h"
41 #include "ipc/urgent_message_observer.h"
42 #include "mojo/public/c/system/types.h"
43 #include "mojo/public/cpp/bindings/associated_group.h"
44 #include "mojo/public/cpp/bindings/associated_group_controller.h"
45 #include "mojo/public/cpp/bindings/connector.h"
46 #include "mojo/public/cpp/bindings/features.h"
47 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
48 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
49 #include "mojo/public/cpp/bindings/interface_id.h"
50 #include "mojo/public/cpp/bindings/message.h"
51 #include "mojo/public/cpp/bindings/message_header_validator.h"
52 #include "mojo/public/cpp/bindings/mojo_buildflags.h"
53 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
54 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
55 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
56 #include "mojo/public/cpp/bindings/scoped_message_error_crash_key.h"
57 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
58 #include "mojo/public/cpp/bindings/tracing_helpers.h"
59
60 namespace IPC {
61
62 class ChannelAssociatedGroupController;
63
64 namespace {
65
66 constinit thread_local bool off_sequence_binding_allowed = false;
67
68 BASE_FEATURE(kMojoChannelAssociatedSendUsesRunOrPostTask,
69 "MojoChannelAssociatedSendUsesRunOrPostTask",
70 base::FEATURE_DISABLED_BY_DEFAULT);
71
72 // Used to track some internal Channel state in pursuit of message leaks.
73 //
74 // TODO(crbug.com/40563310): Remove this.
75 class ControllerMemoryDumpProvider
76 : public base::trace_event::MemoryDumpProvider {
77 public:
ControllerMemoryDumpProvider()78 ControllerMemoryDumpProvider() {
79 base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider(
80 this, "IPCChannel", nullptr);
81 }
82
83 ControllerMemoryDumpProvider(const ControllerMemoryDumpProvider&) = delete;
84 ControllerMemoryDumpProvider& operator=(const ControllerMemoryDumpProvider&) =
85 delete;
86
~ControllerMemoryDumpProvider()87 ~ControllerMemoryDumpProvider() override {
88 base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider(
89 this);
90 }
91
AddController(ChannelAssociatedGroupController * controller)92 void AddController(ChannelAssociatedGroupController* controller) {
93 base::AutoLock lock(lock_);
94 controllers_.insert(controller);
95 }
96
RemoveController(ChannelAssociatedGroupController * controller)97 void RemoveController(ChannelAssociatedGroupController* controller) {
98 base::AutoLock lock(lock_);
99 controllers_.erase(controller);
100 }
101
102 // base::trace_event::MemoryDumpProvider:
103 bool OnMemoryDump(const base::trace_event::MemoryDumpArgs& args,
104 base::trace_event::ProcessMemoryDump* pmd) override;
105
106 private:
107 base::Lock lock_;
108 std::set<raw_ptr<ChannelAssociatedGroupController, SetExperimental>>
109 controllers_;
110 };
111
GetMemoryDumpProvider()112 ControllerMemoryDumpProvider& GetMemoryDumpProvider() {
113 static base::NoDestructor<ControllerMemoryDumpProvider> provider;
114 return *provider;
115 }
116
117 // Messages are grouped by this info when recording memory metrics.
118 struct MessageMemoryDumpInfo {
MessageMemoryDumpInfoIPC::__anon907ea5930111::MessageMemoryDumpInfo119 MessageMemoryDumpInfo(const mojo::Message& message)
120 : id(message.name()), profiler_tag(message.heap_profiler_tag()) {}
121 MessageMemoryDumpInfo() = default;
122
operator ==IPC::__anon907ea5930111::MessageMemoryDumpInfo123 bool operator==(const MessageMemoryDumpInfo& other) const {
124 return other.id == id && other.profiler_tag == profiler_tag;
125 }
126
127 uint32_t id = 0;
128 const char* profiler_tag = nullptr;
129 };
130
131 struct MessageMemoryDumpInfoHash {
operator ()IPC::__anon907ea5930111::MessageMemoryDumpInfoHash132 size_t operator()(const MessageMemoryDumpInfo& info) const {
133 return base::HashInts(
134 info.id, info.profiler_tag ? base::FastHash(info.profiler_tag) : 0);
135 }
136 };
137
138 class ScopedUrgentMessageNotification {
139 public:
ScopedUrgentMessageNotification(UrgentMessageObserver * observer=nullptr)140 explicit ScopedUrgentMessageNotification(
141 UrgentMessageObserver* observer = nullptr)
142 : observer_(observer) {
143 if (observer_) {
144 observer_->OnUrgentMessageReceived();
145 }
146 }
147
~ScopedUrgentMessageNotification()148 ~ScopedUrgentMessageNotification() {
149 if (observer_) {
150 observer_->OnUrgentMessageProcessed();
151 }
152 }
153
ScopedUrgentMessageNotification(ScopedUrgentMessageNotification && other)154 ScopedUrgentMessageNotification(ScopedUrgentMessageNotification&& other)
155 : observer_(std::exchange(other.observer_, nullptr)) {}
156
operator =(ScopedUrgentMessageNotification && other)157 ScopedUrgentMessageNotification& operator=(
158 ScopedUrgentMessageNotification&& other) {
159 observer_ = std::exchange(other.observer_, nullptr);
160 return *this;
161 }
162
163 private:
164 raw_ptr<UrgentMessageObserver> observer_;
165 };
166
167 } // namespace
168
169 class ChannelAssociatedGroupController
170 : public mojo::AssociatedGroupController,
171 public mojo::MessageReceiver,
172 public mojo::PipeControlMessageHandlerDelegate {
173 public:
ChannelAssociatedGroupController(bool set_interface_id_namespace_bit,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)174 ChannelAssociatedGroupController(
175 bool set_interface_id_namespace_bit,
176 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
177 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
178 : task_runner_(ipc_task_runner),
179 proxy_task_runner_(proxy_task_runner),
180 set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
181 dispatcher_(this),
182 control_message_handler_(this),
183 control_message_proxy_thunk_(this),
184 control_message_proxy_(&control_message_proxy_thunk_) {
185 control_message_handler_.SetDescription(
186 "IPC::mojom::Bootstrap [primary] PipeControlMessageHandler");
187 dispatcher_.SetValidator(std::make_unique<mojo::MessageHeaderValidator>(
188 "IPC::mojom::Bootstrap [primary] MessageHeaderValidator"));
189
190 GetMemoryDumpProvider().AddController(this);
191
192 DETACH_FROM_SEQUENCE(sequence_checker_);
193 }
194
195 ChannelAssociatedGroupController(const ChannelAssociatedGroupController&) =
196 delete;
197 ChannelAssociatedGroupController& operator=(
198 const ChannelAssociatedGroupController&) = delete;
199
GetQueuedMessageCount()200 size_t GetQueuedMessageCount() {
201 base::AutoLock lock(outgoing_messages_lock_);
202 return outgoing_messages_.size();
203 }
204
GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo * info,size_t * count)205 void GetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo* info,
206 size_t* count) {
207 std::unordered_map<MessageMemoryDumpInfo, size_t, MessageMemoryDumpInfoHash>
208 counts;
209 std::pair<MessageMemoryDumpInfo, size_t> top_message_info_and_count = {
210 MessageMemoryDumpInfo(), 0};
211 base::AutoLock lock(outgoing_messages_lock_);
212 for (const auto& message : outgoing_messages_) {
213 auto it_and_inserted = counts.emplace(MessageMemoryDumpInfo(message), 0);
214 it_and_inserted.first->second++;
215 if (it_and_inserted.first->second > top_message_info_and_count.second)
216 top_message_info_and_count = *it_and_inserted.first;
217 }
218 *info = top_message_info_and_count.first;
219 *count = top_message_info_and_count.second;
220 }
221
Pause()222 void Pause() {
223 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
224 CHECK(was_bound_or_message_sent_);
225 CHECK(!paused_);
226 paused_ = true;
227 }
228
Unpause()229 void Unpause() {
230 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
231 CHECK(was_bound_or_message_sent_);
232 CHECK(paused_);
233 paused_ = false;
234 }
235
FlushOutgoingMessages()236 void FlushOutgoingMessages() {
237 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
238 CHECK(was_bound_or_message_sent_);
239
240 std::vector<mojo::Message> outgoing_messages;
241 {
242 base::AutoLock lock(outgoing_messages_lock_);
243 std::swap(outgoing_messages, outgoing_messages_);
244 }
245
246 for (auto& message : outgoing_messages)
247 SendMessage(&message);
248 }
249
Bind(mojo::ScopedMessagePipeHandle handle,mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)250 void Bind(mojo::ScopedMessagePipeHandle handle,
251 mojo::PendingAssociatedRemote<mojom::Channel>* sender,
252 mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) {
253 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
254
255 connector_ = std::make_unique<mojo::Connector>(
256 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
257 "IPC Channel");
258 connector_->set_incoming_receiver(&dispatcher_);
259 connector_->set_connection_error_handler(
260 base::BindOnce(&ChannelAssociatedGroupController::OnPipeError,
261 base::Unretained(this)));
262 connector_->set_enforce_errors_from_incoming_receiver(false);
263
264 // Don't let the Connector do any sort of queuing on our behalf. Individual
265 // messages bound for the IPC::ChannelProxy thread (i.e. that vast majority
266 // of messages received by this Connector) are already individually
267 // scheduled for dispatch by ChannelProxy, so Connector's normal mode of
268 // operation would only introduce a redundant scheduling step for most
269 // messages.
270 connector_->set_force_immediate_dispatch(true);
271
272 mojo::InterfaceId sender_id, receiver_id;
273 if (set_interface_id_namespace_bit_) {
274 sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
275 receiver_id = 1;
276 } else {
277 sender_id = 1;
278 receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
279 }
280
281 {
282 base::AutoLock locker(lock_);
283 Endpoint* sender_endpoint = new Endpoint(this, sender_id);
284 Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
285 endpoints_.insert({ sender_id, sender_endpoint });
286 endpoints_.insert({ receiver_id, receiver_endpoint });
287 sender_endpoint->set_handle_created();
288 receiver_endpoint->set_handle_created();
289 }
290
291 mojo::ScopedInterfaceEndpointHandle sender_handle =
292 CreateScopedInterfaceEndpointHandle(sender_id);
293 mojo::ScopedInterfaceEndpointHandle receiver_handle =
294 CreateScopedInterfaceEndpointHandle(receiver_id);
295
296 *sender = mojo::PendingAssociatedRemote<mojom::Channel>(
297 std::move(sender_handle), 0);
298 *receiver = mojo::PendingAssociatedReceiver<mojom::Channel>(
299 std::move(receiver_handle));
300
301 if (!was_bound_or_message_sent_) {
302 was_bound_or_message_sent_ = true;
303 DETACH_FROM_SEQUENCE(sequence_checker_);
304 }
305 }
306
StartReceiving()307 void StartReceiving() {
308 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
309 CHECK(was_bound_or_message_sent_);
310 connector_->StartReceiving(task_runner_);
311 }
312
ShutDown()313 void ShutDown() {
314 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
315 shut_down_ = true;
316 if (connector_)
317 connector_->CloseMessagePipe();
318 OnPipeError();
319 connector_.reset();
320
321 base::AutoLock lock(outgoing_messages_lock_);
322 outgoing_messages_.clear();
323 }
324
325 // mojo::AssociatedGroupController:
AssociateInterface(mojo::ScopedInterfaceEndpointHandle handle_to_send)326 mojo::InterfaceId AssociateInterface(
327 mojo::ScopedInterfaceEndpointHandle handle_to_send) override {
328 if (!handle_to_send.pending_association())
329 return mojo::kInvalidInterfaceId;
330
331 uint32_t id = 0;
332 {
333 base::AutoLock locker(lock_);
334 do {
335 if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask)
336 next_interface_id_ = 2;
337 id = next_interface_id_++;
338 if (set_interface_id_namespace_bit_)
339 id |= mojo::kInterfaceIdNamespaceMask;
340 } while (base::Contains(endpoints_, id));
341
342 Endpoint* endpoint = new Endpoint(this, id);
343 if (encountered_error_)
344 endpoint->set_peer_closed();
345 endpoint->set_handle_created();
346 endpoints_.insert({id, endpoint});
347 }
348
349 if (!NotifyAssociation(&handle_to_send, id)) {
350 // The peer handle of |handle_to_send|, which is supposed to join this
351 // associated group, has been closed.
352 {
353 base::AutoLock locker(lock_);
354 Endpoint* endpoint = FindEndpoint(id);
355 if (endpoint)
356 MarkClosedAndMaybeRemove(endpoint);
357 }
358
359 control_message_proxy_.NotifyPeerEndpointClosed(
360 id, handle_to_send.disconnect_reason());
361 }
362 return id;
363 }
364
CreateLocalEndpointHandle(mojo::InterfaceId id)365 mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
366 mojo::InterfaceId id) override {
367 if (!mojo::IsValidInterfaceId(id))
368 return mojo::ScopedInterfaceEndpointHandle();
369
370 // Unless it is the primary ID, |id| is from the remote side and therefore
371 // its namespace bit is supposed to be different than the value that this
372 // router would use.
373 if (!mojo::IsPrimaryInterfaceId(id) &&
374 set_interface_id_namespace_bit_ ==
375 mojo::HasInterfaceIdNamespaceBitSet(id)) {
376 return mojo::ScopedInterfaceEndpointHandle();
377 }
378
379 base::AutoLock locker(lock_);
380 bool inserted = false;
381 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
382 if (inserted) {
383 DCHECK(!endpoint->handle_created());
384 if (encountered_error_)
385 endpoint->set_peer_closed();
386 } else {
387 if (endpoint->handle_created())
388 return mojo::ScopedInterfaceEndpointHandle();
389 }
390
391 endpoint->set_handle_created();
392 return CreateScopedInterfaceEndpointHandle(id);
393 }
394
CloseEndpointHandle(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)395 void CloseEndpointHandle(
396 mojo::InterfaceId id,
397 const std::optional<mojo::DisconnectReason>& reason) override {
398 if (!mojo::IsValidInterfaceId(id))
399 return;
400 {
401 base::AutoLock locker(lock_);
402 DCHECK(base::Contains(endpoints_, id));
403 Endpoint* endpoint = endpoints_[id].get();
404 DCHECK(!endpoint->client());
405 DCHECK(!endpoint->closed());
406 MarkClosedAndMaybeRemove(endpoint);
407 }
408
409 if (!mojo::IsPrimaryInterfaceId(id) || reason)
410 control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
411 }
412
NotifyLocalEndpointOfPeerClosure(mojo::InterfaceId id)413 void NotifyLocalEndpointOfPeerClosure(mojo::InterfaceId id) override {
414 if (!task_runner_->RunsTasksInCurrentSequence()) {
415 task_runner_->PostTask(
416 FROM_HERE, base::BindOnce(&ChannelAssociatedGroupController::
417 NotifyLocalEndpointOfPeerClosure,
418 base::WrapRefCounted(this), id));
419 return;
420 }
421 OnPeerAssociatedEndpointClosed(id, std::nullopt);
422 }
423
AttachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle,mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)424 mojo::InterfaceEndpointController* AttachEndpointClient(
425 const mojo::ScopedInterfaceEndpointHandle& handle,
426 mojo::InterfaceEndpointClient* client,
427 scoped_refptr<base::SequencedTaskRunner> runner) override {
428 const mojo::InterfaceId id = handle.id();
429
430 DCHECK(mojo::IsValidInterfaceId(id));
431 DCHECK(client);
432
433 base::AutoLock locker(lock_);
434 DCHECK(base::Contains(endpoints_, id));
435
436 Endpoint* endpoint = endpoints_[id].get();
437 endpoint->AttachClient(client, std::move(runner));
438
439 if (endpoint->peer_closed())
440 NotifyEndpointOfError(endpoint, true /* force_async */);
441
442 return endpoint;
443 }
444
DetachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle)445 void DetachEndpointClient(
446 const mojo::ScopedInterfaceEndpointHandle& handle) override {
447 const mojo::InterfaceId id = handle.id();
448
449 DCHECK(mojo::IsValidInterfaceId(id));
450
451 base::AutoLock locker(lock_);
452 DCHECK(base::Contains(endpoints_, id));
453
454 Endpoint* endpoint = endpoints_[id].get();
455 endpoint->DetachClient();
456 }
457
RaiseError()458 void RaiseError() override {
459 // We ignore errors on channel endpoints, leaving the pipe open. There are
460 // good reasons for this:
461 //
462 // * We should never close a channel endpoint in either process as long as
463 // the child process is still alive. The child's endpoint should only be
464 // closed implicitly by process death, and the browser's endpoint should
465 // only be closed after the child process is confirmed to be dead. Crash
466 // reporting logic in Chrome relies on this behavior in order to do the
467 // right thing.
468 //
469 // * There are two interesting conditions under which RaiseError() can be
470 // implicitly reached: an incoming message fails validation, or the
471 // local endpoint drops a response callback without calling it.
472 //
473 // * In the validation case, we also report the message as bad, and this
474 // will imminently trigger the common bad-IPC path in the browser,
475 // causing the browser to kill the offending renderer.
476 //
477 // * In the dropped response callback case, the net result of ignoring the
478 // issue is generally innocuous. While indicative of programmer error,
479 // it's not a severe failure and is already covered by separate DCHECKs.
480 //
481 // See https://crbug.com/861607 for additional discussion.
482 }
483
PrefersSerializedMessages()484 bool PrefersSerializedMessages() override { return true; }
485
SetUrgentMessageObserver(UrgentMessageObserver * observer)486 void SetUrgentMessageObserver(UrgentMessageObserver* observer) {
487 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
488 CHECK(!was_bound_or_message_sent_);
489 urgent_message_observer_ = observer;
490 DETACH_FROM_SEQUENCE(sequence_checker_);
491 }
492
493 private:
494 class Endpoint;
495 class ControlMessageProxyThunk;
496 friend class Endpoint;
497 friend class ControlMessageProxyThunk;
498
499 // MessageWrapper objects are always destroyed under the controller's lock. On
500 // destruction, if the message it wrappers contains
501 // ScopedInterfaceEndpointHandles (which cannot be destructed under the
502 // controller's lock), the wrapper unlocks to clean them up.
503 class MessageWrapper {
504 public:
505 MessageWrapper() = default;
506
MessageWrapper(ChannelAssociatedGroupController * controller,mojo::Message message)507 MessageWrapper(ChannelAssociatedGroupController* controller,
508 mojo::Message message)
509 : controller_(controller), value_(std::move(message)) {}
510
MessageWrapper(MessageWrapper && other)511 MessageWrapper(MessageWrapper&& other)
512 : controller_(other.controller_), value_(std::move(other.value_)) {}
513
514 MessageWrapper(const MessageWrapper&) = delete;
515 MessageWrapper& operator=(const MessageWrapper&) = delete;
516
~MessageWrapper()517 ~MessageWrapper() {
518 if (value_.associated_endpoint_handles()->empty())
519 return;
520
521 controller_->lock_.AssertAcquired();
522 {
523 base::AutoUnlock unlocker(controller_->lock_);
524 value_.mutable_associated_endpoint_handles()->clear();
525 }
526 }
527
operator =(MessageWrapper && other)528 MessageWrapper& operator=(MessageWrapper&& other) {
529 controller_ = other.controller_;
530 value_ = std::move(other.value_);
531 return *this;
532 }
533
HasRequestId(uint64_t request_id)534 bool HasRequestId(uint64_t request_id) {
535 return !value_.IsNull() && value_.version() >= 1 &&
536 value_.header_v1()->request_id == request_id;
537 }
538
value()539 mojo::Message& value() { return value_; }
540
541 private:
542 raw_ptr<ChannelAssociatedGroupController> controller_ = nullptr;
543 mojo::Message value_;
544 };
545
546 class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
547 public mojo::InterfaceEndpointController {
548 public:
Endpoint(ChannelAssociatedGroupController * controller,mojo::InterfaceId id)549 Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
550 : controller_(controller), id_(id) {}
551
552 Endpoint(const Endpoint&) = delete;
553 Endpoint& operator=(const Endpoint&) = delete;
554
id() const555 mojo::InterfaceId id() const { return id_; }
556
closed() const557 bool closed() const {
558 controller_->lock_.AssertAcquired();
559 return closed_;
560 }
561
set_closed()562 void set_closed() {
563 controller_->lock_.AssertAcquired();
564 closed_ = true;
565 }
566
peer_closed() const567 bool peer_closed() const {
568 controller_->lock_.AssertAcquired();
569 return peer_closed_;
570 }
571
set_peer_closed()572 void set_peer_closed() {
573 controller_->lock_.AssertAcquired();
574 peer_closed_ = true;
575 }
576
handle_created() const577 bool handle_created() const {
578 controller_->lock_.AssertAcquired();
579 return handle_created_;
580 }
581
set_handle_created()582 void set_handle_created() {
583 controller_->lock_.AssertAcquired();
584 handle_created_ = true;
585 }
586
disconnect_reason() const587 const std::optional<mojo::DisconnectReason>& disconnect_reason() const {
588 return disconnect_reason_;
589 }
590
set_disconnect_reason(const std::optional<mojo::DisconnectReason> & disconnect_reason)591 void set_disconnect_reason(
592 const std::optional<mojo::DisconnectReason>& disconnect_reason) {
593 disconnect_reason_ = disconnect_reason;
594 }
595
task_runner() const596 base::SequencedTaskRunner* task_runner() const {
597 return task_runner_.get();
598 }
599
was_bound_off_sequence() const600 bool was_bound_off_sequence() const { return was_bound_off_sequence_; }
601
client() const602 mojo::InterfaceEndpointClient* client() const {
603 controller_->lock_.AssertAcquired();
604 return client_;
605 }
606
AttachClient(mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)607 void AttachClient(mojo::InterfaceEndpointClient* client,
608 scoped_refptr<base::SequencedTaskRunner> runner) {
609 controller_->lock_.AssertAcquired();
610 DCHECK(!client_);
611 DCHECK(!closed_);
612
613 task_runner_ = std::move(runner);
614 client_ = client;
615
616 if (off_sequence_binding_allowed) {
617 was_bound_off_sequence_ = true;
618 }
619 }
620
DetachClient()621 void DetachClient() {
622 controller_->lock_.AssertAcquired();
623 DCHECK(client_);
624 DCHECK(!closed_);
625
626 task_runner_ = nullptr;
627 client_ = nullptr;
628 sync_watcher_.reset();
629 }
630
EnqueueSyncMessage(MessageWrapper message)631 std::optional<uint32_t> EnqueueSyncMessage(MessageWrapper message) {
632 controller_->lock_.AssertAcquired();
633 if (exclusive_wait_ && exclusive_wait_->TryFulfillingWith(message)) {
634 exclusive_wait_ = nullptr;
635 return std::nullopt;
636 }
637
638 uint32_t id = GenerateSyncMessageId();
639 sync_messages_.emplace_back(id, std::move(message));
640 SignalSyncMessageEvent();
641 return id;
642 }
643
SignalSyncMessageEvent()644 void SignalSyncMessageEvent() {
645 controller_->lock_.AssertAcquired();
646
647 if (sync_watcher_)
648 sync_watcher_->SignalEvent();
649 }
650
PopSyncMessage(uint32_t id)651 MessageWrapper PopSyncMessage(uint32_t id) {
652 controller_->lock_.AssertAcquired();
653 if (sync_messages_.empty() || sync_messages_.front().first != id)
654 return MessageWrapper();
655 MessageWrapper message = std::move(sync_messages_.front().second);
656 sync_messages_.pop_front();
657 return message;
658 }
659
660 // mojo::InterfaceEndpointController:
SendMessage(mojo::Message * message)661 bool SendMessage(mojo::Message* message) override {
662 DCHECK(task_runner_->RunsTasksInCurrentSequence());
663 message->set_interface_id(id_);
664 return controller_->SendMessage(message);
665 }
666
AllowWokenUpBySyncWatchOnSameThread()667 void AllowWokenUpBySyncWatchOnSameThread() override {
668 DCHECK(task_runner_->RunsTasksInCurrentSequence());
669
670 EnsureSyncWatcherExists();
671 sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
672 }
673
SyncWatch(const bool & should_stop)674 bool SyncWatch(const bool& should_stop) override {
675 DCHECK(task_runner_->RunsTasksInCurrentSequence());
676
677 // It's not legal to make sync calls from the primary endpoint's thread,
678 // and in fact they must only happen from the proxy task runner.
679 DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
680 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
681
682 EnsureSyncWatcherExists();
683 {
684 base::AutoLock locker(controller_->lock_);
685 if (peer_closed_) {
686 SignalSyncMessageEvent();
687 }
688 }
689 return sync_watcher_->SyncWatch(&should_stop);
690 }
691
WaitForIncomingSyncReply(uint64_t request_id)692 MessageWrapper WaitForIncomingSyncReply(uint64_t request_id) {
693 std::optional<ExclusiveSyncWait> wait;
694 {
695 base::AutoLock lock(controller_->lock_);
696 for (auto& [id, message] : sync_messages_) {
697 if (message.HasRequestId(request_id)) {
698 return std::move(message);
699 }
700 }
701
702 DCHECK(!exclusive_wait_);
703 wait.emplace(request_id);
704 exclusive_wait_ = &wait.value();
705 }
706
707 wait->event.Wait();
708 return std::move(wait->message);
709 }
710
SyncWatchExclusive(uint64_t request_id)711 bool SyncWatchExclusive(uint64_t request_id) override {
712 MessageWrapper message = WaitForIncomingSyncReply(request_id);
713 if (message.value().IsNull() || !client_) {
714 return false;
715 }
716
717 if (!client_->HandleIncomingMessage(&message.value())) {
718 base::AutoLock locker(controller_->lock_);
719 controller_->RaiseError();
720 return false;
721 }
722
723 return true;
724 }
725
RegisterExternalSyncWaiter(uint64_t request_id)726 void RegisterExternalSyncWaiter(uint64_t request_id) override {}
727
728 private:
729 friend class base::RefCountedThreadSafe<Endpoint>;
730
~Endpoint()731 ~Endpoint() override {
732 controller_->lock_.AssertAcquired();
733 DCHECK(!client_);
734 DCHECK(closed_);
735 DCHECK(peer_closed_);
736 DCHECK(!sync_watcher_);
737 if (exclusive_wait_) {
738 exclusive_wait_->event.Signal();
739 }
740 }
741
OnSyncMessageEventReady()742 void OnSyncMessageEventReady() {
743 DCHECK(task_runner_->RunsTasksInCurrentSequence());
744
745 // SUBTLE: The order of these scoped_refptrs matters.
746 // `controller_keepalive` MUST outlive `keepalive` because the Endpoint
747 // holds raw pointer to the AssociatedGroupController.
748 scoped_refptr<AssociatedGroupController> controller_keepalive(
749 controller_.get());
750 scoped_refptr<Endpoint> keepalive(this);
751 base::AutoLock locker(controller_->lock_);
752 bool more_to_process = false;
753 if (!sync_messages_.empty()) {
754 MessageWrapper message_wrapper =
755 std::move(sync_messages_.front().second);
756 sync_messages_.pop_front();
757
758 bool dispatch_succeeded;
759 mojo::InterfaceEndpointClient* client = client_;
760 {
761 base::AutoUnlock unlocker(controller_->lock_);
762 dispatch_succeeded =
763 client->HandleIncomingMessage(&message_wrapper.value());
764 }
765
766 if (!sync_messages_.empty())
767 more_to_process = true;
768
769 if (!dispatch_succeeded)
770 controller_->RaiseError();
771 }
772
773 if (!more_to_process)
774 sync_watcher_->ResetEvent();
775
776 // If there are no queued sync messages and the peer has closed, there
777 // there won't be incoming sync messages in the future. If any
778 // SyncWatch() calls are on the stack for this endpoint, resetting the
779 // watcher will allow them to exit as the stack undwinds.
780 if (!more_to_process && peer_closed_)
781 sync_watcher_.reset();
782 }
783
EnsureSyncWatcherExists()784 void EnsureSyncWatcherExists() {
785 DCHECK(task_runner_->RunsTasksInCurrentSequence());
786 if (sync_watcher_)
787 return;
788
789 base::AutoLock locker(controller_->lock_);
790 sync_watcher_ = std::make_unique<mojo::SequenceLocalSyncEventWatcher>(
791 base::BindRepeating(&Endpoint::OnSyncMessageEventReady,
792 base::Unretained(this)));
793 if (peer_closed_ || !sync_messages_.empty())
794 SignalSyncMessageEvent();
795 }
796
GenerateSyncMessageId()797 uint32_t GenerateSyncMessageId() {
798 // Overflow is fine.
799 uint32_t id = next_sync_message_id_++;
800 DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
801 return id;
802 }
803
804 // Tracks the state of a pending sync wait which excludes all other incoming
805 // IPC on the waiting thread.
806 struct ExclusiveSyncWait {
ExclusiveSyncWaitIPC::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait807 explicit ExclusiveSyncWait(uint64_t request_id)
808 : request_id(request_id) {}
809 ~ExclusiveSyncWait() = default;
810
TryFulfillingWithIPC::ChannelAssociatedGroupController::Endpoint::ExclusiveSyncWait811 bool TryFulfillingWith(MessageWrapper& wrapper) {
812 if (!wrapper.HasRequestId(request_id)) {
813 return false;
814 }
815
816 message = std::move(wrapper);
817 event.Signal();
818 return true;
819 }
820
821 uint64_t request_id;
822 base::WaitableEvent event;
823 MessageWrapper message;
824 };
825
826 const raw_ptr<ChannelAssociatedGroupController> controller_;
827 const mojo::InterfaceId id_;
828
829 bool closed_ = false;
830 bool peer_closed_ = false;
831 bool handle_created_ = false;
832 bool was_bound_off_sequence_ = false;
833 std::optional<mojo::DisconnectReason> disconnect_reason_;
834 raw_ptr<mojo::InterfaceEndpointClient> client_ = nullptr;
835 scoped_refptr<base::SequencedTaskRunner> task_runner_;
836 std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_;
837 base::circular_deque<std::pair<uint32_t, MessageWrapper>> sync_messages_;
838 raw_ptr<ExclusiveSyncWait> exclusive_wait_ = nullptr;
839 uint32_t next_sync_message_id_ = 0;
840 };
841
842 class ControlMessageProxyThunk : public MessageReceiver {
843 public:
ControlMessageProxyThunk(ChannelAssociatedGroupController * controller)844 explicit ControlMessageProxyThunk(
845 ChannelAssociatedGroupController* controller)
846 : controller_(controller) {}
847
848 ControlMessageProxyThunk(const ControlMessageProxyThunk&) = delete;
849 ControlMessageProxyThunk& operator=(const ControlMessageProxyThunk&) =
850 delete;
851
852 private:
853 // MessageReceiver:
Accept(mojo::Message * message)854 bool Accept(mojo::Message* message) override {
855 return controller_->SendMessage(message);
856 }
857
858 raw_ptr<ChannelAssociatedGroupController> controller_;
859 };
860
~ChannelAssociatedGroupController()861 ~ChannelAssociatedGroupController() override {
862 DCHECK(!connector_);
863
864 base::AutoLock locker(lock_);
865 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
866 Endpoint* endpoint = iter->second.get();
867 ++iter;
868
869 if (!endpoint->closed()) {
870 // This happens when a NotifyPeerEndpointClosed message been received,
871 // but the interface ID hasn't been used to create local endpoint
872 // handle.
873 DCHECK(!endpoint->client());
874 DCHECK(endpoint->peer_closed());
875 MarkClosed(endpoint);
876 } else {
877 MarkPeerClosed(endpoint);
878 }
879 }
880 endpoints_.clear();
881
882 GetMemoryDumpProvider().RemoveController(this);
883 }
884
SendMessage(mojo::Message * message)885 bool SendMessage(mojo::Message* message) {
886 DCHECK(message->heap_profiler_tag());
887 if (task_runner_->BelongsToCurrentThread()) {
888 return SendMessageOnSequence(message);
889 }
890
891 // PostTask (or RunOrPostTask) so that `message` is sent after messages from
892 // tasks that are already queued (e.g. by `IPC::ChannelProxy::Send`).
893 auto callback = base::BindOnce(
894 &ChannelAssociatedGroupController::SendMessageOnSequenceViaTask, this,
895 std::move(*message));
896 if (base::FeatureList::IsEnabled(
897 kMojoChannelAssociatedSendUsesRunOrPostTask)) {
898 task_runner_->RunOrPostTask(base::subtle::RunOrPostTaskPassKey(),
899 FROM_HERE, std::move(callback));
900 } else {
901 task_runner_->PostTask(FROM_HERE, std::move(callback));
902 }
903
904 return true;
905 }
906
SendMessageOnSequence(mojo::Message * message)907 bool SendMessageOnSequence(mojo::Message* message) {
908 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
909 was_bound_or_message_sent_ = true;
910
911 if (!connector_ || paused_) {
912 if (!shut_down_) {
913 base::AutoLock lock(outgoing_messages_lock_);
914 outgoing_messages_.emplace_back(std::move(*message));
915 }
916 return true;
917 }
918
919 MojoResult result = connector_->AcceptAndGetResult(message);
920 if (result == MOJO_RESULT_OK) {
921 return true;
922 }
923
924 CHECK(connector_->encountered_error(), base::NotFatalUntil::M135);
925 return false;
926 }
927
SendMessageOnSequenceViaTask(mojo::Message message)928 void SendMessageOnSequenceViaTask(mojo::Message message) {
929 if (!SendMessageOnSequence(&message)) {
930 RaiseError();
931 }
932 }
933
OnPipeError()934 void OnPipeError() {
935 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
936
937 // We keep |this| alive here because it's possible for the notifications
938 // below to release all other references.
939 scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
940
941 base::AutoLock locker(lock_);
942 encountered_error_ = true;
943
944 std::vector<uint32_t> endpoints_to_remove;
945 std::vector<scoped_refptr<Endpoint>> endpoints_to_notify;
946 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
947 Endpoint* endpoint = iter->second.get();
948 ++iter;
949
950 if (endpoint->client()) {
951 endpoints_to_notify.push_back(endpoint);
952 }
953
954 if (MarkPeerClosed(endpoint)) {
955 endpoints_to_remove.push_back(endpoint->id());
956 }
957 }
958
959 for (auto& endpoint : endpoints_to_notify) {
960 // Because a notification may in turn detach any endpoint, we have to
961 // check each client again here.
962 if (endpoint->client())
963 NotifyEndpointOfError(endpoint.get(), false /* force_async */);
964 }
965
966 for (uint32_t id : endpoints_to_remove) {
967 endpoints_.erase(id);
968 }
969 }
970
NotifyEndpointOfError(Endpoint * endpoint,bool force_async)971 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async)
972 EXCLUSIVE_LOCKS_REQUIRED(lock_) {
973 DCHECK(endpoint->task_runner() && endpoint->client());
974 if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
975 mojo::InterfaceEndpointClient* client = endpoint->client();
976 std::optional<mojo::DisconnectReason> reason(
977 endpoint->disconnect_reason());
978
979 base::AutoUnlock unlocker(lock_);
980 client->NotifyError(reason);
981 } else {
982 endpoint->task_runner()->PostTask(
983 FROM_HERE,
984 base::BindOnce(&ChannelAssociatedGroupController::
985 NotifyEndpointOfErrorOnEndpointThread,
986 this, endpoint->id(),
987 // This is safe as `endpoint` is verified to be in
988 // `endpoints_` (a map with ownership) before use.
989 base::UnsafeDangling(endpoint)));
990 }
991 }
992
993 // `endpoint` might be a dangling ptr and must be checked before dereference.
NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,MayBeDangling<Endpoint> endpoint)994 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
995 MayBeDangling<Endpoint> endpoint) {
996 base::AutoLock locker(lock_);
997 auto iter = endpoints_.find(id);
998 if (iter == endpoints_.end() || iter->second.get() != endpoint)
999 return;
1000 if (!endpoint->client())
1001 return;
1002
1003 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
1004 NotifyEndpointOfError(endpoint, false /* force_async */);
1005 }
1006
1007 // Marks `endpoint` as closed and returns true if and only if its peer was
1008 // also already closed.
MarkClosed(Endpoint * endpoint)1009 bool MarkClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1010 endpoint->set_closed();
1011 return endpoint->peer_closed();
1012 }
1013
1014 // Marks `endpoint` as having a closed peer and returns true if and only if
1015 // `endpoint` itself was also already closed.
MarkPeerClosed(Endpoint * endpoint)1016 bool MarkPeerClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1017 endpoint->set_peer_closed();
1018 endpoint->SignalSyncMessageEvent();
1019 return endpoint->closed();
1020 }
1021
MarkClosedAndMaybeRemove(Endpoint * endpoint)1022 void MarkClosedAndMaybeRemove(Endpoint* endpoint)
1023 EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1024 if (MarkClosed(endpoint)) {
1025 endpoints_.erase(endpoint->id());
1026 }
1027 }
1028
MarkPeerClosedAndMaybeRemove(Endpoint * endpoint)1029 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint)
1030 EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1031 if (MarkPeerClosed(endpoint)) {
1032 endpoints_.erase(endpoint->id());
1033 }
1034 }
1035
FindOrInsertEndpoint(mojo::InterfaceId id,bool * inserted)1036 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted)
1037 EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1038 DCHECK(!inserted || !*inserted);
1039
1040 Endpoint* endpoint = FindEndpoint(id);
1041 if (!endpoint) {
1042 endpoint = new Endpoint(this, id);
1043 endpoints_.insert({id, endpoint});
1044 if (inserted)
1045 *inserted = true;
1046 }
1047 return endpoint;
1048 }
1049
FindEndpoint(mojo::InterfaceId id)1050 Endpoint* FindEndpoint(mojo::InterfaceId id) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1051 auto iter = endpoints_.find(id);
1052 return iter != endpoints_.end() ? iter->second.get() : nullptr;
1053 }
1054
1055 // mojo::MessageReceiver:
Accept(mojo::Message * message)1056 bool Accept(mojo::Message* message) override {
1057 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
1058
1059 if (!message->DeserializeAssociatedEndpointHandles(this))
1060 return false;
1061
1062 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
1063 return control_message_handler_.Accept(message);
1064
1065 mojo::InterfaceId id = message->interface_id();
1066 if (!mojo::IsValidInterfaceId(id))
1067 return false;
1068
1069 base::ReleasableAutoLock locker(&lock_);
1070 Endpoint* endpoint = FindEndpoint(id);
1071 if (!endpoint)
1072 return true;
1073
1074 mojo::InterfaceEndpointClient* client = endpoint->client();
1075 if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) {
1076 // The ChannelProxy for this channel is bound to `proxy_task_runner_` and
1077 // by default legacy IPCs must dispatch to either the IO thread or the
1078 // proxy task runner. We generally impose the same constraint on
1079 // associated interface endpoints so that FIFO can be guaranteed across
1080 // all interfaces without stalling any of them to wait for a pending
1081 // endpoint to be bound.
1082 //
1083 // This allows us to assume that if an endpoint is not yet bound when we
1084 // receive a message targeting it, it *will* be bound on the proxy task
1085 // runner by the time a newly posted task runs there. Hence we simply post
1086 // a hopeful dispatch task to that task runner.
1087 //
1088 // As it turns out, there are even some instances of endpoints binding to
1089 // alternative (non-IO-thread, non-proxy) task runners, but still
1090 // ultimately relying on the fact that we schedule their messages on the
1091 // proxy task runner. So even if the endpoint is already bound, we
1092 // default to scheduling it on the proxy task runner as long as it's not
1093 // bound specifically to the IO task runner.
1094 // TODO(rockot): Try to sort out these cases and maybe eliminate them.
1095 //
1096 // Finally, it's also possible that an endpoint was bound to an
1097 // alternative task runner and it really does want its messages to
1098 // dispatch there. In that case `was_bound_off_sequence()` will be true to
1099 // signal that we should really use that task runner.
1100 const scoped_refptr<base::SequencedTaskRunner> task_runner =
1101 client && endpoint->was_bound_off_sequence()
1102 ? endpoint->task_runner()
1103 : proxy_task_runner_.get();
1104
1105 ScopedUrgentMessageNotification scoped_urgent_message_notification(
1106 message->has_flag(mojo::Message::kFlagIsUrgent)
1107 ? urgent_message_observer_
1108 : nullptr);
1109
1110 if (message->has_flag(mojo::Message::kFlagIsSync)) {
1111 MessageWrapper message_wrapper(this, std::move(*message));
1112 // Sync messages may need to be handled by the endpoint if it's blocking
1113 // on a sync reply. We pass ownership of the message to the endpoint's
1114 // sync message queue. If the endpoint was blocking, it will dequeue the
1115 // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
1116 // call will dequeue the message and dispatch it.
1117 std::optional<uint32_t> message_id =
1118 endpoint->EnqueueSyncMessage(std::move(message_wrapper));
1119 if (message_id) {
1120 task_runner->PostTask(
1121 FROM_HERE,
1122 base::BindOnce(
1123 &ChannelAssociatedGroupController::AcceptSyncMessage, this,
1124 id, *message_id,
1125 std::move(scoped_urgent_message_notification)));
1126 }
1127 return true;
1128 }
1129
1130 // If |task_runner| has been torn down already, this PostTask will fail
1131 // and destroy |message|. That operation may need to in turn destroy
1132 // in-transit associated endpoints and thus acquire |lock_|. We no longer
1133 // need the lock to be held now, so we can release it before the PostTask.
1134 {
1135 // Grab interface name from |client| before releasing the lock to ensure
1136 // that |client| is safe to access.
1137 base::TaskAnnotator::ScopedSetIpcHash scoped_set_ipc_hash(
1138 client ? client->interface_name() : "unknown interface");
1139 locker.Release();
1140 task_runner->PostTask(
1141 FROM_HERE,
1142 base::BindOnce(
1143 &ChannelAssociatedGroupController::AcceptOnEndpointThread, this,
1144 std::move(*message),
1145 std::move(scoped_urgent_message_notification)));
1146 }
1147 return true;
1148 }
1149
1150 locker.Release();
1151 // It's safe to access |client| here without holding a lock, because this
1152 // code runs on a proxy thread and |client| can't be destroyed from any
1153 // thread.
1154 return client->HandleIncomingMessage(message);
1155 }
1156
AcceptOnEndpointThread(mojo::Message message,ScopedUrgentMessageNotification scoped_urgent_message_notification)1157 void AcceptOnEndpointThread(
1158 mojo::Message message,
1159 ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1160 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1161 "ChannelAssociatedGroupController::AcceptOnEndpointThread");
1162
1163 mojo::InterfaceId id = message.interface_id();
1164 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsPrimaryInterfaceId(id));
1165
1166 base::AutoLock locker(lock_);
1167 Endpoint* endpoint = FindEndpoint(id);
1168 if (!endpoint)
1169 return;
1170
1171 mojo::InterfaceEndpointClient* client = endpoint->client();
1172 if (!client)
1173 return;
1174
1175 if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1176 !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1177 return;
1178 }
1179
1180 // TODO(altimin): This event is temporarily kept as a debug fallback. Remove
1181 // it once the new implementation proves to be stable.
1182 TRACE_EVENT(
1183 TRACE_DISABLED_BY_DEFAULT("mojom"),
1184 // Using client->interface_name() is safe here because this is a static
1185 // string defined for each mojo interface.
1186 perfetto::StaticString(client->interface_name()),
1187 [&](perfetto::EventContext& ctx) {
1188 static const uint8_t* toplevel_flow_enabled =
1189 TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("toplevel.flow");
1190 if (!*toplevel_flow_enabled)
1191 return;
1192
1193 perfetto::Flow::Global(message.GetTraceId())(ctx);
1194 });
1195
1196 // Sync messages should never make their way to this method.
1197 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
1198
1199 bool result = false;
1200 {
1201 base::AutoUnlock unlocker(lock_);
1202 result = client->HandleIncomingMessage(&message);
1203 }
1204
1205 if (!result)
1206 RaiseError();
1207 }
1208
AcceptSyncMessage(mojo::InterfaceId interface_id,uint32_t message_id,ScopedUrgentMessageNotification scoped_urgent_message_notification)1209 void AcceptSyncMessage(
1210 mojo::InterfaceId interface_id,
1211 uint32_t message_id,
1212 ScopedUrgentMessageNotification scoped_urgent_message_notification) {
1213 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
1214 "ChannelAssociatedGroupController::AcceptSyncMessage");
1215
1216 base::AutoLock locker(lock_);
1217 Endpoint* endpoint = FindEndpoint(interface_id);
1218 if (!endpoint)
1219 return;
1220
1221 // Careful, if the endpoint is detached its members are cleared. Check for
1222 // that before dereferencing.
1223 mojo::InterfaceEndpointClient* client = endpoint->client();
1224 if (!client)
1225 return;
1226
1227 if (!endpoint->task_runner()->RunsTasksInCurrentSequence() &&
1228 !proxy_task_runner_->RunsTasksInCurrentSequence()) {
1229 return;
1230 }
1231
1232 // Using client->interface_name() is safe here because this is a static
1233 // string defined for each mojo interface.
1234 TRACE_EVENT0("mojom", client->interface_name());
1235 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
1236
1237 // The message must have already been dequeued by the endpoint waking up
1238 // from a sync wait. Nothing to do.
1239 if (message_wrapper.value().IsNull())
1240 return;
1241
1242 bool result = false;
1243 {
1244 base::AutoUnlock unlocker(lock_);
1245 result = client->HandleIncomingMessage(&message_wrapper.value());
1246 }
1247
1248 if (!result)
1249 RaiseError();
1250 }
1251
1252 // mojo::PipeControlMessageHandlerDelegate:
OnPeerAssociatedEndpointClosed(mojo::InterfaceId id,const std::optional<mojo::DisconnectReason> & reason)1253 bool OnPeerAssociatedEndpointClosed(
1254 mojo::InterfaceId id,
1255 const std::optional<mojo::DisconnectReason>& reason) override {
1256 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
1257
1258 scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
1259 base::AutoLock locker(lock_);
1260 scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr);
1261 if (reason)
1262 endpoint->set_disconnect_reason(reason);
1263 if (!endpoint->peer_closed()) {
1264 if (endpoint->client())
1265 NotifyEndpointOfError(endpoint.get(), false /* force_async */);
1266 MarkPeerClosedAndMaybeRemove(endpoint.get());
1267 }
1268
1269 return true;
1270 }
1271
WaitForFlushToComplete(mojo::ScopedMessagePipeHandle flush_pipe)1272 bool WaitForFlushToComplete(
1273 mojo::ScopedMessagePipeHandle flush_pipe) override {
1274 // We don't support async flushing on the IPC Channel pipe.
1275 return false;
1276 }
1277
1278 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
1279 const scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
1280 const bool set_interface_id_namespace_bit_;
1281
1282 // Ensures sequenced access to members below.
1283 SEQUENCE_CHECKER(sequence_checker_);
1284
1285 // Whether `Bind()` or `SendMessageOnSequence()` was called.
1286 // `sequence_checker_` can be detached when this is `false`.
1287 bool was_bound_or_message_sent_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1288
1289 bool paused_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1290 bool shut_down_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
1291 std::unique_ptr<mojo::Connector> connector_
1292 GUARDED_BY_CONTEXT(sequence_checker_);
1293 mojo::MessageDispatcher dispatcher_ GUARDED_BY_CONTEXT(sequence_checker_);
1294 mojo::PipeControlMessageHandler control_message_handler_
1295 GUARDED_BY_CONTEXT(sequence_checker_);
1296 ControlMessageProxyThunk control_message_proxy_thunk_
1297 GUARDED_BY_CONTEXT(sequence_checker_);
1298 raw_ptr<UrgentMessageObserver> urgent_message_observer_
1299 GUARDED_BY_CONTEXT(sequence_checker_) = nullptr;
1300
1301 // NOTE: It is unsafe to call into this object while holding |lock_|.
1302 mojo::PipeControlMessageProxy control_message_proxy_;
1303
1304 // Outgoing messages sent before this controller Bound() to a pipe or while it
1305 // was paused. Protected by a lock to support memory dumps from any thread.
1306 base::Lock outgoing_messages_lock_;
1307 std::vector<mojo::Message> outgoing_messages_
1308 GUARDED_BY(outgoing_messages_lock_);
1309
1310 // Guards the fields below for thread-safe access.
1311 base::Lock lock_;
1312
1313 bool encountered_error_ GUARDED_BY(lock_) = false;
1314
1315 // ID #1 is reserved for the mojom::Channel interface.
1316 uint32_t next_interface_id_ GUARDED_BY(lock_) = 2;
1317
1318 std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_ GUARDED_BY(lock_);
1319 };
1320
1321 namespace {
1322
OnMemoryDump(const base::trace_event::MemoryDumpArgs & args,base::trace_event::ProcessMemoryDump * pmd)1323 bool ControllerMemoryDumpProvider::OnMemoryDump(
1324 const base::trace_event::MemoryDumpArgs& args,
1325 base::trace_event::ProcessMemoryDump* pmd) {
1326 base::AutoLock lock(lock_);
1327 for (ChannelAssociatedGroupController* controller : controllers_) {
1328 base::trace_event::MemoryAllocatorDump* dump = pmd->CreateAllocatorDump(
1329 base::StringPrintf("mojo/queued_ipc_channel_message/0x%" PRIxPTR,
1330 reinterpret_cast<uintptr_t>(controller)));
1331 dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount,
1332 base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1333 controller->GetQueuedMessageCount());
1334 MessageMemoryDumpInfo info;
1335 size_t count = 0;
1336 controller->GetTopQueuedMessageMemoryDumpInfo(&info, &count);
1337 dump->AddScalar("top_message_name", "id", info.id);
1338 dump->AddScalar("top_message_count",
1339 base::trace_event::MemoryAllocatorDump::kUnitsObjects,
1340 count);
1341
1342 if (info.profiler_tag) {
1343 // TODO(ssid): Memory dumps currently do not support adding string
1344 // arguments in background dumps. So, add this value as a trace event for
1345 // now.
1346 TRACE_EVENT2(base::trace_event::MemoryDumpManager::kTraceCategory,
1347 "ControllerMemoryDumpProvider::OnMemoryDump",
1348 "top_queued_message_tag", info.profiler_tag,
1349 "count", count);
1350 }
1351 }
1352
1353 return true;
1354 }
1355
1356 class MojoBootstrapImpl : public MojoBootstrap {
1357 public:
MojoBootstrapImpl(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<ChannelAssociatedGroupController> controller)1358 MojoBootstrapImpl(
1359 mojo::ScopedMessagePipeHandle handle,
1360 const scoped_refptr<ChannelAssociatedGroupController> controller)
1361 : controller_(controller),
1362 associated_group_(controller),
1363 handle_(std::move(handle)) {}
1364
1365 MojoBootstrapImpl(const MojoBootstrapImpl&) = delete;
1366 MojoBootstrapImpl& operator=(const MojoBootstrapImpl&) = delete;
1367
~MojoBootstrapImpl()1368 ~MojoBootstrapImpl() override {
1369 controller_->ShutDown();
1370 }
1371
1372 private:
Connect(mojo::PendingAssociatedRemote<mojom::Channel> * sender,mojo::PendingAssociatedReceiver<mojom::Channel> * receiver)1373 void Connect(
1374 mojo::PendingAssociatedRemote<mojom::Channel>* sender,
1375 mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) override {
1376 controller_->Bind(std::move(handle_), sender, receiver);
1377 }
1378
StartReceiving()1379 void StartReceiving() override { controller_->StartReceiving(); }
1380
Pause()1381 void Pause() override {
1382 controller_->Pause();
1383 }
1384
Unpause()1385 void Unpause() override {
1386 controller_->Unpause();
1387 }
1388
Flush()1389 void Flush() override {
1390 controller_->FlushOutgoingMessages();
1391 }
1392
GetAssociatedGroup()1393 mojo::AssociatedGroup* GetAssociatedGroup() override {
1394 return &associated_group_;
1395 }
1396
SetUrgentMessageObserver(UrgentMessageObserver * observer)1397 void SetUrgentMessageObserver(UrgentMessageObserver* observer) override {
1398 controller_->SetUrgentMessageObserver(observer);
1399 }
1400
1401 scoped_refptr<ChannelAssociatedGroupController> controller_;
1402 mojo::AssociatedGroup associated_group_;
1403
1404 mojo::ScopedMessagePipeHandle handle_;
1405 };
1406
1407 } // namespace
1408
1409 ScopedAllowOffSequenceChannelAssociatedBindings::
ScopedAllowOffSequenceChannelAssociatedBindings()1410 ScopedAllowOffSequenceChannelAssociatedBindings()
1411 : resetter_(&off_sequence_binding_allowed, true) {}
1412
1413 ScopedAllowOffSequenceChannelAssociatedBindings::
1414 ~ScopedAllowOffSequenceChannelAssociatedBindings() = default;
1415
1416 // static
Create(mojo::ScopedMessagePipeHandle handle,Channel::Mode mode,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)1417 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
1418 mojo::ScopedMessagePipeHandle handle,
1419 Channel::Mode mode,
1420 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
1421 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
1422 return std::make_unique<MojoBootstrapImpl>(
1423 std::move(handle),
1424 base::MakeRefCounted<ChannelAssociatedGroupController>(
1425 mode == Channel::MODE_SERVER, ipc_task_runner, proxy_task_runner));
1426 }
1427
1428 } // namespace IPC
1429