1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6
7 #include <stdint.h>
8
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/single_thread_task_runner.h"
16 #include "base/stl_util.h"
17 #include "base/threading/thread_task_runner_handle.h"
18 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
22
23 namespace mojo {
24 namespace internal {
25
26 // InterfaceEndpoint stores the information of an interface endpoint registered
27 // with the router.
28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
29 // this object.
30 class MultiplexRouter::InterfaceEndpoint
31 : public base::RefCounted<InterfaceEndpoint>,
32 public InterfaceEndpointController {
33 public:
InterfaceEndpoint(MultiplexRouter * router,InterfaceId id)34 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
35 : router_(router),
36 id_(id),
37 closed_(false),
38 peer_closed_(false),
39 handle_created_(false),
40 client_(nullptr),
41 event_signalled_(false) {}
42
43 // ---------------------------------------------------------------------------
44 // The following public methods are safe to call from any threads without
45 // locking.
46
id() const47 InterfaceId id() const { return id_; }
48
49 // ---------------------------------------------------------------------------
50 // The following public methods are called under the router's lock.
51
closed() const52 bool closed() const { return closed_; }
set_closed()53 void set_closed() {
54 router_->AssertLockAcquired();
55 closed_ = true;
56 }
57
peer_closed() const58 bool peer_closed() const { return peer_closed_; }
set_peer_closed()59 void set_peer_closed() {
60 router_->AssertLockAcquired();
61 peer_closed_ = true;
62 }
63
handle_created() const64 bool handle_created() const { return handle_created_; }
set_handle_created()65 void set_handle_created() {
66 router_->AssertLockAcquired();
67 handle_created_ = true;
68 }
69
disconnect_reason() const70 const base::Optional<DisconnectReason>& disconnect_reason() const {
71 return disconnect_reason_;
72 }
set_disconnect_reason(const base::Optional<DisconnectReason> & disconnect_reason)73 void set_disconnect_reason(
74 const base::Optional<DisconnectReason>& disconnect_reason) {
75 router_->AssertLockAcquired();
76 disconnect_reason_ = disconnect_reason;
77 }
78
task_runner() const79 base::SingleThreadTaskRunner* task_runner() const {
80 return task_runner_.get();
81 }
82
client() const83 InterfaceEndpointClient* client() const { return client_; }
84
AttachClient(InterfaceEndpointClient * client,scoped_refptr<base::SingleThreadTaskRunner> runner)85 void AttachClient(InterfaceEndpointClient* client,
86 scoped_refptr<base::SingleThreadTaskRunner> runner) {
87 router_->AssertLockAcquired();
88 DCHECK(!client_);
89 DCHECK(!closed_);
90 DCHECK(runner->BelongsToCurrentThread());
91
92 task_runner_ = std::move(runner);
93 client_ = client;
94 }
95
96 // This method must be called on the same thread as the corresponding
97 // AttachClient() call.
DetachClient()98 void DetachClient() {
99 router_->AssertLockAcquired();
100 DCHECK(client_);
101 DCHECK(task_runner_->BelongsToCurrentThread());
102 DCHECK(!closed_);
103
104 task_runner_ = nullptr;
105 client_ = nullptr;
106 sync_watcher_.reset();
107 }
108
SignalSyncMessageEvent()109 void SignalSyncMessageEvent() {
110 router_->AssertLockAcquired();
111 if (event_signalled_)
112 return;
113
114 event_signalled_ = true;
115 if (!sync_message_event_sender_.is_valid())
116 return;
117
118 MojoResult result =
119 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
120 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
121 DCHECK_EQ(MOJO_RESULT_OK, result);
122 }
123
ResetSyncMessageSignal()124 void ResetSyncMessageSignal() {
125 router_->AssertLockAcquired();
126
127 if (!event_signalled_)
128 return;
129
130 event_signalled_ = false;
131 if (!sync_message_event_receiver_.is_valid())
132 return;
133
134 MojoResult result =
135 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr,
136 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
137 DCHECK_EQ(MOJO_RESULT_OK, result);
138 }
139
140 // ---------------------------------------------------------------------------
141 // The following public methods (i.e., InterfaceEndpointController
142 // implementation) are called by the client on the same thread as the
143 // AttachClient() call. They are called outside of the router's lock.
144
SendMessage(Message * message)145 bool SendMessage(Message* message) override {
146 DCHECK(task_runner_->BelongsToCurrentThread());
147 message->set_interface_id(id_);
148 return router_->connector_.Accept(message);
149 }
150
AllowWokenUpBySyncWatchOnSameThread()151 void AllowWokenUpBySyncWatchOnSameThread() override {
152 DCHECK(task_runner_->BelongsToCurrentThread());
153
154 EnsureSyncWatcherExists();
155 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
156 }
157
SyncWatch(const bool * should_stop)158 bool SyncWatch(const bool* should_stop) override {
159 DCHECK(task_runner_->BelongsToCurrentThread());
160
161 EnsureSyncWatcherExists();
162 return sync_watcher_->SyncWatch(should_stop);
163 }
164
165 private:
166 friend class base::RefCounted<InterfaceEndpoint>;
167
~InterfaceEndpoint()168 ~InterfaceEndpoint() override {
169 router_->AssertLockAcquired();
170
171 DCHECK(!client_);
172 DCHECK(closed_);
173 DCHECK(peer_closed_);
174 DCHECK(!sync_watcher_);
175 }
176
OnHandleReady(MojoResult result)177 void OnHandleReady(MojoResult result) {
178 DCHECK(task_runner_->BelongsToCurrentThread());
179 scoped_refptr<MultiplexRouter> router_protector(router_);
180
181 // Because we never close |sync_message_event_{sender,receiver}_| before
182 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
183 DCHECK_EQ(MOJO_RESULT_OK, result);
184
185 MayAutoLock locker(&router_->lock_);
186 scoped_refptr<InterfaceEndpoint> self_protector(this);
187
188 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
189
190 if (!more_to_process)
191 ResetSyncMessageSignal();
192
193 // Currently there are no queued sync messages and the peer has closed so
194 // there won't be incoming sync messages in the future.
195 if (!more_to_process && peer_closed_) {
196 // If a SyncWatch() call (or multiple ones) of this interface endpoint is
197 // on the call stack, resetting the sync watcher will allow it to exit
198 // when the call stack unwinds to that frame.
199 sync_watcher_.reset();
200 }
201 }
202
EnsureSyncWatcherExists()203 void EnsureSyncWatcherExists() {
204 DCHECK(task_runner_->BelongsToCurrentThread());
205 if (sync_watcher_)
206 return;
207
208 {
209 MayAutoLock locker(&router_->lock_);
210
211 if (!sync_message_event_sender_.is_valid()) {
212 MojoResult result =
213 CreateMessagePipe(nullptr, &sync_message_event_sender_,
214 &sync_message_event_receiver_);
215 DCHECK_EQ(MOJO_RESULT_OK, result);
216
217 if (event_signalled_) {
218 // Reset the flag so that SignalSyncMessageEvent() will actually
219 // signal using the newly-created message pipe.
220 event_signalled_ = false;
221 SignalSyncMessageEvent();
222 }
223 }
224 }
225
226 sync_watcher_.reset(new SyncHandleWatcher(
227 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
228 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
229 }
230
231 // ---------------------------------------------------------------------------
232 // The following members are safe to access from any threads.
233
234 MultiplexRouter* const router_;
235 const InterfaceId id_;
236
237 // ---------------------------------------------------------------------------
238 // The following members are accessed under the router's lock.
239
240 // Whether the endpoint has been closed.
241 bool closed_;
242 // Whether the peer endpoint has been closed.
243 bool peer_closed_;
244
245 // Whether there is already a ScopedInterfaceEndpointHandle created for this
246 // endpoint.
247 bool handle_created_;
248
249 base::Optional<DisconnectReason> disconnect_reason_;
250
251 // The task runner on which |client_|'s methods can be called.
252 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
253 // Not owned. It is null if no client is attached to this endpoint.
254 InterfaceEndpointClient* client_;
255
256 // A message pipe used as an event to signal that sync messages are available.
257 // The message pipe handles are initialized under the router's lock and remain
258 // unchanged afterwards. They may be accessed outside of the router's lock
259 // later.
260 ScopedMessagePipeHandle sync_message_event_sender_;
261 ScopedMessagePipeHandle sync_message_event_receiver_;
262 bool event_signalled_;
263
264 // ---------------------------------------------------------------------------
265 // The following members are only valid while a client is attached. They are
266 // used exclusively on the client's thread. They may be accessed outside of
267 // the router's lock.
268
269 std::unique_ptr<SyncHandleWatcher> sync_watcher_;
270
271 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
272 };
273
274 // MessageWrapper objects are always destroyed under the router's lock. On
275 // destruction, if the message it wrappers contains
276 // ScopedInterfaceEndpointHandles (which cannot be destructed under the
277 // router's lock), the wrapper unlocks to clean them up.
278 class MultiplexRouter::MessageWrapper {
279 public:
280 MessageWrapper() = default;
281
MessageWrapper(MultiplexRouter * router,Message message)282 MessageWrapper(MultiplexRouter* router, Message message)
283 : router_(router), value_(std::move(message)) {}
284
MessageWrapper(MessageWrapper && other)285 MessageWrapper(MessageWrapper&& other)
286 : router_(other.router_), value_(std::move(other.value_)) {}
287
~MessageWrapper()288 ~MessageWrapper() {
289 if (value_.associated_endpoint_handles()->empty())
290 return;
291
292 router_->AssertLockAcquired();
293 {
294 MayAutoUnlock unlocker(&router_->lock_);
295 value_.mutable_associated_endpoint_handles()->clear();
296 }
297 }
298
operator =(MessageWrapper && other)299 MessageWrapper& operator=(MessageWrapper&& other) {
300 router_ = other.router_;
301 value_ = std::move(other.value_);
302 return *this;
303 }
304
value()305 Message& value() { return value_; }
306
307 private:
308 MultiplexRouter* router_ = nullptr;
309 Message value_;
310
311 DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
312 };
313
314 struct MultiplexRouter::Task {
315 public:
316 // Doesn't take ownership of |message| but takes its contents.
CreateMessageTaskmojo::internal::MultiplexRouter::Task317 static std::unique_ptr<Task> CreateMessageTask(
318 MessageWrapper message_wrapper) {
319 Task* task = new Task(MESSAGE);
320 task->message_wrapper = std::move(message_wrapper);
321 return base::WrapUnique(task);
322 }
CreateNotifyErrorTaskmojo::internal::MultiplexRouter::Task323 static std::unique_ptr<Task> CreateNotifyErrorTask(
324 InterfaceEndpoint* endpoint) {
325 Task* task = new Task(NOTIFY_ERROR);
326 task->endpoint_to_notify = endpoint;
327 return base::WrapUnique(task);
328 }
329
~Taskmojo::internal::MultiplexRouter::Task330 ~Task() {}
331
IsMessageTaskmojo::internal::MultiplexRouter::Task332 bool IsMessageTask() const { return type == MESSAGE; }
IsNotifyErrorTaskmojo::internal::MultiplexRouter::Task333 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
334
335 MessageWrapper message_wrapper;
336 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
337
338 enum Type { MESSAGE, NOTIFY_ERROR };
339 Type type;
340
341 private:
Taskmojo::internal::MultiplexRouter::Task342 explicit Task(Type in_type) : type(in_type) {}
343
344 DISALLOW_COPY_AND_ASSIGN(Task);
345 };
346
MultiplexRouter(ScopedMessagePipeHandle message_pipe,Config config,bool set_interface_id_namesapce_bit,scoped_refptr<base::SingleThreadTaskRunner> runner)347 MultiplexRouter::MultiplexRouter(
348 ScopedMessagePipeHandle message_pipe,
349 Config config,
350 bool set_interface_id_namesapce_bit,
351 scoped_refptr<base::SingleThreadTaskRunner> runner)
352 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
353 task_runner_(runner),
354 header_validator_(nullptr),
355 filters_(this),
356 connector_(std::move(message_pipe),
357 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND
358 : Connector::SINGLE_THREADED_SEND,
359 std::move(runner)),
360 control_message_handler_(this),
361 control_message_proxy_(&connector_),
362 next_interface_id_value_(1),
363 posted_to_process_tasks_(false),
364 encountered_error_(false),
365 paused_(false),
366 testing_mode_(false) {
367 DCHECK(task_runner_->BelongsToCurrentThread());
368
369 if (config == MULTI_INTERFACE)
370 lock_.emplace();
371
372 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS ||
373 config == MULTI_INTERFACE) {
374 // Always participate in sync handle watching in multi-interface mode,
375 // because even if it doesn't expect sync requests during sync handle
376 // watching, it may still need to dispatch messages to associated endpoints
377 // on a different thread.
378 connector_.AllowWokenUpBySyncWatchOnSameThread();
379 }
380 connector_.set_incoming_receiver(&filters_);
381 connector_.set_connection_error_handler(
382 base::Bind(&MultiplexRouter::OnPipeConnectionError,
383 base::Unretained(this)));
384
385 std::unique_ptr<MessageHeaderValidator> header_validator =
386 base::MakeUnique<MessageHeaderValidator>();
387 header_validator_ = header_validator.get();
388 filters_.Append(std::move(header_validator));
389 }
390
~MultiplexRouter()391 MultiplexRouter::~MultiplexRouter() {
392 MayAutoLock locker(&lock_);
393
394 sync_message_tasks_.clear();
395 tasks_.clear();
396
397 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
398 InterfaceEndpoint* endpoint = iter->second.get();
399 // Increment the iterator before calling UpdateEndpointStateMayRemove()
400 // because it may remove the corresponding value from the map.
401 ++iter;
402
403 if (!endpoint->closed()) {
404 // This happens when a NotifyPeerEndpointClosed message been received, but
405 // the interface ID hasn't been used to create local endpoint handle.
406 DCHECK(!endpoint->client());
407 DCHECK(endpoint->peer_closed());
408 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
409 } else {
410 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
411 }
412 }
413
414 DCHECK(endpoints_.empty());
415 }
416
SetMasterInterfaceName(const char * name)417 void MultiplexRouter::SetMasterInterfaceName(const char* name) {
418 DCHECK(thread_checker_.CalledOnValidThread());
419 header_validator_->SetDescription(
420 std::string(name) + " [master] MessageHeaderValidator");
421 control_message_handler_.SetDescription(
422 std::string(name) + " [master] PipeControlMessageHandler");
423 connector_.SetWatcherHeapProfilerTag(name);
424 }
425
AssociateInterface(ScopedInterfaceEndpointHandle handle_to_send)426 InterfaceId MultiplexRouter::AssociateInterface(
427 ScopedInterfaceEndpointHandle handle_to_send) {
428 if (!handle_to_send.pending_association())
429 return kInvalidInterfaceId;
430
431 uint32_t id = 0;
432 {
433 MayAutoLock locker(&lock_);
434 do {
435 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
436 next_interface_id_value_ = 1;
437 id = next_interface_id_value_++;
438 if (set_interface_id_namespace_bit_)
439 id |= kInterfaceIdNamespaceMask;
440 } while (base::ContainsKey(endpoints_, id));
441
442 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
443 endpoints_[id] = endpoint;
444 if (encountered_error_)
445 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
446 endpoint->set_handle_created();
447 }
448
449 if (!NotifyAssociation(&handle_to_send, id)) {
450 // The peer handle of |handle_to_send|, which is supposed to join this
451 // associated group, has been closed.
452 {
453 MayAutoLock locker(&lock_);
454 InterfaceEndpoint* endpoint = FindEndpoint(id);
455 if (endpoint)
456 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
457 }
458
459 control_message_proxy_.NotifyPeerEndpointClosed(
460 id, handle_to_send.disconnect_reason());
461 }
462 return id;
463 }
464
CreateLocalEndpointHandle(InterfaceId id)465 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
466 InterfaceId id) {
467 if (!IsValidInterfaceId(id))
468 return ScopedInterfaceEndpointHandle();
469
470 MayAutoLock locker(&lock_);
471 bool inserted = false;
472 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
473 if (inserted) {
474 DCHECK(!endpoint->handle_created());
475
476 if (encountered_error_)
477 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
478 } else {
479 // If the endpoint already exist, it is because we have received a
480 // notification that the peer endpoint has closed.
481 CHECK(!endpoint->closed());
482 CHECK(endpoint->peer_closed());
483
484 if (endpoint->handle_created())
485 return ScopedInterfaceEndpointHandle();
486 }
487
488 endpoint->set_handle_created();
489 return CreateScopedInterfaceEndpointHandle(id);
490 }
491
CloseEndpointHandle(InterfaceId id,const base::Optional<DisconnectReason> & reason)492 void MultiplexRouter::CloseEndpointHandle(
493 InterfaceId id,
494 const base::Optional<DisconnectReason>& reason) {
495 if (!IsValidInterfaceId(id))
496 return;
497
498 MayAutoLock locker(&lock_);
499 DCHECK(base::ContainsKey(endpoints_, id));
500 InterfaceEndpoint* endpoint = endpoints_[id].get();
501 DCHECK(!endpoint->client());
502 DCHECK(!endpoint->closed());
503 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
504
505 if (!IsMasterInterfaceId(id) || reason) {
506 MayAutoUnlock unlocker(&lock_);
507 control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
508 }
509
510 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
511 }
512
AttachEndpointClient(const ScopedInterfaceEndpointHandle & handle,InterfaceEndpointClient * client,scoped_refptr<base::SingleThreadTaskRunner> runner)513 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
514 const ScopedInterfaceEndpointHandle& handle,
515 InterfaceEndpointClient* client,
516 scoped_refptr<base::SingleThreadTaskRunner> runner) {
517 const InterfaceId id = handle.id();
518
519 DCHECK(IsValidInterfaceId(id));
520 DCHECK(client);
521
522 MayAutoLock locker(&lock_);
523 DCHECK(base::ContainsKey(endpoints_, id));
524
525 InterfaceEndpoint* endpoint = endpoints_[id].get();
526 endpoint->AttachClient(client, std::move(runner));
527
528 if (endpoint->peer_closed())
529 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
530 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
531
532 return endpoint;
533 }
534
DetachEndpointClient(const ScopedInterfaceEndpointHandle & handle)535 void MultiplexRouter::DetachEndpointClient(
536 const ScopedInterfaceEndpointHandle& handle) {
537 const InterfaceId id = handle.id();
538
539 DCHECK(IsValidInterfaceId(id));
540
541 MayAutoLock locker(&lock_);
542 DCHECK(base::ContainsKey(endpoints_, id));
543
544 InterfaceEndpoint* endpoint = endpoints_[id].get();
545 endpoint->DetachClient();
546 }
547
RaiseError()548 void MultiplexRouter::RaiseError() {
549 if (task_runner_->BelongsToCurrentThread()) {
550 connector_.RaiseError();
551 } else {
552 task_runner_->PostTask(FROM_HERE,
553 base::Bind(&MultiplexRouter::RaiseError, this));
554 }
555 }
556
CloseMessagePipe()557 void MultiplexRouter::CloseMessagePipe() {
558 DCHECK(thread_checker_.CalledOnValidThread());
559 connector_.CloseMessagePipe();
560 // CloseMessagePipe() above won't trigger connection error handler.
561 // Explicitly call OnPipeConnectionError() so that associated endpoints will
562 // get notified.
563 OnPipeConnectionError();
564 }
565
PauseIncomingMethodCallProcessing()566 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
567 DCHECK(thread_checker_.CalledOnValidThread());
568 connector_.PauseIncomingMethodCallProcessing();
569
570 MayAutoLock locker(&lock_);
571 paused_ = true;
572
573 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter)
574 iter->second->ResetSyncMessageSignal();
575 }
576
ResumeIncomingMethodCallProcessing()577 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
578 DCHECK(thread_checker_.CalledOnValidThread());
579 connector_.ResumeIncomingMethodCallProcessing();
580
581 MayAutoLock locker(&lock_);
582 paused_ = false;
583
584 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
585 auto sync_iter = sync_message_tasks_.find(iter->first);
586 if (iter->second->peer_closed() ||
587 (sync_iter != sync_message_tasks_.end() &&
588 !sync_iter->second.empty())) {
589 iter->second->SignalSyncMessageEvent();
590 }
591 }
592
593 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
594 }
595
HasAssociatedEndpoints() const596 bool MultiplexRouter::HasAssociatedEndpoints() const {
597 DCHECK(thread_checker_.CalledOnValidThread());
598 MayAutoLock locker(&lock_);
599
600 if (endpoints_.size() > 1)
601 return true;
602 if (endpoints_.size() == 0)
603 return false;
604
605 return !base::ContainsKey(endpoints_, kMasterInterfaceId);
606 }
607
EnableTestingMode()608 void MultiplexRouter::EnableTestingMode() {
609 DCHECK(thread_checker_.CalledOnValidThread());
610 MayAutoLock locker(&lock_);
611
612 testing_mode_ = true;
613 connector_.set_enforce_errors_from_incoming_receiver(false);
614 }
615
Accept(Message * message)616 bool MultiplexRouter::Accept(Message* message) {
617 DCHECK(thread_checker_.CalledOnValidThread());
618
619 if (!message->DeserializeAssociatedEndpointHandles(this))
620 return false;
621
622 scoped_refptr<MultiplexRouter> protector(this);
623 MayAutoLock locker(&lock_);
624
625 DCHECK(!paused_);
626
627 ClientCallBehavior client_call_behavior =
628 connector_.during_sync_handle_watcher_callback()
629 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
630 : ALLOW_DIRECT_CLIENT_CALLS;
631
632 bool processed =
633 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
634 connector_.task_runner());
635
636 if (!processed) {
637 // Either the task queue is not empty or we cannot process the message
638 // directly. In both cases, there is no need to call ProcessTasks().
639 tasks_.push_back(
640 Task::CreateMessageTask(MessageWrapper(this, std::move(*message))));
641 Task* task = tasks_.back().get();
642
643 if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
644 InterfaceId id = task->message_wrapper.value().interface_id();
645 sync_message_tasks_[id].push_back(task);
646 InterfaceEndpoint* endpoint = FindEndpoint(id);
647 if (endpoint)
648 endpoint->SignalSyncMessageEvent();
649 }
650 } else if (!tasks_.empty()) {
651 // Processing the message may result in new tasks (for error notification)
652 // being added to the queue. In this case, we have to attempt to process the
653 // tasks.
654 ProcessTasks(client_call_behavior, connector_.task_runner());
655 }
656
657 // Always return true. If we see errors during message processing, we will
658 // explicitly call Connector::RaiseError() to disconnect the message pipe.
659 return true;
660 }
661
OnPeerAssociatedEndpointClosed(InterfaceId id,const base::Optional<DisconnectReason> & reason)662 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
663 InterfaceId id,
664 const base::Optional<DisconnectReason>& reason) {
665 DCHECK(!IsMasterInterfaceId(id) || reason);
666
667 MayAutoLock locker(&lock_);
668 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
669
670 if (reason)
671 endpoint->set_disconnect_reason(reason);
672
673 // It is possible that this endpoint has been set as peer closed. That is
674 // because when the message pipe is closed, all the endpoints are updated with
675 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
676 // as long as there are refs keeping the router alive. If there is a
677 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
678 // here and see that the endpoint has been marked as peer closed.
679 if (!endpoint->peer_closed()) {
680 if (endpoint->client())
681 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
682 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
683 }
684
685 // No need to trigger a ProcessTasks() because it is already on the stack.
686
687 return true;
688 }
689
OnPipeConnectionError()690 void MultiplexRouter::OnPipeConnectionError() {
691 DCHECK(thread_checker_.CalledOnValidThread());
692
693 scoped_refptr<MultiplexRouter> protector(this);
694 MayAutoLock locker(&lock_);
695
696 encountered_error_ = true;
697
698 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
699 InterfaceEndpoint* endpoint = iter->second.get();
700 // Increment the iterator before calling UpdateEndpointStateMayRemove()
701 // because it may remove the corresponding value from the map.
702 ++iter;
703
704 if (endpoint->client())
705 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
706
707 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
708 }
709
710 ProcessTasks(connector_.during_sync_handle_watcher_callback()
711 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
712 : ALLOW_DIRECT_CLIENT_CALLS,
713 connector_.task_runner());
714 }
715
ProcessTasks(ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)716 void MultiplexRouter::ProcessTasks(
717 ClientCallBehavior client_call_behavior,
718 base::SingleThreadTaskRunner* current_task_runner) {
719 AssertLockAcquired();
720
721 if (posted_to_process_tasks_)
722 return;
723
724 while (!tasks_.empty() && !paused_) {
725 std::unique_ptr<Task> task(std::move(tasks_.front()));
726 tasks_.pop_front();
727
728 InterfaceId id = kInvalidInterfaceId;
729 bool sync_message =
730 task->IsMessageTask() && !task->message_wrapper.value().IsNull() &&
731 task->message_wrapper.value().has_flag(Message::kFlagIsSync);
732 if (sync_message) {
733 id = task->message_wrapper.value().interface_id();
734 auto& sync_message_queue = sync_message_tasks_[id];
735 DCHECK_EQ(task.get(), sync_message_queue.front());
736 sync_message_queue.pop_front();
737 }
738
739 bool processed =
740 task->IsNotifyErrorTask()
741 ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
742 current_task_runner)
743 : ProcessIncomingMessage(&task->message_wrapper.value(),
744 client_call_behavior, current_task_runner);
745
746 if (!processed) {
747 if (sync_message) {
748 auto& sync_message_queue = sync_message_tasks_[id];
749 sync_message_queue.push_front(task.get());
750 }
751 tasks_.push_front(std::move(task));
752 break;
753 } else {
754 if (sync_message) {
755 auto iter = sync_message_tasks_.find(id);
756 if (iter != sync_message_tasks_.end() && iter->second.empty())
757 sync_message_tasks_.erase(iter);
758 }
759 }
760 }
761 }
762
ProcessFirstSyncMessageForEndpoint(InterfaceId id)763 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
764 AssertLockAcquired();
765
766 auto iter = sync_message_tasks_.find(id);
767 if (iter == sync_message_tasks_.end())
768 return false;
769
770 if (paused_)
771 return true;
772
773 MultiplexRouter::Task* task = iter->second.front();
774 iter->second.pop_front();
775
776 DCHECK(task->IsMessageTask());
777 MessageWrapper message_wrapper = std::move(task->message_wrapper);
778
779 // Note: after this call, |task| and |iter| may be invalidated.
780 bool processed = ProcessIncomingMessage(
781 &message_wrapper.value(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES,
782 nullptr);
783 DCHECK(processed);
784
785 iter = sync_message_tasks_.find(id);
786 if (iter == sync_message_tasks_.end())
787 return false;
788
789 if (iter->second.empty()) {
790 sync_message_tasks_.erase(iter);
791 return false;
792 }
793
794 return true;
795 }
796
ProcessNotifyErrorTask(Task * task,ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)797 bool MultiplexRouter::ProcessNotifyErrorTask(
798 Task* task,
799 ClientCallBehavior client_call_behavior,
800 base::SingleThreadTaskRunner* current_task_runner) {
801 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
802 DCHECK(!paused_);
803
804 AssertLockAcquired();
805 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
806 if (!endpoint->client())
807 return true;
808
809 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
810 endpoint->task_runner() != current_task_runner) {
811 MaybePostToProcessTasks(endpoint->task_runner());
812 return false;
813 }
814
815 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
816
817 InterfaceEndpointClient* client = endpoint->client();
818 base::Optional<DisconnectReason> disconnect_reason(
819 endpoint->disconnect_reason());
820
821 {
822 // We must unlock before calling into |client| because it may call this
823 // object within NotifyError(). Holding the lock will lead to deadlock.
824 //
825 // It is safe to call into |client| without the lock. Because |client| is
826 // always accessed on the same thread, including DetachEndpointClient().
827 MayAutoUnlock unlocker(&lock_);
828 client->NotifyError(disconnect_reason);
829 }
830 return true;
831 }
832
ProcessIncomingMessage(Message * message,ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)833 bool MultiplexRouter::ProcessIncomingMessage(
834 Message* message,
835 ClientCallBehavior client_call_behavior,
836 base::SingleThreadTaskRunner* current_task_runner) {
837 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
838 DCHECK(!paused_);
839 DCHECK(message);
840 AssertLockAcquired();
841
842 if (message->IsNull()) {
843 // This is a sync message and has been processed during sync handle
844 // watching.
845 return true;
846 }
847
848 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
849 bool result = false;
850
851 {
852 MayAutoUnlock unlocker(&lock_);
853 result = control_message_handler_.Accept(message);
854 }
855
856 if (!result)
857 RaiseErrorInNonTestingMode();
858
859 return true;
860 }
861
862 InterfaceId id = message->interface_id();
863 DCHECK(IsValidInterfaceId(id));
864
865 InterfaceEndpoint* endpoint = FindEndpoint(id);
866 if (!endpoint || endpoint->closed())
867 return true;
868
869 if (!endpoint->client()) {
870 // We need to wait until a client is attached in order to dispatch further
871 // messages.
872 return false;
873 }
874
875 bool can_direct_call;
876 if (message->has_flag(Message::kFlagIsSync)) {
877 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
878 endpoint->task_runner()->BelongsToCurrentThread();
879 } else {
880 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
881 endpoint->task_runner() == current_task_runner;
882 }
883
884 if (!can_direct_call) {
885 MaybePostToProcessTasks(endpoint->task_runner());
886 return false;
887 }
888
889 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
890
891 InterfaceEndpointClient* client = endpoint->client();
892 bool result = false;
893 {
894 // We must unlock before calling into |client| because it may call this
895 // object within HandleIncomingMessage(). Holding the lock will lead to
896 // deadlock.
897 //
898 // It is safe to call into |client| without the lock. Because |client| is
899 // always accessed on the same thread, including DetachEndpointClient().
900 MayAutoUnlock unlocker(&lock_);
901 result = client->HandleIncomingMessage(message);
902 }
903 if (!result)
904 RaiseErrorInNonTestingMode();
905
906 return true;
907 }
908
MaybePostToProcessTasks(base::SingleThreadTaskRunner * task_runner)909 void MultiplexRouter::MaybePostToProcessTasks(
910 base::SingleThreadTaskRunner* task_runner) {
911 AssertLockAcquired();
912 if (posted_to_process_tasks_)
913 return;
914
915 posted_to_process_tasks_ = true;
916 posted_to_task_runner_ = task_runner;
917 task_runner->PostTask(
918 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
919 }
920
LockAndCallProcessTasks()921 void MultiplexRouter::LockAndCallProcessTasks() {
922 // There is no need to hold a ref to this class in this case because this is
923 // always called using base::Bind(), which holds a ref.
924 MayAutoLock locker(&lock_);
925 posted_to_process_tasks_ = false;
926 scoped_refptr<base::SingleThreadTaskRunner> runner(
927 std::move(posted_to_task_runner_));
928 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
929 }
930
UpdateEndpointStateMayRemove(InterfaceEndpoint * endpoint,EndpointStateUpdateType type)931 void MultiplexRouter::UpdateEndpointStateMayRemove(
932 InterfaceEndpoint* endpoint,
933 EndpointStateUpdateType type) {
934 if (type == ENDPOINT_CLOSED) {
935 endpoint->set_closed();
936 } else {
937 endpoint->set_peer_closed();
938 // If the interface endpoint is performing a sync watch, this makes sure
939 // it is notified and eventually exits the sync watch.
940 endpoint->SignalSyncMessageEvent();
941 }
942 if (endpoint->closed() && endpoint->peer_closed())
943 endpoints_.erase(endpoint->id());
944 }
945
RaiseErrorInNonTestingMode()946 void MultiplexRouter::RaiseErrorInNonTestingMode() {
947 AssertLockAcquired();
948 if (!testing_mode_)
949 RaiseError();
950 }
951
FindOrInsertEndpoint(InterfaceId id,bool * inserted)952 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
953 InterfaceId id,
954 bool* inserted) {
955 AssertLockAcquired();
956 // Either |inserted| is nullptr or it points to a boolean initialized as
957 // false.
958 DCHECK(!inserted || !*inserted);
959
960 InterfaceEndpoint* endpoint = FindEndpoint(id);
961 if (!endpoint) {
962 endpoint = new InterfaceEndpoint(this, id);
963 endpoints_[id] = endpoint;
964 if (inserted)
965 *inserted = true;
966 }
967
968 return endpoint;
969 }
970
FindEndpoint(InterfaceId id)971 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint(
972 InterfaceId id) {
973 AssertLockAcquired();
974 auto iter = endpoints_.find(id);
975 return iter != endpoints_.end() ? iter->second.get() : nullptr;
976 }
977
AssertLockAcquired()978 void MultiplexRouter::AssertLockAcquired() {
979 #if DCHECK_IS_ON()
980 if (lock_)
981 lock_->AssertAcquired();
982 #endif
983 }
984
985 } // namespace internal
986 } // namespace mojo
987