• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/bindings/connector.h"
6 
7 #include <stdint.h>
8 
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/message_loop/message_loop_current.h"
16 #include "base/run_loop.h"
17 #include "base/synchronization/lock.h"
18 #include "base/threading/thread_local.h"
19 #include "base/trace_event/trace_event.h"
20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
21 #include "mojo/public/cpp/bindings/mojo_buildflags.h"
22 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
23 #include "mojo/public/cpp/system/wait.h"
24 
25 #if defined(ENABLE_IPC_FUZZER)
26 #include "mojo/public/cpp/bindings/message_dumper.h"
27 #endif
28 
29 namespace mojo {
30 
31 namespace {
32 
33 // The NestingObserver for each thread. Note that this is always a
34 // Connector::RunLoopNestingObserver; we use the base type here because that
35 // subclass is private to Connector.
36 base::LazyInstance<base::ThreadLocalPointer<base::RunLoop::NestingObserver>>::
37     Leaky g_tls_nesting_observer = LAZY_INSTANCE_INITIALIZER;
38 
39 // The default outgoing serialization mode for new Connectors.
40 Connector::OutgoingSerializationMode g_default_outgoing_serialization_mode =
41     Connector::OutgoingSerializationMode::kLazy;
42 
43 // The default incoming serialization mode for new Connectors.
44 Connector::IncomingSerializationMode g_default_incoming_serialization_mode =
45     Connector::IncomingSerializationMode::kDispatchAsIs;
46 
47 }  // namespace
48 
49 // Used to efficiently maintain a doubly-linked list of all Connectors
50 // currently dispatching on any given thread.
51 class Connector::ActiveDispatchTracker {
52  public:
53   explicit ActiveDispatchTracker(const base::WeakPtr<Connector>& connector);
54   ~ActiveDispatchTracker();
55 
56   void NotifyBeginNesting();
57 
58  private:
59   const base::WeakPtr<Connector> connector_;
60   RunLoopNestingObserver* const nesting_observer_;
61   ActiveDispatchTracker* outer_tracker_ = nullptr;
62   ActiveDispatchTracker* inner_tracker_ = nullptr;
63 
64   DISALLOW_COPY_AND_ASSIGN(ActiveDispatchTracker);
65 };
66 
67 // Watches the MessageLoop on the current thread. Notifies the current chain of
68 // ActiveDispatchTrackers when a nested run loop is started.
69 class Connector::RunLoopNestingObserver
70     : public base::RunLoop::NestingObserver,
71       public base::MessageLoopCurrent::DestructionObserver {
72  public:
RunLoopNestingObserver()73   RunLoopNestingObserver() {
74     base::RunLoop::AddNestingObserverOnCurrentThread(this);
75     base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
76   }
77 
~RunLoopNestingObserver()78   ~RunLoopNestingObserver() override {}
79 
80   // base::RunLoop::NestingObserver:
OnBeginNestedRunLoop()81   void OnBeginNestedRunLoop() override {
82     if (top_tracker_)
83       top_tracker_->NotifyBeginNesting();
84   }
85 
86   // base::MessageLoopCurrent::DestructionObserver:
WillDestroyCurrentMessageLoop()87   void WillDestroyCurrentMessageLoop() override {
88     base::RunLoop::RemoveNestingObserverOnCurrentThread(this);
89     base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
90     DCHECK_EQ(this, g_tls_nesting_observer.Get().Get());
91     g_tls_nesting_observer.Get().Set(nullptr);
92     delete this;
93   }
94 
GetForThread()95   static RunLoopNestingObserver* GetForThread() {
96     if (!base::MessageLoopCurrent::Get())
97       return nullptr;
98     auto* observer = static_cast<RunLoopNestingObserver*>(
99         g_tls_nesting_observer.Get().Get());
100     if (!observer) {
101       observer = new RunLoopNestingObserver;
102       g_tls_nesting_observer.Get().Set(observer);
103     }
104     return observer;
105   }
106 
107  private:
108   friend class ActiveDispatchTracker;
109 
110   ActiveDispatchTracker* top_tracker_ = nullptr;
111 
112   DISALLOW_COPY_AND_ASSIGN(RunLoopNestingObserver);
113 };
114 
ActiveDispatchTracker(const base::WeakPtr<Connector> & connector)115 Connector::ActiveDispatchTracker::ActiveDispatchTracker(
116     const base::WeakPtr<Connector>& connector)
117     : connector_(connector), nesting_observer_(connector_->nesting_observer_) {
118   DCHECK(nesting_observer_);
119   if (nesting_observer_->top_tracker_) {
120     outer_tracker_ = nesting_observer_->top_tracker_;
121     outer_tracker_->inner_tracker_ = this;
122   }
123   nesting_observer_->top_tracker_ = this;
124 }
125 
~ActiveDispatchTracker()126 Connector::ActiveDispatchTracker::~ActiveDispatchTracker() {
127   if (nesting_observer_->top_tracker_ == this)
128     nesting_observer_->top_tracker_ = outer_tracker_;
129   else if (inner_tracker_)
130     inner_tracker_->outer_tracker_ = outer_tracker_;
131   if (outer_tracker_)
132     outer_tracker_->inner_tracker_ = inner_tracker_;
133 }
134 
NotifyBeginNesting()135 void Connector::ActiveDispatchTracker::NotifyBeginNesting() {
136   if (connector_ && connector_->handle_watcher_)
137     connector_->handle_watcher_->ArmOrNotify();
138   if (outer_tracker_)
139     outer_tracker_->NotifyBeginNesting();
140 }
141 
Connector(ScopedMessagePipeHandle message_pipe,ConnectorConfig config,scoped_refptr<base::SequencedTaskRunner> runner)142 Connector::Connector(ScopedMessagePipeHandle message_pipe,
143                      ConnectorConfig config,
144                      scoped_refptr<base::SequencedTaskRunner> runner)
145     : message_pipe_(std::move(message_pipe)),
146       task_runner_(std::move(runner)),
147       error_(false),
148       outgoing_serialization_mode_(g_default_outgoing_serialization_mode),
149       incoming_serialization_mode_(g_default_incoming_serialization_mode),
150       nesting_observer_(RunLoopNestingObserver::GetForThread()),
151       weak_factory_(this) {
152   if (config == MULTI_THREADED_SEND)
153     lock_.emplace();
154 
155 #if defined(ENABLE_IPC_FUZZER)
156   if (!MessageDumper::GetMessageDumpDirectory().empty())
157     message_dumper_ = std::make_unique<MessageDumper>();
158 #endif
159 
160   weak_self_ = weak_factory_.GetWeakPtr();
161   // Even though we don't have an incoming receiver, we still want to monitor
162   // the message pipe to know if is closed or encounters an error.
163   WaitToReadMore();
164 }
165 
~Connector()166 Connector::~Connector() {
167   {
168     // Allow for quick destruction on any sequence if the pipe is already
169     // closed.
170     base::AutoLock lock(connected_lock_);
171     if (!connected_)
172       return;
173   }
174 
175   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
176   CancelWait();
177 }
178 
SetOutgoingSerializationMode(OutgoingSerializationMode mode)179 void Connector::SetOutgoingSerializationMode(OutgoingSerializationMode mode) {
180   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
181   outgoing_serialization_mode_ = mode;
182 }
183 
SetIncomingSerializationMode(IncomingSerializationMode mode)184 void Connector::SetIncomingSerializationMode(IncomingSerializationMode mode) {
185   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
186   incoming_serialization_mode_ = mode;
187 }
188 
CloseMessagePipe()189 void Connector::CloseMessagePipe() {
190   // Throw away the returned message pipe.
191   PassMessagePipe();
192 }
193 
PassMessagePipe()194 ScopedMessagePipeHandle Connector::PassMessagePipe() {
195   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
196 
197   CancelWait();
198   internal::MayAutoLock locker(&lock_);
199   ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
200   weak_factory_.InvalidateWeakPtrs();
201   sync_handle_watcher_callback_count_ = 0;
202 
203   base::AutoLock lock(connected_lock_);
204   connected_ = false;
205   return message_pipe;
206 }
207 
RaiseError()208 void Connector::RaiseError() {
209   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
210 
211   HandleError(true, true);
212 }
213 
WaitForIncomingMessage(MojoDeadline deadline)214 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
215   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
216 
217   if (error_)
218     return false;
219 
220   ResumeIncomingMethodCallProcessing();
221 
222   // TODO(rockot): Use a timed Wait here. Nobody uses anything but 0 or
223   // INDEFINITE deadlines at present, so we only support those.
224   DCHECK(deadline == 0 || deadline == MOJO_DEADLINE_INDEFINITE);
225 
226   MojoResult rv = MOJO_RESULT_UNKNOWN;
227   if (deadline == 0 && !message_pipe_->QuerySignalsState().readable())
228     return false;
229 
230   if (deadline == MOJO_DEADLINE_INDEFINITE) {
231     rv = Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE);
232     if (rv != MOJO_RESULT_OK) {
233       // Users that call WaitForIncomingMessage() should expect their code to be
234       // re-entered, so we call the error handler synchronously.
235       HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
236       return false;
237     }
238   }
239 
240   ignore_result(ReadSingleMessage(&rv));
241   return (rv == MOJO_RESULT_OK);
242 }
243 
PauseIncomingMethodCallProcessing()244 void Connector::PauseIncomingMethodCallProcessing() {
245   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
246 
247   if (paused_)
248     return;
249 
250   paused_ = true;
251   CancelWait();
252 }
253 
ResumeIncomingMethodCallProcessing()254 void Connector::ResumeIncomingMethodCallProcessing() {
255   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
256 
257   if (!paused_)
258     return;
259 
260   paused_ = false;
261   WaitToReadMore();
262 }
263 
PrefersSerializedMessages()264 bool Connector::PrefersSerializedMessages() {
265   if (outgoing_serialization_mode_ == OutgoingSerializationMode::kEager)
266     return true;
267   DCHECK_EQ(OutgoingSerializationMode::kLazy, outgoing_serialization_mode_);
268   return peer_remoteness_tracker_ &&
269          peer_remoteness_tracker_->last_known_state().peer_remote();
270 }
271 
Accept(Message * message)272 bool Connector::Accept(Message* message) {
273   if (!lock_)
274     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
275 
276   if (error_)
277     return false;
278 
279   internal::MayAutoLock locker(&lock_);
280 
281   if (!message_pipe_.is_valid() || drop_writes_)
282     return true;
283 
284 #if defined(ENABLE_IPC_FUZZER)
285   if (message_dumper_ && message->is_serialized()) {
286     bool dump_result = message_dumper_->Accept(message);
287     DCHECK(dump_result);
288   }
289 #endif
290 
291   MojoResult rv =
292       WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
293                       MOJO_WRITE_MESSAGE_FLAG_NONE);
294 
295   switch (rv) {
296     case MOJO_RESULT_OK:
297       break;
298     case MOJO_RESULT_FAILED_PRECONDITION:
299       // There's no point in continuing to write to this pipe since the other
300       // end is gone. Avoid writing any future messages. Hide write failures
301       // from the caller since we'd like them to continue consuming any backlog
302       // of incoming messages before regarding the message pipe as closed.
303       drop_writes_ = true;
304       break;
305     case MOJO_RESULT_BUSY:
306       // We'd get a "busy" result if one of the message's handles is:
307       //   - |message_pipe_|'s own handle;
308       //   - simultaneously being used on another sequence; or
309       //   - in a "busy" state that prohibits it from being transferred (e.g.,
310       //     a data pipe handle in the middle of a two-phase read/write,
311       //     regardless of which sequence that two-phase read/write is happening
312       //     on).
313       // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
314       // crbug.com/389666, etc. are resolved, this will make tests fail quickly
315       // rather than hanging.)
316       CHECK(false) << "Race condition or other bug detected";
317       return false;
318     default:
319       // This particular write was rejected, presumably because of bad input.
320       // The pipe is not necessarily in a bad state.
321       return false;
322   }
323   return true;
324 }
325 
AllowWokenUpBySyncWatchOnSameThread()326 void Connector::AllowWokenUpBySyncWatchOnSameThread() {
327   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
328 
329   allow_woken_up_by_others_ = true;
330 
331   EnsureSyncWatcherExists();
332   sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
333 }
334 
SyncWatch(const bool * should_stop)335 bool Connector::SyncWatch(const bool* should_stop) {
336   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
337 
338   if (error_)
339     return false;
340 
341   ResumeIncomingMethodCallProcessing();
342 
343   EnsureSyncWatcherExists();
344   return sync_watcher_->SyncWatch(should_stop);
345 }
346 
SetWatcherHeapProfilerTag(const char * tag)347 void Connector::SetWatcherHeapProfilerTag(const char* tag) {
348   if (tag) {
349     heap_profiler_tag_ = tag;
350     if (handle_watcher_)
351       handle_watcher_->set_heap_profiler_tag(tag);
352   }
353 }
354 
355 // static
OverrideDefaultSerializationBehaviorForTesting(OutgoingSerializationMode outgoing_mode,IncomingSerializationMode incoming_mode)356 void Connector::OverrideDefaultSerializationBehaviorForTesting(
357     OutgoingSerializationMode outgoing_mode,
358     IncomingSerializationMode incoming_mode) {
359   g_default_outgoing_serialization_mode = outgoing_mode;
360   g_default_incoming_serialization_mode = incoming_mode;
361 }
362 
OnWatcherHandleReady(MojoResult result)363 void Connector::OnWatcherHandleReady(MojoResult result) {
364   OnHandleReadyInternal(result);
365 }
366 
OnSyncHandleWatcherHandleReady(MojoResult result)367 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
368   base::WeakPtr<Connector> weak_self(weak_self_);
369 
370   sync_handle_watcher_callback_count_++;
371   OnHandleReadyInternal(result);
372   // At this point, this object might have been deleted.
373   if (weak_self) {
374     DCHECK_LT(0u, sync_handle_watcher_callback_count_);
375     sync_handle_watcher_callback_count_--;
376   }
377 }
378 
OnHandleReadyInternal(MojoResult result)379 void Connector::OnHandleReadyInternal(MojoResult result) {
380   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
381 
382   if (result != MOJO_RESULT_OK) {
383     HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
384     return;
385   }
386 
387   ReadAllAvailableMessages();
388   // At this point, this object might have been deleted. Return.
389 }
390 
WaitToReadMore()391 void Connector::WaitToReadMore() {
392   CHECK(!paused_);
393   DCHECK(!handle_watcher_);
394 
395   handle_watcher_.reset(new SimpleWatcher(
396       FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_));
397   handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
398   MojoResult rv = handle_watcher_->Watch(
399       message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
400       base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
401 
402   if (message_pipe_.is_valid()) {
403     peer_remoteness_tracker_.emplace(message_pipe_.get(),
404                                      MOJO_HANDLE_SIGNAL_PEER_REMOTE);
405   }
406 
407   if (rv != MOJO_RESULT_OK) {
408     // If the watch failed because the handle is invalid or its conditions can
409     // no longer be met, we signal the error asynchronously to avoid reentry.
410     task_runner_->PostTask(
411         FROM_HERE,
412         base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
413   } else {
414     handle_watcher_->ArmOrNotify();
415   }
416 
417   if (allow_woken_up_by_others_) {
418     EnsureSyncWatcherExists();
419     sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
420   }
421 }
422 
ReadSingleMessage(MojoResult * read_result)423 bool Connector::ReadSingleMessage(MojoResult* read_result) {
424   CHECK(!paused_);
425 
426   bool receiver_result = false;
427 
428   // Detect if |this| was destroyed or the message pipe was closed/transferred
429   // during message dispatch.
430   base::WeakPtr<Connector> weak_self = weak_self_;
431 
432   Message message;
433   const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
434   *read_result = rv;
435 
436   if (rv == MOJO_RESULT_OK) {
437     base::Optional<ActiveDispatchTracker> dispatch_tracker;
438     if (!is_dispatching_ && nesting_observer_) {
439       is_dispatching_ = true;
440       dispatch_tracker.emplace(weak_self);
441     }
442 
443     if (incoming_serialization_mode_ ==
444         IncomingSerializationMode::kSerializeBeforeDispatchForTesting) {
445       message.SerializeIfNecessary();
446     } else {
447       DCHECK_EQ(IncomingSerializationMode::kDispatchAsIs,
448                 incoming_serialization_mode_);
449     }
450 
451 #if !BUILDFLAG(MOJO_TRACE_ENABLED)
452     // This emits just full class name, and is inferior to mojo tracing.
453     TRACE_EVENT0("mojom", heap_profiler_tag_);
454 #endif
455 
456     receiver_result =
457         incoming_receiver_ && incoming_receiver_->Accept(&message);
458 
459     if (!weak_self)
460       return false;
461 
462     if (dispatch_tracker) {
463       is_dispatching_ = false;
464       dispatch_tracker.reset();
465     }
466   } else if (rv == MOJO_RESULT_SHOULD_WAIT) {
467     return true;
468   } else {
469     HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
470     return false;
471   }
472 
473   if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
474     HandleError(true, false);
475     return false;
476   }
477   return true;
478 }
479 
ReadAllAvailableMessages()480 void Connector::ReadAllAvailableMessages() {
481   while (!error_) {
482     base::WeakPtr<Connector> weak_self = weak_self_;
483     MojoResult rv;
484 
485     // May delete |this.|
486     if (!ReadSingleMessage(&rv))
487       return;
488 
489     if (!weak_self || paused_)
490       return;
491 
492     DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT);
493 
494     if (rv == MOJO_RESULT_SHOULD_WAIT) {
495       // Attempt to re-arm the Watcher.
496       MojoResult ready_result;
497       MojoResult arm_result = handle_watcher_->Arm(&ready_result);
498       if (arm_result == MOJO_RESULT_OK)
499         return;
500 
501       // The watcher is already ready to notify again.
502       DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result);
503 
504       if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) {
505         HandleError(false, false);
506         return;
507       }
508 
509       // There's more to read now, so we'll just keep looping.
510       DCHECK_EQ(MOJO_RESULT_OK, ready_result);
511     }
512   }
513 }
514 
CancelWait()515 void Connector::CancelWait() {
516   peer_remoteness_tracker_.reset();
517   handle_watcher_.reset();
518   sync_watcher_.reset();
519 }
520 
HandleError(bool force_pipe_reset,bool force_async_handler)521 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
522   if (error_ || !message_pipe_.is_valid())
523     return;
524 
525   if (paused_) {
526     // Enforce calling the error handler asynchronously if the user has paused
527     // receiving messages. We need to wait until the user starts receiving
528     // messages again.
529     force_async_handler = true;
530   }
531 
532   if (!force_pipe_reset && force_async_handler)
533     force_pipe_reset = true;
534 
535   if (force_pipe_reset) {
536     CancelWait();
537     internal::MayAutoLock locker(&lock_);
538     message_pipe_.reset();
539     MessagePipe dummy_pipe;
540     message_pipe_ = std::move(dummy_pipe.handle0);
541   } else {
542     CancelWait();
543   }
544 
545   if (force_async_handler) {
546     if (!paused_)
547       WaitToReadMore();
548   } else {
549     error_ = true;
550     if (connection_error_handler_)
551       std::move(connection_error_handler_).Run();
552   }
553 }
554 
EnsureSyncWatcherExists()555 void Connector::EnsureSyncWatcherExists() {
556   if (sync_watcher_)
557     return;
558   sync_watcher_.reset(new SyncHandleWatcher(
559       message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
560       base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
561                  base::Unretained(this))));
562 }
563 
564 }  // namespace mojo
565