1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/data_pipe.h"
6
7 #include <string.h>
8
9 #include <algorithm>
10 #include <limits>
11
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "mojo/system/constants.h"
15 #include "mojo/system/memory.h"
16 #include "mojo/system/options_validation.h"
17 #include "mojo/system/waiter_list.h"
18
19 namespace mojo {
20 namespace system {
21
22 // static
23 const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = {
24 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
25 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1u,
26 static_cast<uint32_t>(kDefaultDataPipeCapacityBytes)};
27
28 // static
ValidateCreateOptions(UserPointer<const MojoCreateDataPipeOptions> in_options,MojoCreateDataPipeOptions * out_options)29 MojoResult DataPipe::ValidateCreateOptions(
30 UserPointer<const MojoCreateDataPipeOptions> in_options,
31 MojoCreateDataPipeOptions* out_options) {
32 const MojoCreateDataPipeOptionsFlags kKnownFlags =
33 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD;
34
35 *out_options = kDefaultCreateOptions;
36 if (in_options.IsNull())
37 return MOJO_RESULT_OK;
38
39 UserOptionsReader<MojoCreateDataPipeOptions> reader(in_options);
40 if (!reader.is_valid())
41 return MOJO_RESULT_INVALID_ARGUMENT;
42
43 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, flags, reader))
44 return MOJO_RESULT_OK;
45 if ((reader.options().flags & ~kKnownFlags))
46 return MOJO_RESULT_UNIMPLEMENTED;
47 out_options->flags = reader.options().flags;
48
49 // Checks for fields beyond |flags|:
50
51 if (!OPTIONS_STRUCT_HAS_MEMBER(
52 MojoCreateDataPipeOptions, element_num_bytes, reader))
53 return MOJO_RESULT_OK;
54 if (reader.options().element_num_bytes == 0)
55 return MOJO_RESULT_INVALID_ARGUMENT;
56 out_options->element_num_bytes = reader.options().element_num_bytes;
57
58 if (!OPTIONS_STRUCT_HAS_MEMBER(
59 MojoCreateDataPipeOptions, capacity_num_bytes, reader) ||
60 reader.options().capacity_num_bytes == 0) {
61 // Round the default capacity down to a multiple of the element size (but at
62 // least one element).
63 out_options->capacity_num_bytes =
64 std::max(static_cast<uint32_t>(kDefaultDataPipeCapacityBytes -
65 (kDefaultDataPipeCapacityBytes %
66 out_options->element_num_bytes)),
67 out_options->element_num_bytes);
68 return MOJO_RESULT_OK;
69 }
70 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0)
71 return MOJO_RESULT_INVALID_ARGUMENT;
72 if (reader.options().capacity_num_bytes > kMaxDataPipeCapacityBytes)
73 return MOJO_RESULT_RESOURCE_EXHAUSTED;
74 out_options->capacity_num_bytes = reader.options().capacity_num_bytes;
75
76 return MOJO_RESULT_OK;
77 }
78
ProducerCancelAllWaiters()79 void DataPipe::ProducerCancelAllWaiters() {
80 base::AutoLock locker(lock_);
81 DCHECK(has_local_producer_no_lock());
82 producer_waiter_list_->CancelAllWaiters();
83 }
84
ProducerClose()85 void DataPipe::ProducerClose() {
86 base::AutoLock locker(lock_);
87 DCHECK(producer_open_);
88 producer_open_ = false;
89 DCHECK(has_local_producer_no_lock());
90 producer_waiter_list_.reset();
91 // Not a bug, except possibly in "user" code.
92 DVLOG_IF(2, producer_in_two_phase_write_no_lock())
93 << "Producer closed with active two-phase write";
94 producer_two_phase_max_num_bytes_written_ = 0;
95 ProducerCloseImplNoLock();
96 AwakeConsumerWaitersForStateChangeNoLock(
97 ConsumerGetHandleSignalsStateImplNoLock());
98 }
99
ProducerWriteData(UserPointer<const void> elements,UserPointer<uint32_t> num_bytes,bool all_or_none)100 MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements,
101 UserPointer<uint32_t> num_bytes,
102 bool all_or_none) {
103 base::AutoLock locker(lock_);
104 DCHECK(has_local_producer_no_lock());
105
106 if (producer_in_two_phase_write_no_lock())
107 return MOJO_RESULT_BUSY;
108 if (!consumer_open_no_lock())
109 return MOJO_RESULT_FAILED_PRECONDITION;
110
111 // Returning "busy" takes priority over "invalid argument".
112 uint32_t max_num_bytes_to_write = num_bytes.Get();
113 if (max_num_bytes_to_write % element_num_bytes_ != 0)
114 return MOJO_RESULT_INVALID_ARGUMENT;
115
116 if (max_num_bytes_to_write == 0)
117 return MOJO_RESULT_OK; // Nothing to do.
118
119 uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0;
120
121 HandleSignalsState old_consumer_state =
122 ConsumerGetHandleSignalsStateImplNoLock();
123 MojoResult rv = ProducerWriteDataImplNoLock(
124 elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write);
125 HandleSignalsState new_consumer_state =
126 ConsumerGetHandleSignalsStateImplNoLock();
127 if (!new_consumer_state.equals(old_consumer_state))
128 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
129 return rv;
130 }
131
ProducerBeginWriteData(UserPointer<void * > buffer,UserPointer<uint32_t> buffer_num_bytes,bool all_or_none)132 MojoResult DataPipe::ProducerBeginWriteData(
133 UserPointer<void*> buffer,
134 UserPointer<uint32_t> buffer_num_bytes,
135 bool all_or_none) {
136 base::AutoLock locker(lock_);
137 DCHECK(has_local_producer_no_lock());
138
139 if (producer_in_two_phase_write_no_lock())
140 return MOJO_RESULT_BUSY;
141 if (!consumer_open_no_lock())
142 return MOJO_RESULT_FAILED_PRECONDITION;
143
144 uint32_t min_num_bytes_to_write = 0;
145 if (all_or_none) {
146 min_num_bytes_to_write = buffer_num_bytes.Get();
147 if (min_num_bytes_to_write % element_num_bytes_ != 0)
148 return MOJO_RESULT_INVALID_ARGUMENT;
149 }
150
151 MojoResult rv = ProducerBeginWriteDataImplNoLock(
152 buffer, buffer_num_bytes, min_num_bytes_to_write);
153 if (rv != MOJO_RESULT_OK)
154 return rv;
155 // Note: No need to awake producer waiters, even though we're going from
156 // writable to non-writable (since you can't wait on non-writability).
157 // Similarly, though this may have discarded data (in "may discard" mode),
158 // making it non-readable, there's still no need to awake consumer waiters.
159 DCHECK(producer_in_two_phase_write_no_lock());
160 return MOJO_RESULT_OK;
161 }
162
ProducerEndWriteData(uint32_t num_bytes_written)163 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
164 base::AutoLock locker(lock_);
165 DCHECK(has_local_producer_no_lock());
166
167 if (!producer_in_two_phase_write_no_lock())
168 return MOJO_RESULT_FAILED_PRECONDITION;
169 // Note: Allow successful completion of the two-phase write even if the
170 // consumer has been closed.
171
172 HandleSignalsState old_consumer_state =
173 ConsumerGetHandleSignalsStateImplNoLock();
174 MojoResult rv;
175 if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
176 num_bytes_written % element_num_bytes_ != 0) {
177 rv = MOJO_RESULT_INVALID_ARGUMENT;
178 producer_two_phase_max_num_bytes_written_ = 0;
179 } else {
180 rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
181 }
182 // Two-phase write ended even on failure.
183 DCHECK(!producer_in_two_phase_write_no_lock());
184 // If we're now writable, we *became* writable (since we weren't writable
185 // during the two-phase write), so awake producer waiters.
186 HandleSignalsState new_producer_state =
187 ProducerGetHandleSignalsStateImplNoLock();
188 if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
189 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
190 HandleSignalsState new_consumer_state =
191 ConsumerGetHandleSignalsStateImplNoLock();
192 if (!new_consumer_state.equals(old_consumer_state))
193 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
194 return rv;
195 }
196
ProducerGetHandleSignalsState()197 HandleSignalsState DataPipe::ProducerGetHandleSignalsState() {
198 base::AutoLock locker(lock_);
199 DCHECK(has_local_producer_no_lock());
200 return ProducerGetHandleSignalsStateImplNoLock();
201 }
202
ProducerAddWaiter(Waiter * waiter,MojoHandleSignals signals,uint32_t context,HandleSignalsState * signals_state)203 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
204 MojoHandleSignals signals,
205 uint32_t context,
206 HandleSignalsState* signals_state) {
207 base::AutoLock locker(lock_);
208 DCHECK(has_local_producer_no_lock());
209
210 HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock();
211 if (producer_state.satisfies(signals)) {
212 if (signals_state)
213 *signals_state = producer_state;
214 return MOJO_RESULT_ALREADY_EXISTS;
215 }
216 if (!producer_state.can_satisfy(signals)) {
217 if (signals_state)
218 *signals_state = producer_state;
219 return MOJO_RESULT_FAILED_PRECONDITION;
220 }
221
222 producer_waiter_list_->AddWaiter(waiter, signals, context);
223 return MOJO_RESULT_OK;
224 }
225
ProducerRemoveWaiter(Waiter * waiter,HandleSignalsState * signals_state)226 void DataPipe::ProducerRemoveWaiter(Waiter* waiter,
227 HandleSignalsState* signals_state) {
228 base::AutoLock locker(lock_);
229 DCHECK(has_local_producer_no_lock());
230 producer_waiter_list_->RemoveWaiter(waiter);
231 if (signals_state)
232 *signals_state = ProducerGetHandleSignalsStateImplNoLock();
233 }
234
ProducerIsBusy() const235 bool DataPipe::ProducerIsBusy() const {
236 base::AutoLock locker(lock_);
237 return producer_in_two_phase_write_no_lock();
238 }
239
ConsumerCancelAllWaiters()240 void DataPipe::ConsumerCancelAllWaiters() {
241 base::AutoLock locker(lock_);
242 DCHECK(has_local_consumer_no_lock());
243 consumer_waiter_list_->CancelAllWaiters();
244 }
245
ConsumerClose()246 void DataPipe::ConsumerClose() {
247 base::AutoLock locker(lock_);
248 DCHECK(consumer_open_);
249 consumer_open_ = false;
250 DCHECK(has_local_consumer_no_lock());
251 consumer_waiter_list_.reset();
252 // Not a bug, except possibly in "user" code.
253 DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
254 << "Consumer closed with active two-phase read";
255 consumer_two_phase_max_num_bytes_read_ = 0;
256 ConsumerCloseImplNoLock();
257 AwakeProducerWaitersForStateChangeNoLock(
258 ProducerGetHandleSignalsStateImplNoLock());
259 }
260
ConsumerReadData(UserPointer<void> elements,UserPointer<uint32_t> num_bytes,bool all_or_none)261 MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements,
262 UserPointer<uint32_t> num_bytes,
263 bool all_or_none) {
264 base::AutoLock locker(lock_);
265 DCHECK(has_local_consumer_no_lock());
266
267 if (consumer_in_two_phase_read_no_lock())
268 return MOJO_RESULT_BUSY;
269
270 uint32_t max_num_bytes_to_read = num_bytes.Get();
271 if (max_num_bytes_to_read % element_num_bytes_ != 0)
272 return MOJO_RESULT_INVALID_ARGUMENT;
273
274 if (max_num_bytes_to_read == 0)
275 return MOJO_RESULT_OK; // Nothing to do.
276
277 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
278
279 HandleSignalsState old_producer_state =
280 ProducerGetHandleSignalsStateImplNoLock();
281 MojoResult rv = ConsumerReadDataImplNoLock(
282 elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read);
283 HandleSignalsState new_producer_state =
284 ProducerGetHandleSignalsStateImplNoLock();
285 if (!new_producer_state.equals(old_producer_state))
286 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
287 return rv;
288 }
289
ConsumerDiscardData(UserPointer<uint32_t> num_bytes,bool all_or_none)290 MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
291 bool all_or_none) {
292 base::AutoLock locker(lock_);
293 DCHECK(has_local_consumer_no_lock());
294
295 if (consumer_in_two_phase_read_no_lock())
296 return MOJO_RESULT_BUSY;
297
298 uint32_t max_num_bytes_to_discard = num_bytes.Get();
299 if (max_num_bytes_to_discard % element_num_bytes_ != 0)
300 return MOJO_RESULT_INVALID_ARGUMENT;
301
302 if (max_num_bytes_to_discard == 0)
303 return MOJO_RESULT_OK; // Nothing to do.
304
305 uint32_t min_num_bytes_to_discard =
306 all_or_none ? max_num_bytes_to_discard : 0;
307
308 HandleSignalsState old_producer_state =
309 ProducerGetHandleSignalsStateImplNoLock();
310 MojoResult rv = ConsumerDiscardDataImplNoLock(
311 num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard);
312 HandleSignalsState new_producer_state =
313 ProducerGetHandleSignalsStateImplNoLock();
314 if (!new_producer_state.equals(old_producer_state))
315 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
316 return rv;
317 }
318
ConsumerQueryData(UserPointer<uint32_t> num_bytes)319 MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
320 base::AutoLock locker(lock_);
321 DCHECK(has_local_consumer_no_lock());
322
323 if (consumer_in_two_phase_read_no_lock())
324 return MOJO_RESULT_BUSY;
325
326 // Note: Don't need to validate |*num_bytes| for query.
327 return ConsumerQueryDataImplNoLock(num_bytes);
328 }
329
ConsumerBeginReadData(UserPointer<const void * > buffer,UserPointer<uint32_t> buffer_num_bytes,bool all_or_none)330 MojoResult DataPipe::ConsumerBeginReadData(
331 UserPointer<const void*> buffer,
332 UserPointer<uint32_t> buffer_num_bytes,
333 bool all_or_none) {
334 base::AutoLock locker(lock_);
335 DCHECK(has_local_consumer_no_lock());
336
337 if (consumer_in_two_phase_read_no_lock())
338 return MOJO_RESULT_BUSY;
339
340 uint32_t min_num_bytes_to_read = 0;
341 if (all_or_none) {
342 min_num_bytes_to_read = buffer_num_bytes.Get();
343 if (min_num_bytes_to_read % element_num_bytes_ != 0)
344 return MOJO_RESULT_INVALID_ARGUMENT;
345 }
346
347 MojoResult rv = ConsumerBeginReadDataImplNoLock(
348 buffer, buffer_num_bytes, min_num_bytes_to_read);
349 if (rv != MOJO_RESULT_OK)
350 return rv;
351 DCHECK(consumer_in_two_phase_read_no_lock());
352 return MOJO_RESULT_OK;
353 }
354
ConsumerEndReadData(uint32_t num_bytes_read)355 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
356 base::AutoLock locker(lock_);
357 DCHECK(has_local_consumer_no_lock());
358
359 if (!consumer_in_two_phase_read_no_lock())
360 return MOJO_RESULT_FAILED_PRECONDITION;
361
362 HandleSignalsState old_producer_state =
363 ProducerGetHandleSignalsStateImplNoLock();
364 MojoResult rv;
365 if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
366 num_bytes_read % element_num_bytes_ != 0) {
367 rv = MOJO_RESULT_INVALID_ARGUMENT;
368 consumer_two_phase_max_num_bytes_read_ = 0;
369 } else {
370 rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
371 }
372 // Two-phase read ended even on failure.
373 DCHECK(!consumer_in_two_phase_read_no_lock());
374 // If we're now readable, we *became* readable (since we weren't readable
375 // during the two-phase read), so awake consumer waiters.
376 HandleSignalsState new_consumer_state =
377 ConsumerGetHandleSignalsStateImplNoLock();
378 if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
379 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
380 HandleSignalsState new_producer_state =
381 ProducerGetHandleSignalsStateImplNoLock();
382 if (!new_producer_state.equals(old_producer_state))
383 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
384 return rv;
385 }
386
ConsumerGetHandleSignalsState()387 HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() {
388 base::AutoLock locker(lock_);
389 DCHECK(has_local_consumer_no_lock());
390 return ConsumerGetHandleSignalsStateImplNoLock();
391 }
392
ConsumerAddWaiter(Waiter * waiter,MojoHandleSignals signals,uint32_t context,HandleSignalsState * signals_state)393 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
394 MojoHandleSignals signals,
395 uint32_t context,
396 HandleSignalsState* signals_state) {
397 base::AutoLock locker(lock_);
398 DCHECK(has_local_consumer_no_lock());
399
400 HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock();
401 if (consumer_state.satisfies(signals)) {
402 if (signals_state)
403 *signals_state = consumer_state;
404 return MOJO_RESULT_ALREADY_EXISTS;
405 }
406 if (!consumer_state.can_satisfy(signals)) {
407 if (signals_state)
408 *signals_state = consumer_state;
409 return MOJO_RESULT_FAILED_PRECONDITION;
410 }
411
412 consumer_waiter_list_->AddWaiter(waiter, signals, context);
413 return MOJO_RESULT_OK;
414 }
415
ConsumerRemoveWaiter(Waiter * waiter,HandleSignalsState * signals_state)416 void DataPipe::ConsumerRemoveWaiter(Waiter* waiter,
417 HandleSignalsState* signals_state) {
418 base::AutoLock locker(lock_);
419 DCHECK(has_local_consumer_no_lock());
420 consumer_waiter_list_->RemoveWaiter(waiter);
421 if (signals_state)
422 *signals_state = ConsumerGetHandleSignalsStateImplNoLock();
423 }
424
ConsumerIsBusy() const425 bool DataPipe::ConsumerIsBusy() const {
426 base::AutoLock locker(lock_);
427 return consumer_in_two_phase_read_no_lock();
428 }
429
DataPipe(bool has_local_producer,bool has_local_consumer,const MojoCreateDataPipeOptions & validated_options)430 DataPipe::DataPipe(bool has_local_producer,
431 bool has_local_consumer,
432 const MojoCreateDataPipeOptions& validated_options)
433 : may_discard_((validated_options.flags &
434 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
435 element_num_bytes_(validated_options.element_num_bytes),
436 capacity_num_bytes_(validated_options.capacity_num_bytes),
437 producer_open_(true),
438 consumer_open_(true),
439 producer_waiter_list_(has_local_producer ? new WaiterList() : nullptr),
440 consumer_waiter_list_(has_local_consumer ? new WaiterList() : nullptr),
441 producer_two_phase_max_num_bytes_written_(0),
442 consumer_two_phase_max_num_bytes_read_(0) {
443 // Check that the passed in options actually are validated.
444 MojoCreateDataPipeOptions unused ALLOW_UNUSED = {0};
445 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
446 MOJO_RESULT_OK);
447 }
448
~DataPipe()449 DataPipe::~DataPipe() {
450 DCHECK(!producer_open_);
451 DCHECK(!consumer_open_);
452 DCHECK(!producer_waiter_list_);
453 DCHECK(!consumer_waiter_list_);
454 }
455
AwakeProducerWaitersForStateChangeNoLock(const HandleSignalsState & new_producer_state)456 void DataPipe::AwakeProducerWaitersForStateChangeNoLock(
457 const HandleSignalsState& new_producer_state) {
458 lock_.AssertAcquired();
459 if (!has_local_producer_no_lock())
460 return;
461 producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state);
462 }
463
AwakeConsumerWaitersForStateChangeNoLock(const HandleSignalsState & new_consumer_state)464 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock(
465 const HandleSignalsState& new_consumer_state) {
466 lock_.AssertAcquired();
467 if (!has_local_consumer_no_lock())
468 return;
469 consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state);
470 }
471
472 } // namespace system
473 } // namespace mojo
474