• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 
7 #include <stdint.h>
8 
9 #include <utility>
10 
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/single_thread_task_runner.h"
16 #include "base/stl_util.h"
17 #include "base/threading/thread_task_runner_handle.h"
18 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
22 
23 namespace mojo {
24 namespace internal {
25 
26 // InterfaceEndpoint stores the information of an interface endpoint registered
27 // with the router.
28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
29 // this object.
30 class MultiplexRouter::InterfaceEndpoint
31     : public base::RefCounted<InterfaceEndpoint>,
32       public InterfaceEndpointController {
33  public:
InterfaceEndpoint(MultiplexRouter * router,InterfaceId id)34   InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
35       : router_(router),
36         id_(id),
37         closed_(false),
38         peer_closed_(false),
39         handle_created_(false),
40         client_(nullptr),
41         event_signalled_(false) {}
42 
43   // ---------------------------------------------------------------------------
44   // The following public methods are safe to call from any threads without
45   // locking.
46 
id() const47   InterfaceId id() const { return id_; }
48 
49   // ---------------------------------------------------------------------------
50   // The following public methods are called under the router's lock.
51 
closed() const52   bool closed() const { return closed_; }
set_closed()53   void set_closed() {
54     router_->AssertLockAcquired();
55     closed_ = true;
56   }
57 
peer_closed() const58   bool peer_closed() const { return peer_closed_; }
set_peer_closed()59   void set_peer_closed() {
60     router_->AssertLockAcquired();
61     peer_closed_ = true;
62   }
63 
handle_created() const64   bool handle_created() const { return handle_created_; }
set_handle_created()65   void set_handle_created() {
66     router_->AssertLockAcquired();
67     handle_created_ = true;
68   }
69 
disconnect_reason() const70   const base::Optional<DisconnectReason>& disconnect_reason() const {
71     return disconnect_reason_;
72   }
set_disconnect_reason(const base::Optional<DisconnectReason> & disconnect_reason)73   void set_disconnect_reason(
74       const base::Optional<DisconnectReason>& disconnect_reason) {
75     router_->AssertLockAcquired();
76     disconnect_reason_ = disconnect_reason;
77   }
78 
task_runner() const79   base::SingleThreadTaskRunner* task_runner() const {
80     return task_runner_.get();
81   }
82 
client() const83   InterfaceEndpointClient* client() const { return client_; }
84 
AttachClient(InterfaceEndpointClient * client,scoped_refptr<base::SingleThreadTaskRunner> runner)85   void AttachClient(InterfaceEndpointClient* client,
86                     scoped_refptr<base::SingleThreadTaskRunner> runner) {
87     router_->AssertLockAcquired();
88     DCHECK(!client_);
89     DCHECK(!closed_);
90     DCHECK(runner->BelongsToCurrentThread());
91 
92     task_runner_ = std::move(runner);
93     client_ = client;
94   }
95 
96   // This method must be called on the same thread as the corresponding
97   // AttachClient() call.
DetachClient()98   void DetachClient() {
99     router_->AssertLockAcquired();
100     DCHECK(client_);
101     DCHECK(task_runner_->BelongsToCurrentThread());
102     DCHECK(!closed_);
103 
104     task_runner_ = nullptr;
105     client_ = nullptr;
106     sync_watcher_.reset();
107   }
108 
SignalSyncMessageEvent()109   void SignalSyncMessageEvent() {
110     router_->AssertLockAcquired();
111     if (event_signalled_)
112       return;
113 
114     event_signalled_ = true;
115     if (!sync_message_event_sender_.is_valid())
116       return;
117 
118     MojoResult result =
119         WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
120                         0, MOJO_WRITE_MESSAGE_FLAG_NONE);
121     DCHECK_EQ(MOJO_RESULT_OK, result);
122   }
123 
ResetSyncMessageSignal()124   void ResetSyncMessageSignal() {
125     router_->AssertLockAcquired();
126 
127     if (!event_signalled_)
128       return;
129 
130     event_signalled_ = false;
131     if (!sync_message_event_receiver_.is_valid())
132       return;
133 
134     MojoResult result =
135         ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr,
136                        nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
137     DCHECK_EQ(MOJO_RESULT_OK, result);
138   }
139 
140   // ---------------------------------------------------------------------------
141   // The following public methods (i.e., InterfaceEndpointController
142   // implementation) are called by the client on the same thread as the
143   // AttachClient() call. They are called outside of the router's lock.
144 
SendMessage(Message * message)145   bool SendMessage(Message* message) override {
146     DCHECK(task_runner_->BelongsToCurrentThread());
147     message->set_interface_id(id_);
148     return router_->connector_.Accept(message);
149   }
150 
AllowWokenUpBySyncWatchOnSameThread()151   void AllowWokenUpBySyncWatchOnSameThread() override {
152     DCHECK(task_runner_->BelongsToCurrentThread());
153 
154     EnsureSyncWatcherExists();
155     sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
156   }
157 
SyncWatch(const bool * should_stop)158   bool SyncWatch(const bool* should_stop) override {
159     DCHECK(task_runner_->BelongsToCurrentThread());
160 
161     EnsureSyncWatcherExists();
162     return sync_watcher_->SyncWatch(should_stop);
163   }
164 
165  private:
166   friend class base::RefCounted<InterfaceEndpoint>;
167 
~InterfaceEndpoint()168   ~InterfaceEndpoint() override {
169     router_->AssertLockAcquired();
170 
171     DCHECK(!client_);
172     DCHECK(closed_);
173     DCHECK(peer_closed_);
174     DCHECK(!sync_watcher_);
175   }
176 
OnHandleReady(MojoResult result)177   void OnHandleReady(MojoResult result) {
178     DCHECK(task_runner_->BelongsToCurrentThread());
179     scoped_refptr<MultiplexRouter> router_protector(router_);
180 
181     // Because we never close |sync_message_event_{sender,receiver}_| before
182     // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
183     DCHECK_EQ(MOJO_RESULT_OK, result);
184 
185     MayAutoLock locker(&router_->lock_);
186     scoped_refptr<InterfaceEndpoint> self_protector(this);
187 
188     bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
189 
190     if (!more_to_process)
191       ResetSyncMessageSignal();
192 
193     // Currently there are no queued sync messages and the peer has closed so
194     // there won't be incoming sync messages in the future.
195     if (!more_to_process && peer_closed_) {
196       // If a SyncWatch() call (or multiple ones) of this interface endpoint is
197       // on the call stack, resetting the sync watcher will allow it to exit
198       // when the call stack unwinds to that frame.
199       sync_watcher_.reset();
200     }
201   }
202 
EnsureSyncWatcherExists()203   void EnsureSyncWatcherExists() {
204     DCHECK(task_runner_->BelongsToCurrentThread());
205     if (sync_watcher_)
206       return;
207 
208     {
209       MayAutoLock locker(&router_->lock_);
210 
211       if (!sync_message_event_sender_.is_valid()) {
212         MojoResult result =
213             CreateMessagePipe(nullptr, &sync_message_event_sender_,
214                               &sync_message_event_receiver_);
215         DCHECK_EQ(MOJO_RESULT_OK, result);
216 
217         if (event_signalled_) {
218           // Reset the flag so that SignalSyncMessageEvent() will actually
219           // signal using the newly-created message pipe.
220           event_signalled_ = false;
221           SignalSyncMessageEvent();
222         }
223       }
224     }
225 
226     sync_watcher_.reset(new SyncHandleWatcher(
227         sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
228         base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
229   }
230 
231   // ---------------------------------------------------------------------------
232   // The following members are safe to access from any threads.
233 
234   MultiplexRouter* const router_;
235   const InterfaceId id_;
236 
237   // ---------------------------------------------------------------------------
238   // The following members are accessed under the router's lock.
239 
240   // Whether the endpoint has been closed.
241   bool closed_;
242   // Whether the peer endpoint has been closed.
243   bool peer_closed_;
244 
245   // Whether there is already a ScopedInterfaceEndpointHandle created for this
246   // endpoint.
247   bool handle_created_;
248 
249   base::Optional<DisconnectReason> disconnect_reason_;
250 
251   // The task runner on which |client_|'s methods can be called.
252   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
253   // Not owned. It is null if no client is attached to this endpoint.
254   InterfaceEndpointClient* client_;
255 
256   // A message pipe used as an event to signal that sync messages are available.
257   // The message pipe handles are initialized under the router's lock and remain
258   // unchanged afterwards. They may be accessed outside of the router's lock
259   // later.
260   ScopedMessagePipeHandle sync_message_event_sender_;
261   ScopedMessagePipeHandle sync_message_event_receiver_;
262   bool event_signalled_;
263 
264   // ---------------------------------------------------------------------------
265   // The following members are only valid while a client is attached. They are
266   // used exclusively on the client's thread. They may be accessed outside of
267   // the router's lock.
268 
269   std::unique_ptr<SyncHandleWatcher> sync_watcher_;
270 
271   DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
272 };
273 
274 // MessageWrapper objects are always destroyed under the router's lock. On
275 // destruction, if the message it wrappers contains
276 // ScopedInterfaceEndpointHandles (which cannot be destructed under the
277 // router's lock), the wrapper unlocks to clean them up.
278 class MultiplexRouter::MessageWrapper {
279  public:
280   MessageWrapper() = default;
281 
MessageWrapper(MultiplexRouter * router,Message message)282   MessageWrapper(MultiplexRouter* router, Message message)
283       : router_(router), value_(std::move(message)) {}
284 
MessageWrapper(MessageWrapper && other)285   MessageWrapper(MessageWrapper&& other)
286       : router_(other.router_), value_(std::move(other.value_)) {}
287 
~MessageWrapper()288   ~MessageWrapper() {
289     if (value_.associated_endpoint_handles()->empty())
290       return;
291 
292     router_->AssertLockAcquired();
293     {
294       MayAutoUnlock unlocker(&router_->lock_);
295       value_.mutable_associated_endpoint_handles()->clear();
296     }
297   }
298 
operator =(MessageWrapper && other)299   MessageWrapper& operator=(MessageWrapper&& other) {
300     router_ = other.router_;
301     value_ = std::move(other.value_);
302     return *this;
303   }
304 
value()305   Message& value() { return value_; }
306 
307  private:
308   MultiplexRouter* router_ = nullptr;
309   Message value_;
310 
311   DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
312 };
313 
314 struct MultiplexRouter::Task {
315  public:
316   // Doesn't take ownership of |message| but takes its contents.
CreateMessageTaskmojo::internal::MultiplexRouter::Task317   static std::unique_ptr<Task> CreateMessageTask(
318       MessageWrapper message_wrapper) {
319     Task* task = new Task(MESSAGE);
320     task->message_wrapper = std::move(message_wrapper);
321     return base::WrapUnique(task);
322   }
CreateNotifyErrorTaskmojo::internal::MultiplexRouter::Task323   static std::unique_ptr<Task> CreateNotifyErrorTask(
324       InterfaceEndpoint* endpoint) {
325     Task* task = new Task(NOTIFY_ERROR);
326     task->endpoint_to_notify = endpoint;
327     return base::WrapUnique(task);
328   }
329 
~Taskmojo::internal::MultiplexRouter::Task330   ~Task() {}
331 
IsMessageTaskmojo::internal::MultiplexRouter::Task332   bool IsMessageTask() const { return type == MESSAGE; }
IsNotifyErrorTaskmojo::internal::MultiplexRouter::Task333   bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
334 
335   MessageWrapper message_wrapper;
336   scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
337 
338   enum Type { MESSAGE, NOTIFY_ERROR };
339   Type type;
340 
341  private:
Taskmojo::internal::MultiplexRouter::Task342   explicit Task(Type in_type) : type(in_type) {}
343 
344   DISALLOW_COPY_AND_ASSIGN(Task);
345 };
346 
MultiplexRouter(ScopedMessagePipeHandle message_pipe,Config config,bool set_interface_id_namesapce_bit,scoped_refptr<base::SingleThreadTaskRunner> runner)347 MultiplexRouter::MultiplexRouter(
348     ScopedMessagePipeHandle message_pipe,
349     Config config,
350     bool set_interface_id_namesapce_bit,
351     scoped_refptr<base::SingleThreadTaskRunner> runner)
352     : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
353       task_runner_(runner),
354       header_validator_(nullptr),
355       filters_(this),
356       connector_(std::move(message_pipe),
357                  config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND
358                                            : Connector::SINGLE_THREADED_SEND,
359                  std::move(runner)),
360       control_message_handler_(this),
361       control_message_proxy_(&connector_),
362       next_interface_id_value_(1),
363       posted_to_process_tasks_(false),
364       encountered_error_(false),
365       paused_(false),
366       testing_mode_(false) {
367   DCHECK(task_runner_->BelongsToCurrentThread());
368 
369   if (config == MULTI_INTERFACE)
370     lock_.emplace();
371 
372   if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS ||
373       config == MULTI_INTERFACE) {
374     // Always participate in sync handle watching in multi-interface mode,
375     // because even if it doesn't expect sync requests during sync handle
376     // watching, it may still need to dispatch messages to associated endpoints
377     // on a different thread.
378     connector_.AllowWokenUpBySyncWatchOnSameThread();
379   }
380   connector_.set_incoming_receiver(&filters_);
381   connector_.set_connection_error_handler(
382       base::Bind(&MultiplexRouter::OnPipeConnectionError,
383                  base::Unretained(this)));
384 
385   std::unique_ptr<MessageHeaderValidator> header_validator =
386       base::MakeUnique<MessageHeaderValidator>();
387   header_validator_ = header_validator.get();
388   filters_.Append(std::move(header_validator));
389 }
390 
~MultiplexRouter()391 MultiplexRouter::~MultiplexRouter() {
392   MayAutoLock locker(&lock_);
393 
394   sync_message_tasks_.clear();
395   tasks_.clear();
396 
397   for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
398     InterfaceEndpoint* endpoint = iter->second.get();
399     // Increment the iterator before calling UpdateEndpointStateMayRemove()
400     // because it may remove the corresponding value from the map.
401     ++iter;
402 
403     if (!endpoint->closed()) {
404       // This happens when a NotifyPeerEndpointClosed message been received, but
405       // the interface ID hasn't been used to create local endpoint handle.
406       DCHECK(!endpoint->client());
407       DCHECK(endpoint->peer_closed());
408       UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
409     } else {
410       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
411     }
412   }
413 
414   DCHECK(endpoints_.empty());
415 }
416 
SetMasterInterfaceName(const char * name)417 void MultiplexRouter::SetMasterInterfaceName(const char* name) {
418   DCHECK(thread_checker_.CalledOnValidThread());
419   header_validator_->SetDescription(
420       std::string(name) + " [master] MessageHeaderValidator");
421   control_message_handler_.SetDescription(
422       std::string(name) + " [master] PipeControlMessageHandler");
423   connector_.SetWatcherHeapProfilerTag(name);
424 }
425 
AssociateInterface(ScopedInterfaceEndpointHandle handle_to_send)426 InterfaceId MultiplexRouter::AssociateInterface(
427     ScopedInterfaceEndpointHandle handle_to_send) {
428   if (!handle_to_send.pending_association())
429     return kInvalidInterfaceId;
430 
431   uint32_t id = 0;
432   {
433     MayAutoLock locker(&lock_);
434     do {
435       if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
436         next_interface_id_value_ = 1;
437       id = next_interface_id_value_++;
438       if (set_interface_id_namespace_bit_)
439         id |= kInterfaceIdNamespaceMask;
440     } while (base::ContainsKey(endpoints_, id));
441 
442     InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
443     endpoints_[id] = endpoint;
444     if (encountered_error_)
445       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
446     endpoint->set_handle_created();
447   }
448 
449   if (!NotifyAssociation(&handle_to_send, id)) {
450     // The peer handle of |handle_to_send|, which is supposed to join this
451     // associated group, has been closed.
452     {
453       MayAutoLock locker(&lock_);
454       InterfaceEndpoint* endpoint = FindEndpoint(id);
455       if (endpoint)
456         UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
457     }
458 
459     control_message_proxy_.NotifyPeerEndpointClosed(
460         id, handle_to_send.disconnect_reason());
461   }
462   return id;
463 }
464 
CreateLocalEndpointHandle(InterfaceId id)465 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
466     InterfaceId id) {
467   if (!IsValidInterfaceId(id))
468     return ScopedInterfaceEndpointHandle();
469 
470   MayAutoLock locker(&lock_);
471   bool inserted = false;
472   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
473   if (inserted) {
474     DCHECK(!endpoint->handle_created());
475 
476     if (encountered_error_)
477       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
478   } else {
479     // If the endpoint already exist, it is because we have received a
480     // notification that the peer endpoint has closed.
481     CHECK(!endpoint->closed());
482     CHECK(endpoint->peer_closed());
483 
484     if (endpoint->handle_created())
485       return ScopedInterfaceEndpointHandle();
486   }
487 
488   endpoint->set_handle_created();
489   return CreateScopedInterfaceEndpointHandle(id);
490 }
491 
CloseEndpointHandle(InterfaceId id,const base::Optional<DisconnectReason> & reason)492 void MultiplexRouter::CloseEndpointHandle(
493     InterfaceId id,
494     const base::Optional<DisconnectReason>& reason) {
495   if (!IsValidInterfaceId(id))
496     return;
497 
498   MayAutoLock locker(&lock_);
499   DCHECK(base::ContainsKey(endpoints_, id));
500   InterfaceEndpoint* endpoint = endpoints_[id].get();
501   DCHECK(!endpoint->client());
502   DCHECK(!endpoint->closed());
503   UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
504 
505   if (!IsMasterInterfaceId(id) || reason) {
506     MayAutoUnlock unlocker(&lock_);
507     control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
508   }
509 
510   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
511 }
512 
AttachEndpointClient(const ScopedInterfaceEndpointHandle & handle,InterfaceEndpointClient * client,scoped_refptr<base::SingleThreadTaskRunner> runner)513 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
514     const ScopedInterfaceEndpointHandle& handle,
515     InterfaceEndpointClient* client,
516     scoped_refptr<base::SingleThreadTaskRunner> runner) {
517   const InterfaceId id = handle.id();
518 
519   DCHECK(IsValidInterfaceId(id));
520   DCHECK(client);
521 
522   MayAutoLock locker(&lock_);
523   DCHECK(base::ContainsKey(endpoints_, id));
524 
525   InterfaceEndpoint* endpoint = endpoints_[id].get();
526   endpoint->AttachClient(client, std::move(runner));
527 
528   if (endpoint->peer_closed())
529     tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
530   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
531 
532   return endpoint;
533 }
534 
DetachEndpointClient(const ScopedInterfaceEndpointHandle & handle)535 void MultiplexRouter::DetachEndpointClient(
536     const ScopedInterfaceEndpointHandle& handle) {
537   const InterfaceId id = handle.id();
538 
539   DCHECK(IsValidInterfaceId(id));
540 
541   MayAutoLock locker(&lock_);
542   DCHECK(base::ContainsKey(endpoints_, id));
543 
544   InterfaceEndpoint* endpoint = endpoints_[id].get();
545   endpoint->DetachClient();
546 }
547 
RaiseError()548 void MultiplexRouter::RaiseError() {
549   if (task_runner_->BelongsToCurrentThread()) {
550     connector_.RaiseError();
551   } else {
552     task_runner_->PostTask(FROM_HERE,
553                            base::Bind(&MultiplexRouter::RaiseError, this));
554   }
555 }
556 
CloseMessagePipe()557 void MultiplexRouter::CloseMessagePipe() {
558   DCHECK(thread_checker_.CalledOnValidThread());
559   connector_.CloseMessagePipe();
560   // CloseMessagePipe() above won't trigger connection error handler.
561   // Explicitly call OnPipeConnectionError() so that associated endpoints will
562   // get notified.
563   OnPipeConnectionError();
564 }
565 
PauseIncomingMethodCallProcessing()566 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
567   DCHECK(thread_checker_.CalledOnValidThread());
568   connector_.PauseIncomingMethodCallProcessing();
569 
570   MayAutoLock locker(&lock_);
571   paused_ = true;
572 
573   for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter)
574     iter->second->ResetSyncMessageSignal();
575 }
576 
ResumeIncomingMethodCallProcessing()577 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
578   DCHECK(thread_checker_.CalledOnValidThread());
579   connector_.ResumeIncomingMethodCallProcessing();
580 
581   MayAutoLock locker(&lock_);
582   paused_ = false;
583 
584   for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
585     auto sync_iter = sync_message_tasks_.find(iter->first);
586     if (iter->second->peer_closed() ||
587         (sync_iter != sync_message_tasks_.end() &&
588          !sync_iter->second.empty())) {
589       iter->second->SignalSyncMessageEvent();
590     }
591   }
592 
593   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
594 }
595 
HasAssociatedEndpoints() const596 bool MultiplexRouter::HasAssociatedEndpoints() const {
597   DCHECK(thread_checker_.CalledOnValidThread());
598   MayAutoLock locker(&lock_);
599 
600   if (endpoints_.size() > 1)
601     return true;
602   if (endpoints_.size() == 0)
603     return false;
604 
605   return !base::ContainsKey(endpoints_, kMasterInterfaceId);
606 }
607 
EnableTestingMode()608 void MultiplexRouter::EnableTestingMode() {
609   DCHECK(thread_checker_.CalledOnValidThread());
610   MayAutoLock locker(&lock_);
611 
612   testing_mode_ = true;
613   connector_.set_enforce_errors_from_incoming_receiver(false);
614 }
615 
Accept(Message * message)616 bool MultiplexRouter::Accept(Message* message) {
617   DCHECK(thread_checker_.CalledOnValidThread());
618 
619   if (!message->DeserializeAssociatedEndpointHandles(this))
620     return false;
621 
622   scoped_refptr<MultiplexRouter> protector(this);
623   MayAutoLock locker(&lock_);
624 
625   DCHECK(!paused_);
626 
627   ClientCallBehavior client_call_behavior =
628       connector_.during_sync_handle_watcher_callback()
629           ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
630           : ALLOW_DIRECT_CLIENT_CALLS;
631 
632   bool processed =
633       tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
634                                                connector_.task_runner());
635 
636   if (!processed) {
637     // Either the task queue is not empty or we cannot process the message
638     // directly. In both cases, there is no need to call ProcessTasks().
639     tasks_.push_back(
640         Task::CreateMessageTask(MessageWrapper(this, std::move(*message))));
641     Task* task = tasks_.back().get();
642 
643     if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
644       InterfaceId id = task->message_wrapper.value().interface_id();
645       sync_message_tasks_[id].push_back(task);
646       InterfaceEndpoint* endpoint = FindEndpoint(id);
647       if (endpoint)
648         endpoint->SignalSyncMessageEvent();
649     }
650   } else if (!tasks_.empty()) {
651     // Processing the message may result in new tasks (for error notification)
652     // being added to the queue. In this case, we have to attempt to process the
653     // tasks.
654     ProcessTasks(client_call_behavior, connector_.task_runner());
655   }
656 
657   // Always return true. If we see errors during message processing, we will
658   // explicitly call Connector::RaiseError() to disconnect the message pipe.
659   return true;
660 }
661 
OnPeerAssociatedEndpointClosed(InterfaceId id,const base::Optional<DisconnectReason> & reason)662 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
663     InterfaceId id,
664     const base::Optional<DisconnectReason>& reason) {
665   DCHECK(!IsMasterInterfaceId(id) || reason);
666 
667   MayAutoLock locker(&lock_);
668   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
669 
670   if (reason)
671     endpoint->set_disconnect_reason(reason);
672 
673   // It is possible that this endpoint has been set as peer closed. That is
674   // because when the message pipe is closed, all the endpoints are updated with
675   // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
676   // as long as there are refs keeping the router alive. If there is a
677   // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
678   // here and see that the endpoint has been marked as peer closed.
679   if (!endpoint->peer_closed()) {
680     if (endpoint->client())
681       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
682     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
683   }
684 
685   // No need to trigger a ProcessTasks() because it is already on the stack.
686 
687   return true;
688 }
689 
OnPipeConnectionError()690 void MultiplexRouter::OnPipeConnectionError() {
691   DCHECK(thread_checker_.CalledOnValidThread());
692 
693   scoped_refptr<MultiplexRouter> protector(this);
694   MayAutoLock locker(&lock_);
695 
696   encountered_error_ = true;
697 
698   for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
699     InterfaceEndpoint* endpoint = iter->second.get();
700     // Increment the iterator before calling UpdateEndpointStateMayRemove()
701     // because it may remove the corresponding value from the map.
702     ++iter;
703 
704     if (endpoint->client())
705       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
706 
707     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
708   }
709 
710   ProcessTasks(connector_.during_sync_handle_watcher_callback()
711                    ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
712                    : ALLOW_DIRECT_CLIENT_CALLS,
713                connector_.task_runner());
714 }
715 
ProcessTasks(ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)716 void MultiplexRouter::ProcessTasks(
717     ClientCallBehavior client_call_behavior,
718     base::SingleThreadTaskRunner* current_task_runner) {
719   AssertLockAcquired();
720 
721   if (posted_to_process_tasks_)
722     return;
723 
724   while (!tasks_.empty() && !paused_) {
725     std::unique_ptr<Task> task(std::move(tasks_.front()));
726     tasks_.pop_front();
727 
728     InterfaceId id = kInvalidInterfaceId;
729     bool sync_message =
730         task->IsMessageTask() && !task->message_wrapper.value().IsNull() &&
731         task->message_wrapper.value().has_flag(Message::kFlagIsSync);
732     if (sync_message) {
733       id = task->message_wrapper.value().interface_id();
734       auto& sync_message_queue = sync_message_tasks_[id];
735       DCHECK_EQ(task.get(), sync_message_queue.front());
736       sync_message_queue.pop_front();
737     }
738 
739     bool processed =
740         task->IsNotifyErrorTask()
741             ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
742                                      current_task_runner)
743             : ProcessIncomingMessage(&task->message_wrapper.value(),
744                                      client_call_behavior, current_task_runner);
745 
746     if (!processed) {
747       if (sync_message) {
748         auto& sync_message_queue = sync_message_tasks_[id];
749         sync_message_queue.push_front(task.get());
750       }
751       tasks_.push_front(std::move(task));
752       break;
753     } else {
754       if (sync_message) {
755         auto iter = sync_message_tasks_.find(id);
756         if (iter != sync_message_tasks_.end() && iter->second.empty())
757           sync_message_tasks_.erase(iter);
758       }
759     }
760   }
761 }
762 
ProcessFirstSyncMessageForEndpoint(InterfaceId id)763 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
764   AssertLockAcquired();
765 
766   auto iter = sync_message_tasks_.find(id);
767   if (iter == sync_message_tasks_.end())
768     return false;
769 
770   if (paused_)
771     return true;
772 
773   MultiplexRouter::Task* task = iter->second.front();
774   iter->second.pop_front();
775 
776   DCHECK(task->IsMessageTask());
777   MessageWrapper message_wrapper = std::move(task->message_wrapper);
778 
779   // Note: after this call, |task| and |iter| may be invalidated.
780   bool processed = ProcessIncomingMessage(
781       &message_wrapper.value(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES,
782       nullptr);
783   DCHECK(processed);
784 
785   iter = sync_message_tasks_.find(id);
786   if (iter == sync_message_tasks_.end())
787     return false;
788 
789   if (iter->second.empty()) {
790     sync_message_tasks_.erase(iter);
791     return false;
792   }
793 
794   return true;
795 }
796 
ProcessNotifyErrorTask(Task * task,ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)797 bool MultiplexRouter::ProcessNotifyErrorTask(
798     Task* task,
799     ClientCallBehavior client_call_behavior,
800     base::SingleThreadTaskRunner* current_task_runner) {
801   DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
802   DCHECK(!paused_);
803 
804   AssertLockAcquired();
805   InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
806   if (!endpoint->client())
807     return true;
808 
809   if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
810       endpoint->task_runner() != current_task_runner) {
811     MaybePostToProcessTasks(endpoint->task_runner());
812     return false;
813   }
814 
815   DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
816 
817   InterfaceEndpointClient* client = endpoint->client();
818   base::Optional<DisconnectReason> disconnect_reason(
819       endpoint->disconnect_reason());
820 
821   {
822     // We must unlock before calling into |client| because it may call this
823     // object within NotifyError(). Holding the lock will lead to deadlock.
824     //
825     // It is safe to call into |client| without the lock. Because |client| is
826     // always accessed on the same thread, including DetachEndpointClient().
827     MayAutoUnlock unlocker(&lock_);
828     client->NotifyError(disconnect_reason);
829   }
830   return true;
831 }
832 
ProcessIncomingMessage(Message * message,ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)833 bool MultiplexRouter::ProcessIncomingMessage(
834     Message* message,
835     ClientCallBehavior client_call_behavior,
836     base::SingleThreadTaskRunner* current_task_runner) {
837   DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
838   DCHECK(!paused_);
839   DCHECK(message);
840   AssertLockAcquired();
841 
842   if (message->IsNull()) {
843     // This is a sync message and has been processed during sync handle
844     // watching.
845     return true;
846   }
847 
848   if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
849     bool result = false;
850 
851     {
852       MayAutoUnlock unlocker(&lock_);
853       result = control_message_handler_.Accept(message);
854     }
855 
856     if (!result)
857       RaiseErrorInNonTestingMode();
858 
859     return true;
860   }
861 
862   InterfaceId id = message->interface_id();
863   DCHECK(IsValidInterfaceId(id));
864 
865   InterfaceEndpoint* endpoint = FindEndpoint(id);
866   if (!endpoint || endpoint->closed())
867     return true;
868 
869   if (!endpoint->client()) {
870     // We need to wait until a client is attached in order to dispatch further
871     // messages.
872     return false;
873   }
874 
875   bool can_direct_call;
876   if (message->has_flag(Message::kFlagIsSync)) {
877     can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
878                       endpoint->task_runner()->BelongsToCurrentThread();
879   } else {
880     can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
881                       endpoint->task_runner() == current_task_runner;
882   }
883 
884   if (!can_direct_call) {
885     MaybePostToProcessTasks(endpoint->task_runner());
886     return false;
887   }
888 
889   DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
890 
891   InterfaceEndpointClient* client = endpoint->client();
892   bool result = false;
893   {
894     // We must unlock before calling into |client| because it may call this
895     // object within HandleIncomingMessage(). Holding the lock will lead to
896     // deadlock.
897     //
898     // It is safe to call into |client| without the lock. Because |client| is
899     // always accessed on the same thread, including DetachEndpointClient().
900     MayAutoUnlock unlocker(&lock_);
901     result = client->HandleIncomingMessage(message);
902   }
903   if (!result)
904     RaiseErrorInNonTestingMode();
905 
906   return true;
907 }
908 
MaybePostToProcessTasks(base::SingleThreadTaskRunner * task_runner)909 void MultiplexRouter::MaybePostToProcessTasks(
910     base::SingleThreadTaskRunner* task_runner) {
911   AssertLockAcquired();
912   if (posted_to_process_tasks_)
913     return;
914 
915   posted_to_process_tasks_ = true;
916   posted_to_task_runner_ = task_runner;
917   task_runner->PostTask(
918       FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
919 }
920 
LockAndCallProcessTasks()921 void MultiplexRouter::LockAndCallProcessTasks() {
922   // There is no need to hold a ref to this class in this case because this is
923   // always called using base::Bind(), which holds a ref.
924   MayAutoLock locker(&lock_);
925   posted_to_process_tasks_ = false;
926   scoped_refptr<base::SingleThreadTaskRunner> runner(
927       std::move(posted_to_task_runner_));
928   ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
929 }
930 
UpdateEndpointStateMayRemove(InterfaceEndpoint * endpoint,EndpointStateUpdateType type)931 void MultiplexRouter::UpdateEndpointStateMayRemove(
932     InterfaceEndpoint* endpoint,
933     EndpointStateUpdateType type) {
934   if (type == ENDPOINT_CLOSED) {
935     endpoint->set_closed();
936   } else {
937     endpoint->set_peer_closed();
938     // If the interface endpoint is performing a sync watch, this makes sure
939     // it is notified and eventually exits the sync watch.
940     endpoint->SignalSyncMessageEvent();
941   }
942   if (endpoint->closed() && endpoint->peer_closed())
943     endpoints_.erase(endpoint->id());
944 }
945 
RaiseErrorInNonTestingMode()946 void MultiplexRouter::RaiseErrorInNonTestingMode() {
947   AssertLockAcquired();
948   if (!testing_mode_)
949     RaiseError();
950 }
951 
FindOrInsertEndpoint(InterfaceId id,bool * inserted)952 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
953     InterfaceId id,
954     bool* inserted) {
955   AssertLockAcquired();
956   // Either |inserted| is nullptr or it points to a boolean initialized as
957   // false.
958   DCHECK(!inserted || !*inserted);
959 
960   InterfaceEndpoint* endpoint = FindEndpoint(id);
961   if (!endpoint) {
962     endpoint = new InterfaceEndpoint(this, id);
963     endpoints_[id] = endpoint;
964     if (inserted)
965       *inserted = true;
966   }
967 
968   return endpoint;
969 }
970 
FindEndpoint(InterfaceId id)971 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint(
972     InterfaceId id) {
973   AssertLockAcquired();
974   auto iter = endpoints_.find(id);
975   return iter != endpoints_.end() ? iter->second.get() : nullptr;
976 }
977 
AssertLockAcquired()978 void MultiplexRouter::AssertLockAcquired() {
979 #if DCHECK_IS_ON()
980   if (lock_)
981     lock_->AssertAcquired();
982 #endif
983 }
984 
985 }  // namespace internal
986 }  // namespace mojo
987