1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stddef.h>
6 #include <stdint.h>
7
8 #include <memory>
9
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/run_loop.h"
16 #include "build/build_config.h"
17 #include "mojo/core/embedder/embedder.h"
18 #include "mojo/core/test/mojo_test_base.h"
19 #include "mojo/core/test_utils.h"
20 #include "mojo/public/c/system/data_pipe.h"
21 #include "mojo/public/c/system/functions.h"
22 #include "mojo/public/c/system/message_pipe.h"
23 #include "mojo/public/cpp/system/message_pipe.h"
24 #include "mojo/public/cpp/system/simple_watcher.h"
25 #include "testing/gtest/include/gtest/gtest.h"
26
27 namespace mojo {
28 namespace core {
29 namespace {
30
31 const uint32_t kSizeOfOptions =
32 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
33
34 // In various places, we have to poll (since, e.g., we can't yet wait for a
35 // certain amount of data to be available). This is the maximum number of
36 // iterations (separated by a short sleep).
37 // TODO(vtl): Get rid of this.
38 const size_t kMaxPoll = 100;
39
40 // Used in Multiprocess test.
41 const size_t kMultiprocessCapacity = 37;
42 const char kMultiprocessTestData[] = "hello i'm a string that is 36 bytes";
43 const int kMultiprocessMaxIter = 5;
44
45 // TODO(rockot): There are many uses of ASSERT where EXPECT would be more
46 // appropriate. Fix this.
47
48 class DataPipeTest : public test::MojoTestBase {
49 public:
DataPipeTest()50 DataPipeTest()
51 : producer_(MOJO_HANDLE_INVALID), consumer_(MOJO_HANDLE_INVALID) {}
52
~DataPipeTest()53 ~DataPipeTest() override {
54 if (producer_ != MOJO_HANDLE_INVALID)
55 CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_));
56 if (consumer_ != MOJO_HANDLE_INVALID)
57 CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_));
58 }
59
ReadEmptyMessageWithHandles(MojoHandle pipe,MojoHandle * out_handles,uint32_t num_handles)60 MojoResult ReadEmptyMessageWithHandles(MojoHandle pipe,
61 MojoHandle* out_handles,
62 uint32_t num_handles) {
63 std::vector<uint8_t> bytes;
64 std::vector<ScopedHandle> handles;
65 MojoResult rv = ReadMessageRaw(MessagePipeHandle(pipe), &bytes, &handles,
66 MOJO_READ_MESSAGE_FLAG_NONE);
67 if (rv == MOJO_RESULT_OK) {
68 CHECK_EQ(0u, bytes.size());
69 CHECK_EQ(num_handles, handles.size());
70 for (size_t i = 0; i < num_handles; ++i)
71 out_handles[i] = handles[i].release().value();
72 }
73 return rv;
74 }
75
Create(const MojoCreateDataPipeOptions * options)76 MojoResult Create(const MojoCreateDataPipeOptions* options) {
77 return MojoCreateDataPipe(options, &producer_, &consumer_);
78 }
79
WriteData(const void * elements,uint32_t * num_bytes,bool all_or_none=false)80 MojoResult WriteData(const void* elements,
81 uint32_t* num_bytes,
82 bool all_or_none = false) {
83 MojoWriteDataOptions options;
84 options.struct_size = sizeof(options);
85 options.flags = all_or_none ? MOJO_WRITE_DATA_FLAG_ALL_OR_NONE
86 : MOJO_WRITE_DATA_FLAG_NONE;
87 return MojoWriteData(producer_, elements, num_bytes, &options);
88 }
89
ReadData(void * elements,uint32_t * num_bytes,bool all_or_none=false,bool peek=false)90 MojoResult ReadData(void* elements,
91 uint32_t* num_bytes,
92 bool all_or_none = false,
93 bool peek = false) {
94 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
95 if (all_or_none)
96 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
97 if (peek)
98 flags |= MOJO_READ_DATA_FLAG_PEEK;
99
100 MojoReadDataOptions options;
101 options.struct_size = sizeof(options);
102 options.flags = flags;
103 return MojoReadData(consumer_, &options, elements, num_bytes);
104 }
105
QueryData(uint32_t * num_bytes)106 MojoResult QueryData(uint32_t* num_bytes) {
107 MojoReadDataOptions options;
108 options.struct_size = sizeof(options);
109 options.flags = MOJO_READ_DATA_FLAG_QUERY;
110 return MojoReadData(consumer_, &options, nullptr, num_bytes);
111 }
112
DiscardData(uint32_t * num_bytes,bool all_or_none=false)113 MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) {
114 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD;
115 if (all_or_none)
116 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
117 MojoReadDataOptions options;
118 options.struct_size = sizeof(options);
119 options.flags = flags;
120 return MojoReadData(consumer_, &options, nullptr, num_bytes);
121 }
122
BeginReadData(const void ** elements,uint32_t * num_bytes)123 MojoResult BeginReadData(const void** elements, uint32_t* num_bytes) {
124 return MojoBeginReadData(consumer_, nullptr, elements, num_bytes);
125 }
126
EndReadData(uint32_t num_bytes_read)127 MojoResult EndReadData(uint32_t num_bytes_read) {
128 return MojoEndReadData(consumer_, num_bytes_read, nullptr);
129 }
130
BeginWriteData(void ** elements,uint32_t * num_bytes)131 MojoResult BeginWriteData(void** elements, uint32_t* num_bytes) {
132 return MojoBeginWriteData(producer_, nullptr, elements, num_bytes);
133 }
134
EndWriteData(uint32_t num_bytes_written)135 MojoResult EndWriteData(uint32_t num_bytes_written) {
136 return MojoEndWriteData(producer_, num_bytes_written, nullptr);
137 }
138
CloseProducer()139 MojoResult CloseProducer() {
140 MojoResult rv = MojoClose(producer_);
141 producer_ = MOJO_HANDLE_INVALID;
142 return rv;
143 }
144
CloseConsumer()145 MojoResult CloseConsumer() {
146 MojoResult rv = MojoClose(consumer_);
147 consumer_ = MOJO_HANDLE_INVALID;
148 return rv;
149 }
150
151 MojoHandle producer_, consumer_;
152
153 private:
154 DISALLOW_COPY_AND_ASSIGN(DataPipeTest);
155 };
156
TEST_F(DataPipeTest,Basic)157 TEST_F(DataPipeTest, Basic) {
158 const MojoCreateDataPipeOptions options = {
159 kSizeOfOptions, // |struct_size|.
160 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
161 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
162 1000 * sizeof(int32_t) // |capacity_num_bytes|.
163 };
164
165 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
166
167 // We can write to a data pipe handle immediately.
168 int32_t elements[10] = {};
169 uint32_t num_bytes = 0;
170
171 num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
172
173 elements[0] = 123;
174 elements[1] = 456;
175 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
176 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes));
177
178 // Now wait for the other side to become readable.
179 MojoHandleSignalsState state;
180 ASSERT_EQ(MOJO_RESULT_OK,
181 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &state));
182 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
183 state.satisfied_signals);
184
185 elements[0] = -1;
186 elements[1] = -1;
187 ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes));
188 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
189 ASSERT_EQ(elements[0], 123);
190 ASSERT_EQ(elements[1], 456);
191 }
192
193 // Tests creation of data pipes with various (valid) options.
TEST_F(DataPipeTest,CreateAndMaybeTransfer)194 TEST_F(DataPipeTest, CreateAndMaybeTransfer) {
195 MojoCreateDataPipeOptions test_options[] = {
196 // Default options.
197 {},
198 // Trivial element size, non-default capacity.
199 {kSizeOfOptions, // |struct_size|.
200 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
201 1, // |element_num_bytes|.
202 1000}, // |capacity_num_bytes|.
203 // Nontrivial element size, non-default capacity.
204 {kSizeOfOptions, // |struct_size|.
205 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
206 4, // |element_num_bytes|.
207 4000}, // |capacity_num_bytes|.
208 // Nontrivial element size, default capacity.
209 {kSizeOfOptions, // |struct_size|.
210 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
211 100, // |element_num_bytes|.
212 0} // |capacity_num_bytes|.
213 };
214 for (size_t i = 0; i < arraysize(test_options); i++) {
215 MojoHandle producer_handle, consumer_handle;
216 MojoCreateDataPipeOptions* options = i ? &test_options[i] : nullptr;
217 ASSERT_EQ(MOJO_RESULT_OK,
218 MojoCreateDataPipe(options, &producer_handle, &consumer_handle));
219 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle));
220 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle));
221 }
222 }
223
TEST_F(DataPipeTest,SimpleReadWrite)224 TEST_F(DataPipeTest, SimpleReadWrite) {
225 const MojoCreateDataPipeOptions options = {
226 kSizeOfOptions, // |struct_size|.
227 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
228 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
229 1000 * sizeof(int32_t) // |capacity_num_bytes|.
230 };
231
232 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
233 MojoHandleSignalsState hss;
234
235 int32_t elements[10] = {};
236 uint32_t num_bytes = 0;
237
238 // Try reading; nothing there yet.
239 num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
240 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes));
241
242 // Query; nothing there yet.
243 num_bytes = 0;
244 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
245 ASSERT_EQ(0u, num_bytes);
246
247 // Discard; nothing there yet.
248 num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
249 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes));
250
251 // Read with invalid |num_bytes|.
252 num_bytes = sizeof(elements[0]) + 1;
253 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes));
254
255 // Write two elements.
256 elements[0] = 123;
257 elements[1] = 456;
258 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
259 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
260 // It should have written everything (even without "all or none").
261 ASSERT_EQ(2u * sizeof(elements[0]), num_bytes);
262
263 // Wait.
264 ASSERT_EQ(MOJO_RESULT_OK,
265 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
266 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
267 hss.satisfied_signals);
268 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
269 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
270 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
271 hss.satisfiable_signals);
272
273 // Query.
274 // TODO(vtl): It's theoretically possible (though not with the current
275 // implementation/configured limits) that not all the data has arrived yet.
276 // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
277 // or |2 * ...|.)
278 num_bytes = 0;
279 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
280 ASSERT_EQ(2 * sizeof(elements[0]), num_bytes);
281
282 // Read one element.
283 elements[0] = -1;
284 elements[1] = -1;
285 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
286 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes));
287 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
288 ASSERT_EQ(123, elements[0]);
289 ASSERT_EQ(-1, elements[1]);
290
291 // Query.
292 // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
293 // should get 1 here.)
294 num_bytes = 0;
295 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
296 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
297
298 // Peek one element.
299 elements[0] = -1;
300 elements[1] = -1;
301 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
302 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true));
303 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
304 ASSERT_EQ(456, elements[0]);
305 ASSERT_EQ(-1, elements[1]);
306
307 // Query. Still has 1 element remaining.
308 num_bytes = 0;
309 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
310 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
311
312 // Try to read two elements, with "all or none".
313 elements[0] = -1;
314 elements[1] = -1;
315 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
316 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
317 ReadData(elements, &num_bytes, true, false));
318 ASSERT_EQ(-1, elements[0]);
319 ASSERT_EQ(-1, elements[1]);
320
321 // Try to read two elements, without "all or none".
322 elements[0] = -1;
323 elements[1] = -1;
324 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
325 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false));
326 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
327 ASSERT_EQ(456, elements[0]);
328 ASSERT_EQ(-1, elements[1]);
329
330 // Query.
331 num_bytes = 0;
332 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
333 ASSERT_EQ(0u, num_bytes);
334 }
335
336 // Note: The "basic" waiting tests test that the "wait states" are correct in
337 // various situations; they don't test that waiters are properly awoken on state
338 // changes. (For that, we need to use multiple threads.)
TEST_F(DataPipeTest,BasicProducerWaiting)339 TEST_F(DataPipeTest, BasicProducerWaiting) {
340 // Note: We take advantage of the fact that current for current
341 // implementations capacities are strict maximums. This is not guaranteed by
342 // the API.
343
344 const MojoCreateDataPipeOptions options = {
345 kSizeOfOptions, // |struct_size|.
346 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
347 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
348 2 * sizeof(int32_t) // |capacity_num_bytes|.
349 };
350 Create(&options);
351 MojoHandleSignalsState hss;
352
353 // Never readable. Already writable.
354 hss = GetSignalsState(producer_);
355 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
356 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
357 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
358 hss.satisfiable_signals);
359
360 // Write two elements.
361 int32_t elements[2] = {123, 456};
362 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
363 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
364 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
365
366 // Wait for data to become available to the consumer.
367 ASSERT_EQ(MOJO_RESULT_OK,
368 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
369 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
370 hss.satisfied_signals);
371 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
372 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
373 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
374 hss.satisfiable_signals);
375
376 // Peek one element.
377 elements[0] = -1;
378 elements[1] = -1;
379 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
380 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
381 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
382 ASSERT_EQ(123, elements[0]);
383 ASSERT_EQ(-1, elements[1]);
384
385 // Read one element.
386 elements[0] = -1;
387 elements[1] = -1;
388 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
389 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false));
390 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
391 ASSERT_EQ(123, elements[0]);
392 ASSERT_EQ(-1, elements[1]);
393
394 // Try writing, using a two-phase write.
395 void* buffer = nullptr;
396 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
397 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
398 EXPECT_TRUE(buffer);
399 ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
400
401 static_cast<int32_t*>(buffer)[0] = 789;
402 ASSERT_EQ(MOJO_RESULT_OK,
403 EndWriteData(static_cast<uint32_t>(1u * sizeof(elements[0]))));
404
405 // Read one element, using a two-phase read.
406 const void* read_buffer = nullptr;
407 num_bytes = 0u;
408 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
409 EXPECT_TRUE(read_buffer);
410 // The two-phase read should be able to read at least one element.
411 ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
412 ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
413 ASSERT_EQ(MOJO_RESULT_OK,
414 EndReadData(static_cast<uint32_t>(1u * sizeof(elements[0]))));
415
416 // Write one element.
417 elements[0] = 123;
418 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
419 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
420 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
421
422 // Close the consumer.
423 CloseConsumer();
424
425 // It should now be never-writable.
426 hss = MojoHandleSignalsState();
427 ASSERT_EQ(MOJO_RESULT_OK,
428 WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
429 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
430 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
431 }
432
TEST_F(DataPipeTest,PeerClosedProducerWaiting)433 TEST_F(DataPipeTest, PeerClosedProducerWaiting) {
434 const MojoCreateDataPipeOptions options = {
435 kSizeOfOptions, // |struct_size|.
436 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
437 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
438 2 * sizeof(int32_t) // |capacity_num_bytes|.
439 };
440 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
441 MojoHandleSignalsState hss;
442
443 // Close the consumer.
444 CloseConsumer();
445
446 // It should be signaled.
447 hss = MojoHandleSignalsState();
448 ASSERT_EQ(MOJO_RESULT_OK,
449 WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
450 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
451 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
452 }
453
TEST_F(DataPipeTest,PeerClosedConsumerWaiting)454 TEST_F(DataPipeTest, PeerClosedConsumerWaiting) {
455 const MojoCreateDataPipeOptions options = {
456 kSizeOfOptions, // |struct_size|.
457 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
458 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
459 2 * sizeof(int32_t) // |capacity_num_bytes|.
460 };
461 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
462 MojoHandleSignalsState hss;
463
464 // Close the producer.
465 CloseProducer();
466
467 // It should be signaled.
468 hss = MojoHandleSignalsState();
469 ASSERT_EQ(MOJO_RESULT_OK,
470 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
471 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
472 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
473 }
474
TEST_F(DataPipeTest,BasicConsumerWaiting)475 TEST_F(DataPipeTest, BasicConsumerWaiting) {
476 const MojoCreateDataPipeOptions options = {
477 kSizeOfOptions, // |struct_size|.
478 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
479 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
480 1000 * sizeof(int32_t) // |capacity_num_bytes|.
481 };
482 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
483 MojoHandleSignalsState hss;
484
485 // Never writable.
486 hss = MojoHandleSignalsState();
487 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
488 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
489 EXPECT_EQ(0u, hss.satisfied_signals);
490 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
491 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
492 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
493 hss.satisfiable_signals);
494
495 // Write two elements.
496 int32_t elements[2] = {123, 456};
497 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
498 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
499
500 // Wait for readability.
501 hss = MojoHandleSignalsState();
502 ASSERT_EQ(MOJO_RESULT_OK,
503 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
504 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
505 hss.satisfied_signals);
506 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
507 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
508 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
509 hss.satisfiable_signals);
510
511 // Discard one element.
512 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
513 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
514 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
515
516 // Should still be readable.
517 hss = MojoHandleSignalsState();
518 ASSERT_EQ(MOJO_RESULT_OK,
519 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
520 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
521 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
522 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
523 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
524 hss.satisfiable_signals);
525
526 // Peek one element.
527 elements[0] = -1;
528 elements[1] = -1;
529 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
530 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
531 ASSERT_EQ(456, elements[0]);
532 ASSERT_EQ(-1, elements[1]);
533
534 // Should still be readable.
535 hss = MojoHandleSignalsState();
536 ASSERT_EQ(MOJO_RESULT_OK,
537 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
538 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
539 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
540 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
541 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
542 hss.satisfiable_signals);
543
544 // Read one element.
545 elements[0] = -1;
546 elements[1] = -1;
547 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
548 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
549 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
550 ASSERT_EQ(456, elements[0]);
551 ASSERT_EQ(-1, elements[1]);
552
553 // Write one element.
554 elements[0] = 789;
555 elements[1] = -1;
556 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
557 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
558
559 // Waiting should now succeed.
560 hss = MojoHandleSignalsState();
561 ASSERT_EQ(MOJO_RESULT_OK,
562 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
563 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
564 hss.satisfied_signals);
565 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
566 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
567 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
568 hss.satisfiable_signals);
569
570 // Close the producer.
571 CloseProducer();
572
573 // Should still be readable.
574 hss = MojoHandleSignalsState();
575 ASSERT_EQ(MOJO_RESULT_OK,
576 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
577 EXPECT_TRUE(hss.satisfied_signals & (MOJO_HANDLE_SIGNAL_READABLE |
578 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE));
579 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
580 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
581 hss.satisfiable_signals);
582
583 // Wait for the peer closed signal.
584 hss = MojoHandleSignalsState();
585 ASSERT_EQ(MOJO_RESULT_OK,
586 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
587 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
588 MOJO_HANDLE_SIGNAL_PEER_CLOSED,
589 hss.satisfied_signals);
590 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
591 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
592 hss.satisfiable_signals);
593
594 // Read one element.
595 elements[0] = -1;
596 elements[1] = -1;
597 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
598 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
599 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
600 ASSERT_EQ(789, elements[0]);
601 ASSERT_EQ(-1, elements[1]);
602
603 // Should be never-readable.
604 hss = MojoHandleSignalsState();
605 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
606 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
607 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
608 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
609 }
610
TEST_F(DataPipeTest,ConsumerNewDataReadable)611 TEST_F(DataPipeTest, ConsumerNewDataReadable) {
612 const MojoCreateDataPipeOptions create_options = {
613 kSizeOfOptions, // |struct_size|.
614 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
615 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
616 1000 * sizeof(int32_t) // |capacity_num_bytes|.
617 };
618 EXPECT_EQ(MOJO_RESULT_OK, Create(&create_options));
619
620 int32_t elements[2] = {123, 456};
621 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
622 EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
623
624 // The consumer handle should appear to be readable and have new data.
625 EXPECT_EQ(MOJO_RESULT_OK,
626 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE));
627 EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals &
628 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
629
630 // Now try to read a minimum of 6 elements.
631 int32_t read_elements[6];
632 uint32_t num_read_bytes = sizeof(read_elements);
633 MojoReadDataOptions read_options;
634 read_options.struct_size = sizeof(read_options);
635 read_options.flags = MOJO_READ_DATA_FLAG_ALL_OR_NONE;
636 EXPECT_EQ(
637 MOJO_RESULT_OUT_OF_RANGE,
638 MojoReadData(consumer_, &read_options, read_elements, &num_read_bytes));
639
640 // The consumer should still appear to be readable but not with new data.
641 EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals &
642 MOJO_HANDLE_SIGNAL_READABLE);
643 EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
644 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
645
646 // Write four more elements.
647 EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
648 EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
649
650 // The consumer handle should once again appear to be readable.
651 EXPECT_EQ(MOJO_RESULT_OK,
652 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE));
653
654 // Try again to read a minimum of 6 elements. Should succeed this time.
655 EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(consumer_, &read_options,
656 read_elements, &num_read_bytes));
657
658 // And now the consumer is unreadable.
659 EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
660 MOJO_HANDLE_SIGNAL_READABLE);
661 EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
662 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
663 }
664
665 // Test with two-phase APIs and also closing the producer with an active
666 // consumer waiter.
TEST_F(DataPipeTest,ConsumerWaitingTwoPhase)667 TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) {
668 const MojoCreateDataPipeOptions options = {
669 kSizeOfOptions, // |struct_size|.
670 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
671 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
672 1000 * sizeof(int32_t) // |capacity_num_bytes|.
673 };
674 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
675 MojoHandleSignalsState hss;
676
677 // Write two elements.
678 int32_t* elements = nullptr;
679 void* buffer = nullptr;
680 // Request room for three (but we'll only write two).
681 uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
682 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
683 EXPECT_TRUE(buffer);
684 EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
685 elements = static_cast<int32_t*>(buffer);
686 elements[0] = 123;
687 elements[1] = 456;
688 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0])));
689
690 // Wait for readability.
691 hss = MojoHandleSignalsState();
692 ASSERT_EQ(MOJO_RESULT_OK,
693 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
694 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
695 hss.satisfied_signals);
696 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
697 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
698 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
699 hss.satisfiable_signals);
700
701 // Read one element.
702 // Two should be available, but only read one.
703 const void* read_buffer = nullptr;
704 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
705 EXPECT_TRUE(read_buffer);
706 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
707 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
708 ASSERT_EQ(123, read_elements[0]);
709 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
710
711 // Should still be readable.
712 hss = MojoHandleSignalsState();
713 ASSERT_EQ(MOJO_RESULT_OK,
714 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
715 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
716 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
717 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
718 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
719 hss.satisfiable_signals);
720
721 // Read one element.
722 // Request three, but not in all-or-none mode.
723 read_buffer = nullptr;
724 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
725 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
726 EXPECT_TRUE(read_buffer);
727 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
728 read_elements = static_cast<const int32_t*>(read_buffer);
729 ASSERT_EQ(456, read_elements[0]);
730 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
731
732 // Close the producer.
733 CloseProducer();
734
735 // Should be never-readable.
736 hss = MojoHandleSignalsState();
737 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
738 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
739 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
740 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
741 }
742
743 // Tests that data pipes aren't writable/readable during two-phase writes/reads.
TEST_F(DataPipeTest,BasicTwoPhaseWaiting)744 TEST_F(DataPipeTest, BasicTwoPhaseWaiting) {
745 const MojoCreateDataPipeOptions options = {
746 kSizeOfOptions, // |struct_size|.
747 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
748 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
749 1000 * sizeof(int32_t) // |capacity_num_bytes|.
750 };
751 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
752 MojoHandleSignalsState hss;
753
754 // It should be writable.
755 hss = GetSignalsState(producer_);
756 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
757 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
758 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
759 hss.satisfiable_signals);
760
761 uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
762 void* write_ptr = nullptr;
763 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
764 EXPECT_TRUE(write_ptr);
765 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
766
767 // At this point, it shouldn't be writable.
768 hss = GetSignalsState(producer_);
769 ASSERT_EQ(0u, hss.satisfied_signals);
770 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
771 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
772 hss.satisfiable_signals);
773
774 // It shouldn't be readable yet either (we'll wait later).
775 hss = GetSignalsState(consumer_);
776 ASSERT_EQ(0u, hss.satisfied_signals);
777 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
778 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
779 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
780 hss.satisfiable_signals);
781
782 static_cast<int32_t*>(write_ptr)[0] = 123;
783 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t)));
784
785 // It should immediately be writable again.
786 hss = GetSignalsState(producer_);
787 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
788 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
789 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
790 hss.satisfiable_signals);
791
792 // It should become readable.
793 hss = MojoHandleSignalsState();
794 ASSERT_EQ(MOJO_RESULT_OK,
795 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
796 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
797 hss.satisfied_signals);
798 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
799 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
800 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
801 hss.satisfiable_signals);
802
803 // Start another two-phase write and check that it's readable even in the
804 // middle of it.
805 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
806 write_ptr = nullptr;
807 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
808 EXPECT_TRUE(write_ptr);
809 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
810
811 // It should be readable.
812 hss = MojoHandleSignalsState();
813 ASSERT_EQ(MOJO_RESULT_OK,
814 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
815 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
816 hss.satisfied_signals);
817 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
818 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
819 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
820 hss.satisfiable_signals);
821
822 // End the two-phase write without writing anything.
823 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u));
824
825 // Start a two-phase read.
826 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
827 const void* read_ptr = nullptr;
828 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
829 EXPECT_TRUE(read_ptr);
830 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
831
832 // At this point, it should still be writable.
833 hss = GetSignalsState(producer_);
834 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
835 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
836 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
837 hss.satisfiable_signals);
838
839 // But not readable.
840 hss = GetSignalsState(consumer_);
841 ASSERT_EQ(0u, hss.satisfied_signals);
842 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
843 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
844 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
845 hss.satisfiable_signals);
846
847 // End the two-phase read without reading anything.
848 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u));
849
850 // It should be readable again.
851 hss = GetSignalsState(consumer_);
852 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
853 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
854 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
855 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
856 hss.satisfiable_signals);
857 }
858
Seq(int32_t start,size_t count,int32_t * out)859 void Seq(int32_t start, size_t count, int32_t* out) {
860 for (size_t i = 0; i < count; i++)
861 out[i] = start + static_cast<int32_t>(i);
862 }
863
TEST_F(DataPipeTest,AllOrNone)864 TEST_F(DataPipeTest, AllOrNone) {
865 const MojoCreateDataPipeOptions options = {
866 kSizeOfOptions, // |struct_size|.
867 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
868 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
869 10 * sizeof(int32_t) // |capacity_num_bytes|.
870 };
871 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
872 MojoHandleSignalsState hss;
873
874 // Try writing more than the total capacity of the pipe.
875 uint32_t num_bytes = 20u * sizeof(int32_t);
876 int32_t buffer[100];
877 Seq(0, arraysize(buffer), buffer);
878 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
879
880 // Should still be empty.
881 num_bytes = ~0u;
882 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
883 ASSERT_EQ(0u, num_bytes);
884
885 // Write some data.
886 num_bytes = 5u * sizeof(int32_t);
887 Seq(100, arraysize(buffer), buffer);
888 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
889 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
890
891 // Wait for data.
892 // TODO(vtl): There's no real guarantee that all the data will become
893 // available at once (except that in current implementations, with reasonable
894 // limits, it will). Eventually, we'll be able to wait for a specified amount
895 // of data to become available.
896 hss = MojoHandleSignalsState();
897 ASSERT_EQ(MOJO_RESULT_OK,
898 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
899 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
900 hss.satisfied_signals);
901 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
902 MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_PEER_REMOTE,
903 hss.satisfiable_signals);
904
905 // Half full.
906 num_bytes = 0u;
907 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
908 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
909
910 // Try writing more than the available capacity of the pipe, but less than the
911 // total capacity.
912 num_bytes = 6u * sizeof(int32_t);
913 Seq(200, arraysize(buffer), buffer);
914 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
915
916 // Try reading too much.
917 num_bytes = 11u * sizeof(int32_t);
918 memset(buffer, 0xab, sizeof(buffer));
919 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
920 int32_t expected_buffer[100];
921 memset(expected_buffer, 0xab, sizeof(expected_buffer));
922 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
923
924 // Try discarding too much.
925 num_bytes = 11u * sizeof(int32_t);
926 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
927
928 // Just a little.
929 num_bytes = 2u * sizeof(int32_t);
930 Seq(300, arraysize(buffer), buffer);
931 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
932 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
933
934 // Just right.
935 num_bytes = 3u * sizeof(int32_t);
936 Seq(400, arraysize(buffer), buffer);
937 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
938 ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
939
940 // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
941 // specified amount of data to be available, so poll.
942 for (size_t i = 0; i < kMaxPoll; i++) {
943 num_bytes = 0u;
944 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
945 if (num_bytes >= 10u * sizeof(int32_t))
946 break;
947
948 test::Sleep(test::EpsilonDeadline());
949 }
950 ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
951
952 // Read half.
953 num_bytes = 5u * sizeof(int32_t);
954 memset(buffer, 0xab, sizeof(buffer));
955 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
956 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
957 memset(expected_buffer, 0xab, sizeof(expected_buffer));
958 Seq(100, 5, expected_buffer);
959 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
960
961 // Try reading too much again.
962 num_bytes = 6u * sizeof(int32_t);
963 memset(buffer, 0xab, sizeof(buffer));
964 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
965 memset(expected_buffer, 0xab, sizeof(expected_buffer));
966 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
967
968 // Try discarding too much again.
969 num_bytes = 6u * sizeof(int32_t);
970 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
971
972 // Discard a little.
973 num_bytes = 2u * sizeof(int32_t);
974 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
975 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
976
977 // Three left.
978 num_bytes = 0u;
979 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
980 ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
981
982 // Close the producer, then test producer-closed cases.
983 CloseProducer();
984
985 // Wait.
986 hss = MojoHandleSignalsState();
987 ASSERT_EQ(MOJO_RESULT_OK,
988 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
989 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
990 hss.satisfied_signals);
991 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
992 hss.satisfiable_signals);
993
994 // Try reading too much; "failed precondition" since the producer is closed.
995 num_bytes = 4u * sizeof(int32_t);
996 memset(buffer, 0xab, sizeof(buffer));
997 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
998 ReadData(buffer, &num_bytes, true));
999 memset(expected_buffer, 0xab, sizeof(expected_buffer));
1000 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1001
1002 // Try discarding too much; "failed precondition" again.
1003 num_bytes = 4u * sizeof(int32_t);
1004 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true));
1005
1006 // Read a little.
1007 num_bytes = 2u * sizeof(int32_t);
1008 memset(buffer, 0xab, sizeof(buffer));
1009 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
1010 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
1011 memset(expected_buffer, 0xab, sizeof(expected_buffer));
1012 Seq(400, 2, expected_buffer);
1013 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1014
1015 // Discard the remaining element.
1016 num_bytes = 1u * sizeof(int32_t);
1017 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
1018 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1019
1020 // Empty again.
1021 num_bytes = ~0u;
1022 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1023 ASSERT_EQ(0u, num_bytes);
1024 }
1025
1026 // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
1027 // respectively, as much as possible, even if it may have to "wrap around" the
1028 // internal circular buffer. (Note that the two-phase write and read need not do
1029 // this.)
TEST_F(DataPipeTest,WrapAround)1030 TEST_F(DataPipeTest, WrapAround) {
1031 unsigned char test_data[1000];
1032 for (size_t i = 0; i < arraysize(test_data); i++)
1033 test_data[i] = static_cast<unsigned char>(i);
1034
1035 const MojoCreateDataPipeOptions options = {
1036 kSizeOfOptions, // |struct_size|.
1037 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
1038 1u, // |element_num_bytes|.
1039 100u // |capacity_num_bytes|.
1040 };
1041
1042 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1043 MojoHandleSignalsState hss;
1044
1045 // Write 20 bytes.
1046 uint32_t num_bytes = 20u;
1047 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&test_data[0], &num_bytes, true));
1048 ASSERT_EQ(20u, num_bytes);
1049
1050 // Wait for data.
1051 ASSERT_EQ(MOJO_RESULT_OK,
1052 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1053 EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
1054 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1055 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1056 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1057 hss.satisfiable_signals);
1058
1059 // Read 10 bytes.
1060 unsigned char read_buffer[1000] = {0};
1061 num_bytes = 10u;
1062 ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes, true));
1063 ASSERT_EQ(10u, num_bytes);
1064 ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
1065
1066 // Check that a two-phase write can now only write (at most) 80 bytes. (This
1067 // checks an implementation detail; this behavior is not guaranteed.)
1068 void* write_buffer_ptr = nullptr;
1069 num_bytes = 0u;
1070 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1071 EXPECT_TRUE(write_buffer_ptr);
1072 ASSERT_EQ(80u, num_bytes);
1073 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0));
1074
1075 size_t total_num_bytes = 0;
1076 while (total_num_bytes < 90) {
1077 // Wait to write.
1078 ASSERT_EQ(MOJO_RESULT_OK,
1079 WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
1080 ASSERT_EQ(hss.satisfied_signals, MOJO_HANDLE_SIGNAL_WRITABLE);
1081 ASSERT_EQ(hss.satisfiable_signals, MOJO_HANDLE_SIGNAL_WRITABLE |
1082 MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1083 MOJO_HANDLE_SIGNAL_PEER_REMOTE);
1084
1085 // Write as much as we can.
1086 num_bytes = 100;
1087 ASSERT_EQ(MOJO_RESULT_OK,
1088 WriteData(&test_data[20 + total_num_bytes], &num_bytes, false));
1089 total_num_bytes += num_bytes;
1090 }
1091
1092 ASSERT_EQ(90u, total_num_bytes);
1093
1094 num_bytes = 0;
1095 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1096 ASSERT_EQ(100u, num_bytes);
1097
1098 // Check that a two-phase read can now only read (at most) 90 bytes. (This
1099 // checks an implementation detail; this behavior is not guaranteed.)
1100 const void* read_buffer_ptr = nullptr;
1101 num_bytes = 0;
1102 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1103 EXPECT_TRUE(read_buffer_ptr);
1104 ASSERT_EQ(90u, num_bytes);
1105 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0));
1106
1107 // Read as much as possible. We should read 100 bytes.
1108 num_bytes =
1109 static_cast<uint32_t>(arraysize(read_buffer) * sizeof(read_buffer[0]));
1110 memset(read_buffer, 0, num_bytes);
1111 ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes));
1112 ASSERT_EQ(100u, num_bytes);
1113 ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
1114 }
1115
1116 // Tests the behavior of writing (simple and two-phase), closing the producer,
1117 // then reading (simple and two-phase).
TEST_F(DataPipeTest,WriteCloseProducerRead)1118 TEST_F(DataPipeTest, WriteCloseProducerRead) {
1119 const char kTestData[] = "hello world";
1120 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1121
1122 const MojoCreateDataPipeOptions options = {
1123 kSizeOfOptions, // |struct_size|.
1124 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
1125 1u, // |element_num_bytes|.
1126 1000u // |capacity_num_bytes|.
1127 };
1128 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1129
1130 // Write some data, so we'll have something to read.
1131 uint32_t num_bytes = kTestDataSize;
1132 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1133 ASSERT_EQ(kTestDataSize, num_bytes);
1134
1135 // Write it again, so we'll have something left over.
1136 num_bytes = kTestDataSize;
1137 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1138 ASSERT_EQ(kTestDataSize, num_bytes);
1139
1140 // Start two-phase write.
1141 void* write_buffer_ptr = nullptr;
1142 num_bytes = 0u;
1143 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1144 EXPECT_TRUE(write_buffer_ptr);
1145 EXPECT_GT(num_bytes, 0u);
1146
1147 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1148 for (size_t i = 0; i < kMaxPoll; i++) {
1149 num_bytes = 0u;
1150 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1151 if (num_bytes >= 2u * kTestDataSize)
1152 break;
1153
1154 test::Sleep(test::EpsilonDeadline());
1155 }
1156 ASSERT_EQ(2u * kTestDataSize, num_bytes);
1157
1158 // Start two-phase read.
1159 const void* read_buffer_ptr = nullptr;
1160 num_bytes = 0u;
1161 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1162 EXPECT_TRUE(read_buffer_ptr);
1163 ASSERT_EQ(2u * kTestDataSize, num_bytes);
1164
1165 // Close the producer.
1166 CloseProducer();
1167
1168 // The consumer can finish its two-phase read.
1169 ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1170 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize));
1171
1172 // And start another.
1173 read_buffer_ptr = nullptr;
1174 num_bytes = 0u;
1175 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1176 EXPECT_TRUE(read_buffer_ptr);
1177 ASSERT_EQ(kTestDataSize, num_bytes);
1178 }
1179
1180 // Tests the behavior of interrupting a two-phase read and write by closing the
1181 // consumer.
TEST_F(DataPipeTest,TwoPhaseWriteReadCloseConsumer)1182 TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) {
1183 const char kTestData[] = "hello world";
1184 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1185
1186 const MojoCreateDataPipeOptions options = {
1187 kSizeOfOptions, // |struct_size|.
1188 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
1189 1u, // |element_num_bytes|.
1190 1000u // |capacity_num_bytes|.
1191 };
1192 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1193 MojoHandleSignalsState hss;
1194
1195 // Write some data, so we'll have something to read.
1196 uint32_t num_bytes = kTestDataSize;
1197 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1198 ASSERT_EQ(kTestDataSize, num_bytes);
1199
1200 // Start two-phase write.
1201 void* write_buffer_ptr = nullptr;
1202 num_bytes = 0u;
1203 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1204 EXPECT_TRUE(write_buffer_ptr);
1205 ASSERT_GT(num_bytes, kTestDataSize);
1206
1207 // Wait for data.
1208 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1209 hss = MojoHandleSignalsState();
1210 ASSERT_EQ(MOJO_RESULT_OK,
1211 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1212 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1213 hss.satisfied_signals);
1214 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1215 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1216 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1217 hss.satisfiable_signals);
1218
1219 // Start two-phase read.
1220 const void* read_buffer_ptr = nullptr;
1221 num_bytes = 0u;
1222 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1223 EXPECT_TRUE(read_buffer_ptr);
1224 ASSERT_EQ(kTestDataSize, num_bytes);
1225
1226 // Close the consumer.
1227 CloseConsumer();
1228
1229 // Wait for producer to know that the consumer is closed.
1230 hss = MojoHandleSignalsState();
1231 ASSERT_EQ(MOJO_RESULT_OK,
1232 WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
1233 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1234 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1235
1236 // Actually write some data. (Note: Premature freeing of the buffer would
1237 // probably only be detected under ASAN or similar.)
1238 memcpy(write_buffer_ptr, kTestData, kTestDataSize);
1239 // Note: Even though the consumer has been closed, ending the two-phase
1240 // write will report success.
1241 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize));
1242
1243 // But trying to write should result in failure.
1244 num_bytes = kTestDataSize;
1245 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes));
1246
1247 // As will trying to start another two-phase write.
1248 write_buffer_ptr = nullptr;
1249 num_bytes = 0u;
1250 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1251 BeginWriteData(&write_buffer_ptr, &num_bytes));
1252 }
1253
1254 // Tests the behavior of "interrupting" a two-phase write by closing both the
1255 // producer and the consumer.
TEST_F(DataPipeTest,TwoPhaseWriteCloseBoth)1256 TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) {
1257 const uint32_t kTestDataSize = 15u;
1258
1259 const MojoCreateDataPipeOptions options = {
1260 kSizeOfOptions, // |struct_size|.
1261 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
1262 1u, // |element_num_bytes|.
1263 1000u // |capacity_num_bytes|.
1264 };
1265 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1266
1267 // Start two-phase write.
1268 void* write_buffer_ptr = nullptr;
1269 uint32_t num_bytes = 0u;
1270 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1271 EXPECT_TRUE(write_buffer_ptr);
1272 ASSERT_GT(num_bytes, kTestDataSize);
1273 }
1274
1275 // Tests the behavior of writing, closing the producer, and then reading (with
1276 // and without data remaining).
TEST_F(DataPipeTest,WriteCloseProducerReadNoData)1277 TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
1278 const char kTestData[] = "hello world";
1279 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1280
1281 const MojoCreateDataPipeOptions options = {
1282 kSizeOfOptions, // |struct_size|.
1283 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
1284 1u, // |element_num_bytes|.
1285 1000u // |capacity_num_bytes|.
1286 };
1287 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1288 MojoHandleSignalsState hss;
1289
1290 // Write some data, so we'll have something to read.
1291 uint32_t num_bytes = kTestDataSize;
1292 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1293 ASSERT_EQ(kTestDataSize, num_bytes);
1294
1295 // Close the producer.
1296 CloseProducer();
1297
1298 // Wait. (Note that once the consumer knows that the producer is closed, it
1299 // must also know about all the data that was sent.)
1300 hss = MojoHandleSignalsState();
1301 ASSERT_EQ(MOJO_RESULT_OK,
1302 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
1303 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1304 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1305 hss.satisfied_signals);
1306 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1307 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1308 hss.satisfiable_signals);
1309
1310 // Peek that data.
1311 char buffer[1000];
1312 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1313 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true));
1314 ASSERT_EQ(kTestDataSize, num_bytes);
1315 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1316
1317 // Read that data.
1318 memset(buffer, 0, 1000);
1319 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1320 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes));
1321 ASSERT_EQ(kTestDataSize, num_bytes);
1322 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1323
1324 // A second read should fail.
1325 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1326 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes));
1327
1328 // A two-phase read should also fail.
1329 const void* read_buffer_ptr = nullptr;
1330 num_bytes = 0u;
1331 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1332 BeginReadData(&read_buffer_ptr, &num_bytes));
1333
1334 // Ditto for discard.
1335 num_bytes = 10u;
1336 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
1337 }
1338
1339 // Test that during a two phase read the memory stays valid even if more data
1340 // comes in.
TEST_F(DataPipeTest,TwoPhaseReadMemoryStable)1341 TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) {
1342 const char kTestData[] = "hello world";
1343 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1344
1345 const MojoCreateDataPipeOptions options = {
1346 kSizeOfOptions, // |struct_size|.
1347 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
1348 1u, // |element_num_bytes|.
1349 1000u // |capacity_num_bytes|.
1350 };
1351 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1352 MojoHandleSignalsState hss;
1353
1354 // Write some data.
1355 uint32_t num_bytes = kTestDataSize;
1356 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1357 ASSERT_EQ(kTestDataSize, num_bytes);
1358
1359 // Wait for the data.
1360 hss = MojoHandleSignalsState();
1361 ASSERT_EQ(MOJO_RESULT_OK,
1362 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1363 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1364 hss.satisfied_signals);
1365 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1366 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1367 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1368 hss.satisfiable_signals);
1369
1370 // Begin a two-phase read.
1371 const void* read_buffer_ptr = nullptr;
1372 uint32_t read_buffer_size = 0u;
1373 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size));
1374
1375 // Write more data.
1376 const char kExtraData[] = "bye world";
1377 const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
1378 num_bytes = kExtraDataSize;
1379 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
1380 ASSERT_EQ(kExtraDataSize, num_bytes);
1381
1382 // Close the producer.
1383 CloseProducer();
1384
1385 // Wait. (Note that once the consumer knows that the producer is closed, it
1386 // must also have received the extra data).
1387 hss = MojoHandleSignalsState();
1388 ASSERT_EQ(MOJO_RESULT_OK,
1389 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
1390 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1391 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1392 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1393 hss.satisfiable_signals);
1394
1395 // Read the two phase memory to check it's still valid.
1396 ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1397 EndReadData(read_buffer_size);
1398 }
1399
1400 // Test that two-phase reads/writes behave correctly when given invalid
1401 // arguments.
TEST_F(DataPipeTest,TwoPhaseMoreInvalidArguments)1402 TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
1403 const MojoCreateDataPipeOptions options = {
1404 kSizeOfOptions, // |struct_size|.
1405 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
1406 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1407 10 * sizeof(int32_t) // |capacity_num_bytes|.
1408 };
1409 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1410 MojoHandleSignalsState hss;
1411
1412 // No data.
1413 uint32_t num_bytes = 1000u;
1414 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1415 ASSERT_EQ(0u, num_bytes);
1416
1417 // Try "ending" a two-phase write when one isn't active.
1418 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1419 EndWriteData(1u * sizeof(int32_t)));
1420
1421 // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
1422 // have time to propagate.
1423 test::Sleep(test::EpsilonDeadline());
1424
1425 // Still no data.
1426 num_bytes = 1000u;
1427 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1428 ASSERT_EQ(0u, num_bytes);
1429
1430 // Try ending a two-phase write with an invalid amount (too much).
1431 num_bytes = 0u;
1432 void* write_ptr = nullptr;
1433 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1434 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1435 EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1436
1437 // But the two-phase write still ended.
1438 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1439
1440 // Wait a bit (as above).
1441 test::Sleep(test::EpsilonDeadline());
1442
1443 // Still no data.
1444 num_bytes = 1000u;
1445 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1446 ASSERT_EQ(0u, num_bytes);
1447
1448 // Try ending a two-phase write with an invalid amount (not a multiple of the
1449 // element size).
1450 num_bytes = 0u;
1451 write_ptr = nullptr;
1452 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1453 EXPECT_GE(num_bytes, 1u);
1454 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u));
1455
1456 // But the two-phase write still ended.
1457 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1458
1459 // Wait a bit (as above).
1460 test::Sleep(test::EpsilonDeadline());
1461
1462 // Still no data.
1463 num_bytes = 1000u;
1464 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1465 ASSERT_EQ(0u, num_bytes);
1466
1467 // Now write some data, so we'll be able to try reading.
1468 int32_t element = 123;
1469 num_bytes = 1u * sizeof(int32_t);
1470 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes));
1471
1472 // Wait for data.
1473 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1474 hss = MojoHandleSignalsState();
1475 ASSERT_EQ(MOJO_RESULT_OK,
1476 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1477 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1478 hss.satisfied_signals);
1479 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1480 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1481 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1482 hss.satisfiable_signals);
1483
1484 // One element available.
1485 num_bytes = 0u;
1486 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1487 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1488
1489 // Try "ending" a two-phase read when one isn't active.
1490 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t)));
1491
1492 // Still one element available.
1493 num_bytes = 0u;
1494 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1495 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1496
1497 // Try ending a two-phase read with an invalid amount (too much).
1498 num_bytes = 0u;
1499 const void* read_ptr = nullptr;
1500 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1501 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1502 EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1503
1504 // Still one element available.
1505 num_bytes = 0u;
1506 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1507 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1508
1509 // Try ending a two-phase read with an invalid amount (not a multiple of the
1510 // element size).
1511 num_bytes = 0u;
1512 read_ptr = nullptr;
1513 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1514 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1515 ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
1516 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u));
1517
1518 // Still one element available.
1519 num_bytes = 0u;
1520 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1521 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1522 }
1523
1524 // Test that a producer can be sent over a MP.
TEST_F(DataPipeTest,SendProducer)1525 TEST_F(DataPipeTest, SendProducer) {
1526 const char kTestData[] = "hello world";
1527 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1528
1529 const MojoCreateDataPipeOptions options = {
1530 kSizeOfOptions, // |struct_size|.
1531 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
1532 1u, // |element_num_bytes|.
1533 1000u // |capacity_num_bytes|.
1534 };
1535 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1536 MojoHandleSignalsState hss;
1537
1538 // Write some data.
1539 uint32_t num_bytes = kTestDataSize;
1540 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1541 ASSERT_EQ(kTestDataSize, num_bytes);
1542
1543 // Wait for the data.
1544 hss = MojoHandleSignalsState();
1545 ASSERT_EQ(MOJO_RESULT_OK,
1546 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1547 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1548 hss.satisfied_signals);
1549 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1550 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1551 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1552 hss.satisfiable_signals);
1553
1554 // Check the data.
1555 const void* read_buffer = nullptr;
1556 num_bytes = 0u;
1557 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
1558 ASSERT_EQ(0, memcmp(read_buffer, kTestData, kTestDataSize));
1559 EndReadData(num_bytes);
1560
1561 // Now send the producer over a MP so that it's serialized.
1562 MojoHandle pipe0, pipe1;
1563 ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
1564
1565 ASSERT_EQ(MOJO_RESULT_OK,
1566 WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &producer_, 1,
1567 MOJO_WRITE_MESSAGE_FLAG_NONE));
1568 producer_ = MOJO_HANDLE_INVALID;
1569 ASSERT_EQ(MOJO_RESULT_OK,
1570 WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1571 ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &producer_, 1));
1572
1573 // Write more data.
1574 const char kExtraData[] = "bye world";
1575 const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
1576 num_bytes = kExtraDataSize;
1577 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
1578 ASSERT_EQ(kExtraDataSize, num_bytes);
1579
1580 // Wait for it.
1581 hss = MojoHandleSignalsState();
1582 ASSERT_EQ(MOJO_RESULT_OK,
1583 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1584 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1585 hss.satisfied_signals);
1586 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1587 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1588 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1589 hss.satisfiable_signals);
1590
1591 // Check the second write.
1592 num_bytes = 0u;
1593 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
1594 ASSERT_EQ(0, memcmp(read_buffer, kExtraData, kExtraDataSize));
1595 EndReadData(num_bytes);
1596
1597 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
1598 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
1599 }
1600
1601 // Ensures that if a data pipe consumer whose producer has closed is passed over
1602 // a message pipe, the deserialized dispatcher is also marked as having a closed
1603 // peer.
TEST_F(DataPipeTest,ConsumerWithClosedProducerSent)1604 TEST_F(DataPipeTest, ConsumerWithClosedProducerSent) {
1605 const MojoCreateDataPipeOptions options = {
1606 kSizeOfOptions, // |struct_size|.
1607 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
1608 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1609 1000 * sizeof(int32_t) // |capacity_num_bytes|.
1610 };
1611
1612 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1613
1614 // We can write to a data pipe handle immediately.
1615 int32_t data = 123;
1616 uint32_t num_bytes = sizeof(data);
1617 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&data, &num_bytes));
1618 ASSERT_EQ(MOJO_RESULT_OK, CloseProducer());
1619
1620 // Now wait for the other side to become readable and to see the peer closed.
1621 MojoHandleSignalsState state;
1622 ASSERT_EQ(MOJO_RESULT_OK,
1623 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state));
1624 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1625 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1626 state.satisfied_signals);
1627 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1628 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1629 state.satisfiable_signals);
1630
1631 // Now send the consumer over a MP so that it's serialized.
1632 MojoHandle pipe0, pipe1;
1633 ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
1634
1635 ASSERT_EQ(MOJO_RESULT_OK,
1636 WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &consumer_, 1,
1637 MOJO_WRITE_MESSAGE_FLAG_NONE));
1638 consumer_ = MOJO_HANDLE_INVALID;
1639 ASSERT_EQ(MOJO_RESULT_OK,
1640 WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &state));
1641 ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &consumer_, 1));
1642
1643 ASSERT_EQ(MOJO_RESULT_OK,
1644 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state));
1645 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1646 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1647 state.satisfied_signals);
1648 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1649 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1650 state.satisfiable_signals);
1651
1652 int32_t read_data;
1653 ASSERT_EQ(MOJO_RESULT_OK, ReadData(&read_data, &num_bytes));
1654 ASSERT_EQ(sizeof(read_data), num_bytes);
1655 ASSERT_EQ(data, read_data);
1656
1657 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
1658 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
1659 }
1660
WriteAllData(MojoHandle producer,const void * elements,uint32_t num_bytes)1661 bool WriteAllData(MojoHandle producer,
1662 const void* elements,
1663 uint32_t num_bytes) {
1664 for (size_t i = 0; i < kMaxPoll; i++) {
1665 // Write as much data as we can.
1666 uint32_t write_bytes = num_bytes;
1667 MojoResult result =
1668 MojoWriteData(producer, elements, &write_bytes, nullptr);
1669 if (result == MOJO_RESULT_OK) {
1670 num_bytes -= write_bytes;
1671 elements = static_cast<const uint8_t*>(elements) + write_bytes;
1672 if (num_bytes == 0)
1673 return true;
1674 } else {
1675 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
1676 }
1677
1678 MojoHandleSignalsState hss = MojoHandleSignalsState();
1679 EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals(
1680 producer, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
1681 EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_WRITABLE);
1682 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1683 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1684 hss.satisfiable_signals);
1685 }
1686
1687 return false;
1688 }
1689
1690 // If |expect_empty| is true, expect |consumer| to be empty after reading.
ReadAllData(MojoHandle consumer,void * elements,uint32_t num_bytes,bool expect_empty)1691 bool ReadAllData(MojoHandle consumer,
1692 void* elements,
1693 uint32_t num_bytes,
1694 bool expect_empty) {
1695 for (size_t i = 0; i < kMaxPoll; i++) {
1696 // Read as much data as we can.
1697 uint32_t read_bytes = num_bytes;
1698 MojoResult result = MojoReadData(consumer, nullptr, elements, &read_bytes);
1699 if (result == MOJO_RESULT_OK) {
1700 num_bytes -= read_bytes;
1701 elements = static_cast<uint8_t*>(elements) + read_bytes;
1702 if (num_bytes == 0) {
1703 if (expect_empty) {
1704 // Expect no more data.
1705 test::Sleep(test::TinyDeadline());
1706 MojoReadDataOptions options;
1707 options.struct_size = sizeof(options);
1708 options.flags = MOJO_READ_DATA_FLAG_QUERY;
1709 MojoReadData(consumer, &options, nullptr, &num_bytes);
1710 EXPECT_EQ(0u, num_bytes);
1711 }
1712 return true;
1713 }
1714 } else {
1715 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
1716 }
1717
1718 MojoHandleSignalsState hss = MojoHandleSignalsState();
1719 EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals(
1720 consumer, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1721 // Peer could have become closed while we're still waiting for data.
1722 EXPECT_TRUE(MOJO_HANDLE_SIGNAL_READABLE & hss.satisfied_signals);
1723 EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
1724 EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED);
1725 }
1726
1727 return num_bytes == 0;
1728 }
1729
1730 #if !defined(OS_IOS)
1731
TEST_F(DataPipeTest,Multiprocess)1732 TEST_F(DataPipeTest, Multiprocess) {
1733 const uint32_t kTestDataSize =
1734 static_cast<uint32_t>(sizeof(kMultiprocessTestData));
1735 const MojoCreateDataPipeOptions options = {
1736 kSizeOfOptions, // |struct_size|.
1737 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
1738 1, // |element_num_bytes|.
1739 kMultiprocessCapacity // |capacity_num_bytes|.
1740 };
1741 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1742
1743 RunTestClient("MultiprocessClient", [&](MojoHandle server_mp) {
1744 // Send some data before serialising and sending the data pipe over.
1745 // This is the first write so we don't need to use WriteAllData.
1746 uint32_t num_bytes = kTestDataSize;
1747 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kMultiprocessTestData, &num_bytes,
1748 MOJO_WRITE_DATA_FLAG_ALL_OR_NONE));
1749 ASSERT_EQ(kTestDataSize, num_bytes);
1750
1751 // Send child process the data pipe.
1752 ASSERT_EQ(MOJO_RESULT_OK,
1753 WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0,
1754 &consumer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
1755
1756 // Send a bunch of data of varying sizes.
1757 uint8_t buffer[100];
1758 int seq = 0;
1759 for (int i = 0; i < kMultiprocessMaxIter; ++i) {
1760 for (uint32_t size = 1; size <= kMultiprocessCapacity; size++) {
1761 for (unsigned int j = 0; j < size; ++j)
1762 buffer[j] = seq + j;
1763 EXPECT_TRUE(WriteAllData(producer_, buffer, size));
1764 seq += size;
1765 }
1766 }
1767
1768 // Write the test string in again.
1769 ASSERT_TRUE(WriteAllData(producer_, kMultiprocessTestData, kTestDataSize));
1770
1771 // Swap ends.
1772 ASSERT_EQ(MOJO_RESULT_OK,
1773 WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0,
1774 &producer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
1775
1776 // Receive the consumer from the other side.
1777 producer_ = MOJO_HANDLE_INVALID;
1778 MojoHandleSignalsState hss = MojoHandleSignalsState();
1779 ASSERT_EQ(MOJO_RESULT_OK,
1780 WaitForSignals(server_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1781 ASSERT_EQ(MOJO_RESULT_OK,
1782 ReadEmptyMessageWithHandles(server_mp, &consumer_, 1));
1783
1784 // Read the test string twice. Once for when we sent it, and once for the
1785 // other end sending it.
1786 for (int i = 0; i < 2; ++i) {
1787 EXPECT_TRUE(ReadAllData(consumer_, buffer, kTestDataSize, i == 1));
1788 EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
1789 }
1790
1791 WriteMessage(server_mp, "quit");
1792
1793 // Don't have to close the consumer here because it will be done for us.
1794 });
1795 }
1796
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient,DataPipeTest,client_mp)1797 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient, DataPipeTest, client_mp) {
1798 const uint32_t kTestDataSize =
1799 static_cast<uint32_t>(sizeof(kMultiprocessTestData));
1800
1801 // Receive the data pipe from the other side.
1802 MojoHandle consumer = MOJO_HANDLE_INVALID;
1803 MojoHandleSignalsState hss = MojoHandleSignalsState();
1804 ASSERT_EQ(MOJO_RESULT_OK,
1805 WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1806 ASSERT_EQ(MOJO_RESULT_OK,
1807 ReadEmptyMessageWithHandles(client_mp, &consumer, 1));
1808
1809 // Read the initial string that was sent.
1810 int32_t buffer[100];
1811 EXPECT_TRUE(ReadAllData(consumer, buffer, kTestDataSize, false));
1812 EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
1813
1814 // Receive the main data and check it is correct.
1815 int seq = 0;
1816 uint8_t expected_buffer[100];
1817 for (int i = 0; i < kMultiprocessMaxIter; ++i) {
1818 for (uint32_t size = 1; size <= kMultiprocessCapacity; ++size) {
1819 for (unsigned int j = 0; j < size; ++j)
1820 expected_buffer[j] = seq + j;
1821 EXPECT_TRUE(ReadAllData(consumer, buffer, size, false));
1822 EXPECT_EQ(0, memcmp(buffer, expected_buffer, size));
1823
1824 seq += size;
1825 }
1826 }
1827
1828 // Swap ends.
1829 ASSERT_EQ(MOJO_RESULT_OK,
1830 WriteMessageRaw(MessagePipeHandle(client_mp), nullptr, 0, &consumer,
1831 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
1832
1833 // Receive the producer from the other side.
1834 MojoHandle producer = MOJO_HANDLE_INVALID;
1835 hss = MojoHandleSignalsState();
1836 ASSERT_EQ(MOJO_RESULT_OK,
1837 WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1838 ASSERT_EQ(MOJO_RESULT_OK,
1839 ReadEmptyMessageWithHandles(client_mp, &producer, 1));
1840
1841 // Write the test string one more time.
1842 EXPECT_TRUE(WriteAllData(producer, kMultiprocessTestData, kTestDataSize));
1843
1844 // We swapped ends, so close the producer.
1845 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(producer));
1846
1847 // Wait to receive a "quit" message before exiting.
1848 EXPECT_EQ("quit", ReadMessage(client_mp));
1849 }
1850
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer,DataPipeTest,h)1851 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer, DataPipeTest, h) {
1852 MojoHandle p;
1853 std::string message = ReadMessageWithHandles(h, &p, 1);
1854
1855 // Write some data to the producer and close it.
1856 uint32_t num_bytes = static_cast<uint32_t>(message.size());
1857 EXPECT_EQ(MOJO_RESULT_OK,
1858 MojoWriteData(p, message.data(), &num_bytes, nullptr));
1859 EXPECT_EQ(num_bytes, static_cast<uint32_t>(message.size()));
1860
1861 // Close the producer before quitting.
1862 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
1863
1864 // Wait for a quit message.
1865 EXPECT_EQ("quit", ReadMessage(h));
1866 }
1867
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer,DataPipeTest,h)1868 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer, DataPipeTest, h) {
1869 MojoHandle c;
1870 std::string expected_message = ReadMessageWithHandles(h, &c, 1);
1871
1872 // Wait for the consumer to become readable.
1873 EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE));
1874
1875 // Drain the consumer and expect to find the given message.
1876 uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
1877 std::vector<char> bytes(expected_message.size());
1878 EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, nullptr, bytes.data(), &num_bytes));
1879 EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
1880
1881 std::string message(bytes.data(), bytes.size());
1882 EXPECT_EQ(expected_message, message);
1883
1884 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
1885
1886 // Wait for a quit message.
1887 EXPECT_EQ("quit", ReadMessage(h));
1888 }
1889
TEST_F(DataPipeTest,SendConsumerAndCloseProducer)1890 TEST_F(DataPipeTest, SendConsumerAndCloseProducer) {
1891 // Create a new data pipe.
1892 MojoHandle p, c;
1893 EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &p, &c));
1894
1895 RunTestClient("WriteAndCloseProducer", [&](MojoHandle producer_client) {
1896 RunTestClient("ReadAndCloseConsumer", [&](MojoHandle consumer_client) {
1897 const std::string kMessage = "Hello, world!";
1898 WriteMessageWithHandles(producer_client, kMessage, &p, 1);
1899 WriteMessageWithHandles(consumer_client, kMessage, &c, 1);
1900
1901 WriteMessage(consumer_client, "quit");
1902 });
1903
1904 WriteMessage(producer_client, "quit");
1905 });
1906 }
1907
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite,DataPipeTest,h)1908 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite, DataPipeTest, h) {
1909 const MojoCreateDataPipeOptions options = {
1910 kSizeOfOptions, // |struct_size|.
1911 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
1912 1, // |element_num_bytes|.
1913 kMultiprocessCapacity // |capacity_num_bytes|.
1914 };
1915
1916 MojoHandle p, c;
1917 ASSERT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(&options, &p, &c));
1918
1919 const std::string kMessage = "Hello, world!";
1920 WriteMessageWithHandles(h, kMessage, &c, 1);
1921
1922 // Write some data to the producer and close it.
1923 uint32_t num_bytes = static_cast<uint32_t>(kMessage.size());
1924 EXPECT_EQ(MOJO_RESULT_OK,
1925 MojoWriteData(p, kMessage.data(), &num_bytes, nullptr));
1926 EXPECT_EQ(num_bytes, static_cast<uint32_t>(kMessage.size()));
1927 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
1928
1929 // Wait for a quit message.
1930 EXPECT_EQ("quit", ReadMessage(h));
1931 }
1932
TEST_F(DataPipeTest,CreateInChild)1933 TEST_F(DataPipeTest, CreateInChild) {
1934 RunTestClient("CreateAndWrite", [&](MojoHandle child) {
1935 MojoHandle c;
1936 std::string expected_message = ReadMessageWithHandles(child, &c, 1);
1937
1938 // Wait for the consumer to become readable.
1939 EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE));
1940
1941 // Drain the consumer and expect to find the given message.
1942 uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
1943 std::vector<char> bytes(expected_message.size());
1944 EXPECT_EQ(MOJO_RESULT_OK,
1945 MojoReadData(c, nullptr, bytes.data(), &num_bytes));
1946 EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
1947
1948 std::string message(bytes.data(), bytes.size());
1949 EXPECT_EQ(expected_message, message);
1950
1951 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
1952 WriteMessage(child, "quit");
1953 });
1954 }
1955
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(DataPipeStatusChangeInTransitClient,DataPipeTest,parent)1956 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(DataPipeStatusChangeInTransitClient,
1957 DataPipeTest,
1958 parent) {
1959 // This test verifies that peer closure is detectable through various
1960 // mechanisms when it races with handle transfer.
1961
1962 MojoHandle handles[6];
1963 EXPECT_EQ("o_O", ReadMessageWithHandles(parent, handles, 6));
1964 MojoHandle* producers = &handles[0];
1965 MojoHandle* consumers = &handles[3];
1966
1967 // Wait on producer 0
1968 EXPECT_EQ(MOJO_RESULT_OK,
1969 WaitForSignals(producers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED));
1970
1971 // Wait on consumer 0
1972 EXPECT_EQ(MOJO_RESULT_OK,
1973 WaitForSignals(consumers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED));
1974
1975 base::MessageLoop message_loop;
1976
1977 // Wait on producer 1 and consumer 1 using SimpleWatchers.
1978 {
1979 base::RunLoop run_loop;
1980 int count = 0;
1981 auto callback = base::Bind(
1982 [](base::RunLoop* loop, int* count, MojoResult result) {
1983 EXPECT_EQ(MOJO_RESULT_OK, result);
1984 if (++*count == 2)
1985 loop->Quit();
1986 },
1987 &run_loop, &count);
1988 SimpleWatcher producer_watcher(FROM_HERE,
1989 SimpleWatcher::ArmingPolicy::AUTOMATIC,
1990 base::SequencedTaskRunnerHandle::Get());
1991 SimpleWatcher consumer_watcher(FROM_HERE,
1992 SimpleWatcher::ArmingPolicy::AUTOMATIC,
1993 base::SequencedTaskRunnerHandle::Get());
1994 producer_watcher.Watch(Handle(producers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1995 callback);
1996 consumer_watcher.Watch(Handle(consumers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1997 callback);
1998 run_loop.Run();
1999 EXPECT_EQ(2, count);
2000 }
2001
2002 // Wait on producer 2 by polling with MojoWriteData.
2003 MojoResult result;
2004 do {
2005 uint32_t num_bytes = 0;
2006 result = MojoWriteData(producers[2], nullptr, &num_bytes, nullptr);
2007 } while (result == MOJO_RESULT_OK);
2008 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
2009
2010 // Wait on consumer 2 by polling with MojoReadData.
2011 do {
2012 char byte;
2013 uint32_t num_bytes = 1;
2014 result = MojoReadData(consumers[2], nullptr, &byte, &num_bytes);
2015 } while (result == MOJO_RESULT_SHOULD_WAIT);
2016 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
2017
2018 for (size_t i = 0; i < 6; ++i)
2019 CloseHandle(handles[i]);
2020 }
2021
TEST_F(DataPipeTest,StatusChangeInTransit)2022 TEST_F(DataPipeTest, StatusChangeInTransit) {
2023 MojoHandle producers[6];
2024 MojoHandle consumers[6];
2025 for (size_t i = 0; i < 6; ++i)
2026 CreateDataPipe(&producers[i], &consumers[i], 1);
2027
2028 RunTestClient("DataPipeStatusChangeInTransitClient", [&](MojoHandle child) {
2029 MojoHandle handles[] = {producers[0], producers[1], producers[2],
2030 consumers[3], consumers[4], consumers[5]};
2031
2032 // Send 3 producers and 3 consumers, and let their transfer race with their
2033 // peers' closure.
2034 WriteMessageWithHandles(child, "o_O", handles, 6);
2035
2036 for (size_t i = 0; i < 3; ++i)
2037 CloseHandle(consumers[i]);
2038 for (size_t i = 3; i < 6; ++i)
2039 CloseHandle(producers[i]);
2040 });
2041 }
2042
2043 #endif // !defined(OS_IOS)
2044
2045 } // namespace
2046 } // namespace core
2047 } // namespace mojo
2048