1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6
7 #include <stdint.h>
8
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/sequenced_task_runner.h"
16 #include "base/stl_util.h"
17 #include "base/synchronization/waitable_event.h"
18 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
21 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
22
23 namespace mojo {
24 namespace internal {
25
26 // InterfaceEndpoint stores the information of an interface endpoint registered
27 // with the router.
28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
29 // this object.
30 class MultiplexRouter::InterfaceEndpoint
31 : public base::RefCountedThreadSafe<InterfaceEndpoint>,
32 public InterfaceEndpointController {
33 public:
InterfaceEndpoint(MultiplexRouter * router,InterfaceId id)34 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
35 : router_(router),
36 id_(id),
37 closed_(false),
38 peer_closed_(false),
39 handle_created_(false),
40 client_(nullptr) {}
41
42 // ---------------------------------------------------------------------------
43 // The following public methods are safe to call from any sequence without
44 // locking.
45
id() const46 InterfaceId id() const { return id_; }
47
48 // ---------------------------------------------------------------------------
49 // The following public methods are called under the router's lock.
50
closed() const51 bool closed() const { return closed_; }
set_closed()52 void set_closed() {
53 router_->AssertLockAcquired();
54 closed_ = true;
55 }
56
peer_closed() const57 bool peer_closed() const { return peer_closed_; }
set_peer_closed()58 void set_peer_closed() {
59 router_->AssertLockAcquired();
60 peer_closed_ = true;
61 }
62
handle_created() const63 bool handle_created() const { return handle_created_; }
set_handle_created()64 void set_handle_created() {
65 router_->AssertLockAcquired();
66 handle_created_ = true;
67 }
68
disconnect_reason() const69 const base::Optional<DisconnectReason>& disconnect_reason() const {
70 return disconnect_reason_;
71 }
set_disconnect_reason(const base::Optional<DisconnectReason> & disconnect_reason)72 void set_disconnect_reason(
73 const base::Optional<DisconnectReason>& disconnect_reason) {
74 router_->AssertLockAcquired();
75 disconnect_reason_ = disconnect_reason;
76 }
77
task_runner() const78 base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); }
79
client() const80 InterfaceEndpointClient* client() const { return client_; }
81
AttachClient(InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)82 void AttachClient(InterfaceEndpointClient* client,
83 scoped_refptr<base::SequencedTaskRunner> runner) {
84 router_->AssertLockAcquired();
85 DCHECK(!client_);
86 DCHECK(!closed_);
87 DCHECK(runner->RunsTasksInCurrentSequence());
88
89 task_runner_ = std::move(runner);
90 client_ = client;
91 }
92
93 // This method must be called on the same sequence as the corresponding
94 // AttachClient() call.
DetachClient()95 void DetachClient() {
96 router_->AssertLockAcquired();
97 DCHECK(client_);
98 DCHECK(task_runner_->RunsTasksInCurrentSequence());
99 DCHECK(!closed_);
100
101 task_runner_ = nullptr;
102 client_ = nullptr;
103 sync_watcher_.reset();
104 }
105
SignalSyncMessageEvent()106 void SignalSyncMessageEvent() {
107 router_->AssertLockAcquired();
108 if (sync_message_event_signaled_)
109 return;
110 sync_message_event_signaled_ = true;
111 if (sync_watcher_)
112 sync_watcher_->SignalEvent();
113 }
114
ResetSyncMessageSignal()115 void ResetSyncMessageSignal() {
116 router_->AssertLockAcquired();
117 if (!sync_message_event_signaled_)
118 return;
119 sync_message_event_signaled_ = false;
120 if (sync_watcher_)
121 sync_watcher_->ResetEvent();
122 }
123
124 // ---------------------------------------------------------------------------
125 // The following public methods (i.e., InterfaceEndpointController
126 // implementation) are called by the client on the same sequence as the
127 // AttachClient() call. They are called outside of the router's lock.
128
SendMessage(Message * message)129 bool SendMessage(Message* message) override {
130 DCHECK(task_runner_->RunsTasksInCurrentSequence());
131 message->set_interface_id(id_);
132 return router_->connector_.Accept(message);
133 }
134
AllowWokenUpBySyncWatchOnSameThread()135 void AllowWokenUpBySyncWatchOnSameThread() override {
136 DCHECK(task_runner_->RunsTasksInCurrentSequence());
137
138 EnsureSyncWatcherExists();
139 sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
140 }
141
SyncWatch(const bool * should_stop)142 bool SyncWatch(const bool* should_stop) override {
143 DCHECK(task_runner_->RunsTasksInCurrentSequence());
144
145 EnsureSyncWatcherExists();
146 return sync_watcher_->SyncWatch(should_stop);
147 }
148
149 private:
150 friend class base::RefCountedThreadSafe<InterfaceEndpoint>;
151
~InterfaceEndpoint()152 ~InterfaceEndpoint() override {
153 router_->AssertLockAcquired();
154
155 DCHECK(!client_);
156 }
157
OnSyncEventSignaled()158 void OnSyncEventSignaled() {
159 DCHECK(task_runner_->RunsTasksInCurrentSequence());
160 scoped_refptr<MultiplexRouter> router_protector(router_);
161
162 MayAutoLock locker(&router_->lock_);
163 scoped_refptr<InterfaceEndpoint> self_protector(this);
164
165 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
166
167 if (!more_to_process)
168 ResetSyncMessageSignal();
169
170 // Currently there are no queued sync messages and the peer has closed so
171 // there won't be incoming sync messages in the future.
172 if (!more_to_process && peer_closed_) {
173 // If a SyncWatch() call (or multiple ones) of this interface endpoint is
174 // on the call stack, resetting the sync watcher will allow it to exit
175 // when the call stack unwinds to that frame.
176 sync_watcher_.reset();
177 }
178 }
179
EnsureSyncWatcherExists()180 void EnsureSyncWatcherExists() {
181 DCHECK(task_runner_->RunsTasksInCurrentSequence());
182 if (sync_watcher_)
183 return;
184
185 MayAutoLock locker(&router_->lock_);
186 sync_watcher_ =
187 std::make_unique<SequenceLocalSyncEventWatcher>(base::BindRepeating(
188 &InterfaceEndpoint::OnSyncEventSignaled, base::Unretained(this)));
189 if (sync_message_event_signaled_)
190 sync_watcher_->SignalEvent();
191 }
192
193 // ---------------------------------------------------------------------------
194 // The following members are safe to access from any sequence.
195
196 MultiplexRouter* const router_;
197 const InterfaceId id_;
198
199 // ---------------------------------------------------------------------------
200 // The following members are accessed under the router's lock.
201
202 // Whether the endpoint has been closed.
203 bool closed_;
204 // Whether the peer endpoint has been closed.
205 bool peer_closed_;
206
207 // Whether there is already a ScopedInterfaceEndpointHandle created for this
208 // endpoint.
209 bool handle_created_;
210
211 base::Optional<DisconnectReason> disconnect_reason_;
212
213 // The task runner on which |client_|'s methods can be called.
214 scoped_refptr<base::SequencedTaskRunner> task_runner_;
215 // Not owned. It is null if no client is attached to this endpoint.
216 InterfaceEndpointClient* client_;
217
218 // Indicates whether the sync watcher should be signaled for this endpoint.
219 bool sync_message_event_signaled_ = false;
220
221 // Guarded by the router's lock. Used to synchronously wait on replies.
222 std::unique_ptr<SequenceLocalSyncEventWatcher> sync_watcher_;
223
224 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
225 };
226
227 // MessageWrapper objects are always destroyed under the router's lock. On
228 // destruction, if the message it wrappers contains interface IDs, the wrapper
229 // closes the corresponding endpoints.
230 class MultiplexRouter::MessageWrapper {
231 public:
232 MessageWrapper() = default;
233
MessageWrapper(MultiplexRouter * router,Message message)234 MessageWrapper(MultiplexRouter* router, Message message)
235 : router_(router), value_(std::move(message)) {}
236
MessageWrapper(MessageWrapper && other)237 MessageWrapper(MessageWrapper&& other)
238 : router_(other.router_), value_(std::move(other.value_)) {}
239
~MessageWrapper()240 ~MessageWrapper() {
241 if (!router_ || value_.IsNull())
242 return;
243
244 router_->AssertLockAcquired();
245 // Don't try to close the endpoints if at this point the router is already
246 // half-destructed.
247 if (!router_->being_destructed_)
248 router_->CloseEndpointsForMessage(value_);
249 }
250
operator =(MessageWrapper && other)251 MessageWrapper& operator=(MessageWrapper&& other) {
252 router_ = other.router_;
253 value_ = std::move(other.value_);
254 return *this;
255 }
256
value() const257 const Message& value() const { return value_; }
258
259 // Must be called outside of the router's lock.
260 // Returns a null message if it fails to deseralize the associated endpoint
261 // handles.
DeserializeEndpointHandlesAndTake()262 Message DeserializeEndpointHandlesAndTake() {
263 if (!value_.DeserializeAssociatedEndpointHandles(router_)) {
264 // The previous call may have deserialized part of the associated
265 // interface endpoint handles. They must be destroyed outside of the
266 // router's lock, so we cannot wait until destruction of MessageWrapper.
267 value_.Reset();
268 return Message();
269 }
270 return std::move(value_);
271 }
272
273 private:
274 MultiplexRouter* router_ = nullptr;
275 Message value_;
276
277 DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
278 };
279
280 struct MultiplexRouter::Task {
281 public:
282 // Doesn't take ownership of |message| but takes its contents.
CreateMessageTaskmojo::internal::MultiplexRouter::Task283 static std::unique_ptr<Task> CreateMessageTask(
284 MessageWrapper message_wrapper) {
285 Task* task = new Task(MESSAGE);
286 task->message_wrapper = std::move(message_wrapper);
287 return base::WrapUnique(task);
288 }
CreateNotifyErrorTaskmojo::internal::MultiplexRouter::Task289 static std::unique_ptr<Task> CreateNotifyErrorTask(
290 InterfaceEndpoint* endpoint) {
291 Task* task = new Task(NOTIFY_ERROR);
292 task->endpoint_to_notify = endpoint;
293 return base::WrapUnique(task);
294 }
295
~Taskmojo::internal::MultiplexRouter::Task296 ~Task() {}
297
IsMessageTaskmojo::internal::MultiplexRouter::Task298 bool IsMessageTask() const { return type == MESSAGE; }
IsNotifyErrorTaskmojo::internal::MultiplexRouter::Task299 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
300
301 MessageWrapper message_wrapper;
302 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
303
304 enum Type { MESSAGE, NOTIFY_ERROR };
305 Type type;
306
307 private:
Taskmojo::internal::MultiplexRouter::Task308 explicit Task(Type in_type) : type(in_type) {}
309
310 DISALLOW_COPY_AND_ASSIGN(Task);
311 };
312
MultiplexRouter(ScopedMessagePipeHandle message_pipe,Config config,bool set_interface_id_namesapce_bit,scoped_refptr<base::SequencedTaskRunner> runner)313 MultiplexRouter::MultiplexRouter(
314 ScopedMessagePipeHandle message_pipe,
315 Config config,
316 bool set_interface_id_namesapce_bit,
317 scoped_refptr<base::SequencedTaskRunner> runner)
318 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
319 task_runner_(runner),
320 filters_(this),
321 connector_(std::move(message_pipe),
322 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND
323 : Connector::SINGLE_THREADED_SEND,
324 std::move(runner)),
325 control_message_handler_(this),
326 control_message_proxy_(&connector_) {
327 DCHECK(task_runner_->RunsTasksInCurrentSequence());
328
329 if (config == MULTI_INTERFACE)
330 lock_.emplace();
331
332 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS ||
333 config == MULTI_INTERFACE) {
334 // Always participate in sync handle watching in multi-interface mode,
335 // because even if it doesn't expect sync requests during sync handle
336 // watching, it may still need to dispatch messages to associated endpoints
337 // on a different sequence.
338 connector_.AllowWokenUpBySyncWatchOnSameThread();
339 }
340 connector_.set_incoming_receiver(&filters_);
341 connector_.set_connection_error_handler(base::Bind(
342 &MultiplexRouter::OnPipeConnectionError, base::Unretained(this)));
343
344 std::unique_ptr<MessageHeaderValidator> header_validator =
345 std::make_unique<MessageHeaderValidator>();
346 header_validator_ = header_validator.get();
347 filters_.Append(std::move(header_validator));
348 }
349
~MultiplexRouter()350 MultiplexRouter::~MultiplexRouter() {
351 MayAutoLock locker(&lock_);
352
353 being_destructed_ = true;
354
355 sync_message_tasks_.clear();
356 tasks_.clear();
357 endpoints_.clear();
358 }
359
AddIncomingMessageFilter(std::unique_ptr<MessageReceiver> filter)360 void MultiplexRouter::AddIncomingMessageFilter(
361 std::unique_ptr<MessageReceiver> filter) {
362 filters_.Append(std::move(filter));
363 }
364
SetMasterInterfaceName(const char * name)365 void MultiplexRouter::SetMasterInterfaceName(const char* name) {
366 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
367 header_validator_->SetDescription(std::string(name) +
368 " [master] MessageHeaderValidator");
369 control_message_handler_.SetDescription(
370 std::string(name) + " [master] PipeControlMessageHandler");
371 connector_.SetWatcherHeapProfilerTag(name);
372 }
373
AssociateInterface(ScopedInterfaceEndpointHandle handle_to_send)374 InterfaceId MultiplexRouter::AssociateInterface(
375 ScopedInterfaceEndpointHandle handle_to_send) {
376 if (!handle_to_send.pending_association())
377 return kInvalidInterfaceId;
378
379 uint32_t id = 0;
380 {
381 MayAutoLock locker(&lock_);
382 do {
383 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
384 next_interface_id_value_ = 1;
385 id = next_interface_id_value_++;
386 if (set_interface_id_namespace_bit_)
387 id |= kInterfaceIdNamespaceMask;
388 } while (base::ContainsKey(endpoints_, id));
389
390 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
391 endpoints_[id] = endpoint;
392 if (encountered_error_)
393 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
394 endpoint->set_handle_created();
395 }
396
397 if (!NotifyAssociation(&handle_to_send, id)) {
398 // The peer handle of |handle_to_send|, which is supposed to join this
399 // associated group, has been closed.
400 {
401 MayAutoLock locker(&lock_);
402 InterfaceEndpoint* endpoint = FindEndpoint(id);
403 if (endpoint)
404 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
405 }
406
407 control_message_proxy_.NotifyPeerEndpointClosed(
408 id, handle_to_send.disconnect_reason());
409 }
410 return id;
411 }
412
CreateLocalEndpointHandle(InterfaceId id)413 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
414 InterfaceId id) {
415 if (!IsValidInterfaceId(id))
416 return ScopedInterfaceEndpointHandle();
417
418 MayAutoLock locker(&lock_);
419 bool inserted = false;
420 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
421 if (inserted) {
422 if (encountered_error_)
423 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
424 } else {
425 if (endpoint->handle_created() || endpoint->closed())
426 return ScopedInterfaceEndpointHandle();
427 }
428
429 endpoint->set_handle_created();
430 return CreateScopedInterfaceEndpointHandle(id);
431 }
432
CloseEndpointHandle(InterfaceId id,const base::Optional<DisconnectReason> & reason)433 void MultiplexRouter::CloseEndpointHandle(
434 InterfaceId id,
435 const base::Optional<DisconnectReason>& reason) {
436 if (!IsValidInterfaceId(id))
437 return;
438
439 MayAutoLock locker(&lock_);
440 DCHECK(base::ContainsKey(endpoints_, id));
441 InterfaceEndpoint* endpoint = endpoints_[id].get();
442 DCHECK(!endpoint->client());
443 DCHECK(!endpoint->closed());
444 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
445
446 if (!IsMasterInterfaceId(id) || reason) {
447 MayAutoUnlock unlocker(&lock_);
448 control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
449 }
450
451 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
452 }
453
AttachEndpointClient(const ScopedInterfaceEndpointHandle & handle,InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)454 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
455 const ScopedInterfaceEndpointHandle& handle,
456 InterfaceEndpointClient* client,
457 scoped_refptr<base::SequencedTaskRunner> runner) {
458 const InterfaceId id = handle.id();
459
460 DCHECK(IsValidInterfaceId(id));
461 DCHECK(client);
462
463 MayAutoLock locker(&lock_);
464 DCHECK(base::ContainsKey(endpoints_, id));
465
466 InterfaceEndpoint* endpoint = endpoints_[id].get();
467 endpoint->AttachClient(client, std::move(runner));
468
469 if (endpoint->peer_closed())
470 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
471 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
472
473 return endpoint;
474 }
475
DetachEndpointClient(const ScopedInterfaceEndpointHandle & handle)476 void MultiplexRouter::DetachEndpointClient(
477 const ScopedInterfaceEndpointHandle& handle) {
478 const InterfaceId id = handle.id();
479
480 DCHECK(IsValidInterfaceId(id));
481
482 MayAutoLock locker(&lock_);
483 DCHECK(base::ContainsKey(endpoints_, id));
484
485 InterfaceEndpoint* endpoint = endpoints_[id].get();
486 endpoint->DetachClient();
487 }
488
RaiseError()489 void MultiplexRouter::RaiseError() {
490 if (task_runner_->RunsTasksInCurrentSequence()) {
491 connector_.RaiseError();
492 } else {
493 task_runner_->PostTask(FROM_HERE,
494 base::Bind(&MultiplexRouter::RaiseError, this));
495 }
496 }
497
PrefersSerializedMessages()498 bool MultiplexRouter::PrefersSerializedMessages() {
499 MayAutoLock locker(&lock_);
500 return connector_.PrefersSerializedMessages();
501 }
502
CloseMessagePipe()503 void MultiplexRouter::CloseMessagePipe() {
504 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
505 connector_.CloseMessagePipe();
506 // CloseMessagePipe() above won't trigger connection error handler.
507 // Explicitly call OnPipeConnectionError() so that associated endpoints will
508 // get notified.
509 OnPipeConnectionError();
510 }
511
PauseIncomingMethodCallProcessing()512 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
513 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
514 connector_.PauseIncomingMethodCallProcessing();
515
516 MayAutoLock locker(&lock_);
517 paused_ = true;
518
519 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter)
520 iter->second->ResetSyncMessageSignal();
521 }
522
ResumeIncomingMethodCallProcessing()523 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
524 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
525 connector_.ResumeIncomingMethodCallProcessing();
526
527 MayAutoLock locker(&lock_);
528 paused_ = false;
529
530 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
531 auto sync_iter = sync_message_tasks_.find(iter->first);
532 if (iter->second->peer_closed() ||
533 (sync_iter != sync_message_tasks_.end() &&
534 !sync_iter->second.empty())) {
535 iter->second->SignalSyncMessageEvent();
536 }
537 }
538
539 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
540 }
541
HasAssociatedEndpoints() const542 bool MultiplexRouter::HasAssociatedEndpoints() const {
543 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
544 MayAutoLock locker(&lock_);
545
546 if (endpoints_.size() > 1)
547 return true;
548 if (endpoints_.size() == 0)
549 return false;
550
551 return !base::ContainsKey(endpoints_, kMasterInterfaceId);
552 }
553
EnableTestingMode()554 void MultiplexRouter::EnableTestingMode() {
555 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
556 MayAutoLock locker(&lock_);
557
558 testing_mode_ = true;
559 connector_.set_enforce_errors_from_incoming_receiver(false);
560 }
561
Accept(Message * message)562 bool MultiplexRouter::Accept(Message* message) {
563 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
564
565 // Insert endpoints for the payload interface IDs as soon as the message
566 // arrives, instead of waiting till the message is dispatched. Consider the
567 // following sequence:
568 // 1) Async message msg1 arrives, containing interface ID x. Msg1 is not
569 // dispatched because a sync call is blocking the thread.
570 // 2) Sync message msg2 arrives targeting interface ID x.
571 //
572 // If we don't insert endpoint for interface ID x, when trying to dispatch
573 // msg2 we don't know whether it is an unexpected message or it is just
574 // because the message containing x hasn't been dispatched.
575 if (!InsertEndpointsForMessage(*message))
576 return false;
577
578 scoped_refptr<MultiplexRouter> protector(this);
579 MayAutoLock locker(&lock_);
580
581 DCHECK(!paused_);
582
583 ClientCallBehavior client_call_behavior =
584 connector_.during_sync_handle_watcher_callback()
585 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
586 : ALLOW_DIRECT_CLIENT_CALLS;
587
588 MessageWrapper message_wrapper(this, std::move(*message));
589 bool processed = tasks_.empty() && ProcessIncomingMessage(
590 &message_wrapper, client_call_behavior,
591 connector_.task_runner());
592
593 if (!processed) {
594 // Either the task queue is not empty or we cannot process the message
595 // directly. In both cases, there is no need to call ProcessTasks().
596 tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper)));
597 Task* task = tasks_.back().get();
598
599 if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
600 InterfaceId id = task->message_wrapper.value().interface_id();
601 sync_message_tasks_[id].push_back(task);
602 InterfaceEndpoint* endpoint = FindEndpoint(id);
603 if (endpoint)
604 endpoint->SignalSyncMessageEvent();
605 }
606 } else if (!tasks_.empty()) {
607 // Processing the message may result in new tasks (for error notification)
608 // being added to the queue. In this case, we have to attempt to process the
609 // tasks.
610 ProcessTasks(client_call_behavior, connector_.task_runner());
611 }
612
613 // Always return true. If we see errors during message processing, we will
614 // explicitly call Connector::RaiseError() to disconnect the message pipe.
615 return true;
616 }
617
OnPeerAssociatedEndpointClosed(InterfaceId id,const base::Optional<DisconnectReason> & reason)618 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
619 InterfaceId id,
620 const base::Optional<DisconnectReason>& reason) {
621 MayAutoLock locker(&lock_);
622 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
623
624 if (reason)
625 endpoint->set_disconnect_reason(reason);
626
627 // It is possible that this endpoint has been set as peer closed. That is
628 // because when the message pipe is closed, all the endpoints are updated with
629 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
630 // as long as there are refs keeping the router alive. If there is a
631 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
632 // here and see that the endpoint has been marked as peer closed.
633 if (!endpoint->peer_closed()) {
634 if (endpoint->client())
635 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
636 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
637 }
638
639 // No need to trigger a ProcessTasks() because it is already on the stack.
640
641 return true;
642 }
643
OnPipeConnectionError()644 void MultiplexRouter::OnPipeConnectionError() {
645 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
646
647 scoped_refptr<MultiplexRouter> protector(this);
648 MayAutoLock locker(&lock_);
649
650 encountered_error_ = true;
651
652 // Calling UpdateEndpointStateMayRemove() may remove the corresponding value
653 // from |endpoints_| and invalidate any iterator of |endpoints_|. Therefore,
654 // copy the endpoint pointers to a vector and iterate over it instead.
655 std::vector<scoped_refptr<InterfaceEndpoint>> endpoint_vector;
656 endpoint_vector.reserve(endpoints_.size());
657 for (const auto& pair : endpoints_)
658 endpoint_vector.push_back(pair.second);
659
660 for (const auto& endpoint : endpoint_vector) {
661 if (endpoint->client())
662 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint.get()));
663
664 UpdateEndpointStateMayRemove(endpoint.get(), PEER_ENDPOINT_CLOSED);
665 }
666
667 ProcessTasks(connector_.during_sync_handle_watcher_callback()
668 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
669 : ALLOW_DIRECT_CLIENT_CALLS,
670 connector_.task_runner());
671 }
672
ProcessTasks(ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)673 void MultiplexRouter::ProcessTasks(
674 ClientCallBehavior client_call_behavior,
675 base::SequencedTaskRunner* current_task_runner) {
676 AssertLockAcquired();
677
678 if (posted_to_process_tasks_)
679 return;
680
681 while (!tasks_.empty() && !paused_) {
682 std::unique_ptr<Task> task(std::move(tasks_.front()));
683 tasks_.pop_front();
684
685 InterfaceId id = kInvalidInterfaceId;
686 bool sync_message =
687 task->IsMessageTask() && !task->message_wrapper.value().IsNull() &&
688 task->message_wrapper.value().has_flag(Message::kFlagIsSync);
689 if (sync_message) {
690 id = task->message_wrapper.value().interface_id();
691 auto& sync_message_queue = sync_message_tasks_[id];
692 DCHECK_EQ(task.get(), sync_message_queue.front());
693 sync_message_queue.pop_front();
694 }
695
696 bool processed =
697 task->IsNotifyErrorTask()
698 ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
699 current_task_runner)
700 : ProcessIncomingMessage(&task->message_wrapper,
701 client_call_behavior, current_task_runner);
702
703 if (!processed) {
704 if (sync_message) {
705 auto& sync_message_queue = sync_message_tasks_[id];
706 sync_message_queue.push_front(task.get());
707 }
708 tasks_.push_front(std::move(task));
709 break;
710 } else {
711 if (sync_message) {
712 auto iter = sync_message_tasks_.find(id);
713 if (iter != sync_message_tasks_.end() && iter->second.empty())
714 sync_message_tasks_.erase(iter);
715 }
716 }
717 }
718 }
719
ProcessFirstSyncMessageForEndpoint(InterfaceId id)720 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
721 AssertLockAcquired();
722
723 auto iter = sync_message_tasks_.find(id);
724 if (iter == sync_message_tasks_.end())
725 return false;
726
727 if (paused_)
728 return true;
729
730 MultiplexRouter::Task* task = iter->second.front();
731 iter->second.pop_front();
732
733 DCHECK(task->IsMessageTask());
734 MessageWrapper message_wrapper = std::move(task->message_wrapper);
735
736 // Note: after this call, |task| and |iter| may be invalidated.
737 bool processed = ProcessIncomingMessage(
738 &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
739 DCHECK(processed);
740
741 iter = sync_message_tasks_.find(id);
742 if (iter == sync_message_tasks_.end())
743 return false;
744
745 if (iter->second.empty()) {
746 sync_message_tasks_.erase(iter);
747 return false;
748 }
749
750 return true;
751 }
752
ProcessNotifyErrorTask(Task * task,ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)753 bool MultiplexRouter::ProcessNotifyErrorTask(
754 Task* task,
755 ClientCallBehavior client_call_behavior,
756 base::SequencedTaskRunner* current_task_runner) {
757 DCHECK(!current_task_runner ||
758 current_task_runner->RunsTasksInCurrentSequence());
759 DCHECK(!paused_);
760
761 AssertLockAcquired();
762 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
763 if (!endpoint->client())
764 return true;
765
766 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
767 endpoint->task_runner() != current_task_runner) {
768 MaybePostToProcessTasks(endpoint->task_runner());
769 return false;
770 }
771
772 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
773
774 InterfaceEndpointClient* client = endpoint->client();
775 base::Optional<DisconnectReason> disconnect_reason(
776 endpoint->disconnect_reason());
777
778 {
779 // We must unlock before calling into |client| because it may call this
780 // object within NotifyError(). Holding the lock will lead to deadlock.
781 //
782 // It is safe to call into |client| without the lock. Because |client| is
783 // always accessed on the same sequence, including DetachEndpointClient().
784 MayAutoUnlock unlocker(&lock_);
785 client->NotifyError(disconnect_reason);
786 }
787 return true;
788 }
789
ProcessIncomingMessage(MessageWrapper * message_wrapper,ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)790 bool MultiplexRouter::ProcessIncomingMessage(
791 MessageWrapper* message_wrapper,
792 ClientCallBehavior client_call_behavior,
793 base::SequencedTaskRunner* current_task_runner) {
794 DCHECK(!current_task_runner ||
795 current_task_runner->RunsTasksInCurrentSequence());
796 DCHECK(!paused_);
797 DCHECK(message_wrapper);
798 AssertLockAcquired();
799
800 const Message* message = &message_wrapper->value();
801 if (message->IsNull()) {
802 // This is a sync message and has been processed during sync handle
803 // watching.
804 return true;
805 }
806
807 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
808 bool result = false;
809
810 {
811 MayAutoUnlock unlocker(&lock_);
812 Message tmp_message =
813 message_wrapper->DeserializeEndpointHandlesAndTake();
814 result = !tmp_message.IsNull() &&
815 control_message_handler_.Accept(&tmp_message);
816 }
817
818 if (!result)
819 RaiseErrorInNonTestingMode();
820
821 return true;
822 }
823
824 InterfaceId id = message->interface_id();
825 DCHECK(IsValidInterfaceId(id));
826
827 InterfaceEndpoint* endpoint = FindEndpoint(id);
828 if (!endpoint || endpoint->closed())
829 return true;
830
831 if (!endpoint->client()) {
832 // We need to wait until a client is attached in order to dispatch further
833 // messages.
834 return false;
835 }
836
837 bool can_direct_call;
838 if (message->has_flag(Message::kFlagIsSync)) {
839 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
840 endpoint->task_runner()->RunsTasksInCurrentSequence();
841 } else {
842 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
843 endpoint->task_runner() == current_task_runner;
844 }
845
846 if (!can_direct_call) {
847 MaybePostToProcessTasks(endpoint->task_runner());
848 return false;
849 }
850
851 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
852
853 InterfaceEndpointClient* client = endpoint->client();
854 bool result = false;
855 {
856 // We must unlock before calling into |client| because it may call this
857 // object within HandleIncomingMessage(). Holding the lock will lead to
858 // deadlock.
859 //
860 // It is safe to call into |client| without the lock. Because |client| is
861 // always accessed on the same sequence, including DetachEndpointClient().
862 MayAutoUnlock unlocker(&lock_);
863 Message tmp_message = message_wrapper->DeserializeEndpointHandlesAndTake();
864 result =
865 !tmp_message.IsNull() && client->HandleIncomingMessage(&tmp_message);
866 }
867 if (!result)
868 RaiseErrorInNonTestingMode();
869
870 return true;
871 }
872
MaybePostToProcessTasks(base::SequencedTaskRunner * task_runner)873 void MultiplexRouter::MaybePostToProcessTasks(
874 base::SequencedTaskRunner* task_runner) {
875 AssertLockAcquired();
876 if (posted_to_process_tasks_)
877 return;
878
879 posted_to_process_tasks_ = true;
880 posted_to_task_runner_ = task_runner;
881 task_runner->PostTask(
882 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
883 }
884
LockAndCallProcessTasks()885 void MultiplexRouter::LockAndCallProcessTasks() {
886 // There is no need to hold a ref to this class in this case because this is
887 // always called using base::Bind(), which holds a ref.
888 MayAutoLock locker(&lock_);
889 posted_to_process_tasks_ = false;
890 scoped_refptr<base::SequencedTaskRunner> runner(
891 std::move(posted_to_task_runner_));
892 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
893 }
894
UpdateEndpointStateMayRemove(InterfaceEndpoint * endpoint,EndpointStateUpdateType type)895 void MultiplexRouter::UpdateEndpointStateMayRemove(
896 InterfaceEndpoint* endpoint,
897 EndpointStateUpdateType type) {
898 if (type == ENDPOINT_CLOSED) {
899 endpoint->set_closed();
900 } else {
901 endpoint->set_peer_closed();
902 // If the interface endpoint is performing a sync watch, this makes sure
903 // it is notified and eventually exits the sync watch.
904 endpoint->SignalSyncMessageEvent();
905 }
906 if (endpoint->closed() && endpoint->peer_closed())
907 endpoints_.erase(endpoint->id());
908 }
909
RaiseErrorInNonTestingMode()910 void MultiplexRouter::RaiseErrorInNonTestingMode() {
911 AssertLockAcquired();
912 if (!testing_mode_)
913 RaiseError();
914 }
915
FindOrInsertEndpoint(InterfaceId id,bool * inserted)916 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
917 InterfaceId id,
918 bool* inserted) {
919 AssertLockAcquired();
920 // Either |inserted| is nullptr or it points to a boolean initialized as
921 // false.
922 DCHECK(!inserted || !*inserted);
923
924 InterfaceEndpoint* endpoint = FindEndpoint(id);
925 if (!endpoint) {
926 endpoint = new InterfaceEndpoint(this, id);
927 endpoints_[id] = endpoint;
928 if (inserted)
929 *inserted = true;
930 }
931
932 return endpoint;
933 }
934
FindEndpoint(InterfaceId id)935 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint(
936 InterfaceId id) {
937 AssertLockAcquired();
938 auto iter = endpoints_.find(id);
939 return iter != endpoints_.end() ? iter->second.get() : nullptr;
940 }
941
AssertLockAcquired()942 void MultiplexRouter::AssertLockAcquired() {
943 #if DCHECK_IS_ON()
944 if (lock_)
945 lock_->AssertAcquired();
946 #endif
947 }
948
InsertEndpointsForMessage(const Message & message)949 bool MultiplexRouter::InsertEndpointsForMessage(const Message& message) {
950 if (!message.is_serialized())
951 return true;
952
953 uint32_t num_ids = message.payload_num_interface_ids();
954 if (num_ids == 0)
955 return true;
956
957 const uint32_t* ids = message.payload_interface_ids();
958
959 MayAutoLock locker(&lock_);
960 for (uint32_t i = 0; i < num_ids; ++i) {
961 // Message header validation already ensures that the IDs are valid and not
962 // the master ID.
963 // The IDs are from the remote side and therefore their namespace bit is
964 // supposed to be different than the value that this router would use.
965 if (set_interface_id_namespace_bit_ ==
966 HasInterfaceIdNamespaceBitSet(ids[i])) {
967 return false;
968 }
969
970 // It is possible that the endpoint already exists even when the remote side
971 // is well-behaved: it might have notified us that the peer endpoint has
972 // closed.
973 bool inserted = false;
974 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(ids[i], &inserted);
975 if (endpoint->closed() || endpoint->handle_created())
976 return false;
977 }
978
979 return true;
980 }
981
CloseEndpointsForMessage(const Message & message)982 void MultiplexRouter::CloseEndpointsForMessage(const Message& message) {
983 AssertLockAcquired();
984
985 if (!message.is_serialized())
986 return;
987
988 uint32_t num_ids = message.payload_num_interface_ids();
989 if (num_ids == 0)
990 return;
991
992 const uint32_t* ids = message.payload_interface_ids();
993 for (uint32_t i = 0; i < num_ids; ++i) {
994 InterfaceEndpoint* endpoint = FindEndpoint(ids[i]);
995 // If the remote side maliciously sends the same interface ID in another
996 // message which has been dispatched, we could get here with no endpoint
997 // for the ID, a closed endpoint, or an endpoint with handle created.
998 if (!endpoint || endpoint->closed() || endpoint->handle_created()) {
999 RaiseErrorInNonTestingMode();
1000 continue;
1001 }
1002
1003 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
1004 MayAutoUnlock unlocker(&lock_);
1005 control_message_proxy_.NotifyPeerEndpointClosed(ids[i], base::nullopt);
1006 }
1007
1008 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
1009 }
1010
1011 } // namespace internal
1012 } // namespace mojo
1013