1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/connector.h"
6
7 #include <stdint.h>
8 #include <utility>
9
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/synchronization/lock.h"
15 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
16
17 namespace mojo {
18
19 namespace {
20
21 // Similar to base::AutoLock, except that it does nothing if |lock| passed into
22 // the constructor is null.
23 class MayAutoLock {
24 public:
MayAutoLock(base::Lock * lock)25 explicit MayAutoLock(base::Lock* lock) : lock_(lock) {
26 if (lock_)
27 lock_->Acquire();
28 }
29
~MayAutoLock()30 ~MayAutoLock() {
31 if (lock_) {
32 lock_->AssertAcquired();
33 lock_->Release();
34 }
35 }
36
37 private:
38 base::Lock* lock_;
39 DISALLOW_COPY_AND_ASSIGN(MayAutoLock);
40 };
41
42 } // namespace
43
44 // ----------------------------------------------------------------------------
45
Connector(ScopedMessagePipeHandle message_pipe,ConnectorConfig config,scoped_refptr<base::SingleThreadTaskRunner> runner)46 Connector::Connector(ScopedMessagePipeHandle message_pipe,
47 ConnectorConfig config,
48 scoped_refptr<base::SingleThreadTaskRunner> runner)
49 : message_pipe_(std::move(message_pipe)),
50 incoming_receiver_(nullptr),
51 task_runner_(std::move(runner)),
52 handle_watcher_(task_runner_),
53 error_(false),
54 drop_writes_(false),
55 enforce_errors_from_incoming_receiver_(true),
56 paused_(false),
57 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr),
58 allow_woken_up_by_others_(false),
59 sync_handle_watcher_callback_count_(0),
60 weak_factory_(this) {
61 weak_self_ = weak_factory_.GetWeakPtr();
62 // Even though we don't have an incoming receiver, we still want to monitor
63 // the message pipe to know if is closed or encounters an error.
64 WaitToReadMore();
65 }
66
~Connector()67 Connector::~Connector() {
68 DCHECK(thread_checker_.CalledOnValidThread());
69
70 CancelWait();
71 }
72
CloseMessagePipe()73 void Connector::CloseMessagePipe() {
74 DCHECK(thread_checker_.CalledOnValidThread());
75
76 CancelWait();
77 MayAutoLock locker(lock_.get());
78 message_pipe_.reset();
79 }
80
PassMessagePipe()81 ScopedMessagePipeHandle Connector::PassMessagePipe() {
82 DCHECK(thread_checker_.CalledOnValidThread());
83
84 CancelWait();
85 MayAutoLock locker(lock_.get());
86 return std::move(message_pipe_);
87 }
88
RaiseError()89 void Connector::RaiseError() {
90 DCHECK(thread_checker_.CalledOnValidThread());
91
92 HandleError(true, true);
93 }
94
WaitForIncomingMessage(MojoDeadline deadline)95 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
96 DCHECK(thread_checker_.CalledOnValidThread());
97
98 if (error_)
99 return false;
100
101 ResumeIncomingMethodCallProcessing();
102
103 MojoResult rv =
104 Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
105 if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED)
106 return false;
107 if (rv != MOJO_RESULT_OK) {
108 // Users that call WaitForIncomingMessage() should expect their code to be
109 // re-entered, so we call the error handler synchronously.
110 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
111 return false;
112 }
113 ignore_result(ReadSingleMessage(&rv));
114 return (rv == MOJO_RESULT_OK);
115 }
116
PauseIncomingMethodCallProcessing()117 void Connector::PauseIncomingMethodCallProcessing() {
118 DCHECK(thread_checker_.CalledOnValidThread());
119
120 if (paused_)
121 return;
122
123 paused_ = true;
124 CancelWait();
125 }
126
ResumeIncomingMethodCallProcessing()127 void Connector::ResumeIncomingMethodCallProcessing() {
128 DCHECK(thread_checker_.CalledOnValidThread());
129
130 if (!paused_)
131 return;
132
133 paused_ = false;
134 WaitToReadMore();
135 }
136
Accept(Message * message)137 bool Connector::Accept(Message* message) {
138 DCHECK(lock_ || thread_checker_.CalledOnValidThread());
139
140 // It shouldn't hurt even if |error_| may be changed by a different thread at
141 // the same time. The outcome is that we may write into |message_pipe_| after
142 // encountering an error, which should be fine.
143 if (error_)
144 return false;
145
146 MayAutoLock locker(lock_.get());
147
148 if (!message_pipe_.is_valid() || drop_writes_)
149 return true;
150
151 MojoResult rv =
152 WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
153 MOJO_WRITE_MESSAGE_FLAG_NONE);
154
155 switch (rv) {
156 case MOJO_RESULT_OK:
157 break;
158 case MOJO_RESULT_FAILED_PRECONDITION:
159 // There's no point in continuing to write to this pipe since the other
160 // end is gone. Avoid writing any future messages. Hide write failures
161 // from the caller since we'd like them to continue consuming any backlog
162 // of incoming messages before regarding the message pipe as closed.
163 drop_writes_ = true;
164 break;
165 case MOJO_RESULT_BUSY:
166 // We'd get a "busy" result if one of the message's handles is:
167 // - |message_pipe_|'s own handle;
168 // - simultaneously being used on another thread; or
169 // - in a "busy" state that prohibits it from being transferred (e.g.,
170 // a data pipe handle in the middle of a two-phase read/write,
171 // regardless of which thread that two-phase read/write is happening
172 // on).
173 // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
174 // crbug.com/389666, etc. are resolved, this will make tests fail quickly
175 // rather than hanging.)
176 CHECK(false) << "Race condition or other bug detected";
177 return false;
178 default:
179 // This particular write was rejected, presumably because of bad input.
180 // The pipe is not necessarily in a bad state.
181 return false;
182 }
183 return true;
184 }
185
AllowWokenUpBySyncWatchOnSameThread()186 void Connector::AllowWokenUpBySyncWatchOnSameThread() {
187 DCHECK(thread_checker_.CalledOnValidThread());
188
189 allow_woken_up_by_others_ = true;
190
191 EnsureSyncWatcherExists();
192 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
193 }
194
SyncWatch(const bool * should_stop)195 bool Connector::SyncWatch(const bool* should_stop) {
196 DCHECK(thread_checker_.CalledOnValidThread());
197
198 if (error_)
199 return false;
200
201 ResumeIncomingMethodCallProcessing();
202
203 EnsureSyncWatcherExists();
204 return sync_watcher_->SyncWatch(should_stop);
205 }
206
OnWatcherHandleReady(MojoResult result)207 void Connector::OnWatcherHandleReady(MojoResult result) {
208 OnHandleReadyInternal(result);
209 }
210
OnSyncHandleWatcherHandleReady(MojoResult result)211 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
212 base::WeakPtr<Connector> weak_self(weak_self_);
213
214 sync_handle_watcher_callback_count_++;
215 OnHandleReadyInternal(result);
216 // At this point, this object might have been deleted.
217 if (weak_self)
218 sync_handle_watcher_callback_count_--;
219 }
220
OnHandleReadyInternal(MojoResult result)221 void Connector::OnHandleReadyInternal(MojoResult result) {
222 DCHECK(thread_checker_.CalledOnValidThread());
223
224 if (result != MOJO_RESULT_OK) {
225 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
226 return;
227 }
228 ReadAllAvailableMessages();
229 // At this point, this object might have been deleted. Return.
230 }
231
WaitToReadMore()232 void Connector::WaitToReadMore() {
233 CHECK(!paused_);
234 DCHECK(!handle_watcher_.IsWatching());
235
236 MojoResult rv = handle_watcher_.Start(
237 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
238 base::Bind(&Connector::OnWatcherHandleReady,
239 base::Unretained(this)));
240
241 if (rv != MOJO_RESULT_OK) {
242 // If the watch failed because the handle is invalid or its conditions can
243 // no longer be met, we signal the error asynchronously to avoid reentry.
244 task_runner_->PostTask(
245 FROM_HERE,
246 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
247 }
248
249 if (allow_woken_up_by_others_) {
250 EnsureSyncWatcherExists();
251 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
252 }
253 }
254
ReadSingleMessage(MojoResult * read_result)255 bool Connector::ReadSingleMessage(MojoResult* read_result) {
256 CHECK(!paused_);
257
258 bool receiver_result = false;
259
260 // Detect if |this| was destroyed during message dispatch. Allow for the
261 // possibility of re-entering ReadMore() through message dispatch.
262 base::WeakPtr<Connector> weak_self = weak_self_;
263
264 Message message;
265 const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
266 *read_result = rv;
267
268 if (rv == MOJO_RESULT_OK) {
269 receiver_result =
270 incoming_receiver_ && incoming_receiver_->Accept(&message);
271 }
272
273 if (!weak_self)
274 return false;
275
276 if (rv == MOJO_RESULT_SHOULD_WAIT)
277 return true;
278
279 if (rv != MOJO_RESULT_OK) {
280 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
281 return false;
282 }
283
284 if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
285 HandleError(true, false);
286 return false;
287 }
288 return true;
289 }
290
ReadAllAvailableMessages()291 void Connector::ReadAllAvailableMessages() {
292 while (!error_) {
293 MojoResult rv;
294
295 // Return immediately if |this| was destroyed. Do not touch any members!
296 if (!ReadSingleMessage(&rv))
297 return;
298
299 if (paused_)
300 return;
301
302 if (rv == MOJO_RESULT_SHOULD_WAIT)
303 break;
304 }
305 }
306
CancelWait()307 void Connector::CancelWait() {
308 handle_watcher_.Cancel();
309 sync_watcher_.reset();
310 }
311
HandleError(bool force_pipe_reset,bool force_async_handler)312 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
313 if (error_ || !message_pipe_.is_valid())
314 return;
315
316 if (paused_) {
317 // Enforce calling the error handler asynchronously if the user has paused
318 // receiving messages. We need to wait until the user starts receiving
319 // messages again.
320 force_async_handler = true;
321 }
322
323 if (!force_pipe_reset && force_async_handler)
324 force_pipe_reset = true;
325
326 if (force_pipe_reset) {
327 CancelWait();
328 MayAutoLock locker(lock_.get());
329 message_pipe_.reset();
330 MessagePipe dummy_pipe;
331 message_pipe_ = std::move(dummy_pipe.handle0);
332 } else {
333 CancelWait();
334 }
335
336 if (force_async_handler) {
337 if (!paused_)
338 WaitToReadMore();
339 } else {
340 error_ = true;
341 if (!connection_error_handler_.is_null())
342 connection_error_handler_.Run();
343 }
344 }
345
EnsureSyncWatcherExists()346 void Connector::EnsureSyncWatcherExists() {
347 if (sync_watcher_)
348 return;
349 sync_watcher_.reset(new SyncHandleWatcher(
350 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
351 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
352 base::Unretained(this))));
353 }
354
355 } // namespace mojo
356