• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/data_pipe.h"
6 
7 #include <string.h>
8 
9 #include <algorithm>
10 #include <limits>
11 
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "mojo/system/constants.h"
15 #include "mojo/system/memory.h"
16 #include "mojo/system/options_validation.h"
17 #include "mojo/system/waiter_list.h"
18 
19 namespace mojo {
20 namespace system {
21 
22 // static
23 const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = {
24     static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
25     MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1u,
26     static_cast<uint32_t>(kDefaultDataPipeCapacityBytes)};
27 
28 // static
ValidateCreateOptions(UserPointer<const MojoCreateDataPipeOptions> in_options,MojoCreateDataPipeOptions * out_options)29 MojoResult DataPipe::ValidateCreateOptions(
30     UserPointer<const MojoCreateDataPipeOptions> in_options,
31     MojoCreateDataPipeOptions* out_options) {
32   const MojoCreateDataPipeOptionsFlags kKnownFlags =
33       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD;
34 
35   *out_options = kDefaultCreateOptions;
36   if (in_options.IsNull())
37     return MOJO_RESULT_OK;
38 
39   UserOptionsReader<MojoCreateDataPipeOptions> reader(in_options);
40   if (!reader.is_valid())
41     return MOJO_RESULT_INVALID_ARGUMENT;
42 
43   if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, flags, reader))
44     return MOJO_RESULT_OK;
45   if ((reader.options().flags & ~kKnownFlags))
46     return MOJO_RESULT_UNIMPLEMENTED;
47   out_options->flags = reader.options().flags;
48 
49   // Checks for fields beyond |flags|:
50 
51   if (!OPTIONS_STRUCT_HAS_MEMBER(
52           MojoCreateDataPipeOptions, element_num_bytes, reader))
53     return MOJO_RESULT_OK;
54   if (reader.options().element_num_bytes == 0)
55     return MOJO_RESULT_INVALID_ARGUMENT;
56   out_options->element_num_bytes = reader.options().element_num_bytes;
57 
58   if (!OPTIONS_STRUCT_HAS_MEMBER(
59           MojoCreateDataPipeOptions, capacity_num_bytes, reader) ||
60       reader.options().capacity_num_bytes == 0) {
61     // Round the default capacity down to a multiple of the element size (but at
62     // least one element).
63     out_options->capacity_num_bytes =
64         std::max(static_cast<uint32_t>(kDefaultDataPipeCapacityBytes -
65                                        (kDefaultDataPipeCapacityBytes %
66                                         out_options->element_num_bytes)),
67                  out_options->element_num_bytes);
68     return MOJO_RESULT_OK;
69   }
70   if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0)
71     return MOJO_RESULT_INVALID_ARGUMENT;
72   if (reader.options().capacity_num_bytes > kMaxDataPipeCapacityBytes)
73     return MOJO_RESULT_RESOURCE_EXHAUSTED;
74   out_options->capacity_num_bytes = reader.options().capacity_num_bytes;
75 
76   return MOJO_RESULT_OK;
77 }
78 
ProducerCancelAllWaiters()79 void DataPipe::ProducerCancelAllWaiters() {
80   base::AutoLock locker(lock_);
81   DCHECK(has_local_producer_no_lock());
82   producer_waiter_list_->CancelAllWaiters();
83 }
84 
ProducerClose()85 void DataPipe::ProducerClose() {
86   base::AutoLock locker(lock_);
87   DCHECK(producer_open_);
88   producer_open_ = false;
89   DCHECK(has_local_producer_no_lock());
90   producer_waiter_list_.reset();
91   // Not a bug, except possibly in "user" code.
92   DVLOG_IF(2, producer_in_two_phase_write_no_lock())
93       << "Producer closed with active two-phase write";
94   producer_two_phase_max_num_bytes_written_ = 0;
95   ProducerCloseImplNoLock();
96   AwakeConsumerWaitersForStateChangeNoLock(
97       ConsumerGetHandleSignalsStateImplNoLock());
98 }
99 
ProducerWriteData(UserPointer<const void> elements,UserPointer<uint32_t> num_bytes,bool all_or_none)100 MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements,
101                                        UserPointer<uint32_t> num_bytes,
102                                        bool all_or_none) {
103   base::AutoLock locker(lock_);
104   DCHECK(has_local_producer_no_lock());
105 
106   if (producer_in_two_phase_write_no_lock())
107     return MOJO_RESULT_BUSY;
108   if (!consumer_open_no_lock())
109     return MOJO_RESULT_FAILED_PRECONDITION;
110 
111   // Returning "busy" takes priority over "invalid argument".
112   uint32_t max_num_bytes_to_write = num_bytes.Get();
113   if (max_num_bytes_to_write % element_num_bytes_ != 0)
114     return MOJO_RESULT_INVALID_ARGUMENT;
115 
116   if (max_num_bytes_to_write == 0)
117     return MOJO_RESULT_OK;  // Nothing to do.
118 
119   uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0;
120 
121   HandleSignalsState old_consumer_state =
122       ConsumerGetHandleSignalsStateImplNoLock();
123   MojoResult rv = ProducerWriteDataImplNoLock(
124       elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write);
125   HandleSignalsState new_consumer_state =
126       ConsumerGetHandleSignalsStateImplNoLock();
127   if (!new_consumer_state.equals(old_consumer_state))
128     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
129   return rv;
130 }
131 
ProducerBeginWriteData(UserPointer<void * > buffer,UserPointer<uint32_t> buffer_num_bytes,bool all_or_none)132 MojoResult DataPipe::ProducerBeginWriteData(
133     UserPointer<void*> buffer,
134     UserPointer<uint32_t> buffer_num_bytes,
135     bool all_or_none) {
136   base::AutoLock locker(lock_);
137   DCHECK(has_local_producer_no_lock());
138 
139   if (producer_in_two_phase_write_no_lock())
140     return MOJO_RESULT_BUSY;
141   if (!consumer_open_no_lock())
142     return MOJO_RESULT_FAILED_PRECONDITION;
143 
144   uint32_t min_num_bytes_to_write = 0;
145   if (all_or_none) {
146     min_num_bytes_to_write = buffer_num_bytes.Get();
147     if (min_num_bytes_to_write % element_num_bytes_ != 0)
148       return MOJO_RESULT_INVALID_ARGUMENT;
149   }
150 
151   MojoResult rv = ProducerBeginWriteDataImplNoLock(
152       buffer, buffer_num_bytes, min_num_bytes_to_write);
153   if (rv != MOJO_RESULT_OK)
154     return rv;
155   // Note: No need to awake producer waiters, even though we're going from
156   // writable to non-writable (since you can't wait on non-writability).
157   // Similarly, though this may have discarded data (in "may discard" mode),
158   // making it non-readable, there's still no need to awake consumer waiters.
159   DCHECK(producer_in_two_phase_write_no_lock());
160   return MOJO_RESULT_OK;
161 }
162 
ProducerEndWriteData(uint32_t num_bytes_written)163 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
164   base::AutoLock locker(lock_);
165   DCHECK(has_local_producer_no_lock());
166 
167   if (!producer_in_two_phase_write_no_lock())
168     return MOJO_RESULT_FAILED_PRECONDITION;
169   // Note: Allow successful completion of the two-phase write even if the
170   // consumer has been closed.
171 
172   HandleSignalsState old_consumer_state =
173       ConsumerGetHandleSignalsStateImplNoLock();
174   MojoResult rv;
175   if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
176       num_bytes_written % element_num_bytes_ != 0) {
177     rv = MOJO_RESULT_INVALID_ARGUMENT;
178     producer_two_phase_max_num_bytes_written_ = 0;
179   } else {
180     rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
181   }
182   // Two-phase write ended even on failure.
183   DCHECK(!producer_in_two_phase_write_no_lock());
184   // If we're now writable, we *became* writable (since we weren't writable
185   // during the two-phase write), so awake producer waiters.
186   HandleSignalsState new_producer_state =
187       ProducerGetHandleSignalsStateImplNoLock();
188   if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
189     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
190   HandleSignalsState new_consumer_state =
191       ConsumerGetHandleSignalsStateImplNoLock();
192   if (!new_consumer_state.equals(old_consumer_state))
193     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
194   return rv;
195 }
196 
ProducerGetHandleSignalsState()197 HandleSignalsState DataPipe::ProducerGetHandleSignalsState() {
198   base::AutoLock locker(lock_);
199   DCHECK(has_local_producer_no_lock());
200   return ProducerGetHandleSignalsStateImplNoLock();
201 }
202 
ProducerAddWaiter(Waiter * waiter,MojoHandleSignals signals,uint32_t context,HandleSignalsState * signals_state)203 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
204                                        MojoHandleSignals signals,
205                                        uint32_t context,
206                                        HandleSignalsState* signals_state) {
207   base::AutoLock locker(lock_);
208   DCHECK(has_local_producer_no_lock());
209 
210   HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock();
211   if (producer_state.satisfies(signals)) {
212     if (signals_state)
213       *signals_state = producer_state;
214     return MOJO_RESULT_ALREADY_EXISTS;
215   }
216   if (!producer_state.can_satisfy(signals)) {
217     if (signals_state)
218       *signals_state = producer_state;
219     return MOJO_RESULT_FAILED_PRECONDITION;
220   }
221 
222   producer_waiter_list_->AddWaiter(waiter, signals, context);
223   return MOJO_RESULT_OK;
224 }
225 
ProducerRemoveWaiter(Waiter * waiter,HandleSignalsState * signals_state)226 void DataPipe::ProducerRemoveWaiter(Waiter* waiter,
227                                     HandleSignalsState* signals_state) {
228   base::AutoLock locker(lock_);
229   DCHECK(has_local_producer_no_lock());
230   producer_waiter_list_->RemoveWaiter(waiter);
231   if (signals_state)
232     *signals_state = ProducerGetHandleSignalsStateImplNoLock();
233 }
234 
ProducerIsBusy() const235 bool DataPipe::ProducerIsBusy() const {
236   base::AutoLock locker(lock_);
237   return producer_in_two_phase_write_no_lock();
238 }
239 
ConsumerCancelAllWaiters()240 void DataPipe::ConsumerCancelAllWaiters() {
241   base::AutoLock locker(lock_);
242   DCHECK(has_local_consumer_no_lock());
243   consumer_waiter_list_->CancelAllWaiters();
244 }
245 
ConsumerClose()246 void DataPipe::ConsumerClose() {
247   base::AutoLock locker(lock_);
248   DCHECK(consumer_open_);
249   consumer_open_ = false;
250   DCHECK(has_local_consumer_no_lock());
251   consumer_waiter_list_.reset();
252   // Not a bug, except possibly in "user" code.
253   DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
254       << "Consumer closed with active two-phase read";
255   consumer_two_phase_max_num_bytes_read_ = 0;
256   ConsumerCloseImplNoLock();
257   AwakeProducerWaitersForStateChangeNoLock(
258       ProducerGetHandleSignalsStateImplNoLock());
259 }
260 
ConsumerReadData(UserPointer<void> elements,UserPointer<uint32_t> num_bytes,bool all_or_none)261 MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements,
262                                       UserPointer<uint32_t> num_bytes,
263                                       bool all_or_none) {
264   base::AutoLock locker(lock_);
265   DCHECK(has_local_consumer_no_lock());
266 
267   if (consumer_in_two_phase_read_no_lock())
268     return MOJO_RESULT_BUSY;
269 
270   uint32_t max_num_bytes_to_read = num_bytes.Get();
271   if (max_num_bytes_to_read % element_num_bytes_ != 0)
272     return MOJO_RESULT_INVALID_ARGUMENT;
273 
274   if (max_num_bytes_to_read == 0)
275     return MOJO_RESULT_OK;  // Nothing to do.
276 
277   uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
278 
279   HandleSignalsState old_producer_state =
280       ProducerGetHandleSignalsStateImplNoLock();
281   MojoResult rv = ConsumerReadDataImplNoLock(
282       elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read);
283   HandleSignalsState new_producer_state =
284       ProducerGetHandleSignalsStateImplNoLock();
285   if (!new_producer_state.equals(old_producer_state))
286     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
287   return rv;
288 }
289 
ConsumerDiscardData(UserPointer<uint32_t> num_bytes,bool all_or_none)290 MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
291                                          bool all_or_none) {
292   base::AutoLock locker(lock_);
293   DCHECK(has_local_consumer_no_lock());
294 
295   if (consumer_in_two_phase_read_no_lock())
296     return MOJO_RESULT_BUSY;
297 
298   uint32_t max_num_bytes_to_discard = num_bytes.Get();
299   if (max_num_bytes_to_discard % element_num_bytes_ != 0)
300     return MOJO_RESULT_INVALID_ARGUMENT;
301 
302   if (max_num_bytes_to_discard == 0)
303     return MOJO_RESULT_OK;  // Nothing to do.
304 
305   uint32_t min_num_bytes_to_discard =
306       all_or_none ? max_num_bytes_to_discard : 0;
307 
308   HandleSignalsState old_producer_state =
309       ProducerGetHandleSignalsStateImplNoLock();
310   MojoResult rv = ConsumerDiscardDataImplNoLock(
311       num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard);
312   HandleSignalsState new_producer_state =
313       ProducerGetHandleSignalsStateImplNoLock();
314   if (!new_producer_state.equals(old_producer_state))
315     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
316   return rv;
317 }
318 
ConsumerQueryData(UserPointer<uint32_t> num_bytes)319 MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
320   base::AutoLock locker(lock_);
321   DCHECK(has_local_consumer_no_lock());
322 
323   if (consumer_in_two_phase_read_no_lock())
324     return MOJO_RESULT_BUSY;
325 
326   // Note: Don't need to validate |*num_bytes| for query.
327   return ConsumerQueryDataImplNoLock(num_bytes);
328 }
329 
ConsumerBeginReadData(UserPointer<const void * > buffer,UserPointer<uint32_t> buffer_num_bytes,bool all_or_none)330 MojoResult DataPipe::ConsumerBeginReadData(
331     UserPointer<const void*> buffer,
332     UserPointer<uint32_t> buffer_num_bytes,
333     bool all_or_none) {
334   base::AutoLock locker(lock_);
335   DCHECK(has_local_consumer_no_lock());
336 
337   if (consumer_in_two_phase_read_no_lock())
338     return MOJO_RESULT_BUSY;
339 
340   uint32_t min_num_bytes_to_read = 0;
341   if (all_or_none) {
342     min_num_bytes_to_read = buffer_num_bytes.Get();
343     if (min_num_bytes_to_read % element_num_bytes_ != 0)
344       return MOJO_RESULT_INVALID_ARGUMENT;
345   }
346 
347   MojoResult rv = ConsumerBeginReadDataImplNoLock(
348       buffer, buffer_num_bytes, min_num_bytes_to_read);
349   if (rv != MOJO_RESULT_OK)
350     return rv;
351   DCHECK(consumer_in_two_phase_read_no_lock());
352   return MOJO_RESULT_OK;
353 }
354 
ConsumerEndReadData(uint32_t num_bytes_read)355 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
356   base::AutoLock locker(lock_);
357   DCHECK(has_local_consumer_no_lock());
358 
359   if (!consumer_in_two_phase_read_no_lock())
360     return MOJO_RESULT_FAILED_PRECONDITION;
361 
362   HandleSignalsState old_producer_state =
363       ProducerGetHandleSignalsStateImplNoLock();
364   MojoResult rv;
365   if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
366       num_bytes_read % element_num_bytes_ != 0) {
367     rv = MOJO_RESULT_INVALID_ARGUMENT;
368     consumer_two_phase_max_num_bytes_read_ = 0;
369   } else {
370     rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
371   }
372   // Two-phase read ended even on failure.
373   DCHECK(!consumer_in_two_phase_read_no_lock());
374   // If we're now readable, we *became* readable (since we weren't readable
375   // during the two-phase read), so awake consumer waiters.
376   HandleSignalsState new_consumer_state =
377       ConsumerGetHandleSignalsStateImplNoLock();
378   if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
379     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
380   HandleSignalsState new_producer_state =
381       ProducerGetHandleSignalsStateImplNoLock();
382   if (!new_producer_state.equals(old_producer_state))
383     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
384   return rv;
385 }
386 
ConsumerGetHandleSignalsState()387 HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() {
388   base::AutoLock locker(lock_);
389   DCHECK(has_local_consumer_no_lock());
390   return ConsumerGetHandleSignalsStateImplNoLock();
391 }
392 
ConsumerAddWaiter(Waiter * waiter,MojoHandleSignals signals,uint32_t context,HandleSignalsState * signals_state)393 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
394                                        MojoHandleSignals signals,
395                                        uint32_t context,
396                                        HandleSignalsState* signals_state) {
397   base::AutoLock locker(lock_);
398   DCHECK(has_local_consumer_no_lock());
399 
400   HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock();
401   if (consumer_state.satisfies(signals)) {
402     if (signals_state)
403       *signals_state = consumer_state;
404     return MOJO_RESULT_ALREADY_EXISTS;
405   }
406   if (!consumer_state.can_satisfy(signals)) {
407     if (signals_state)
408       *signals_state = consumer_state;
409     return MOJO_RESULT_FAILED_PRECONDITION;
410   }
411 
412   consumer_waiter_list_->AddWaiter(waiter, signals, context);
413   return MOJO_RESULT_OK;
414 }
415 
ConsumerRemoveWaiter(Waiter * waiter,HandleSignalsState * signals_state)416 void DataPipe::ConsumerRemoveWaiter(Waiter* waiter,
417                                     HandleSignalsState* signals_state) {
418   base::AutoLock locker(lock_);
419   DCHECK(has_local_consumer_no_lock());
420   consumer_waiter_list_->RemoveWaiter(waiter);
421   if (signals_state)
422     *signals_state = ConsumerGetHandleSignalsStateImplNoLock();
423 }
424 
ConsumerIsBusy() const425 bool DataPipe::ConsumerIsBusy() const {
426   base::AutoLock locker(lock_);
427   return consumer_in_two_phase_read_no_lock();
428 }
429 
DataPipe(bool has_local_producer,bool has_local_consumer,const MojoCreateDataPipeOptions & validated_options)430 DataPipe::DataPipe(bool has_local_producer,
431                    bool has_local_consumer,
432                    const MojoCreateDataPipeOptions& validated_options)
433     : may_discard_((validated_options.flags &
434                     MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
435       element_num_bytes_(validated_options.element_num_bytes),
436       capacity_num_bytes_(validated_options.capacity_num_bytes),
437       producer_open_(true),
438       consumer_open_(true),
439       producer_waiter_list_(has_local_producer ? new WaiterList() : nullptr),
440       consumer_waiter_list_(has_local_consumer ? new WaiterList() : nullptr),
441       producer_two_phase_max_num_bytes_written_(0),
442       consumer_two_phase_max_num_bytes_read_(0) {
443   // Check that the passed in options actually are validated.
444   MojoCreateDataPipeOptions unused ALLOW_UNUSED = {0};
445   DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
446             MOJO_RESULT_OK);
447 }
448 
~DataPipe()449 DataPipe::~DataPipe() {
450   DCHECK(!producer_open_);
451   DCHECK(!consumer_open_);
452   DCHECK(!producer_waiter_list_);
453   DCHECK(!consumer_waiter_list_);
454 }
455 
AwakeProducerWaitersForStateChangeNoLock(const HandleSignalsState & new_producer_state)456 void DataPipe::AwakeProducerWaitersForStateChangeNoLock(
457     const HandleSignalsState& new_producer_state) {
458   lock_.AssertAcquired();
459   if (!has_local_producer_no_lock())
460     return;
461   producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state);
462 }
463 
AwakeConsumerWaitersForStateChangeNoLock(const HandleSignalsState & new_consumer_state)464 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock(
465     const HandleSignalsState& new_consumer_state) {
466   lock_.AssertAcquired();
467   if (!has_local_consumer_no_lock())
468     return;
469   consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state);
470 }
471 
472 }  // namespace system
473 }  // namespace mojo
474