1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/data_pipe.h"
6
7 #include <string.h>
8
9 #include <algorithm>
10 #include <limits>
11
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "mojo/system/constants.h"
15 #include "mojo/system/memory.h"
16 #include "mojo/system/options_validation.h"
17 #include "mojo/system/waiter_list.h"
18
19 namespace mojo {
20 namespace system {
21
22 // static
23 const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = {
24 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
25 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
26 1u,
27 static_cast<uint32_t>(kDefaultDataPipeCapacityBytes)
28 };
29
30 // static
ValidateCreateOptions(const MojoCreateDataPipeOptions * in_options,MojoCreateDataPipeOptions * out_options)31 MojoResult DataPipe::ValidateCreateOptions(
32 const MojoCreateDataPipeOptions* in_options,
33 MojoCreateDataPipeOptions* out_options) {
34 const MojoCreateDataPipeOptionsFlags kKnownFlags =
35 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD;
36
37 *out_options = kDefaultCreateOptions;
38 if (!in_options)
39 return MOJO_RESULT_OK;
40
41 MojoResult result =
42 ValidateOptionsStructPointerSizeAndFlags<MojoCreateDataPipeOptions>(
43 in_options, kKnownFlags, out_options);
44 if (result != MOJO_RESULT_OK)
45 return result;
46
47 // Checks for fields beyond |flags|:
48
49 if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions, element_num_bytes,
50 in_options))
51 return MOJO_RESULT_OK;
52 if (in_options->element_num_bytes == 0)
53 return MOJO_RESULT_INVALID_ARGUMENT;
54 out_options->element_num_bytes = in_options->element_num_bytes;
55
56 if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions, capacity_num_bytes,
57 in_options) ||
58 in_options->capacity_num_bytes == 0) {
59 // Round the default capacity down to a multiple of the element size (but at
60 // least one element).
61 out_options->capacity_num_bytes = std::max(
62 static_cast<uint32_t>(kDefaultDataPipeCapacityBytes -
63 (kDefaultDataPipeCapacityBytes % out_options->element_num_bytes)),
64 out_options->element_num_bytes);
65 return MOJO_RESULT_OK;
66 }
67 if (in_options->capacity_num_bytes % out_options->element_num_bytes != 0)
68 return MOJO_RESULT_INVALID_ARGUMENT;
69 if (in_options->capacity_num_bytes > kMaxDataPipeCapacityBytes)
70 return MOJO_RESULT_RESOURCE_EXHAUSTED;
71 out_options->capacity_num_bytes = in_options->capacity_num_bytes;
72
73 return MOJO_RESULT_OK;
74 }
75
ProducerCancelAllWaiters()76 void DataPipe::ProducerCancelAllWaiters() {
77 base::AutoLock locker(lock_);
78 DCHECK(has_local_producer_no_lock());
79 producer_waiter_list_->CancelAllWaiters();
80 }
81
ProducerClose()82 void DataPipe::ProducerClose() {
83 base::AutoLock locker(lock_);
84 DCHECK(producer_open_);
85 producer_open_ = false;
86 DCHECK(has_local_producer_no_lock());
87 producer_waiter_list_.reset();
88 // Not a bug, except possibly in "user" code.
89 DVLOG_IF(2, producer_in_two_phase_write_no_lock())
90 << "Producer closed with active two-phase write";
91 producer_two_phase_max_num_bytes_written_ = 0;
92 ProducerCloseImplNoLock();
93 AwakeConsumerWaitersForStateChangeNoLock(
94 ConsumerGetHandleSignalsStateNoLock());
95 }
96
ProducerWriteData(const void * elements,uint32_t * num_bytes,bool all_or_none)97 MojoResult DataPipe::ProducerWriteData(const void* elements,
98 uint32_t* num_bytes,
99 bool all_or_none) {
100 base::AutoLock locker(lock_);
101 DCHECK(has_local_producer_no_lock());
102
103 if (producer_in_two_phase_write_no_lock())
104 return MOJO_RESULT_BUSY;
105 if (!consumer_open_no_lock())
106 return MOJO_RESULT_FAILED_PRECONDITION;
107
108 // Returning "busy" takes priority over "invalid argument".
109 if (*num_bytes % element_num_bytes_ != 0)
110 return MOJO_RESULT_INVALID_ARGUMENT;
111
112 if (*num_bytes == 0)
113 return MOJO_RESULT_OK; // Nothing to do.
114
115 HandleSignalsState old_consumer_state = ConsumerGetHandleSignalsStateNoLock();
116 MojoResult rv = ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none);
117 HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock();
118 if (!new_consumer_state.equals(old_consumer_state))
119 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
120 return rv;
121 }
122
ProducerBeginWriteData(void ** buffer,uint32_t * buffer_num_bytes,bool all_or_none)123 MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
124 uint32_t* buffer_num_bytes,
125 bool all_or_none) {
126 base::AutoLock locker(lock_);
127 DCHECK(has_local_producer_no_lock());
128
129 if (producer_in_two_phase_write_no_lock())
130 return MOJO_RESULT_BUSY;
131 if (!consumer_open_no_lock())
132 return MOJO_RESULT_FAILED_PRECONDITION;
133
134 if (all_or_none && *buffer_num_bytes % element_num_bytes_ != 0)
135 return MOJO_RESULT_INVALID_ARGUMENT;
136
137 MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes,
138 all_or_none);
139 if (rv != MOJO_RESULT_OK)
140 return rv;
141 // Note: No need to awake producer waiters, even though we're going from
142 // writable to non-writable (since you can't wait on non-writability).
143 // Similarly, though this may have discarded data (in "may discard" mode),
144 // making it non-readable, there's still no need to awake consumer waiters.
145 DCHECK(producer_in_two_phase_write_no_lock());
146 return MOJO_RESULT_OK;
147 }
148
ProducerEndWriteData(uint32_t num_bytes_written)149 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
150 base::AutoLock locker(lock_);
151 DCHECK(has_local_producer_no_lock());
152
153 if (!producer_in_two_phase_write_no_lock())
154 return MOJO_RESULT_FAILED_PRECONDITION;
155 // Note: Allow successful completion of the two-phase write even if the
156 // consumer has been closed.
157
158 HandleSignalsState old_consumer_state = ConsumerGetHandleSignalsStateNoLock();
159 MojoResult rv;
160 if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
161 num_bytes_written % element_num_bytes_ != 0) {
162 rv = MOJO_RESULT_INVALID_ARGUMENT;
163 producer_two_phase_max_num_bytes_written_ = 0;
164 } else {
165 rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
166 }
167 // Two-phase write ended even on failure.
168 DCHECK(!producer_in_two_phase_write_no_lock());
169 // If we're now writable, we *became* writable (since we weren't writable
170 // during the two-phase write), so awake producer waiters.
171 HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
172 if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
173 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
174 HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock();
175 if (!new_consumer_state.equals(old_consumer_state))
176 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
177 return rv;
178 }
179
ProducerAddWaiter(Waiter * waiter,MojoHandleSignals signals,uint32_t context)180 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
181 MojoHandleSignals signals,
182 uint32_t context) {
183 base::AutoLock locker(lock_);
184 DCHECK(has_local_producer_no_lock());
185
186 HandleSignalsState producer_state = ProducerGetHandleSignalsStateNoLock();
187 if (producer_state.satisfies(signals))
188 return MOJO_RESULT_ALREADY_EXISTS;
189 if (!producer_state.can_satisfy(signals))
190 return MOJO_RESULT_FAILED_PRECONDITION;
191
192 producer_waiter_list_->AddWaiter(waiter, signals, context);
193 return MOJO_RESULT_OK;
194 }
195
ProducerRemoveWaiter(Waiter * waiter)196 void DataPipe::ProducerRemoveWaiter(Waiter* waiter) {
197 base::AutoLock locker(lock_);
198 DCHECK(has_local_producer_no_lock());
199 producer_waiter_list_->RemoveWaiter(waiter);
200 }
201
ProducerIsBusy() const202 bool DataPipe::ProducerIsBusy() const {
203 base::AutoLock locker(lock_);
204 return producer_in_two_phase_write_no_lock();
205 }
206
ConsumerCancelAllWaiters()207 void DataPipe::ConsumerCancelAllWaiters() {
208 base::AutoLock locker(lock_);
209 DCHECK(has_local_consumer_no_lock());
210 consumer_waiter_list_->CancelAllWaiters();
211 }
212
ConsumerClose()213 void DataPipe::ConsumerClose() {
214 base::AutoLock locker(lock_);
215 DCHECK(consumer_open_);
216 consumer_open_ = false;
217 DCHECK(has_local_consumer_no_lock());
218 consumer_waiter_list_.reset();
219 // Not a bug, except possibly in "user" code.
220 DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
221 << "Consumer closed with active two-phase read";
222 consumer_two_phase_max_num_bytes_read_ = 0;
223 ConsumerCloseImplNoLock();
224 AwakeProducerWaitersForStateChangeNoLock(
225 ProducerGetHandleSignalsStateNoLock());
226 }
227
ConsumerReadData(void * elements,uint32_t * num_bytes,bool all_or_none)228 MojoResult DataPipe::ConsumerReadData(void* elements,
229 uint32_t* num_bytes,
230 bool all_or_none) {
231 base::AutoLock locker(lock_);
232 DCHECK(has_local_consumer_no_lock());
233
234 if (consumer_in_two_phase_read_no_lock())
235 return MOJO_RESULT_BUSY;
236
237 if (*num_bytes % element_num_bytes_ != 0)
238 return MOJO_RESULT_INVALID_ARGUMENT;
239
240 if (*num_bytes == 0)
241 return MOJO_RESULT_OK; // Nothing to do.
242
243 HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock();
244 MojoResult rv = ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none);
245 HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
246 if (!new_producer_state.equals(old_producer_state))
247 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
248 return rv;
249 }
250
ConsumerDiscardData(uint32_t * num_bytes,bool all_or_none)251 MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes,
252 bool all_or_none) {
253 base::AutoLock locker(lock_);
254 DCHECK(has_local_consumer_no_lock());
255
256 if (consumer_in_two_phase_read_no_lock())
257 return MOJO_RESULT_BUSY;
258
259 if (*num_bytes % element_num_bytes_ != 0)
260 return MOJO_RESULT_INVALID_ARGUMENT;
261
262 if (*num_bytes == 0)
263 return MOJO_RESULT_OK; // Nothing to do.
264
265 HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock();
266 MojoResult rv = ConsumerDiscardDataImplNoLock(num_bytes, all_or_none);
267 HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
268 if (!new_producer_state.equals(old_producer_state))
269 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
270 return rv;
271 }
272
ConsumerQueryData(uint32_t * num_bytes)273 MojoResult DataPipe::ConsumerQueryData(uint32_t* num_bytes) {
274 base::AutoLock locker(lock_);
275 DCHECK(has_local_consumer_no_lock());
276
277 if (consumer_in_two_phase_read_no_lock())
278 return MOJO_RESULT_BUSY;
279
280 // Note: Don't need to validate |*num_bytes| for query.
281 return ConsumerQueryDataImplNoLock(num_bytes);
282 }
283
ConsumerBeginReadData(const void ** buffer,uint32_t * buffer_num_bytes,bool all_or_none)284 MojoResult DataPipe::ConsumerBeginReadData(const void** buffer,
285 uint32_t* buffer_num_bytes,
286 bool all_or_none) {
287 base::AutoLock locker(lock_);
288 DCHECK(has_local_consumer_no_lock());
289
290 if (consumer_in_two_phase_read_no_lock())
291 return MOJO_RESULT_BUSY;
292
293 if (all_or_none && *buffer_num_bytes % element_num_bytes_ != 0)
294 return MOJO_RESULT_INVALID_ARGUMENT;
295
296 MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes,
297 all_or_none);
298 if (rv != MOJO_RESULT_OK)
299 return rv;
300 DCHECK(consumer_in_two_phase_read_no_lock());
301 return MOJO_RESULT_OK;
302 }
303
ConsumerEndReadData(uint32_t num_bytes_read)304 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
305 base::AutoLock locker(lock_);
306 DCHECK(has_local_consumer_no_lock());
307
308 if (!consumer_in_two_phase_read_no_lock())
309 return MOJO_RESULT_FAILED_PRECONDITION;
310
311 HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock();
312 MojoResult rv;
313 if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
314 num_bytes_read % element_num_bytes_ != 0) {
315 rv = MOJO_RESULT_INVALID_ARGUMENT;
316 consumer_two_phase_max_num_bytes_read_ = 0;
317 } else {
318 rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
319 }
320 // Two-phase read ended even on failure.
321 DCHECK(!consumer_in_two_phase_read_no_lock());
322 // If we're now readable, we *became* readable (since we weren't readable
323 // during the two-phase read), so awake consumer waiters.
324 HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock();
325 if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
326 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
327 HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
328 if (!new_producer_state.equals(old_producer_state))
329 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
330 return rv;
331 }
332
ConsumerAddWaiter(Waiter * waiter,MojoHandleSignals signals,uint32_t context)333 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
334 MojoHandleSignals signals,
335 uint32_t context) {
336 base::AutoLock locker(lock_);
337 DCHECK(has_local_consumer_no_lock());
338
339 HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateNoLock();
340 if (consumer_state.satisfies(signals))
341 return MOJO_RESULT_ALREADY_EXISTS;
342 if (!consumer_state.can_satisfy(signals))
343 return MOJO_RESULT_FAILED_PRECONDITION;
344
345 consumer_waiter_list_->AddWaiter(waiter, signals, context);
346 return MOJO_RESULT_OK;
347 }
348
ConsumerRemoveWaiter(Waiter * waiter)349 void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) {
350 base::AutoLock locker(lock_);
351 DCHECK(has_local_consumer_no_lock());
352 consumer_waiter_list_->RemoveWaiter(waiter);
353 }
354
ConsumerIsBusy() const355 bool DataPipe::ConsumerIsBusy() const {
356 base::AutoLock locker(lock_);
357 return consumer_in_two_phase_read_no_lock();
358 }
359
360
DataPipe(bool has_local_producer,bool has_local_consumer,const MojoCreateDataPipeOptions & validated_options)361 DataPipe::DataPipe(bool has_local_producer,
362 bool has_local_consumer,
363 const MojoCreateDataPipeOptions& validated_options)
364 : may_discard_((validated_options.flags &
365 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
366 element_num_bytes_(validated_options.element_num_bytes),
367 capacity_num_bytes_(validated_options.capacity_num_bytes),
368 producer_open_(true),
369 consumer_open_(true),
370 producer_waiter_list_(has_local_producer ? new WaiterList() : NULL),
371 consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL),
372 producer_two_phase_max_num_bytes_written_(0),
373 consumer_two_phase_max_num_bytes_read_(0) {
374 // Check that the passed in options actually are validated.
375 MojoCreateDataPipeOptions unused ALLOW_UNUSED = { 0 };
376 DCHECK_EQ(ValidateCreateOptions(&validated_options, &unused), MOJO_RESULT_OK);
377 }
378
~DataPipe()379 DataPipe::~DataPipe() {
380 DCHECK(!producer_open_);
381 DCHECK(!consumer_open_);
382 DCHECK(!producer_waiter_list_);
383 DCHECK(!consumer_waiter_list_);
384 }
385
AwakeProducerWaitersForStateChangeNoLock(const HandleSignalsState & new_producer_state)386 void DataPipe::AwakeProducerWaitersForStateChangeNoLock(
387 const HandleSignalsState& new_producer_state) {
388 lock_.AssertAcquired();
389 if (!has_local_producer_no_lock())
390 return;
391 producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state);
392 }
393
AwakeConsumerWaitersForStateChangeNoLock(const HandleSignalsState & new_consumer_state)394 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock(
395 const HandleSignalsState& new_consumer_state) {
396 lock_.AssertAcquired();
397 if (!has_local_consumer_no_lock())
398 return;
399 consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state);
400 }
401
402 } // namespace system
403 } // namespace mojo
404