1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_mojo_bootstrap.h"
6
7 #include <inttypes.h>
8 #include <stdint.h>
9
10 #include <map>
11 #include <memory>
12 #include <set>
13 #include <utility>
14 #include <vector>
15
16 #include "base/callback.h"
17 #include "base/containers/queue.h"
18 #include "base/logging.h"
19 #include "base/macros.h"
20 #include "base/memory/ptr_util.h"
21 #include "base/no_destructor.h"
22 #include "base/sequenced_task_runner.h"
23 #include "base/single_thread_task_runner.h"
24 #include "base/strings/stringprintf.h"
25 #include "base/synchronization/lock.h"
26 #include "base/threading/thread_checker.h"
27 #include "base/threading/thread_task_runner_handle.h"
28 #include "base/trace_event/memory_allocator_dump.h"
29 #include "base/trace_event/memory_dump_manager.h"
30 #include "base/trace_event/memory_dump_provider.h"
31 #include "ipc/ipc_channel.h"
32 #include "mojo/public/cpp/bindings/associated_group.h"
33 #include "mojo/public/cpp/bindings/associated_group_controller.h"
34 #include "mojo/public/cpp/bindings/connector.h"
35 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
36 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
37 #include "mojo/public/cpp/bindings/interface_id.h"
38 #include "mojo/public/cpp/bindings/message.h"
39 #include "mojo/public/cpp/bindings/message_header_validator.h"
40 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
41 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
42 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
43 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
44
45 namespace IPC {
46
47 namespace {
48
49 class ChannelAssociatedGroupController;
50
51 // Used to track some internal Channel state in pursuit of message leaks.
52 //
53 // TODO(https://crbug.com/813045): Remove this.
54 class ControllerMemoryDumpProvider
55 : public base::trace_event::MemoryDumpProvider {
56 public:
ControllerMemoryDumpProvider()57 ControllerMemoryDumpProvider() {
58 base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider(
59 this, "IPCChannel", nullptr);
60 }
61
~ControllerMemoryDumpProvider()62 ~ControllerMemoryDumpProvider() override {
63 base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider(
64 this);
65 }
66
AddController(ChannelAssociatedGroupController * controller)67 void AddController(ChannelAssociatedGroupController* controller) {
68 base::AutoLock lock(lock_);
69 controllers_.insert(controller);
70 }
71
RemoveController(ChannelAssociatedGroupController * controller)72 void RemoveController(ChannelAssociatedGroupController* controller) {
73 base::AutoLock lock(lock_);
74 controllers_.erase(controller);
75 }
76
77 // base::trace_event::MemoryDumpProvider:
78 bool OnMemoryDump(const base::trace_event::MemoryDumpArgs& args,
79 base::trace_event::ProcessMemoryDump* pmd) override;
80
81 private:
82 base::Lock lock_;
83 std::set<ChannelAssociatedGroupController*> controllers_;
84
85 DISALLOW_COPY_AND_ASSIGN(ControllerMemoryDumpProvider);
86 };
87
GetMemoryDumpProvider()88 ControllerMemoryDumpProvider& GetMemoryDumpProvider() {
89 static base::NoDestructor<ControllerMemoryDumpProvider> provider;
90 return *provider;
91 }
92
93 class ChannelAssociatedGroupController
94 : public mojo::AssociatedGroupController,
95 public mojo::MessageReceiver,
96 public mojo::PipeControlMessageHandlerDelegate {
97 public:
ChannelAssociatedGroupController(bool set_interface_id_namespace_bit,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)98 ChannelAssociatedGroupController(
99 bool set_interface_id_namespace_bit,
100 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
101 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
102 : task_runner_(ipc_task_runner),
103 proxy_task_runner_(proxy_task_runner),
104 set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
105 filters_(this),
106 control_message_handler_(this),
107 control_message_proxy_thunk_(this),
108 control_message_proxy_(&control_message_proxy_thunk_) {
109 thread_checker_.DetachFromThread();
110 control_message_handler_.SetDescription(
111 "IPC::mojom::Bootstrap [master] PipeControlMessageHandler");
112 filters_.Append<mojo::MessageHeaderValidator>(
113 "IPC::mojom::Bootstrap [master] MessageHeaderValidator");
114
115 GetMemoryDumpProvider().AddController(this);
116 }
117
GetQueuedMessageCount()118 size_t GetQueuedMessageCount() {
119 base::AutoLock lock(outgoing_messages_lock_);
120 return outgoing_messages_.size();
121 }
122
Bind(mojo::ScopedMessagePipeHandle handle)123 void Bind(mojo::ScopedMessagePipeHandle handle) {
124 DCHECK(thread_checker_.CalledOnValidThread());
125 DCHECK(task_runner_->BelongsToCurrentThread());
126
127 connector_.reset(new mojo::Connector(
128 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
129 task_runner_));
130 connector_->set_incoming_receiver(&filters_);
131 connector_->set_connection_error_handler(
132 base::Bind(&ChannelAssociatedGroupController::OnPipeError,
133 base::Unretained(this)));
134 connector_->set_enforce_errors_from_incoming_receiver(false);
135 connector_->SetWatcherHeapProfilerTag("IPC Channel");
136 }
137
Pause()138 void Pause() {
139 DCHECK(!paused_);
140 paused_ = true;
141 }
142
Unpause()143 void Unpause() {
144 DCHECK(paused_);
145 paused_ = false;
146 }
147
FlushOutgoingMessages()148 void FlushOutgoingMessages() {
149 std::vector<mojo::Message> outgoing_messages;
150 {
151 base::AutoLock lock(outgoing_messages_lock_);
152 std::swap(outgoing_messages, outgoing_messages_);
153 }
154 for (auto& message : outgoing_messages)
155 SendMessage(&message);
156 }
157
CreateChannelEndpoints(mojom::ChannelAssociatedPtr * sender,mojom::ChannelAssociatedRequest * receiver)158 void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender,
159 mojom::ChannelAssociatedRequest* receiver) {
160 mojo::InterfaceId sender_id, receiver_id;
161 if (set_interface_id_namespace_bit_) {
162 sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
163 receiver_id = 1;
164 } else {
165 sender_id = 1;
166 receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
167 }
168
169 {
170 base::AutoLock locker(lock_);
171 Endpoint* sender_endpoint = new Endpoint(this, sender_id);
172 Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
173 endpoints_.insert({ sender_id, sender_endpoint });
174 endpoints_.insert({ receiver_id, receiver_endpoint });
175 sender_endpoint->set_handle_created();
176 receiver_endpoint->set_handle_created();
177 }
178
179 mojo::ScopedInterfaceEndpointHandle sender_handle =
180 CreateScopedInterfaceEndpointHandle(sender_id);
181 mojo::ScopedInterfaceEndpointHandle receiver_handle =
182 CreateScopedInterfaceEndpointHandle(receiver_id);
183
184 sender->Bind(mojom::ChannelAssociatedPtrInfo(std::move(sender_handle), 0));
185 *receiver = mojom::ChannelAssociatedRequest(std::move(receiver_handle));
186 }
187
ShutDown()188 void ShutDown() {
189 DCHECK(thread_checker_.CalledOnValidThread());
190 shut_down_ = true;
191 connector_->CloseMessagePipe();
192 OnPipeError();
193 connector_.reset();
194
195 base::AutoLock lock(outgoing_messages_lock_);
196 outgoing_messages_.clear();
197 }
198
199 // mojo::AssociatedGroupController:
AssociateInterface(mojo::ScopedInterfaceEndpointHandle handle_to_send)200 mojo::InterfaceId AssociateInterface(
201 mojo::ScopedInterfaceEndpointHandle handle_to_send) override {
202 if (!handle_to_send.pending_association())
203 return mojo::kInvalidInterfaceId;
204
205 uint32_t id = 0;
206 {
207 base::AutoLock locker(lock_);
208 do {
209 if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask)
210 next_interface_id_ = 2;
211 id = next_interface_id_++;
212 if (set_interface_id_namespace_bit_)
213 id |= mojo::kInterfaceIdNamespaceMask;
214 } while (ContainsKey(endpoints_, id));
215
216 Endpoint* endpoint = new Endpoint(this, id);
217 if (encountered_error_)
218 endpoint->set_peer_closed();
219 endpoint->set_handle_created();
220 endpoints_.insert({id, endpoint});
221 }
222
223 if (!NotifyAssociation(&handle_to_send, id)) {
224 // The peer handle of |handle_to_send|, which is supposed to join this
225 // associated group, has been closed.
226 {
227 base::AutoLock locker(lock_);
228 Endpoint* endpoint = FindEndpoint(id);
229 if (endpoint)
230 MarkClosedAndMaybeRemove(endpoint);
231 }
232
233 control_message_proxy_.NotifyPeerEndpointClosed(
234 id, handle_to_send.disconnect_reason());
235 }
236 return id;
237 }
238
CreateLocalEndpointHandle(mojo::InterfaceId id)239 mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
240 mojo::InterfaceId id) override {
241 if (!mojo::IsValidInterfaceId(id))
242 return mojo::ScopedInterfaceEndpointHandle();
243
244 // Unless it is the master ID, |id| is from the remote side and therefore
245 // its namespace bit is supposed to be different than the value that this
246 // router would use.
247 if (!mojo::IsMasterInterfaceId(id) &&
248 set_interface_id_namespace_bit_ ==
249 mojo::HasInterfaceIdNamespaceBitSet(id)) {
250 return mojo::ScopedInterfaceEndpointHandle();
251 }
252
253 base::AutoLock locker(lock_);
254 bool inserted = false;
255 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
256 if (inserted) {
257 DCHECK(!endpoint->handle_created());
258 if (encountered_error_)
259 endpoint->set_peer_closed();
260 } else {
261 if (endpoint->handle_created())
262 return mojo::ScopedInterfaceEndpointHandle();
263 }
264
265 endpoint->set_handle_created();
266 return CreateScopedInterfaceEndpointHandle(id);
267 }
268
CloseEndpointHandle(mojo::InterfaceId id,const base::Optional<mojo::DisconnectReason> & reason)269 void CloseEndpointHandle(
270 mojo::InterfaceId id,
271 const base::Optional<mojo::DisconnectReason>& reason) override {
272 if (!mojo::IsValidInterfaceId(id))
273 return;
274 {
275 base::AutoLock locker(lock_);
276 DCHECK(ContainsKey(endpoints_, id));
277 Endpoint* endpoint = endpoints_[id].get();
278 DCHECK(!endpoint->client());
279 DCHECK(!endpoint->closed());
280 MarkClosedAndMaybeRemove(endpoint);
281 }
282
283 if (!mojo::IsMasterInterfaceId(id) || reason)
284 control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
285 }
286
AttachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle,mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)287 mojo::InterfaceEndpointController* AttachEndpointClient(
288 const mojo::ScopedInterfaceEndpointHandle& handle,
289 mojo::InterfaceEndpointClient* client,
290 scoped_refptr<base::SequencedTaskRunner> runner) override {
291 const mojo::InterfaceId id = handle.id();
292
293 DCHECK(mojo::IsValidInterfaceId(id));
294 DCHECK(client);
295
296 base::AutoLock locker(lock_);
297 DCHECK(ContainsKey(endpoints_, id));
298
299 Endpoint* endpoint = endpoints_[id].get();
300 endpoint->AttachClient(client, std::move(runner));
301
302 if (endpoint->peer_closed())
303 NotifyEndpointOfError(endpoint, true /* force_async */);
304
305 return endpoint;
306 }
307
DetachEndpointClient(const mojo::ScopedInterfaceEndpointHandle & handle)308 void DetachEndpointClient(
309 const mojo::ScopedInterfaceEndpointHandle& handle) override {
310 const mojo::InterfaceId id = handle.id();
311
312 DCHECK(mojo::IsValidInterfaceId(id));
313
314 base::AutoLock locker(lock_);
315 DCHECK(ContainsKey(endpoints_, id));
316
317 Endpoint* endpoint = endpoints_[id].get();
318 endpoint->DetachClient();
319 }
320
RaiseError()321 void RaiseError() override {
322 // We ignore errors on channel endpoints, leaving the pipe open. There are
323 // good reasons for this:
324 //
325 // * We should never close a channel endpoint in either process as long as
326 // the child process is still alive. The child's endpoint should only be
327 // closed implicitly by process death, and the browser's endpoint should
328 // only be closed after the child process is confirmed to be dead. Crash
329 // reporting logic in Chrome relies on this behavior in order to do the
330 // right thing.
331 //
332 // * There are two interesting conditions under which RaiseError() can be
333 // implicitly reached: an incoming message fails validation, or the
334 // local endpoint drops a response callback without calling it.
335 //
336 // * In the validation case, we also report the message as bad, and this
337 // will imminently trigger the common bad-IPC path in the browser,
338 // causing the browser to kill the offending renderer.
339 //
340 // * In the dropped response callback case, the net result of ignoring the
341 // issue is generally innocuous. While indicative of programmer error,
342 // it's not a severe failure and is already covered by separate DCHECKs.
343 //
344 // See https://crbug.com/861607 for additional discussion.
345 }
346
PrefersSerializedMessages()347 bool PrefersSerializedMessages() override { return true; }
348
349 private:
350 class Endpoint;
351 class ControlMessageProxyThunk;
352 friend class Endpoint;
353 friend class ControlMessageProxyThunk;
354
355 // MessageWrapper objects are always destroyed under the controller's lock. On
356 // destruction, if the message it wrappers contains
357 // ScopedInterfaceEndpointHandles (which cannot be destructed under the
358 // controller's lock), the wrapper unlocks to clean them up.
359 class MessageWrapper {
360 public:
361 MessageWrapper() = default;
362
MessageWrapper(ChannelAssociatedGroupController * controller,mojo::Message message)363 MessageWrapper(ChannelAssociatedGroupController* controller,
364 mojo::Message message)
365 : controller_(controller), value_(std::move(message)) {}
366
MessageWrapper(MessageWrapper && other)367 MessageWrapper(MessageWrapper&& other)
368 : controller_(other.controller_), value_(std::move(other.value_)) {}
369
~MessageWrapper()370 ~MessageWrapper() {
371 if (value_.associated_endpoint_handles()->empty())
372 return;
373
374 controller_->lock_.AssertAcquired();
375 {
376 base::AutoUnlock unlocker(controller_->lock_);
377 value_.mutable_associated_endpoint_handles()->clear();
378 }
379 }
380
operator =(MessageWrapper && other)381 MessageWrapper& operator=(MessageWrapper&& other) {
382 controller_ = other.controller_;
383 value_ = std::move(other.value_);
384 return *this;
385 }
386
value()387 mojo::Message& value() { return value_; }
388
389 private:
390 ChannelAssociatedGroupController* controller_ = nullptr;
391 mojo::Message value_;
392
393 DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
394 };
395
396 class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
397 public mojo::InterfaceEndpointController {
398 public:
Endpoint(ChannelAssociatedGroupController * controller,mojo::InterfaceId id)399 Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
400 : controller_(controller), id_(id) {}
401
id() const402 mojo::InterfaceId id() const { return id_; }
403
closed() const404 bool closed() const {
405 controller_->lock_.AssertAcquired();
406 return closed_;
407 }
408
set_closed()409 void set_closed() {
410 controller_->lock_.AssertAcquired();
411 closed_ = true;
412 }
413
peer_closed() const414 bool peer_closed() const {
415 controller_->lock_.AssertAcquired();
416 return peer_closed_;
417 }
418
set_peer_closed()419 void set_peer_closed() {
420 controller_->lock_.AssertAcquired();
421 peer_closed_ = true;
422 }
423
handle_created() const424 bool handle_created() const {
425 controller_->lock_.AssertAcquired();
426 return handle_created_;
427 }
428
set_handle_created()429 void set_handle_created() {
430 controller_->lock_.AssertAcquired();
431 handle_created_ = true;
432 }
433
disconnect_reason() const434 const base::Optional<mojo::DisconnectReason>& disconnect_reason() const {
435 return disconnect_reason_;
436 }
437
set_disconnect_reason(const base::Optional<mojo::DisconnectReason> & disconnect_reason)438 void set_disconnect_reason(
439 const base::Optional<mojo::DisconnectReason>& disconnect_reason) {
440 disconnect_reason_ = disconnect_reason;
441 }
442
task_runner() const443 base::SequencedTaskRunner* task_runner() const {
444 return task_runner_.get();
445 }
446
client() const447 mojo::InterfaceEndpointClient* client() const {
448 controller_->lock_.AssertAcquired();
449 return client_;
450 }
451
AttachClient(mojo::InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)452 void AttachClient(mojo::InterfaceEndpointClient* client,
453 scoped_refptr<base::SequencedTaskRunner> runner) {
454 controller_->lock_.AssertAcquired();
455 DCHECK(!client_);
456 DCHECK(!closed_);
457 DCHECK(runner->RunsTasksInCurrentSequence());
458
459 task_runner_ = std::move(runner);
460 client_ = client;
461 }
462
DetachClient()463 void DetachClient() {
464 controller_->lock_.AssertAcquired();
465 DCHECK(client_);
466 DCHECK(task_runner_->RunsTasksInCurrentSequence());
467 DCHECK(!closed_);
468
469 task_runner_ = nullptr;
470 client_ = nullptr;
471 sync_watcher_.reset();
472 }
473
EnqueueSyncMessage(MessageWrapper message)474 uint32_t EnqueueSyncMessage(MessageWrapper message) {
475 controller_->lock_.AssertAcquired();
476 uint32_t id = GenerateSyncMessageId();
477 sync_messages_.emplace(id, std::move(message));
478 SignalSyncMessageEvent();
479 return id;
480 }
481
SignalSyncMessageEvent()482 void SignalSyncMessageEvent() {
483 controller_->lock_.AssertAcquired();
484
485 if (sync_watcher_)
486 sync_watcher_->SignalEvent();
487 }
488
PopSyncMessage(uint32_t id)489 MessageWrapper PopSyncMessage(uint32_t id) {
490 controller_->lock_.AssertAcquired();
491 if (sync_messages_.empty() || sync_messages_.front().first != id)
492 return MessageWrapper();
493 MessageWrapper message = std::move(sync_messages_.front().second);
494 sync_messages_.pop();
495 return message;
496 }
497
498 // mojo::InterfaceEndpointController:
SendMessage(mojo::Message * message)499 bool SendMessage(mojo::Message* message) override {
500 DCHECK(task_runner_->RunsTasksInCurrentSequence());
501 message->set_interface_id(id_);
502 return controller_->SendMessage(message);
503 }
504
AllowWokenUpBySyncWatchOnSameThread()505 void AllowWokenUpBySyncWatchOnSameThread() override {
506 DCHECK(task_runner_->RunsTasksInCurrentSequence());
507
508 EnsureSyncWatcherExists();
509 sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
510 }
511
SyncWatch(const bool * should_stop)512 bool SyncWatch(const bool* should_stop) override {
513 DCHECK(task_runner_->RunsTasksInCurrentSequence());
514
515 // It's not legal to make sync calls from the master endpoint's thread,
516 // and in fact they must only happen from the proxy task runner.
517 DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
518 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
519
520 EnsureSyncWatcherExists();
521 return sync_watcher_->SyncWatch(should_stop);
522 }
523
524 private:
525 friend class base::RefCountedThreadSafe<Endpoint>;
526
~Endpoint()527 ~Endpoint() override {
528 controller_->lock_.AssertAcquired();
529 DCHECK(!client_);
530 DCHECK(closed_);
531 DCHECK(peer_closed_);
532 DCHECK(!sync_watcher_);
533 }
534
OnSyncMessageEventReady()535 void OnSyncMessageEventReady() {
536 DCHECK(task_runner_->RunsTasksInCurrentSequence());
537
538 scoped_refptr<Endpoint> keepalive(this);
539 scoped_refptr<AssociatedGroupController> controller_keepalive(
540 controller_);
541 base::AutoLock locker(controller_->lock_);
542 bool more_to_process = false;
543 if (!sync_messages_.empty()) {
544 MessageWrapper message_wrapper =
545 std::move(sync_messages_.front().second);
546 sync_messages_.pop();
547
548 bool dispatch_succeeded;
549 mojo::InterfaceEndpointClient* client = client_;
550 {
551 base::AutoUnlock unlocker(controller_->lock_);
552 dispatch_succeeded =
553 client->HandleIncomingMessage(&message_wrapper.value());
554 }
555
556 if (!sync_messages_.empty())
557 more_to_process = true;
558
559 if (!dispatch_succeeded)
560 controller_->RaiseError();
561 }
562
563 if (!more_to_process)
564 sync_watcher_->ResetEvent();
565
566 // If there are no queued sync messages and the peer has closed, there
567 // there won't be incoming sync messages in the future. If any
568 // SyncWatch() calls are on the stack for this endpoint, resetting the
569 // watcher will allow them to exit as the stack undwinds.
570 if (!more_to_process && peer_closed_)
571 sync_watcher_.reset();
572 }
573
EnsureSyncWatcherExists()574 void EnsureSyncWatcherExists() {
575 DCHECK(task_runner_->RunsTasksInCurrentSequence());
576 if (sync_watcher_)
577 return;
578
579 base::AutoLock locker(controller_->lock_);
580 sync_watcher_ = std::make_unique<mojo::SequenceLocalSyncEventWatcher>(
581 base::BindRepeating(&Endpoint::OnSyncMessageEventReady,
582 base::Unretained(this)));
583 if (peer_closed_ || !sync_messages_.empty())
584 SignalSyncMessageEvent();
585 }
586
GenerateSyncMessageId()587 uint32_t GenerateSyncMessageId() {
588 // Overflow is fine.
589 uint32_t id = next_sync_message_id_++;
590 DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
591 return id;
592 }
593
594 ChannelAssociatedGroupController* const controller_;
595 const mojo::InterfaceId id_;
596
597 bool closed_ = false;
598 bool peer_closed_ = false;
599 bool handle_created_ = false;
600 base::Optional<mojo::DisconnectReason> disconnect_reason_;
601 mojo::InterfaceEndpointClient* client_ = nullptr;
602 scoped_refptr<base::SequencedTaskRunner> task_runner_;
603 std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_;
604 base::queue<std::pair<uint32_t, MessageWrapper>> sync_messages_;
605 uint32_t next_sync_message_id_ = 0;
606
607 DISALLOW_COPY_AND_ASSIGN(Endpoint);
608 };
609
610 class ControlMessageProxyThunk : public MessageReceiver {
611 public:
ControlMessageProxyThunk(ChannelAssociatedGroupController * controller)612 explicit ControlMessageProxyThunk(
613 ChannelAssociatedGroupController* controller)
614 : controller_(controller) {}
615
616 private:
617 // MessageReceiver:
Accept(mojo::Message * message)618 bool Accept(mojo::Message* message) override {
619 return controller_->SendMessage(message);
620 }
621
622 ChannelAssociatedGroupController* controller_;
623
624 DISALLOW_COPY_AND_ASSIGN(ControlMessageProxyThunk);
625 };
626
~ChannelAssociatedGroupController()627 ~ChannelAssociatedGroupController() override {
628 DCHECK(!connector_);
629
630 base::AutoLock locker(lock_);
631 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
632 Endpoint* endpoint = iter->second.get();
633 ++iter;
634
635 if (!endpoint->closed()) {
636 // This happens when a NotifyPeerEndpointClosed message been received,
637 // but the interface ID hasn't been used to create local endpoint
638 // handle.
639 DCHECK(!endpoint->client());
640 DCHECK(endpoint->peer_closed());
641 MarkClosedAndMaybeRemove(endpoint);
642 } else {
643 MarkPeerClosedAndMaybeRemove(endpoint);
644 }
645 }
646
647 DCHECK(endpoints_.empty());
648
649 GetMemoryDumpProvider().RemoveController(this);
650 }
651
SendMessage(mojo::Message * message)652 bool SendMessage(mojo::Message* message) {
653 if (task_runner_->BelongsToCurrentThread()) {
654 DCHECK(thread_checker_.CalledOnValidThread());
655 if (!connector_ || paused_) {
656 if (!shut_down_) {
657 base::AutoLock lock(outgoing_messages_lock_);
658 outgoing_messages_.emplace_back(std::move(*message));
659 }
660 return true;
661 }
662 return connector_->Accept(message);
663 } else {
664 // Do a message size check here so we don't lose valuable stack
665 // information to the task scheduler.
666 CHECK_LE(message->data_num_bytes(), Channel::kMaximumMessageSize);
667
668 // We always post tasks to the master endpoint thread when called from
669 // other threads in order to simulate IPC::ChannelProxy::Send behavior.
670 task_runner_->PostTask(
671 FROM_HERE,
672 base::Bind(
673 &ChannelAssociatedGroupController::SendMessageOnMasterThread,
674 this, base::Passed(message)));
675 return true;
676 }
677 }
678
SendMessageOnMasterThread(mojo::Message message)679 void SendMessageOnMasterThread(mojo::Message message) {
680 DCHECK(thread_checker_.CalledOnValidThread());
681 if (!SendMessage(&message))
682 RaiseError();
683 }
684
OnPipeError()685 void OnPipeError() {
686 DCHECK(thread_checker_.CalledOnValidThread());
687
688 // We keep |this| alive here because it's possible for the notifications
689 // below to release all other references.
690 scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
691
692 base::AutoLock locker(lock_);
693 encountered_error_ = true;
694
695 std::vector<scoped_refptr<Endpoint>> endpoints_to_notify;
696 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
697 Endpoint* endpoint = iter->second.get();
698 ++iter;
699
700 if (endpoint->client())
701 endpoints_to_notify.push_back(endpoint);
702
703 MarkPeerClosedAndMaybeRemove(endpoint);
704 }
705
706 for (auto& endpoint : endpoints_to_notify) {
707 // Because a notification may in turn detach any endpoint, we have to
708 // check each client again here.
709 if (endpoint->client())
710 NotifyEndpointOfError(endpoint.get(), false /* force_async */);
711 }
712 }
713
NotifyEndpointOfError(Endpoint * endpoint,bool force_async)714 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) {
715 lock_.AssertAcquired();
716 DCHECK(endpoint->task_runner() && endpoint->client());
717 if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
718 mojo::InterfaceEndpointClient* client = endpoint->client();
719 base::Optional<mojo::DisconnectReason> reason(
720 endpoint->disconnect_reason());
721
722 base::AutoUnlock unlocker(lock_);
723 client->NotifyError(reason);
724 } else {
725 endpoint->task_runner()->PostTask(
726 FROM_HERE,
727 base::Bind(&ChannelAssociatedGroupController::
728 NotifyEndpointOfErrorOnEndpointThread,
729 this, endpoint->id(), base::Unretained(endpoint)));
730 }
731 }
732
NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,Endpoint * endpoint)733 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
734 Endpoint* endpoint) {
735 base::AutoLock locker(lock_);
736 auto iter = endpoints_.find(id);
737 if (iter == endpoints_.end() || iter->second.get() != endpoint)
738 return;
739 if (!endpoint->client())
740 return;
741
742 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
743 NotifyEndpointOfError(endpoint, false /* force_async */);
744 }
745
MarkClosedAndMaybeRemove(Endpoint * endpoint)746 void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
747 lock_.AssertAcquired();
748 endpoint->set_closed();
749 if (endpoint->closed() && endpoint->peer_closed())
750 endpoints_.erase(endpoint->id());
751 }
752
MarkPeerClosedAndMaybeRemove(Endpoint * endpoint)753 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
754 lock_.AssertAcquired();
755 endpoint->set_peer_closed();
756 endpoint->SignalSyncMessageEvent();
757 if (endpoint->closed() && endpoint->peer_closed())
758 endpoints_.erase(endpoint->id());
759 }
760
FindOrInsertEndpoint(mojo::InterfaceId id,bool * inserted)761 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
762 lock_.AssertAcquired();
763 DCHECK(!inserted || !*inserted);
764
765 Endpoint* endpoint = FindEndpoint(id);
766 if (!endpoint) {
767 endpoint = new Endpoint(this, id);
768 endpoints_.insert({id, endpoint});
769 if (inserted)
770 *inserted = true;
771 }
772 return endpoint;
773 }
774
FindEndpoint(mojo::InterfaceId id)775 Endpoint* FindEndpoint(mojo::InterfaceId id) {
776 lock_.AssertAcquired();
777 auto iter = endpoints_.find(id);
778 return iter != endpoints_.end() ? iter->second.get() : nullptr;
779 }
780
781 // mojo::MessageReceiver:
Accept(mojo::Message * message)782 bool Accept(mojo::Message* message) override {
783 DCHECK(thread_checker_.CalledOnValidThread());
784
785 if (!message->DeserializeAssociatedEndpointHandles(this))
786 return false;
787
788 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
789 return control_message_handler_.Accept(message);
790
791 mojo::InterfaceId id = message->interface_id();
792 DCHECK(mojo::IsValidInterfaceId(id));
793
794 base::AutoLock locker(lock_);
795 Endpoint* endpoint = FindEndpoint(id);
796 if (!endpoint)
797 return true;
798
799 mojo::InterfaceEndpointClient* client = endpoint->client();
800 if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) {
801 // No client has been bound yet or the client runs tasks on another
802 // thread. We assume the other thread must always be the one on which
803 // |proxy_task_runner_| runs tasks, since that's the only valid scenario.
804 //
805 // If the client is not yet bound, it must be bound by the time this task
806 // runs or else it's programmer error.
807 DCHECK(proxy_task_runner_);
808
809 if (message->has_flag(mojo::Message::kFlagIsSync)) {
810 MessageWrapper message_wrapper(this, std::move(*message));
811 // Sync messages may need to be handled by the endpoint if it's blocking
812 // on a sync reply. We pass ownership of the message to the endpoint's
813 // sync message queue. If the endpoint was blocking, it will dequeue the
814 // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
815 // call will dequeue the message and dispatch it.
816 uint32_t message_id =
817 endpoint->EnqueueSyncMessage(std::move(message_wrapper));
818 proxy_task_runner_->PostTask(
819 FROM_HERE,
820 base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage,
821 this, id, message_id));
822 return true;
823 }
824
825 proxy_task_runner_->PostTask(
826 FROM_HERE,
827 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
828 this, base::Passed(message)));
829 return true;
830 }
831
832 // We do not expect to receive sync responses on the master endpoint thread.
833 // If it's happening, it's a bug.
834 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) ||
835 !message->has_flag(mojo::Message::kFlagIsResponse));
836
837 base::AutoUnlock unlocker(lock_);
838 return client->HandleIncomingMessage(message);
839 }
840
AcceptOnProxyThread(mojo::Message message)841 void AcceptOnProxyThread(mojo::Message message) {
842 DCHECK(proxy_task_runner_->BelongsToCurrentThread());
843
844 mojo::InterfaceId id = message.interface_id();
845 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
846
847 base::AutoLock locker(lock_);
848 Endpoint* endpoint = FindEndpoint(id);
849 if (!endpoint)
850 return;
851
852 mojo::InterfaceEndpointClient* client = endpoint->client();
853 if (!client)
854 return;
855
856 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
857
858 // Sync messages should never make their way to this method.
859 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
860
861 bool result = false;
862 {
863 base::AutoUnlock unlocker(lock_);
864 result = client->HandleIncomingMessage(&message);
865 }
866
867 if (!result)
868 RaiseError();
869 }
870
AcceptSyncMessage(mojo::InterfaceId interface_id,uint32_t message_id)871 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) {
872 DCHECK(proxy_task_runner_->BelongsToCurrentThread());
873
874 base::AutoLock locker(lock_);
875 Endpoint* endpoint = FindEndpoint(interface_id);
876 if (!endpoint)
877 return;
878
879 // Careful, if the endpoint is detached its members are cleared. Check for
880 // that before dereferencing.
881 mojo::InterfaceEndpointClient* client = endpoint->client();
882 if (!client)
883 return;
884
885 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
886 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
887
888 // The message must have already been dequeued by the endpoint waking up
889 // from a sync wait. Nothing to do.
890 if (message_wrapper.value().IsNull())
891 return;
892
893 bool result = false;
894 {
895 base::AutoUnlock unlocker(lock_);
896 result = client->HandleIncomingMessage(&message_wrapper.value());
897 }
898
899 if (!result)
900 RaiseError();
901 }
902
903 // mojo::PipeControlMessageHandlerDelegate:
OnPeerAssociatedEndpointClosed(mojo::InterfaceId id,const base::Optional<mojo::DisconnectReason> & reason)904 bool OnPeerAssociatedEndpointClosed(
905 mojo::InterfaceId id,
906 const base::Optional<mojo::DisconnectReason>& reason) override {
907 DCHECK(thread_checker_.CalledOnValidThread());
908
909 scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
910 base::AutoLock locker(lock_);
911 scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr);
912 if (reason)
913 endpoint->set_disconnect_reason(reason);
914 if (!endpoint->peer_closed()) {
915 if (endpoint->client())
916 NotifyEndpointOfError(endpoint.get(), false /* force_async */);
917 MarkPeerClosedAndMaybeRemove(endpoint.get());
918 }
919
920 return true;
921 }
922
923 // Checked in places which must be run on the master endpoint's thread.
924 base::ThreadChecker thread_checker_;
925
926 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
927
928 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
929 const bool set_interface_id_namespace_bit_;
930 bool paused_ = false;
931 std::unique_ptr<mojo::Connector> connector_;
932 mojo::FilterChain filters_;
933 mojo::PipeControlMessageHandler control_message_handler_;
934 ControlMessageProxyThunk control_message_proxy_thunk_;
935
936 // NOTE: It is unsafe to call into this object while holding |lock_|.
937 mojo::PipeControlMessageProxy control_message_proxy_;
938
939 // Guards access to |outgoing_messages_| only. Used to support memory dumps
940 // which may be triggered from any thread.
941 base::Lock outgoing_messages_lock_;
942
943 // Outgoing messages that were sent before this controller was bound to a
944 // real message pipe.
945 std::vector<mojo::Message> outgoing_messages_;
946
947 // Guards the fields below for thread-safe access.
948 base::Lock lock_;
949
950 bool encountered_error_ = false;
951 bool shut_down_ = false;
952
953 // ID #1 is reserved for the mojom::Channel interface.
954 uint32_t next_interface_id_ = 2;
955
956 std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_;
957
958 DISALLOW_COPY_AND_ASSIGN(ChannelAssociatedGroupController);
959 };
960
OnMemoryDump(const base::trace_event::MemoryDumpArgs & args,base::trace_event::ProcessMemoryDump * pmd)961 bool ControllerMemoryDumpProvider::OnMemoryDump(
962 const base::trace_event::MemoryDumpArgs& args,
963 base::trace_event::ProcessMemoryDump* pmd) {
964 base::AutoLock lock(lock_);
965 for (auto* controller : controllers_) {
966 base::trace_event::MemoryAllocatorDump* dump = pmd->CreateAllocatorDump(
967 base::StringPrintf("mojo/queued_ipc_channel_message/0x%" PRIxPTR,
968 reinterpret_cast<uintptr_t>(controller)));
969 dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount,
970 base::trace_event::MemoryAllocatorDump::kUnitsObjects,
971 controller->GetQueuedMessageCount());
972 }
973
974 return true;
975 }
976
977 class MojoBootstrapImpl : public MojoBootstrap {
978 public:
MojoBootstrapImpl(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<ChannelAssociatedGroupController> controller)979 MojoBootstrapImpl(
980 mojo::ScopedMessagePipeHandle handle,
981 const scoped_refptr<ChannelAssociatedGroupController> controller)
982 : controller_(controller),
983 associated_group_(controller),
984 handle_(std::move(handle)) {}
985
~MojoBootstrapImpl()986 ~MojoBootstrapImpl() override {
987 controller_->ShutDown();
988 }
989
990 private:
Connect(mojom::ChannelAssociatedPtr * sender,mojom::ChannelAssociatedRequest * receiver)991 void Connect(mojom::ChannelAssociatedPtr* sender,
992 mojom::ChannelAssociatedRequest* receiver) override {
993 controller_->Bind(std::move(handle_));
994 controller_->CreateChannelEndpoints(sender, receiver);
995 }
996
Pause()997 void Pause() override {
998 controller_->Pause();
999 }
1000
Unpause()1001 void Unpause() override {
1002 controller_->Unpause();
1003 }
1004
Flush()1005 void Flush() override {
1006 controller_->FlushOutgoingMessages();
1007 }
1008
GetAssociatedGroup()1009 mojo::AssociatedGroup* GetAssociatedGroup() override {
1010 return &associated_group_;
1011 }
1012
1013 scoped_refptr<ChannelAssociatedGroupController> controller_;
1014 mojo::AssociatedGroup associated_group_;
1015
1016 mojo::ScopedMessagePipeHandle handle_;
1017
1018 DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl);
1019 };
1020
1021 } // namespace
1022
1023 // static
Create(mojo::ScopedMessagePipeHandle handle,Channel::Mode mode,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)1024 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
1025 mojo::ScopedMessagePipeHandle handle,
1026 Channel::Mode mode,
1027 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
1028 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
1029 return std::make_unique<MojoBootstrapImpl>(
1030 std::move(handle),
1031 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER,
1032 ipc_task_runner, proxy_task_runner));
1033 }
1034
1035 } // namespace IPC
1036