1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/connector.h"
6
7 #include <stdint.h>
8
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/message_loop/message_loop_current.h"
16 #include "base/run_loop.h"
17 #include "base/synchronization/lock.h"
18 #include "base/threading/thread_local.h"
19 #include "base/trace_event/trace_event.h"
20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
21 #include "mojo/public/cpp/bindings/mojo_buildflags.h"
22 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
23 #include "mojo/public/cpp/system/wait.h"
24
25 #if defined(ENABLE_IPC_FUZZER)
26 #include "mojo/public/cpp/bindings/message_dumper.h"
27 #endif
28
29 namespace mojo {
30
31 namespace {
32
33 // The NestingObserver for each thread. Note that this is always a
34 // Connector::RunLoopNestingObserver; we use the base type here because that
35 // subclass is private to Connector.
36 base::LazyInstance<base::ThreadLocalPointer<base::RunLoop::NestingObserver>>::
37 Leaky g_tls_nesting_observer = LAZY_INSTANCE_INITIALIZER;
38
39 // The default outgoing serialization mode for new Connectors.
40 Connector::OutgoingSerializationMode g_default_outgoing_serialization_mode =
41 Connector::OutgoingSerializationMode::kLazy;
42
43 // The default incoming serialization mode for new Connectors.
44 Connector::IncomingSerializationMode g_default_incoming_serialization_mode =
45 Connector::IncomingSerializationMode::kDispatchAsIs;
46
47 } // namespace
48
49 // Used to efficiently maintain a doubly-linked list of all Connectors
50 // currently dispatching on any given thread.
51 class Connector::ActiveDispatchTracker {
52 public:
53 explicit ActiveDispatchTracker(const base::WeakPtr<Connector>& connector);
54 ~ActiveDispatchTracker();
55
56 void NotifyBeginNesting();
57
58 private:
59 const base::WeakPtr<Connector> connector_;
60 RunLoopNestingObserver* const nesting_observer_;
61 ActiveDispatchTracker* outer_tracker_ = nullptr;
62 ActiveDispatchTracker* inner_tracker_ = nullptr;
63
64 DISALLOW_COPY_AND_ASSIGN(ActiveDispatchTracker);
65 };
66
67 // Watches the MessageLoop on the current thread. Notifies the current chain of
68 // ActiveDispatchTrackers when a nested run loop is started.
69 class Connector::RunLoopNestingObserver
70 : public base::RunLoop::NestingObserver,
71 public base::MessageLoopCurrent::DestructionObserver {
72 public:
RunLoopNestingObserver()73 RunLoopNestingObserver() {
74 base::RunLoop::AddNestingObserverOnCurrentThread(this);
75 base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
76 }
77
~RunLoopNestingObserver()78 ~RunLoopNestingObserver() override {}
79
80 // base::RunLoop::NestingObserver:
OnBeginNestedRunLoop()81 void OnBeginNestedRunLoop() override {
82 if (top_tracker_)
83 top_tracker_->NotifyBeginNesting();
84 }
85
86 // base::MessageLoopCurrent::DestructionObserver:
WillDestroyCurrentMessageLoop()87 void WillDestroyCurrentMessageLoop() override {
88 base::RunLoop::RemoveNestingObserverOnCurrentThread(this);
89 base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
90 DCHECK_EQ(this, g_tls_nesting_observer.Get().Get());
91 g_tls_nesting_observer.Get().Set(nullptr);
92 delete this;
93 }
94
GetForThread()95 static RunLoopNestingObserver* GetForThread() {
96 if (!base::MessageLoopCurrent::Get())
97 return nullptr;
98 auto* observer = static_cast<RunLoopNestingObserver*>(
99 g_tls_nesting_observer.Get().Get());
100 if (!observer) {
101 observer = new RunLoopNestingObserver;
102 g_tls_nesting_observer.Get().Set(observer);
103 }
104 return observer;
105 }
106
107 private:
108 friend class ActiveDispatchTracker;
109
110 ActiveDispatchTracker* top_tracker_ = nullptr;
111
112 DISALLOW_COPY_AND_ASSIGN(RunLoopNestingObserver);
113 };
114
ActiveDispatchTracker(const base::WeakPtr<Connector> & connector)115 Connector::ActiveDispatchTracker::ActiveDispatchTracker(
116 const base::WeakPtr<Connector>& connector)
117 : connector_(connector), nesting_observer_(connector_->nesting_observer_) {
118 DCHECK(nesting_observer_);
119 if (nesting_observer_->top_tracker_) {
120 outer_tracker_ = nesting_observer_->top_tracker_;
121 outer_tracker_->inner_tracker_ = this;
122 }
123 nesting_observer_->top_tracker_ = this;
124 }
125
~ActiveDispatchTracker()126 Connector::ActiveDispatchTracker::~ActiveDispatchTracker() {
127 if (nesting_observer_->top_tracker_ == this)
128 nesting_observer_->top_tracker_ = outer_tracker_;
129 else if (inner_tracker_)
130 inner_tracker_->outer_tracker_ = outer_tracker_;
131 if (outer_tracker_)
132 outer_tracker_->inner_tracker_ = inner_tracker_;
133 }
134
NotifyBeginNesting()135 void Connector::ActiveDispatchTracker::NotifyBeginNesting() {
136 if (connector_ && connector_->handle_watcher_)
137 connector_->handle_watcher_->ArmOrNotify();
138 if (outer_tracker_)
139 outer_tracker_->NotifyBeginNesting();
140 }
141
Connector(ScopedMessagePipeHandle message_pipe,ConnectorConfig config,scoped_refptr<base::SequencedTaskRunner> runner)142 Connector::Connector(ScopedMessagePipeHandle message_pipe,
143 ConnectorConfig config,
144 scoped_refptr<base::SequencedTaskRunner> runner)
145 : message_pipe_(std::move(message_pipe)),
146 task_runner_(std::move(runner)),
147 error_(false),
148 outgoing_serialization_mode_(g_default_outgoing_serialization_mode),
149 incoming_serialization_mode_(g_default_incoming_serialization_mode),
150 nesting_observer_(RunLoopNestingObserver::GetForThread()),
151 weak_factory_(this) {
152 if (config == MULTI_THREADED_SEND)
153 lock_.emplace();
154
155 #if defined(ENABLE_IPC_FUZZER)
156 if (!MessageDumper::GetMessageDumpDirectory().empty())
157 message_dumper_ = std::make_unique<MessageDumper>();
158 #endif
159
160 weak_self_ = weak_factory_.GetWeakPtr();
161 // Even though we don't have an incoming receiver, we still want to monitor
162 // the message pipe to know if is closed or encounters an error.
163 WaitToReadMore();
164 }
165
~Connector()166 Connector::~Connector() {
167 {
168 // Allow for quick destruction on any sequence if the pipe is already
169 // closed.
170 base::AutoLock lock(connected_lock_);
171 if (!connected_)
172 return;
173 }
174
175 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
176 CancelWait();
177 }
178
SetOutgoingSerializationMode(OutgoingSerializationMode mode)179 void Connector::SetOutgoingSerializationMode(OutgoingSerializationMode mode) {
180 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
181 outgoing_serialization_mode_ = mode;
182 }
183
SetIncomingSerializationMode(IncomingSerializationMode mode)184 void Connector::SetIncomingSerializationMode(IncomingSerializationMode mode) {
185 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
186 incoming_serialization_mode_ = mode;
187 }
188
CloseMessagePipe()189 void Connector::CloseMessagePipe() {
190 // Throw away the returned message pipe.
191 PassMessagePipe();
192 }
193
PassMessagePipe()194 ScopedMessagePipeHandle Connector::PassMessagePipe() {
195 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
196
197 CancelWait();
198 internal::MayAutoLock locker(&lock_);
199 ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
200 weak_factory_.InvalidateWeakPtrs();
201 sync_handle_watcher_callback_count_ = 0;
202
203 base::AutoLock lock(connected_lock_);
204 connected_ = false;
205 return message_pipe;
206 }
207
RaiseError()208 void Connector::RaiseError() {
209 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
210
211 HandleError(true, true);
212 }
213
WaitForIncomingMessage(MojoDeadline deadline)214 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
215 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
216
217 if (error_)
218 return false;
219
220 ResumeIncomingMethodCallProcessing();
221
222 // TODO(rockot): Use a timed Wait here. Nobody uses anything but 0 or
223 // INDEFINITE deadlines at present, so we only support those.
224 DCHECK(deadline == 0 || deadline == MOJO_DEADLINE_INDEFINITE);
225
226 MojoResult rv = MOJO_RESULT_UNKNOWN;
227 if (deadline == 0 && !message_pipe_->QuerySignalsState().readable())
228 return false;
229
230 if (deadline == MOJO_DEADLINE_INDEFINITE) {
231 rv = Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE);
232 if (rv != MOJO_RESULT_OK) {
233 // Users that call WaitForIncomingMessage() should expect their code to be
234 // re-entered, so we call the error handler synchronously.
235 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
236 return false;
237 }
238 }
239
240 ignore_result(ReadSingleMessage(&rv));
241 return (rv == MOJO_RESULT_OK);
242 }
243
PauseIncomingMethodCallProcessing()244 void Connector::PauseIncomingMethodCallProcessing() {
245 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
246
247 if (paused_)
248 return;
249
250 paused_ = true;
251 CancelWait();
252 }
253
ResumeIncomingMethodCallProcessing()254 void Connector::ResumeIncomingMethodCallProcessing() {
255 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
256
257 if (!paused_)
258 return;
259
260 paused_ = false;
261 WaitToReadMore();
262 }
263
PrefersSerializedMessages()264 bool Connector::PrefersSerializedMessages() {
265 if (outgoing_serialization_mode_ == OutgoingSerializationMode::kEager)
266 return true;
267 DCHECK_EQ(OutgoingSerializationMode::kLazy, outgoing_serialization_mode_);
268 return peer_remoteness_tracker_ &&
269 peer_remoteness_tracker_->last_known_state().peer_remote();
270 }
271
Accept(Message * message)272 bool Connector::Accept(Message* message) {
273 if (!lock_)
274 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
275
276 if (error_)
277 return false;
278
279 internal::MayAutoLock locker(&lock_);
280
281 if (!message_pipe_.is_valid() || drop_writes_)
282 return true;
283
284 #if defined(ENABLE_IPC_FUZZER)
285 if (message_dumper_ && message->is_serialized()) {
286 bool dump_result = message_dumper_->Accept(message);
287 DCHECK(dump_result);
288 }
289 #endif
290
291 MojoResult rv =
292 WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
293 MOJO_WRITE_MESSAGE_FLAG_NONE);
294
295 switch (rv) {
296 case MOJO_RESULT_OK:
297 break;
298 case MOJO_RESULT_FAILED_PRECONDITION:
299 // There's no point in continuing to write to this pipe since the other
300 // end is gone. Avoid writing any future messages. Hide write failures
301 // from the caller since we'd like them to continue consuming any backlog
302 // of incoming messages before regarding the message pipe as closed.
303 drop_writes_ = true;
304 break;
305 case MOJO_RESULT_BUSY:
306 // We'd get a "busy" result if one of the message's handles is:
307 // - |message_pipe_|'s own handle;
308 // - simultaneously being used on another sequence; or
309 // - in a "busy" state that prohibits it from being transferred (e.g.,
310 // a data pipe handle in the middle of a two-phase read/write,
311 // regardless of which sequence that two-phase read/write is happening
312 // on).
313 // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
314 // crbug.com/389666, etc. are resolved, this will make tests fail quickly
315 // rather than hanging.)
316 CHECK(false) << "Race condition or other bug detected";
317 return false;
318 default:
319 // This particular write was rejected, presumably because of bad input.
320 // The pipe is not necessarily in a bad state.
321 return false;
322 }
323 return true;
324 }
325
AllowWokenUpBySyncWatchOnSameThread()326 void Connector::AllowWokenUpBySyncWatchOnSameThread() {
327 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
328
329 allow_woken_up_by_others_ = true;
330
331 EnsureSyncWatcherExists();
332 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
333 }
334
SyncWatch(const bool * should_stop)335 bool Connector::SyncWatch(const bool* should_stop) {
336 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
337
338 if (error_)
339 return false;
340
341 ResumeIncomingMethodCallProcessing();
342
343 EnsureSyncWatcherExists();
344 return sync_watcher_->SyncWatch(should_stop);
345 }
346
SetWatcherHeapProfilerTag(const char * tag)347 void Connector::SetWatcherHeapProfilerTag(const char* tag) {
348 if (tag) {
349 heap_profiler_tag_ = tag;
350 if (handle_watcher_)
351 handle_watcher_->set_heap_profiler_tag(tag);
352 }
353 }
354
355 // static
OverrideDefaultSerializationBehaviorForTesting(OutgoingSerializationMode outgoing_mode,IncomingSerializationMode incoming_mode)356 void Connector::OverrideDefaultSerializationBehaviorForTesting(
357 OutgoingSerializationMode outgoing_mode,
358 IncomingSerializationMode incoming_mode) {
359 g_default_outgoing_serialization_mode = outgoing_mode;
360 g_default_incoming_serialization_mode = incoming_mode;
361 }
362
OnWatcherHandleReady(MojoResult result)363 void Connector::OnWatcherHandleReady(MojoResult result) {
364 OnHandleReadyInternal(result);
365 }
366
OnSyncHandleWatcherHandleReady(MojoResult result)367 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
368 base::WeakPtr<Connector> weak_self(weak_self_);
369
370 sync_handle_watcher_callback_count_++;
371 OnHandleReadyInternal(result);
372 // At this point, this object might have been deleted.
373 if (weak_self) {
374 DCHECK_LT(0u, sync_handle_watcher_callback_count_);
375 sync_handle_watcher_callback_count_--;
376 }
377 }
378
OnHandleReadyInternal(MojoResult result)379 void Connector::OnHandleReadyInternal(MojoResult result) {
380 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
381
382 if (result != MOJO_RESULT_OK) {
383 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
384 return;
385 }
386
387 ReadAllAvailableMessages();
388 // At this point, this object might have been deleted. Return.
389 }
390
WaitToReadMore()391 void Connector::WaitToReadMore() {
392 CHECK(!paused_);
393 DCHECK(!handle_watcher_);
394
395 handle_watcher_.reset(new SimpleWatcher(
396 FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_));
397 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
398 MojoResult rv = handle_watcher_->Watch(
399 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
400 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
401
402 if (message_pipe_.is_valid()) {
403 peer_remoteness_tracker_.emplace(message_pipe_.get(),
404 MOJO_HANDLE_SIGNAL_PEER_REMOTE);
405 }
406
407 if (rv != MOJO_RESULT_OK) {
408 // If the watch failed because the handle is invalid or its conditions can
409 // no longer be met, we signal the error asynchronously to avoid reentry.
410 task_runner_->PostTask(
411 FROM_HERE,
412 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
413 } else {
414 handle_watcher_->ArmOrNotify();
415 }
416
417 if (allow_woken_up_by_others_) {
418 EnsureSyncWatcherExists();
419 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
420 }
421 }
422
ReadSingleMessage(MojoResult * read_result)423 bool Connector::ReadSingleMessage(MojoResult* read_result) {
424 CHECK(!paused_);
425
426 bool receiver_result = false;
427
428 // Detect if |this| was destroyed or the message pipe was closed/transferred
429 // during message dispatch.
430 base::WeakPtr<Connector> weak_self = weak_self_;
431
432 Message message;
433 const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
434 *read_result = rv;
435
436 if (rv == MOJO_RESULT_OK) {
437 base::Optional<ActiveDispatchTracker> dispatch_tracker;
438 if (!is_dispatching_ && nesting_observer_) {
439 is_dispatching_ = true;
440 dispatch_tracker.emplace(weak_self);
441 }
442
443 if (incoming_serialization_mode_ ==
444 IncomingSerializationMode::kSerializeBeforeDispatchForTesting) {
445 message.SerializeIfNecessary();
446 } else {
447 DCHECK_EQ(IncomingSerializationMode::kDispatchAsIs,
448 incoming_serialization_mode_);
449 }
450
451 #if !BUILDFLAG(MOJO_TRACE_ENABLED)
452 // This emits just full class name, and is inferior to mojo tracing.
453 TRACE_EVENT0("mojom", heap_profiler_tag_);
454 #endif
455
456 receiver_result =
457 incoming_receiver_ && incoming_receiver_->Accept(&message);
458
459 if (!weak_self)
460 return false;
461
462 if (dispatch_tracker) {
463 is_dispatching_ = false;
464 dispatch_tracker.reset();
465 }
466 } else if (rv == MOJO_RESULT_SHOULD_WAIT) {
467 return true;
468 } else {
469 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
470 return false;
471 }
472
473 if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
474 HandleError(true, false);
475 return false;
476 }
477 return true;
478 }
479
ReadAllAvailableMessages()480 void Connector::ReadAllAvailableMessages() {
481 while (!error_) {
482 base::WeakPtr<Connector> weak_self = weak_self_;
483 MojoResult rv;
484
485 // May delete |this.|
486 if (!ReadSingleMessage(&rv))
487 return;
488
489 if (!weak_self || paused_)
490 return;
491
492 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT);
493
494 if (rv == MOJO_RESULT_SHOULD_WAIT) {
495 // Attempt to re-arm the Watcher.
496 MojoResult ready_result;
497 MojoResult arm_result = handle_watcher_->Arm(&ready_result);
498 if (arm_result == MOJO_RESULT_OK)
499 return;
500
501 // The watcher is already ready to notify again.
502 DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result);
503
504 if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) {
505 HandleError(false, false);
506 return;
507 }
508
509 // There's more to read now, so we'll just keep looping.
510 DCHECK_EQ(MOJO_RESULT_OK, ready_result);
511 }
512 }
513 }
514
CancelWait()515 void Connector::CancelWait() {
516 peer_remoteness_tracker_.reset();
517 handle_watcher_.reset();
518 sync_watcher_.reset();
519 }
520
HandleError(bool force_pipe_reset,bool force_async_handler)521 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
522 if (error_ || !message_pipe_.is_valid())
523 return;
524
525 if (paused_) {
526 // Enforce calling the error handler asynchronously if the user has paused
527 // receiving messages. We need to wait until the user starts receiving
528 // messages again.
529 force_async_handler = true;
530 }
531
532 if (!force_pipe_reset && force_async_handler)
533 force_pipe_reset = true;
534
535 if (force_pipe_reset) {
536 CancelWait();
537 internal::MayAutoLock locker(&lock_);
538 message_pipe_.reset();
539 MessagePipe dummy_pipe;
540 message_pipe_ = std::move(dummy_pipe.handle0);
541 } else {
542 CancelWait();
543 }
544
545 if (force_async_handler) {
546 if (!paused_)
547 WaitToReadMore();
548 } else {
549 error_ = true;
550 if (connection_error_handler_)
551 std::move(connection_error_handler_).Run();
552 }
553 }
554
EnsureSyncWatcherExists()555 void Connector::EnsureSyncWatcherExists() {
556 if (sync_watcher_)
557 return;
558 sync_watcher_.reset(new SyncHandleWatcher(
559 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
560 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
561 base::Unretained(this))));
562 }
563
564 } // namespace mojo
565