• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/data_pipe.h"
6 
7 #include <string.h>
8 
9 #include <algorithm>
10 #include <limits>
11 
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "mojo/system/constants.h"
15 #include "mojo/system/memory.h"
16 #include "mojo/system/options_validation.h"
17 #include "mojo/system/waiter_list.h"
18 
19 namespace mojo {
20 namespace system {
21 
22 // static
23 const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = {
24   static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
25   MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
26   1u,
27   static_cast<uint32_t>(kDefaultDataPipeCapacityBytes)
28 };
29 
30 // static
ValidateCreateOptions(const MojoCreateDataPipeOptions * in_options,MojoCreateDataPipeOptions * out_options)31 MojoResult DataPipe::ValidateCreateOptions(
32     const MojoCreateDataPipeOptions* in_options,
33     MojoCreateDataPipeOptions* out_options) {
34   const MojoCreateDataPipeOptionsFlags kKnownFlags =
35       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD;
36 
37   *out_options = kDefaultCreateOptions;
38   if (!in_options)
39     return MOJO_RESULT_OK;
40 
41   MojoResult result =
42       ValidateOptionsStructPointerSizeAndFlags<MojoCreateDataPipeOptions>(
43           in_options, kKnownFlags, out_options);
44   if (result != MOJO_RESULT_OK)
45     return result;
46 
47   // Checks for fields beyond |flags|:
48 
49   if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions, element_num_bytes,
50                                  in_options))
51     return MOJO_RESULT_OK;
52   if (in_options->element_num_bytes == 0)
53     return MOJO_RESULT_INVALID_ARGUMENT;
54   out_options->element_num_bytes = in_options->element_num_bytes;
55 
56   if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions, capacity_num_bytes,
57                                  in_options) ||
58       in_options->capacity_num_bytes == 0) {
59     // Round the default capacity down to a multiple of the element size (but at
60     // least one element).
61     out_options->capacity_num_bytes = std::max(
62         static_cast<uint32_t>(kDefaultDataPipeCapacityBytes -
63             (kDefaultDataPipeCapacityBytes % out_options->element_num_bytes)),
64         out_options->element_num_bytes);
65     return MOJO_RESULT_OK;
66   }
67   if (in_options->capacity_num_bytes % out_options->element_num_bytes != 0)
68     return MOJO_RESULT_INVALID_ARGUMENT;
69   if (in_options->capacity_num_bytes > kMaxDataPipeCapacityBytes)
70     return MOJO_RESULT_RESOURCE_EXHAUSTED;
71   out_options->capacity_num_bytes = in_options->capacity_num_bytes;
72 
73   return MOJO_RESULT_OK;
74 }
75 
ProducerCancelAllWaiters()76 void DataPipe::ProducerCancelAllWaiters() {
77   base::AutoLock locker(lock_);
78   DCHECK(has_local_producer_no_lock());
79   producer_waiter_list_->CancelAllWaiters();
80 }
81 
ProducerClose()82 void DataPipe::ProducerClose() {
83   base::AutoLock locker(lock_);
84   DCHECK(producer_open_);
85   producer_open_ = false;
86   DCHECK(has_local_producer_no_lock());
87   producer_waiter_list_.reset();
88   // Not a bug, except possibly in "user" code.
89   DVLOG_IF(2, producer_in_two_phase_write_no_lock())
90       << "Producer closed with active two-phase write";
91   producer_two_phase_max_num_bytes_written_ = 0;
92   ProducerCloseImplNoLock();
93   AwakeConsumerWaitersForStateChangeNoLock(
94       ConsumerGetHandleSignalsStateNoLock());
95 }
96 
ProducerWriteData(const void * elements,uint32_t * num_bytes,bool all_or_none)97 MojoResult DataPipe::ProducerWriteData(const void* elements,
98                                        uint32_t* num_bytes,
99                                        bool all_or_none) {
100   base::AutoLock locker(lock_);
101   DCHECK(has_local_producer_no_lock());
102 
103   if (producer_in_two_phase_write_no_lock())
104     return MOJO_RESULT_BUSY;
105   if (!consumer_open_no_lock())
106     return MOJO_RESULT_FAILED_PRECONDITION;
107 
108   // Returning "busy" takes priority over "invalid argument".
109   if (*num_bytes % element_num_bytes_ != 0)
110     return MOJO_RESULT_INVALID_ARGUMENT;
111 
112   if (*num_bytes == 0)
113     return MOJO_RESULT_OK;  // Nothing to do.
114 
115   HandleSignalsState old_consumer_state = ConsumerGetHandleSignalsStateNoLock();
116   MojoResult rv = ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none);
117   HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock();
118   if (!new_consumer_state.equals(old_consumer_state))
119     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
120   return rv;
121 }
122 
ProducerBeginWriteData(void ** buffer,uint32_t * buffer_num_bytes,bool all_or_none)123 MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
124                                             uint32_t* buffer_num_bytes,
125                                             bool all_or_none) {
126   base::AutoLock locker(lock_);
127   DCHECK(has_local_producer_no_lock());
128 
129   if (producer_in_two_phase_write_no_lock())
130     return MOJO_RESULT_BUSY;
131   if (!consumer_open_no_lock())
132     return MOJO_RESULT_FAILED_PRECONDITION;
133 
134   if (all_or_none && *buffer_num_bytes % element_num_bytes_ != 0)
135     return MOJO_RESULT_INVALID_ARGUMENT;
136 
137   MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes,
138                                                    all_or_none);
139   if (rv != MOJO_RESULT_OK)
140     return rv;
141   // Note: No need to awake producer waiters, even though we're going from
142   // writable to non-writable (since you can't wait on non-writability).
143   // Similarly, though this may have discarded data (in "may discard" mode),
144   // making it non-readable, there's still no need to awake consumer waiters.
145   DCHECK(producer_in_two_phase_write_no_lock());
146   return MOJO_RESULT_OK;
147 }
148 
ProducerEndWriteData(uint32_t num_bytes_written)149 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
150   base::AutoLock locker(lock_);
151   DCHECK(has_local_producer_no_lock());
152 
153   if (!producer_in_two_phase_write_no_lock())
154     return MOJO_RESULT_FAILED_PRECONDITION;
155   // Note: Allow successful completion of the two-phase write even if the
156   // consumer has been closed.
157 
158   HandleSignalsState old_consumer_state = ConsumerGetHandleSignalsStateNoLock();
159   MojoResult rv;
160   if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
161       num_bytes_written % element_num_bytes_ != 0) {
162     rv = MOJO_RESULT_INVALID_ARGUMENT;
163     producer_two_phase_max_num_bytes_written_ = 0;
164   } else {
165     rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
166   }
167   // Two-phase write ended even on failure.
168   DCHECK(!producer_in_two_phase_write_no_lock());
169   // If we're now writable, we *became* writable (since we weren't writable
170   // during the two-phase write), so awake producer waiters.
171   HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
172   if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
173     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
174   HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock();
175   if (!new_consumer_state.equals(old_consumer_state))
176     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
177   return rv;
178 }
179 
ProducerAddWaiter(Waiter * waiter,MojoHandleSignals signals,uint32_t context)180 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
181                                        MojoHandleSignals signals,
182                                        uint32_t context) {
183   base::AutoLock locker(lock_);
184   DCHECK(has_local_producer_no_lock());
185 
186   HandleSignalsState producer_state = ProducerGetHandleSignalsStateNoLock();
187   if (producer_state.satisfies(signals))
188     return MOJO_RESULT_ALREADY_EXISTS;
189   if (!producer_state.can_satisfy(signals))
190     return MOJO_RESULT_FAILED_PRECONDITION;
191 
192   producer_waiter_list_->AddWaiter(waiter, signals, context);
193   return MOJO_RESULT_OK;
194 }
195 
ProducerRemoveWaiter(Waiter * waiter)196 void DataPipe::ProducerRemoveWaiter(Waiter* waiter) {
197   base::AutoLock locker(lock_);
198   DCHECK(has_local_producer_no_lock());
199   producer_waiter_list_->RemoveWaiter(waiter);
200 }
201 
ProducerIsBusy() const202 bool DataPipe::ProducerIsBusy() const {
203   base::AutoLock locker(lock_);
204   return producer_in_two_phase_write_no_lock();
205 }
206 
ConsumerCancelAllWaiters()207 void DataPipe::ConsumerCancelAllWaiters() {
208   base::AutoLock locker(lock_);
209   DCHECK(has_local_consumer_no_lock());
210   consumer_waiter_list_->CancelAllWaiters();
211 }
212 
ConsumerClose()213 void DataPipe::ConsumerClose() {
214   base::AutoLock locker(lock_);
215   DCHECK(consumer_open_);
216   consumer_open_ = false;
217   DCHECK(has_local_consumer_no_lock());
218   consumer_waiter_list_.reset();
219   // Not a bug, except possibly in "user" code.
220   DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
221       << "Consumer closed with active two-phase read";
222   consumer_two_phase_max_num_bytes_read_ = 0;
223   ConsumerCloseImplNoLock();
224   AwakeProducerWaitersForStateChangeNoLock(
225       ProducerGetHandleSignalsStateNoLock());
226 }
227 
ConsumerReadData(void * elements,uint32_t * num_bytes,bool all_or_none)228 MojoResult DataPipe::ConsumerReadData(void* elements,
229                                       uint32_t* num_bytes,
230                                       bool all_or_none) {
231   base::AutoLock locker(lock_);
232   DCHECK(has_local_consumer_no_lock());
233 
234   if (consumer_in_two_phase_read_no_lock())
235     return MOJO_RESULT_BUSY;
236 
237   if (*num_bytes % element_num_bytes_ != 0)
238     return MOJO_RESULT_INVALID_ARGUMENT;
239 
240   if (*num_bytes == 0)
241     return MOJO_RESULT_OK;  // Nothing to do.
242 
243   HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock();
244   MojoResult rv = ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none);
245   HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
246   if (!new_producer_state.equals(old_producer_state))
247     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
248   return rv;
249 }
250 
ConsumerDiscardData(uint32_t * num_bytes,bool all_or_none)251 MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes,
252                                          bool all_or_none) {
253   base::AutoLock locker(lock_);
254   DCHECK(has_local_consumer_no_lock());
255 
256   if (consumer_in_two_phase_read_no_lock())
257     return MOJO_RESULT_BUSY;
258 
259   if (*num_bytes % element_num_bytes_ != 0)
260     return MOJO_RESULT_INVALID_ARGUMENT;
261 
262   if (*num_bytes == 0)
263     return MOJO_RESULT_OK;  // Nothing to do.
264 
265   HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock();
266   MojoResult rv = ConsumerDiscardDataImplNoLock(num_bytes, all_or_none);
267   HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
268   if (!new_producer_state.equals(old_producer_state))
269     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
270   return rv;
271 }
272 
ConsumerQueryData(uint32_t * num_bytes)273 MojoResult DataPipe::ConsumerQueryData(uint32_t* num_bytes) {
274   base::AutoLock locker(lock_);
275   DCHECK(has_local_consumer_no_lock());
276 
277   if (consumer_in_two_phase_read_no_lock())
278     return MOJO_RESULT_BUSY;
279 
280   // Note: Don't need to validate |*num_bytes| for query.
281   return ConsumerQueryDataImplNoLock(num_bytes);
282 }
283 
ConsumerBeginReadData(const void ** buffer,uint32_t * buffer_num_bytes,bool all_or_none)284 MojoResult DataPipe::ConsumerBeginReadData(const void** buffer,
285                                            uint32_t* buffer_num_bytes,
286                                            bool all_or_none) {
287   base::AutoLock locker(lock_);
288   DCHECK(has_local_consumer_no_lock());
289 
290   if (consumer_in_two_phase_read_no_lock())
291     return MOJO_RESULT_BUSY;
292 
293   if (all_or_none && *buffer_num_bytes % element_num_bytes_ != 0)
294     return MOJO_RESULT_INVALID_ARGUMENT;
295 
296   MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes,
297                                                   all_or_none);
298   if (rv != MOJO_RESULT_OK)
299     return rv;
300   DCHECK(consumer_in_two_phase_read_no_lock());
301   return MOJO_RESULT_OK;
302 }
303 
ConsumerEndReadData(uint32_t num_bytes_read)304 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
305   base::AutoLock locker(lock_);
306   DCHECK(has_local_consumer_no_lock());
307 
308   if (!consumer_in_two_phase_read_no_lock())
309     return MOJO_RESULT_FAILED_PRECONDITION;
310 
311   HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock();
312   MojoResult rv;
313   if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
314       num_bytes_read % element_num_bytes_ != 0) {
315     rv = MOJO_RESULT_INVALID_ARGUMENT;
316     consumer_two_phase_max_num_bytes_read_ = 0;
317   } else {
318     rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
319   }
320   // Two-phase read ended even on failure.
321   DCHECK(!consumer_in_two_phase_read_no_lock());
322   // If we're now readable, we *became* readable (since we weren't readable
323   // during the two-phase read), so awake consumer waiters.
324   HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock();
325   if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
326     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
327   HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
328   if (!new_producer_state.equals(old_producer_state))
329     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
330   return rv;
331 }
332 
ConsumerAddWaiter(Waiter * waiter,MojoHandleSignals signals,uint32_t context)333 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
334                                        MojoHandleSignals signals,
335                                        uint32_t context) {
336   base::AutoLock locker(lock_);
337   DCHECK(has_local_consumer_no_lock());
338 
339   HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateNoLock();
340   if (consumer_state.satisfies(signals))
341     return MOJO_RESULT_ALREADY_EXISTS;
342   if (!consumer_state.can_satisfy(signals))
343     return MOJO_RESULT_FAILED_PRECONDITION;
344 
345   consumer_waiter_list_->AddWaiter(waiter, signals, context);
346   return MOJO_RESULT_OK;
347 }
348 
ConsumerRemoveWaiter(Waiter * waiter)349 void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) {
350   base::AutoLock locker(lock_);
351   DCHECK(has_local_consumer_no_lock());
352   consumer_waiter_list_->RemoveWaiter(waiter);
353 }
354 
ConsumerIsBusy() const355 bool DataPipe::ConsumerIsBusy() const {
356   base::AutoLock locker(lock_);
357   return consumer_in_two_phase_read_no_lock();
358 }
359 
360 
DataPipe(bool has_local_producer,bool has_local_consumer,const MojoCreateDataPipeOptions & validated_options)361 DataPipe::DataPipe(bool has_local_producer,
362                    bool has_local_consumer,
363                    const MojoCreateDataPipeOptions& validated_options)
364     : may_discard_((validated_options.flags &
365                        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
366       element_num_bytes_(validated_options.element_num_bytes),
367       capacity_num_bytes_(validated_options.capacity_num_bytes),
368       producer_open_(true),
369       consumer_open_(true),
370       producer_waiter_list_(has_local_producer ? new WaiterList() : NULL),
371       consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL),
372       producer_two_phase_max_num_bytes_written_(0),
373       consumer_two_phase_max_num_bytes_read_(0) {
374   // Check that the passed in options actually are validated.
375   MojoCreateDataPipeOptions unused ALLOW_UNUSED = { 0 };
376   DCHECK_EQ(ValidateCreateOptions(&validated_options, &unused), MOJO_RESULT_OK);
377 }
378 
~DataPipe()379 DataPipe::~DataPipe() {
380   DCHECK(!producer_open_);
381   DCHECK(!consumer_open_);
382   DCHECK(!producer_waiter_list_);
383   DCHECK(!consumer_waiter_list_);
384 }
385 
AwakeProducerWaitersForStateChangeNoLock(const HandleSignalsState & new_producer_state)386 void DataPipe::AwakeProducerWaitersForStateChangeNoLock(
387     const HandleSignalsState& new_producer_state) {
388   lock_.AssertAcquired();
389   if (!has_local_producer_no_lock())
390     return;
391   producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state);
392 }
393 
AwakeConsumerWaitersForStateChangeNoLock(const HandleSignalsState & new_consumer_state)394 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock(
395     const HandleSignalsState& new_consumer_state) {
396   lock_.AssertAcquired();
397   if (!has_local_consumer_no_lock())
398     return;
399   consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state);
400 }
401 
402 }  // namespace system
403 }  // namespace mojo
404