1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/connector.h"
6
7 #include <stdint.h>
8 #include <utility>
9
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/synchronization/lock.h"
15 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
16 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
17
18 namespace mojo {
19
Connector(ScopedMessagePipeHandle message_pipe,ConnectorConfig config,scoped_refptr<base::SingleThreadTaskRunner> runner)20 Connector::Connector(ScopedMessagePipeHandle message_pipe,
21 ConnectorConfig config,
22 scoped_refptr<base::SingleThreadTaskRunner> runner)
23 : message_pipe_(std::move(message_pipe)),
24 task_runner_(std::move(runner)),
25 weak_factory_(this) {
26 if (config == MULTI_THREADED_SEND)
27 lock_.emplace();
28
29 weak_self_ = weak_factory_.GetWeakPtr();
30 // Even though we don't have an incoming receiver, we still want to monitor
31 // the message pipe to know if is closed or encounters an error.
32 WaitToReadMore();
33 }
34
~Connector()35 Connector::~Connector() {
36 {
37 // Allow for quick destruction on any thread if the pipe is already closed.
38 base::AutoLock lock(connected_lock_);
39 if (!connected_)
40 return;
41 }
42
43 DCHECK(thread_checker_.CalledOnValidThread());
44 CancelWait();
45 }
46
CloseMessagePipe()47 void Connector::CloseMessagePipe() {
48 // Throw away the returned message pipe.
49 PassMessagePipe();
50 }
51
PassMessagePipe()52 ScopedMessagePipeHandle Connector::PassMessagePipe() {
53 DCHECK(thread_checker_.CalledOnValidThread());
54
55 CancelWait();
56 internal::MayAutoLock locker(&lock_);
57 ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
58 weak_factory_.InvalidateWeakPtrs();
59 sync_handle_watcher_callback_count_ = 0;
60
61 base::AutoLock lock(connected_lock_);
62 connected_ = false;
63 return message_pipe;
64 }
65
RaiseError()66 void Connector::RaiseError() {
67 DCHECK(thread_checker_.CalledOnValidThread());
68
69 HandleError(true, true);
70 }
71
WaitForIncomingMessage(MojoDeadline deadline)72 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
73 DCHECK(thread_checker_.CalledOnValidThread());
74
75 if (error_)
76 return false;
77
78 ResumeIncomingMethodCallProcessing();
79
80 MojoResult rv =
81 Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
82 if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED)
83 return false;
84 if (rv != MOJO_RESULT_OK) {
85 // Users that call WaitForIncomingMessage() should expect their code to be
86 // re-entered, so we call the error handler synchronously.
87 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
88 return false;
89 }
90 ignore_result(ReadSingleMessage(&rv));
91 return (rv == MOJO_RESULT_OK);
92 }
93
PauseIncomingMethodCallProcessing()94 void Connector::PauseIncomingMethodCallProcessing() {
95 DCHECK(thread_checker_.CalledOnValidThread());
96
97 if (paused_)
98 return;
99
100 paused_ = true;
101 CancelWait();
102 }
103
ResumeIncomingMethodCallProcessing()104 void Connector::ResumeIncomingMethodCallProcessing() {
105 DCHECK(thread_checker_.CalledOnValidThread());
106
107 if (!paused_)
108 return;
109
110 paused_ = false;
111 WaitToReadMore();
112 }
113
Accept(Message * message)114 bool Connector::Accept(Message* message) {
115 DCHECK(lock_ || thread_checker_.CalledOnValidThread());
116
117 // It shouldn't hurt even if |error_| may be changed by a different thread at
118 // the same time. The outcome is that we may write into |message_pipe_| after
119 // encountering an error, which should be fine.
120 if (error_)
121 return false;
122
123 internal::MayAutoLock locker(&lock_);
124
125 if (!message_pipe_.is_valid() || drop_writes_)
126 return true;
127
128 MojoResult rv =
129 WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
130 MOJO_WRITE_MESSAGE_FLAG_NONE);
131
132 switch (rv) {
133 case MOJO_RESULT_OK:
134 break;
135 case MOJO_RESULT_FAILED_PRECONDITION:
136 // There's no point in continuing to write to this pipe since the other
137 // end is gone. Avoid writing any future messages. Hide write failures
138 // from the caller since we'd like them to continue consuming any backlog
139 // of incoming messages before regarding the message pipe as closed.
140 drop_writes_ = true;
141 break;
142 case MOJO_RESULT_BUSY:
143 // We'd get a "busy" result if one of the message's handles is:
144 // - |message_pipe_|'s own handle;
145 // - simultaneously being used on another thread; or
146 // - in a "busy" state that prohibits it from being transferred (e.g.,
147 // a data pipe handle in the middle of a two-phase read/write,
148 // regardless of which thread that two-phase read/write is happening
149 // on).
150 // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
151 // crbug.com/389666, etc. are resolved, this will make tests fail quickly
152 // rather than hanging.)
153 CHECK(false) << "Race condition or other bug detected";
154 return false;
155 default:
156 // This particular write was rejected, presumably because of bad input.
157 // The pipe is not necessarily in a bad state.
158 return false;
159 }
160 return true;
161 }
162
AllowWokenUpBySyncWatchOnSameThread()163 void Connector::AllowWokenUpBySyncWatchOnSameThread() {
164 DCHECK(thread_checker_.CalledOnValidThread());
165
166 allow_woken_up_by_others_ = true;
167
168 EnsureSyncWatcherExists();
169 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
170 }
171
SyncWatch(const bool * should_stop)172 bool Connector::SyncWatch(const bool* should_stop) {
173 DCHECK(thread_checker_.CalledOnValidThread());
174
175 if (error_)
176 return false;
177
178 ResumeIncomingMethodCallProcessing();
179
180 EnsureSyncWatcherExists();
181 return sync_watcher_->SyncWatch(should_stop);
182 }
183
SetWatcherHeapProfilerTag(const char * tag)184 void Connector::SetWatcherHeapProfilerTag(const char* tag) {
185 heap_profiler_tag_ = tag;
186 if (handle_watcher_) {
187 handle_watcher_->set_heap_profiler_tag(tag);
188 }
189 }
190
OnWatcherHandleReady(MojoResult result)191 void Connector::OnWatcherHandleReady(MojoResult result) {
192 OnHandleReadyInternal(result);
193 }
194
OnSyncHandleWatcherHandleReady(MojoResult result)195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
196 base::WeakPtr<Connector> weak_self(weak_self_);
197
198 sync_handle_watcher_callback_count_++;
199 OnHandleReadyInternal(result);
200 // At this point, this object might have been deleted.
201 if (weak_self) {
202 DCHECK_LT(0u, sync_handle_watcher_callback_count_);
203 sync_handle_watcher_callback_count_--;
204 }
205 }
206
OnHandleReadyInternal(MojoResult result)207 void Connector::OnHandleReadyInternal(MojoResult result) {
208 DCHECK(thread_checker_.CalledOnValidThread());
209
210 if (result != MOJO_RESULT_OK) {
211 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
212 return;
213 }
214 ReadAllAvailableMessages();
215 // At this point, this object might have been deleted. Return.
216 }
217
WaitToReadMore()218 void Connector::WaitToReadMore() {
219 CHECK(!paused_);
220 DCHECK(!handle_watcher_);
221
222 handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_));
223 if (heap_profiler_tag_)
224 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
225 MojoResult rv = handle_watcher_->Start(
226 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
227 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
228
229 if (rv != MOJO_RESULT_OK) {
230 // If the watch failed because the handle is invalid or its conditions can
231 // no longer be met, we signal the error asynchronously to avoid reentry.
232 task_runner_->PostTask(
233 FROM_HERE,
234 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
235 }
236
237 if (allow_woken_up_by_others_) {
238 EnsureSyncWatcherExists();
239 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
240 }
241 }
242
ReadSingleMessage(MojoResult * read_result)243 bool Connector::ReadSingleMessage(MojoResult* read_result) {
244 CHECK(!paused_);
245
246 bool receiver_result = false;
247
248 // Detect if |this| was destroyed or the message pipe was closed/transferred
249 // during message dispatch.
250 base::WeakPtr<Connector> weak_self = weak_self_;
251
252 Message message;
253 const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
254 *read_result = rv;
255
256 if (rv == MOJO_RESULT_OK) {
257 receiver_result =
258 incoming_receiver_ && incoming_receiver_->Accept(&message);
259 }
260
261 if (!weak_self)
262 return false;
263
264 if (rv == MOJO_RESULT_SHOULD_WAIT)
265 return true;
266
267 if (rv != MOJO_RESULT_OK) {
268 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
269 return false;
270 }
271
272 if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
273 HandleError(true, false);
274 return false;
275 }
276 return true;
277 }
278
ReadAllAvailableMessages()279 void Connector::ReadAllAvailableMessages() {
280 while (!error_) {
281 MojoResult rv;
282
283 if (!ReadSingleMessage(&rv)) {
284 // Return immediately without touching any members. |this| may have been
285 // destroyed.
286 return;
287 }
288
289 if (paused_)
290 return;
291
292 if (rv == MOJO_RESULT_SHOULD_WAIT)
293 break;
294 }
295 }
296
CancelWait()297 void Connector::CancelWait() {
298 handle_watcher_.reset();
299 sync_watcher_.reset();
300 }
301
HandleError(bool force_pipe_reset,bool force_async_handler)302 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
303 if (error_ || !message_pipe_.is_valid())
304 return;
305
306 if (paused_) {
307 // Enforce calling the error handler asynchronously if the user has paused
308 // receiving messages. We need to wait until the user starts receiving
309 // messages again.
310 force_async_handler = true;
311 }
312
313 if (!force_pipe_reset && force_async_handler)
314 force_pipe_reset = true;
315
316 if (force_pipe_reset) {
317 CancelWait();
318 internal::MayAutoLock locker(&lock_);
319 message_pipe_.reset();
320 MessagePipe dummy_pipe;
321 message_pipe_ = std::move(dummy_pipe.handle0);
322 } else {
323 CancelWait();
324 }
325
326 if (force_async_handler) {
327 if (!paused_)
328 WaitToReadMore();
329 } else {
330 error_ = true;
331 if (!connection_error_handler_.is_null())
332 connection_error_handler_.Run();
333 }
334 }
335
EnsureSyncWatcherExists()336 void Connector::EnsureSyncWatcherExists() {
337 if (sync_watcher_)
338 return;
339 sync_watcher_.reset(new SyncHandleWatcher(
340 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
341 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
342 base::Unretained(this))));
343 }
344
345 } // namespace mojo
346