1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6
7 #include <stdint.h>
8
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/single_thread_task_runner.h"
16 #include "base/stl_util.h"
17 #include "base/threading/thread_task_runner_handle.h"
18 #include "mojo/public/cpp/bindings/associated_group.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
22
23 namespace mojo {
24 namespace internal {
25
26 // InterfaceEndpoint stores the information of an interface endpoint registered
27 // with the router.
28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
29 // this object.
30 class MultiplexRouter::InterfaceEndpoint
31 : public base::RefCounted<InterfaceEndpoint>,
32 public InterfaceEndpointController {
33 public:
InterfaceEndpoint(MultiplexRouter * router,InterfaceId id)34 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
35 : router_(router),
36 id_(id),
37 closed_(false),
38 peer_closed_(false),
39 client_(nullptr),
40 event_signalled_(false) {}
41
42 // ---------------------------------------------------------------------------
43 // The following public methods are safe to call from any threads without
44 // locking.
45
id() const46 InterfaceId id() const { return id_; }
47
48 // ---------------------------------------------------------------------------
49 // The following public methods are called under the router's lock.
50
closed() const51 bool closed() const { return closed_; }
set_closed()52 void set_closed() {
53 router_->lock_.AssertAcquired();
54 closed_ = true;
55 }
56
peer_closed() const57 bool peer_closed() const { return peer_closed_; }
set_peer_closed()58 void set_peer_closed() {
59 router_->lock_.AssertAcquired();
60 peer_closed_ = true;
61 }
62
task_runner() const63 base::SingleThreadTaskRunner* task_runner() const {
64 return task_runner_.get();
65 }
66
client() const67 InterfaceEndpointClient* client() const { return client_; }
68
AttachClient(InterfaceEndpointClient * client,scoped_refptr<base::SingleThreadTaskRunner> runner)69 void AttachClient(InterfaceEndpointClient* client,
70 scoped_refptr<base::SingleThreadTaskRunner> runner) {
71 router_->lock_.AssertAcquired();
72 DCHECK(!client_);
73 DCHECK(!closed_);
74 DCHECK(runner->BelongsToCurrentThread());
75
76 task_runner_ = std::move(runner);
77 client_ = client;
78 }
79
80 // This method must be called on the same thread as the corresponding
81 // AttachClient() call.
DetachClient()82 void DetachClient() {
83 router_->lock_.AssertAcquired();
84 DCHECK(client_);
85 DCHECK(task_runner_->BelongsToCurrentThread());
86 DCHECK(!closed_);
87
88 task_runner_ = nullptr;
89 client_ = nullptr;
90 sync_watcher_.reset();
91 }
92
SignalSyncMessageEvent()93 void SignalSyncMessageEvent() {
94 router_->lock_.AssertAcquired();
95 if (event_signalled_)
96 return;
97
98 EnsureEventMessagePipeExists();
99 event_signalled_ = true;
100 MojoResult result =
101 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
102 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
103 DCHECK_EQ(MOJO_RESULT_OK, result);
104 }
105
106 // ---------------------------------------------------------------------------
107 // The following public methods (i.e., InterfaceEndpointController
108 // implementation) are called by the client on the same thread as the
109 // AttachClient() call. They are called outside of the router's lock.
110
SendMessage(Message * message)111 bool SendMessage(Message* message) override {
112 DCHECK(task_runner_->BelongsToCurrentThread());
113 message->set_interface_id(id_);
114 return router_->connector_.Accept(message);
115 }
116
AllowWokenUpBySyncWatchOnSameThread()117 void AllowWokenUpBySyncWatchOnSameThread() override {
118 DCHECK(task_runner_->BelongsToCurrentThread());
119
120 EnsureSyncWatcherExists();
121 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
122 }
123
SyncWatch(const bool * should_stop)124 bool SyncWatch(const bool* should_stop) override {
125 DCHECK(task_runner_->BelongsToCurrentThread());
126
127 EnsureSyncWatcherExists();
128 return sync_watcher_->SyncWatch(should_stop);
129 }
130
131 private:
132 friend class base::RefCounted<InterfaceEndpoint>;
133
~InterfaceEndpoint()134 ~InterfaceEndpoint() override {
135 router_->lock_.AssertAcquired();
136
137 DCHECK(!client_);
138 DCHECK(closed_);
139 DCHECK(peer_closed_);
140 DCHECK(!sync_watcher_);
141 }
142
OnHandleReady(MojoResult result)143 void OnHandleReady(MojoResult result) {
144 DCHECK(task_runner_->BelongsToCurrentThread());
145 scoped_refptr<InterfaceEndpoint> self_protector(this);
146 scoped_refptr<MultiplexRouter> router_protector(router_);
147
148 // Because we never close |sync_message_event_{sender,receiver}_| before
149 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
150 DCHECK_EQ(MOJO_RESULT_OK, result);
151 bool reset_sync_watcher = false;
152 {
153 base::AutoLock locker(router_->lock_);
154
155 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
156
157 if (!more_to_process)
158 ResetSyncMessageSignal();
159
160 // Currently there are no queued sync messages and the peer has closed so
161 // there won't be incoming sync messages in the future.
162 reset_sync_watcher = !more_to_process && peer_closed_;
163 }
164 if (reset_sync_watcher) {
165 // If a SyncWatch() call (or multiple ones) of this interface endpoint is
166 // on the call stack, resetting the sync watcher will allow it to exit
167 // when the call stack unwinds to that frame.
168 sync_watcher_.reset();
169 }
170 }
171
EnsureSyncWatcherExists()172 void EnsureSyncWatcherExists() {
173 DCHECK(task_runner_->BelongsToCurrentThread());
174 if (sync_watcher_)
175 return;
176
177 {
178 base::AutoLock locker(router_->lock_);
179 EnsureEventMessagePipeExists();
180
181 auto iter = router_->sync_message_tasks_.find(id_);
182 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
183 SignalSyncMessageEvent();
184 }
185
186 sync_watcher_.reset(new SyncHandleWatcher(
187 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
188 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
189 }
190
EnsureEventMessagePipeExists()191 void EnsureEventMessagePipeExists() {
192 router_->lock_.AssertAcquired();
193
194 if (sync_message_event_receiver_.is_valid())
195 return;
196
197 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
198 &sync_message_event_receiver_);
199 DCHECK_EQ(MOJO_RESULT_OK, result);
200 }
201
ResetSyncMessageSignal()202 void ResetSyncMessageSignal() {
203 router_->lock_.AssertAcquired();
204
205 if (!event_signalled_)
206 return;
207
208 DCHECK(sync_message_event_receiver_.is_valid());
209 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(),
210 nullptr, nullptr, nullptr, nullptr,
211 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
212 DCHECK_EQ(MOJO_RESULT_OK, result);
213 event_signalled_ = false;
214 }
215
216 // ---------------------------------------------------------------------------
217 // The following members are safe to access from any threads.
218
219 MultiplexRouter* const router_;
220 const InterfaceId id_;
221
222 // ---------------------------------------------------------------------------
223 // The following members are accessed under the router's lock.
224
225 // Whether the endpoint has been closed.
226 bool closed_;
227 // Whether the peer endpoint has been closed.
228 bool peer_closed_;
229
230 // The task runner on which |client_|'s methods can be called.
231 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
232 // Not owned. It is null if no client is attached to this endpoint.
233 InterfaceEndpointClient* client_;
234
235 // A message pipe used as an event to signal that sync messages are available.
236 // The message pipe handles are initialized under the router's lock and remain
237 // unchanged afterwards. They may be accessed outside of the router's lock
238 // later.
239 ScopedMessagePipeHandle sync_message_event_sender_;
240 ScopedMessagePipeHandle sync_message_event_receiver_;
241 bool event_signalled_;
242
243 // ---------------------------------------------------------------------------
244 // The following members are only valid while a client is attached. They are
245 // used exclusively on the client's thread. They may be accessed outside of
246 // the router's lock.
247
248 std::unique_ptr<SyncHandleWatcher> sync_watcher_;
249
250 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
251 };
252
253 struct MultiplexRouter::Task {
254 public:
255 // Doesn't take ownership of |message| but takes its contents.
CreateMessageTaskmojo::internal::MultiplexRouter::Task256 static std::unique_ptr<Task> CreateMessageTask(Message* message) {
257 Task* task = new Task(MESSAGE);
258 task->message.reset(new Message);
259 message->MoveTo(task->message.get());
260 return base::WrapUnique(task);
261 }
CreateNotifyErrorTaskmojo::internal::MultiplexRouter::Task262 static std::unique_ptr<Task> CreateNotifyErrorTask(
263 InterfaceEndpoint* endpoint) {
264 Task* task = new Task(NOTIFY_ERROR);
265 task->endpoint_to_notify = endpoint;
266 return base::WrapUnique(task);
267 }
268
~Taskmojo::internal::MultiplexRouter::Task269 ~Task() {}
270
IsMessageTaskmojo::internal::MultiplexRouter::Task271 bool IsMessageTask() const { return type == MESSAGE; }
IsNotifyErrorTaskmojo::internal::MultiplexRouter::Task272 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
273
274 std::unique_ptr<Message> message;
275 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
276
277 enum Type { MESSAGE, NOTIFY_ERROR };
278 Type type;
279
280 private:
Taskmojo::internal::MultiplexRouter::Task281 explicit Task(Type in_type) : type(in_type) {}
282 };
283
MultiplexRouter(bool set_interface_id_namesapce_bit,ScopedMessagePipeHandle message_pipe,scoped_refptr<base::SingleThreadTaskRunner> runner)284 MultiplexRouter::MultiplexRouter(
285 bool set_interface_id_namesapce_bit,
286 ScopedMessagePipeHandle message_pipe,
287 scoped_refptr<base::SingleThreadTaskRunner> runner)
288 : AssociatedGroupController(base::ThreadTaskRunnerHandle::Get()),
289 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
290 header_validator_(this),
291 connector_(std::move(message_pipe),
292 Connector::MULTI_THREADED_SEND,
293 std::move(runner)),
294 control_message_handler_(this),
295 control_message_proxy_(&connector_),
296 next_interface_id_value_(1),
297 posted_to_process_tasks_(false),
298 encountered_error_(false),
299 testing_mode_(false) {
300 // Always participate in sync handle watching, because even if it doesn't
301 // expect sync requests during sync handle watching, it may still need to
302 // dispatch messages to associated endpoints on a different thread.
303 connector_.AllowWokenUpBySyncWatchOnSameThread();
304 connector_.set_incoming_receiver(&header_validator_);
305 connector_.set_connection_error_handler(
306 base::Bind(&MultiplexRouter::OnPipeConnectionError,
307 base::Unretained(this)));
308 }
309
~MultiplexRouter()310 MultiplexRouter::~MultiplexRouter() {
311 base::AutoLock locker(lock_);
312
313 sync_message_tasks_.clear();
314 tasks_.clear();
315
316 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
317 InterfaceEndpoint* endpoint = iter->second.get();
318 // Increment the iterator before calling UpdateEndpointStateMayRemove()
319 // because it may remove the corresponding value from the map.
320 ++iter;
321
322 DCHECK(endpoint->closed());
323 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
324 }
325
326 DCHECK(endpoints_.empty());
327 }
328
SetMasterInterfaceName(const std::string & name)329 void MultiplexRouter::SetMasterInterfaceName(const std::string& name) {
330 DCHECK(thread_checker_.CalledOnValidThread());
331 header_validator_.SetDescription(name + " [master] MessageHeaderValidator");
332 control_message_handler_.SetDescription(
333 name + " [master] PipeControlMessageHandler");
334 }
335
CreateEndpointHandlePair(ScopedInterfaceEndpointHandle * local_endpoint,ScopedInterfaceEndpointHandle * remote_endpoint)336 void MultiplexRouter::CreateEndpointHandlePair(
337 ScopedInterfaceEndpointHandle* local_endpoint,
338 ScopedInterfaceEndpointHandle* remote_endpoint) {
339 base::AutoLock locker(lock_);
340 uint32_t id = 0;
341 do {
342 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
343 next_interface_id_value_ = 1;
344 id = next_interface_id_value_++;
345 if (set_interface_id_namespace_bit_)
346 id |= kInterfaceIdNamespaceMask;
347 } while (ContainsKey(endpoints_, id));
348
349 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
350 endpoints_[id] = endpoint;
351 if (encountered_error_)
352 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
353
354 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true);
355 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false);
356 }
357
CreateLocalEndpointHandle(InterfaceId id)358 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
359 InterfaceId id) {
360 if (!IsValidInterfaceId(id))
361 return ScopedInterfaceEndpointHandle();
362
363 base::AutoLock locker(lock_);
364 bool inserted = false;
365 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
366 if (inserted) {
367 if (encountered_error_)
368 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
369 } else {
370 // If the endpoint already exist, it is because we have received a
371 // notification that the peer endpoint has closed.
372 CHECK(!endpoint->closed());
373 CHECK(endpoint->peer_closed());
374 }
375 return CreateScopedInterfaceEndpointHandle(id, true);
376 }
377
CloseEndpointHandle(InterfaceId id,bool is_local)378 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) {
379 if (!IsValidInterfaceId(id))
380 return;
381
382 base::AutoLock locker(lock_);
383
384 if (!is_local) {
385 DCHECK(ContainsKey(endpoints_, id));
386 DCHECK(!IsMasterInterfaceId(id));
387
388 // We will receive a NotifyPeerEndpointClosed message from the other side.
389 control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
390
391 return;
392 }
393
394 DCHECK(ContainsKey(endpoints_, id));
395 InterfaceEndpoint* endpoint = endpoints_[id].get();
396 DCHECK(!endpoint->client());
397 DCHECK(!endpoint->closed());
398 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
399
400 if (!IsMasterInterfaceId(id))
401 control_message_proxy_.NotifyPeerEndpointClosed(id);
402
403 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
404 }
405
AttachEndpointClient(const ScopedInterfaceEndpointHandle & handle,InterfaceEndpointClient * client,scoped_refptr<base::SingleThreadTaskRunner> runner)406 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
407 const ScopedInterfaceEndpointHandle& handle,
408 InterfaceEndpointClient* client,
409 scoped_refptr<base::SingleThreadTaskRunner> runner) {
410 const InterfaceId id = handle.id();
411
412 DCHECK(IsValidInterfaceId(id));
413 DCHECK(client);
414
415 base::AutoLock locker(lock_);
416 DCHECK(ContainsKey(endpoints_, id));
417
418 InterfaceEndpoint* endpoint = endpoints_[id].get();
419 endpoint->AttachClient(client, std::move(runner));
420
421 if (endpoint->peer_closed())
422 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
423 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
424
425 return endpoint;
426 }
427
DetachEndpointClient(const ScopedInterfaceEndpointHandle & handle)428 void MultiplexRouter::DetachEndpointClient(
429 const ScopedInterfaceEndpointHandle& handle) {
430 const InterfaceId id = handle.id();
431
432 DCHECK(IsValidInterfaceId(id));
433
434 base::AutoLock locker(lock_);
435 DCHECK(ContainsKey(endpoints_, id));
436
437 InterfaceEndpoint* endpoint = endpoints_[id].get();
438 endpoint->DetachClient();
439 }
440
RaiseError()441 void MultiplexRouter::RaiseError() {
442 if (task_runner_->BelongsToCurrentThread()) {
443 connector_.RaiseError();
444 } else {
445 task_runner_->PostTask(FROM_HERE,
446 base::Bind(&MultiplexRouter::RaiseError, this));
447 }
448 }
449
CloseMessagePipe()450 void MultiplexRouter::CloseMessagePipe() {
451 DCHECK(thread_checker_.CalledOnValidThread());
452 connector_.CloseMessagePipe();
453 // CloseMessagePipe() above won't trigger connection error handler.
454 // Explicitly call OnPipeConnectionError() so that associated endpoints will
455 // get notified.
456 OnPipeConnectionError();
457 }
458
HasAssociatedEndpoints() const459 bool MultiplexRouter::HasAssociatedEndpoints() const {
460 DCHECK(thread_checker_.CalledOnValidThread());
461 base::AutoLock locker(lock_);
462
463 if (endpoints_.size() > 1)
464 return true;
465 if (endpoints_.size() == 0)
466 return false;
467
468 return !ContainsKey(endpoints_, kMasterInterfaceId);
469 }
470
EnableTestingMode()471 void MultiplexRouter::EnableTestingMode() {
472 DCHECK(thread_checker_.CalledOnValidThread());
473 base::AutoLock locker(lock_);
474
475 testing_mode_ = true;
476 connector_.set_enforce_errors_from_incoming_receiver(false);
477 }
478
Accept(Message * message)479 bool MultiplexRouter::Accept(Message* message) {
480 DCHECK(thread_checker_.CalledOnValidThread());
481
482 scoped_refptr<MultiplexRouter> protector(this);
483 base::AutoLock locker(lock_);
484
485 ClientCallBehavior client_call_behavior =
486 connector_.during_sync_handle_watcher_callback()
487 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
488 : ALLOW_DIRECT_CLIENT_CALLS;
489
490 bool processed =
491 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
492 connector_.task_runner());
493
494 if (!processed) {
495 // Either the task queue is not empty or we cannot process the message
496 // directly. In both cases, there is no need to call ProcessTasks().
497 tasks_.push_back(Task::CreateMessageTask(message));
498 Task* task = tasks_.back().get();
499
500 if (task->message->has_flag(Message::kFlagIsSync)) {
501 InterfaceId id = task->message->interface_id();
502 sync_message_tasks_[id].push_back(task);
503 auto iter = endpoints_.find(id);
504 if (iter != endpoints_.end())
505 iter->second->SignalSyncMessageEvent();
506 }
507 } else if (!tasks_.empty()) {
508 // Processing the message may result in new tasks (for error notification)
509 // being added to the queue. In this case, we have to attempt to process the
510 // tasks.
511 ProcessTasks(client_call_behavior, connector_.task_runner());
512 }
513
514 // Always return true. If we see errors during message processing, we will
515 // explicitly call Connector::RaiseError() to disconnect the message pipe.
516 return true;
517 }
518
OnPeerAssociatedEndpointClosed(InterfaceId id)519 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
520 lock_.AssertAcquired();
521
522 if (IsMasterInterfaceId(id))
523 return false;
524
525 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
526
527 // It is possible that this endpoint has been set as peer closed. That is
528 // because when the message pipe is closed, all the endpoints are updated with
529 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
530 // as long as there are refs keeping the router alive. If there is a
531 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
532 // here and see that the endpoint has been marked as peer closed.
533 if (!endpoint->peer_closed()) {
534 if (endpoint->client())
535 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
536 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
537 }
538
539 // No need to trigger a ProcessTasks() because it is already on the stack.
540
541 return true;
542 }
543
OnAssociatedEndpointClosedBeforeSent(InterfaceId id)544 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
545 lock_.AssertAcquired();
546
547 if (IsMasterInterfaceId(id))
548 return false;
549
550 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
551 DCHECK(!endpoint->closed());
552 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
553
554 control_message_proxy_.NotifyPeerEndpointClosed(id);
555
556 return true;
557 }
558
OnPipeConnectionError()559 void MultiplexRouter::OnPipeConnectionError() {
560 DCHECK(thread_checker_.CalledOnValidThread());
561
562 scoped_refptr<MultiplexRouter> protector(this);
563 base::AutoLock locker(lock_);
564
565 encountered_error_ = true;
566
567 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
568 InterfaceEndpoint* endpoint = iter->second.get();
569 // Increment the iterator before calling UpdateEndpointStateMayRemove()
570 // because it may remove the corresponding value from the map.
571 ++iter;
572
573 if (endpoint->client())
574 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
575
576 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
577 }
578
579 ProcessTasks(connector_.during_sync_handle_watcher_callback()
580 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
581 : ALLOW_DIRECT_CLIENT_CALLS,
582 connector_.task_runner());
583 }
584
ProcessTasks(ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)585 void MultiplexRouter::ProcessTasks(
586 ClientCallBehavior client_call_behavior,
587 base::SingleThreadTaskRunner* current_task_runner) {
588 lock_.AssertAcquired();
589
590 if (posted_to_process_tasks_)
591 return;
592
593 while (!tasks_.empty()) {
594 std::unique_ptr<Task> task(std::move(tasks_.front()));
595 tasks_.pop_front();
596
597 InterfaceId id = kInvalidInterfaceId;
598 bool sync_message = task->IsMessageTask() && task->message &&
599 task->message->has_flag(Message::kFlagIsSync);
600 if (sync_message) {
601 id = task->message->interface_id();
602 auto& sync_message_queue = sync_message_tasks_[id];
603 DCHECK_EQ(task.get(), sync_message_queue.front());
604 sync_message_queue.pop_front();
605 }
606
607 bool processed =
608 task->IsNotifyErrorTask()
609 ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
610 current_task_runner)
611 : ProcessIncomingMessage(task->message.get(), client_call_behavior,
612 current_task_runner);
613
614 if (!processed) {
615 if (sync_message) {
616 auto& sync_message_queue = sync_message_tasks_[id];
617 sync_message_queue.push_front(task.get());
618 }
619 tasks_.push_front(std::move(task));
620 break;
621 } else {
622 if (sync_message) {
623 auto iter = sync_message_tasks_.find(id);
624 if (iter != sync_message_tasks_.end() && iter->second.empty())
625 sync_message_tasks_.erase(iter);
626 }
627 }
628 }
629 }
630
ProcessFirstSyncMessageForEndpoint(InterfaceId id)631 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
632 lock_.AssertAcquired();
633
634 auto iter = sync_message_tasks_.find(id);
635 if (iter == sync_message_tasks_.end())
636 return false;
637
638 MultiplexRouter::Task* task = iter->second.front();
639 iter->second.pop_front();
640
641 DCHECK(task->IsMessageTask());
642 std::unique_ptr<Message> message(std::move(task->message));
643
644 // Note: after this call, |task| and |iter| may be invalidated.
645 bool processed = ProcessIncomingMessage(
646 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
647 DCHECK(processed);
648
649 iter = sync_message_tasks_.find(id);
650 if (iter == sync_message_tasks_.end())
651 return false;
652
653 if (iter->second.empty()) {
654 sync_message_tasks_.erase(iter);
655 return false;
656 }
657
658 return true;
659 }
660
ProcessNotifyErrorTask(Task * task,ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)661 bool MultiplexRouter::ProcessNotifyErrorTask(
662 Task* task,
663 ClientCallBehavior client_call_behavior,
664 base::SingleThreadTaskRunner* current_task_runner) {
665 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
666 lock_.AssertAcquired();
667 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
668 if (!endpoint->client())
669 return true;
670
671 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
672 endpoint->task_runner() != current_task_runner) {
673 MaybePostToProcessTasks(endpoint->task_runner());
674 return false;
675 }
676
677 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
678
679 InterfaceEndpointClient* client = endpoint->client();
680 {
681 // We must unlock before calling into |client| because it may call this
682 // object within NotifyError(). Holding the lock will lead to deadlock.
683 //
684 // It is safe to call into |client| without the lock. Because |client| is
685 // always accessed on the same thread, including DetachEndpointClient().
686 base::AutoUnlock unlocker(lock_);
687 client->NotifyError();
688 }
689 return true;
690 }
691
ProcessIncomingMessage(Message * message,ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)692 bool MultiplexRouter::ProcessIncomingMessage(
693 Message* message,
694 ClientCallBehavior client_call_behavior,
695 base::SingleThreadTaskRunner* current_task_runner) {
696 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
697 lock_.AssertAcquired();
698
699 if (!message) {
700 // This is a sync message and has been processed during sync handle
701 // watching.
702 return true;
703 }
704
705 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
706 if (!control_message_handler_.Accept(message))
707 RaiseErrorInNonTestingMode();
708 return true;
709 }
710
711 InterfaceId id = message->interface_id();
712 DCHECK(IsValidInterfaceId(id));
713
714 bool inserted = false;
715 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
716 if (inserted) {
717 // Currently, it is legitimate to receive messages for an endpoint
718 // that is not registered. For example, the endpoint is transferred in
719 // a message that is discarded. Once we add support to specify all
720 // enclosing endpoints in message header, we should be able to remove
721 // this.
722 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
723
724 // It is also possible that this newly-inserted endpoint is the master
725 // endpoint. When the master InterfacePtr/Binding goes away, the message
726 // pipe is closed and we explicitly trigger a pipe connection error. The
727 // error updates all the endpoints, including the master endpoint, with
728 // PEER_ENDPOINT_CLOSED and removes the master endpoint from the
729 // registration. We continue to process remaining tasks in the queue, as
730 // long as there are refs keeping the router alive. If there are remaining
731 // messages for the master endpoint, we will get here.
732 if (!IsMasterInterfaceId(id))
733 control_message_proxy_.NotifyPeerEndpointClosed(id);
734 return true;
735 }
736
737 if (endpoint->closed())
738 return true;
739
740 if (!endpoint->client()) {
741 // We need to wait until a client is attached in order to dispatch further
742 // messages.
743 return false;
744 }
745
746 bool can_direct_call;
747 if (message->has_flag(Message::kFlagIsSync)) {
748 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
749 endpoint->task_runner()->BelongsToCurrentThread();
750 } else {
751 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
752 endpoint->task_runner() == current_task_runner;
753 }
754
755 if (!can_direct_call) {
756 MaybePostToProcessTasks(endpoint->task_runner());
757 return false;
758 }
759
760 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
761
762 InterfaceEndpointClient* client = endpoint->client();
763 bool result = false;
764 {
765 // We must unlock before calling into |client| because it may call this
766 // object within HandleIncomingMessage(). Holding the lock will lead to
767 // deadlock.
768 //
769 // It is safe to call into |client| without the lock. Because |client| is
770 // always accessed on the same thread, including DetachEndpointClient().
771 base::AutoUnlock unlocker(lock_);
772 result = client->HandleIncomingMessage(message);
773 }
774 if (!result)
775 RaiseErrorInNonTestingMode();
776
777 return true;
778 }
779
MaybePostToProcessTasks(base::SingleThreadTaskRunner * task_runner)780 void MultiplexRouter::MaybePostToProcessTasks(
781 base::SingleThreadTaskRunner* task_runner) {
782 lock_.AssertAcquired();
783 if (posted_to_process_tasks_)
784 return;
785
786 posted_to_process_tasks_ = true;
787 posted_to_task_runner_ = task_runner;
788 task_runner->PostTask(
789 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
790 }
791
LockAndCallProcessTasks()792 void MultiplexRouter::LockAndCallProcessTasks() {
793 // There is no need to hold a ref to this class in this case because this is
794 // always called using base::Bind(), which holds a ref.
795 base::AutoLock locker(lock_);
796 posted_to_process_tasks_ = false;
797 scoped_refptr<base::SingleThreadTaskRunner> runner(
798 std::move(posted_to_task_runner_));
799 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
800 }
801
UpdateEndpointStateMayRemove(InterfaceEndpoint * endpoint,EndpointStateUpdateType type)802 void MultiplexRouter::UpdateEndpointStateMayRemove(
803 InterfaceEndpoint* endpoint,
804 EndpointStateUpdateType type) {
805 switch (type) {
806 case ENDPOINT_CLOSED:
807 endpoint->set_closed();
808 break;
809 case PEER_ENDPOINT_CLOSED:
810 endpoint->set_peer_closed();
811 // If the interface endpoint is performing a sync watch, this makes sure
812 // it is notified and eventually exits the sync watch.
813 endpoint->SignalSyncMessageEvent();
814 break;
815 }
816 if (endpoint->closed() && endpoint->peer_closed())
817 endpoints_.erase(endpoint->id());
818 }
819
RaiseErrorInNonTestingMode()820 void MultiplexRouter::RaiseErrorInNonTestingMode() {
821 lock_.AssertAcquired();
822 if (!testing_mode_)
823 RaiseError();
824 }
825
FindOrInsertEndpoint(InterfaceId id,bool * inserted)826 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
827 InterfaceId id,
828 bool* inserted) {
829 lock_.AssertAcquired();
830 // Either |inserted| is nullptr or it points to a boolean initialized as
831 // false.
832 DCHECK(!inserted || !*inserted);
833
834 auto iter = endpoints_.find(id);
835 InterfaceEndpoint* endpoint;
836 if (iter == endpoints_.end()) {
837 endpoint = new InterfaceEndpoint(this, id);
838 endpoints_[id] = endpoint;
839 if (inserted)
840 *inserted = true;
841 } else {
842 endpoint = iter->second.get();
843 }
844
845 return endpoint;
846 }
847
848 } // namespace internal
849 } // namespace mojo
850