• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <stddef.h>
6 #include <stdint.h>
7 
8 #include <memory>
9 
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/run_loop.h"
16 #include "build/build_config.h"
17 #include "mojo/core/embedder/embedder.h"
18 #include "mojo/core/test/mojo_test_base.h"
19 #include "mojo/core/test_utils.h"
20 #include "mojo/public/c/system/data_pipe.h"
21 #include "mojo/public/c/system/functions.h"
22 #include "mojo/public/c/system/message_pipe.h"
23 #include "mojo/public/cpp/system/message_pipe.h"
24 #include "mojo/public/cpp/system/simple_watcher.h"
25 #include "testing/gtest/include/gtest/gtest.h"
26 
27 namespace mojo {
28 namespace core {
29 namespace {
30 
31 const uint32_t kSizeOfOptions =
32     static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
33 
34 // In various places, we have to poll (since, e.g., we can't yet wait for a
35 // certain amount of data to be available). This is the maximum number of
36 // iterations (separated by a short sleep).
37 // TODO(vtl): Get rid of this.
38 const size_t kMaxPoll = 100;
39 
40 // Used in Multiprocess test.
41 const size_t kMultiprocessCapacity = 37;
42 const char kMultiprocessTestData[] = "hello i'm a string that is 36 bytes";
43 const int kMultiprocessMaxIter = 5;
44 
45 // TODO(rockot): There are many uses of ASSERT where EXPECT would be more
46 // appropriate. Fix this.
47 
48 class DataPipeTest : public test::MojoTestBase {
49  public:
DataPipeTest()50   DataPipeTest()
51       : producer_(MOJO_HANDLE_INVALID), consumer_(MOJO_HANDLE_INVALID) {}
52 
~DataPipeTest()53   ~DataPipeTest() override {
54     if (producer_ != MOJO_HANDLE_INVALID)
55       CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_));
56     if (consumer_ != MOJO_HANDLE_INVALID)
57       CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_));
58   }
59 
ReadEmptyMessageWithHandles(MojoHandle pipe,MojoHandle * out_handles,uint32_t num_handles)60   MojoResult ReadEmptyMessageWithHandles(MojoHandle pipe,
61                                          MojoHandle* out_handles,
62                                          uint32_t num_handles) {
63     std::vector<uint8_t> bytes;
64     std::vector<ScopedHandle> handles;
65     MojoResult rv = ReadMessageRaw(MessagePipeHandle(pipe), &bytes, &handles,
66                                    MOJO_READ_MESSAGE_FLAG_NONE);
67     if (rv == MOJO_RESULT_OK) {
68       CHECK_EQ(0u, bytes.size());
69       CHECK_EQ(num_handles, handles.size());
70       for (size_t i = 0; i < num_handles; ++i)
71         out_handles[i] = handles[i].release().value();
72     }
73     return rv;
74   }
75 
Create(const MojoCreateDataPipeOptions * options)76   MojoResult Create(const MojoCreateDataPipeOptions* options) {
77     return MojoCreateDataPipe(options, &producer_, &consumer_);
78   }
79 
WriteData(const void * elements,uint32_t * num_bytes,bool all_or_none=false)80   MojoResult WriteData(const void* elements,
81                        uint32_t* num_bytes,
82                        bool all_or_none = false) {
83     MojoWriteDataOptions options;
84     options.struct_size = sizeof(options);
85     options.flags = all_or_none ? MOJO_WRITE_DATA_FLAG_ALL_OR_NONE
86                                 : MOJO_WRITE_DATA_FLAG_NONE;
87     return MojoWriteData(producer_, elements, num_bytes, &options);
88   }
89 
ReadData(void * elements,uint32_t * num_bytes,bool all_or_none=false,bool peek=false)90   MojoResult ReadData(void* elements,
91                       uint32_t* num_bytes,
92                       bool all_or_none = false,
93                       bool peek = false) {
94     MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
95     if (all_or_none)
96       flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
97     if (peek)
98       flags |= MOJO_READ_DATA_FLAG_PEEK;
99 
100     MojoReadDataOptions options;
101     options.struct_size = sizeof(options);
102     options.flags = flags;
103     return MojoReadData(consumer_, &options, elements, num_bytes);
104   }
105 
QueryData(uint32_t * num_bytes)106   MojoResult QueryData(uint32_t* num_bytes) {
107     MojoReadDataOptions options;
108     options.struct_size = sizeof(options);
109     options.flags = MOJO_READ_DATA_FLAG_QUERY;
110     return MojoReadData(consumer_, &options, nullptr, num_bytes);
111   }
112 
DiscardData(uint32_t * num_bytes,bool all_or_none=false)113   MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) {
114     MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD;
115     if (all_or_none)
116       flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
117     MojoReadDataOptions options;
118     options.struct_size = sizeof(options);
119     options.flags = flags;
120     return MojoReadData(consumer_, &options, nullptr, num_bytes);
121   }
122 
BeginReadData(const void ** elements,uint32_t * num_bytes)123   MojoResult BeginReadData(const void** elements, uint32_t* num_bytes) {
124     return MojoBeginReadData(consumer_, nullptr, elements, num_bytes);
125   }
126 
EndReadData(uint32_t num_bytes_read)127   MojoResult EndReadData(uint32_t num_bytes_read) {
128     return MojoEndReadData(consumer_, num_bytes_read, nullptr);
129   }
130 
BeginWriteData(void ** elements,uint32_t * num_bytes)131   MojoResult BeginWriteData(void** elements, uint32_t* num_bytes) {
132     return MojoBeginWriteData(producer_, nullptr, elements, num_bytes);
133   }
134 
EndWriteData(uint32_t num_bytes_written)135   MojoResult EndWriteData(uint32_t num_bytes_written) {
136     return MojoEndWriteData(producer_, num_bytes_written, nullptr);
137   }
138 
CloseProducer()139   MojoResult CloseProducer() {
140     MojoResult rv = MojoClose(producer_);
141     producer_ = MOJO_HANDLE_INVALID;
142     return rv;
143   }
144 
CloseConsumer()145   MojoResult CloseConsumer() {
146     MojoResult rv = MojoClose(consumer_);
147     consumer_ = MOJO_HANDLE_INVALID;
148     return rv;
149   }
150 
151   MojoHandle producer_, consumer_;
152 
153  private:
154   DISALLOW_COPY_AND_ASSIGN(DataPipeTest);
155 };
156 
TEST_F(DataPipeTest,Basic)157 TEST_F(DataPipeTest, Basic) {
158   const MojoCreateDataPipeOptions options = {
159       kSizeOfOptions,                          // |struct_size|.
160       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
161       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
162       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
163   };
164 
165   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
166 
167   // We can write to a data pipe handle immediately.
168   int32_t elements[10] = {};
169   uint32_t num_bytes = 0;
170 
171   num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
172 
173   elements[0] = 123;
174   elements[1] = 456;
175   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
176   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes));
177 
178   // Now wait for the other side to become readable.
179   MojoHandleSignalsState state;
180   ASSERT_EQ(MOJO_RESULT_OK,
181             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &state));
182   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
183             state.satisfied_signals);
184 
185   elements[0] = -1;
186   elements[1] = -1;
187   ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes));
188   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
189   ASSERT_EQ(elements[0], 123);
190   ASSERT_EQ(elements[1], 456);
191 }
192 
193 // Tests creation of data pipes with various (valid) options.
TEST_F(DataPipeTest,CreateAndMaybeTransfer)194 TEST_F(DataPipeTest, CreateAndMaybeTransfer) {
195   MojoCreateDataPipeOptions test_options[] = {
196       // Default options.
197       {},
198       // Trivial element size, non-default capacity.
199       {kSizeOfOptions,                   // |struct_size|.
200        MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
201        1,                                // |element_num_bytes|.
202        1000},                            // |capacity_num_bytes|.
203       // Nontrivial element size, non-default capacity.
204       {kSizeOfOptions,                   // |struct_size|.
205        MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
206        4,                                // |element_num_bytes|.
207        4000},                            // |capacity_num_bytes|.
208       // Nontrivial element size, default capacity.
209       {kSizeOfOptions,                   // |struct_size|.
210        MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
211        100,                              // |element_num_bytes|.
212        0}                                // |capacity_num_bytes|.
213   };
214   for (size_t i = 0; i < arraysize(test_options); i++) {
215     MojoHandle producer_handle, consumer_handle;
216     MojoCreateDataPipeOptions* options = i ? &test_options[i] : nullptr;
217     ASSERT_EQ(MOJO_RESULT_OK,
218               MojoCreateDataPipe(options, &producer_handle, &consumer_handle));
219     ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle));
220     ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle));
221   }
222 }
223 
TEST_F(DataPipeTest,SimpleReadWrite)224 TEST_F(DataPipeTest, SimpleReadWrite) {
225   const MojoCreateDataPipeOptions options = {
226       kSizeOfOptions,                          // |struct_size|.
227       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
228       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
229       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
230   };
231 
232   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
233   MojoHandleSignalsState hss;
234 
235   int32_t elements[10] = {};
236   uint32_t num_bytes = 0;
237 
238   // Try reading; nothing there yet.
239   num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
240   ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes));
241 
242   // Query; nothing there yet.
243   num_bytes = 0;
244   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
245   ASSERT_EQ(0u, num_bytes);
246 
247   // Discard; nothing there yet.
248   num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
249   ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes));
250 
251   // Read with invalid |num_bytes|.
252   num_bytes = sizeof(elements[0]) + 1;
253   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes));
254 
255   // Write two elements.
256   elements[0] = 123;
257   elements[1] = 456;
258   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
259   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
260   // It should have written everything (even without "all or none").
261   ASSERT_EQ(2u * sizeof(elements[0]), num_bytes);
262 
263   // Wait.
264   ASSERT_EQ(MOJO_RESULT_OK,
265             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
266   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
267             hss.satisfied_signals);
268   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
269                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
270                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
271             hss.satisfiable_signals);
272 
273   // Query.
274   // TODO(vtl): It's theoretically possible (though not with the current
275   // implementation/configured limits) that not all the data has arrived yet.
276   // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
277   // or |2 * ...|.)
278   num_bytes = 0;
279   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
280   ASSERT_EQ(2 * sizeof(elements[0]), num_bytes);
281 
282   // Read one element.
283   elements[0] = -1;
284   elements[1] = -1;
285   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
286   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes));
287   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
288   ASSERT_EQ(123, elements[0]);
289   ASSERT_EQ(-1, elements[1]);
290 
291   // Query.
292   // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
293   // should get 1 here.)
294   num_bytes = 0;
295   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
296   ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
297 
298   // Peek one element.
299   elements[0] = -1;
300   elements[1] = -1;
301   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
302   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true));
303   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
304   ASSERT_EQ(456, elements[0]);
305   ASSERT_EQ(-1, elements[1]);
306 
307   // Query. Still has 1 element remaining.
308   num_bytes = 0;
309   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
310   ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
311 
312   // Try to read two elements, with "all or none".
313   elements[0] = -1;
314   elements[1] = -1;
315   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
316   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
317             ReadData(elements, &num_bytes, true, false));
318   ASSERT_EQ(-1, elements[0]);
319   ASSERT_EQ(-1, elements[1]);
320 
321   // Try to read two elements, without "all or none".
322   elements[0] = -1;
323   elements[1] = -1;
324   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
325   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false));
326   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
327   ASSERT_EQ(456, elements[0]);
328   ASSERT_EQ(-1, elements[1]);
329 
330   // Query.
331   num_bytes = 0;
332   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
333   ASSERT_EQ(0u, num_bytes);
334 }
335 
336 // Note: The "basic" waiting tests test that the "wait states" are correct in
337 // various situations; they don't test that waiters are properly awoken on state
338 // changes. (For that, we need to use multiple threads.)
TEST_F(DataPipeTest,BasicProducerWaiting)339 TEST_F(DataPipeTest, BasicProducerWaiting) {
340   // Note: We take advantage of the fact that current for current
341   // implementations capacities are strict maximums. This is not guaranteed by
342   // the API.
343 
344   const MojoCreateDataPipeOptions options = {
345       kSizeOfOptions,                          // |struct_size|.
346       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
347       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
348       2 * sizeof(int32_t)                      // |capacity_num_bytes|.
349   };
350   Create(&options);
351   MojoHandleSignalsState hss;
352 
353   // Never readable. Already writable.
354   hss = GetSignalsState(producer_);
355   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
356   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
357                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
358             hss.satisfiable_signals);
359 
360   // Write two elements.
361   int32_t elements[2] = {123, 456};
362   uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
363   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
364   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
365 
366   // Wait for data to become available to the consumer.
367   ASSERT_EQ(MOJO_RESULT_OK,
368             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
369   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
370             hss.satisfied_signals);
371   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
372                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
373                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
374             hss.satisfiable_signals);
375 
376   // Peek one element.
377   elements[0] = -1;
378   elements[1] = -1;
379   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
380   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
381   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
382   ASSERT_EQ(123, elements[0]);
383   ASSERT_EQ(-1, elements[1]);
384 
385   // Read one element.
386   elements[0] = -1;
387   elements[1] = -1;
388   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
389   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false));
390   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
391   ASSERT_EQ(123, elements[0]);
392   ASSERT_EQ(-1, elements[1]);
393 
394   // Try writing, using a two-phase write.
395   void* buffer = nullptr;
396   num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
397   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
398   EXPECT_TRUE(buffer);
399   ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
400 
401   static_cast<int32_t*>(buffer)[0] = 789;
402   ASSERT_EQ(MOJO_RESULT_OK,
403             EndWriteData(static_cast<uint32_t>(1u * sizeof(elements[0]))));
404 
405   // Read one element, using a two-phase read.
406   const void* read_buffer = nullptr;
407   num_bytes = 0u;
408   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
409   EXPECT_TRUE(read_buffer);
410   // The two-phase read should be able to read at least one element.
411   ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
412   ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
413   ASSERT_EQ(MOJO_RESULT_OK,
414             EndReadData(static_cast<uint32_t>(1u * sizeof(elements[0]))));
415 
416   // Write one element.
417   elements[0] = 123;
418   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
419   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
420   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
421 
422   // Close the consumer.
423   CloseConsumer();
424 
425   // It should now be never-writable.
426   hss = MojoHandleSignalsState();
427   ASSERT_EQ(MOJO_RESULT_OK,
428             WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
429   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
430   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
431 }
432 
TEST_F(DataPipeTest,PeerClosedProducerWaiting)433 TEST_F(DataPipeTest, PeerClosedProducerWaiting) {
434   const MojoCreateDataPipeOptions options = {
435       kSizeOfOptions,                          // |struct_size|.
436       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
437       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
438       2 * sizeof(int32_t)                      // |capacity_num_bytes|.
439   };
440   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
441   MojoHandleSignalsState hss;
442 
443   // Close the consumer.
444   CloseConsumer();
445 
446   // It should be signaled.
447   hss = MojoHandleSignalsState();
448   ASSERT_EQ(MOJO_RESULT_OK,
449             WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
450   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
451   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
452 }
453 
TEST_F(DataPipeTest,PeerClosedConsumerWaiting)454 TEST_F(DataPipeTest, PeerClosedConsumerWaiting) {
455   const MojoCreateDataPipeOptions options = {
456       kSizeOfOptions,                          // |struct_size|.
457       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
458       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
459       2 * sizeof(int32_t)                      // |capacity_num_bytes|.
460   };
461   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
462   MojoHandleSignalsState hss;
463 
464   // Close the producer.
465   CloseProducer();
466 
467   // It should be signaled.
468   hss = MojoHandleSignalsState();
469   ASSERT_EQ(MOJO_RESULT_OK,
470             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
471   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
472   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
473 }
474 
TEST_F(DataPipeTest,BasicConsumerWaiting)475 TEST_F(DataPipeTest, BasicConsumerWaiting) {
476   const MojoCreateDataPipeOptions options = {
477       kSizeOfOptions,                          // |struct_size|.
478       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
479       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
480       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
481   };
482   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
483   MojoHandleSignalsState hss;
484 
485   // Never writable.
486   hss = MojoHandleSignalsState();
487   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
488             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
489   EXPECT_EQ(0u, hss.satisfied_signals);
490   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
491                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
492                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
493             hss.satisfiable_signals);
494 
495   // Write two elements.
496   int32_t elements[2] = {123, 456};
497   uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
498   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
499 
500   // Wait for readability.
501   hss = MojoHandleSignalsState();
502   ASSERT_EQ(MOJO_RESULT_OK,
503             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
504   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
505             hss.satisfied_signals);
506   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
507                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
508                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
509             hss.satisfiable_signals);
510 
511   // Discard one element.
512   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
513   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
514   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
515 
516   // Should still be readable.
517   hss = MojoHandleSignalsState();
518   ASSERT_EQ(MOJO_RESULT_OK,
519             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
520   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
521   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
522                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
523                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
524             hss.satisfiable_signals);
525 
526   // Peek one element.
527   elements[0] = -1;
528   elements[1] = -1;
529   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
530   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
531   ASSERT_EQ(456, elements[0]);
532   ASSERT_EQ(-1, elements[1]);
533 
534   // Should still be readable.
535   hss = MojoHandleSignalsState();
536   ASSERT_EQ(MOJO_RESULT_OK,
537             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
538   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
539   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
540                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
541                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
542             hss.satisfiable_signals);
543 
544   // Read one element.
545   elements[0] = -1;
546   elements[1] = -1;
547   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
548   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
549   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
550   ASSERT_EQ(456, elements[0]);
551   ASSERT_EQ(-1, elements[1]);
552 
553   // Write one element.
554   elements[0] = 789;
555   elements[1] = -1;
556   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
557   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
558 
559   // Waiting should now succeed.
560   hss = MojoHandleSignalsState();
561   ASSERT_EQ(MOJO_RESULT_OK,
562             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
563   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
564             hss.satisfied_signals);
565   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
566                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
567                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
568             hss.satisfiable_signals);
569 
570   // Close the producer.
571   CloseProducer();
572 
573   // Should still be readable.
574   hss = MojoHandleSignalsState();
575   ASSERT_EQ(MOJO_RESULT_OK,
576             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
577   EXPECT_TRUE(hss.satisfied_signals & (MOJO_HANDLE_SIGNAL_READABLE |
578                                        MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE));
579   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
580                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
581             hss.satisfiable_signals);
582 
583   // Wait for the peer closed signal.
584   hss = MojoHandleSignalsState();
585   ASSERT_EQ(MOJO_RESULT_OK,
586             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
587   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
588                 MOJO_HANDLE_SIGNAL_PEER_CLOSED,
589             hss.satisfied_signals);
590   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
591                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
592             hss.satisfiable_signals);
593 
594   // Read one element.
595   elements[0] = -1;
596   elements[1] = -1;
597   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
598   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
599   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
600   ASSERT_EQ(789, elements[0]);
601   ASSERT_EQ(-1, elements[1]);
602 
603   // Should be never-readable.
604   hss = MojoHandleSignalsState();
605   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
606             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
607   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
608   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
609 }
610 
TEST_F(DataPipeTest,ConsumerNewDataReadable)611 TEST_F(DataPipeTest, ConsumerNewDataReadable) {
612   const MojoCreateDataPipeOptions create_options = {
613       kSizeOfOptions,                          // |struct_size|.
614       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
615       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
616       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
617   };
618   EXPECT_EQ(MOJO_RESULT_OK, Create(&create_options));
619 
620   int32_t elements[2] = {123, 456};
621   uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
622   EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
623 
624   // The consumer handle should appear to be readable and have new data.
625   EXPECT_EQ(MOJO_RESULT_OK,
626             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE));
627   EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals &
628               MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
629 
630   // Now try to read a minimum of 6 elements.
631   int32_t read_elements[6];
632   uint32_t num_read_bytes = sizeof(read_elements);
633   MojoReadDataOptions read_options;
634   read_options.struct_size = sizeof(read_options);
635   read_options.flags = MOJO_READ_DATA_FLAG_ALL_OR_NONE;
636   EXPECT_EQ(
637       MOJO_RESULT_OUT_OF_RANGE,
638       MojoReadData(consumer_, &read_options, read_elements, &num_read_bytes));
639 
640   // The consumer should still appear to be readable but not with new data.
641   EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals &
642               MOJO_HANDLE_SIGNAL_READABLE);
643   EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
644                MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
645 
646   // Write four more elements.
647   EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
648   EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
649 
650   // The consumer handle should once again appear to be readable.
651   EXPECT_EQ(MOJO_RESULT_OK,
652             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE));
653 
654   // Try again to read a minimum of 6 elements. Should succeed this time.
655   EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(consumer_, &read_options,
656                                          read_elements, &num_read_bytes));
657 
658   // And now the consumer is unreadable.
659   EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
660                MOJO_HANDLE_SIGNAL_READABLE);
661   EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
662                MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
663 }
664 
665 // Test with two-phase APIs and also closing the producer with an active
666 // consumer waiter.
TEST_F(DataPipeTest,ConsumerWaitingTwoPhase)667 TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) {
668   const MojoCreateDataPipeOptions options = {
669       kSizeOfOptions,                          // |struct_size|.
670       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
671       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
672       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
673   };
674   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
675   MojoHandleSignalsState hss;
676 
677   // Write two elements.
678   int32_t* elements = nullptr;
679   void* buffer = nullptr;
680   // Request room for three (but we'll only write two).
681   uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
682   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
683   EXPECT_TRUE(buffer);
684   EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
685   elements = static_cast<int32_t*>(buffer);
686   elements[0] = 123;
687   elements[1] = 456;
688   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0])));
689 
690   // Wait for readability.
691   hss = MojoHandleSignalsState();
692   ASSERT_EQ(MOJO_RESULT_OK,
693             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
694   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
695             hss.satisfied_signals);
696   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
697                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
698                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
699             hss.satisfiable_signals);
700 
701   // Read one element.
702   // Two should be available, but only read one.
703   const void* read_buffer = nullptr;
704   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
705   EXPECT_TRUE(read_buffer);
706   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
707   const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
708   ASSERT_EQ(123, read_elements[0]);
709   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
710 
711   // Should still be readable.
712   hss = MojoHandleSignalsState();
713   ASSERT_EQ(MOJO_RESULT_OK,
714             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
715   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
716   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
717                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
718                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
719             hss.satisfiable_signals);
720 
721   // Read one element.
722   // Request three, but not in all-or-none mode.
723   read_buffer = nullptr;
724   num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
725   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
726   EXPECT_TRUE(read_buffer);
727   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
728   read_elements = static_cast<const int32_t*>(read_buffer);
729   ASSERT_EQ(456, read_elements[0]);
730   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
731 
732   // Close the producer.
733   CloseProducer();
734 
735   // Should be never-readable.
736   hss = MojoHandleSignalsState();
737   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
738             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
739   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
740   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
741 }
742 
743 // Tests that data pipes aren't writable/readable during two-phase writes/reads.
TEST_F(DataPipeTest,BasicTwoPhaseWaiting)744 TEST_F(DataPipeTest, BasicTwoPhaseWaiting) {
745   const MojoCreateDataPipeOptions options = {
746       kSizeOfOptions,                          // |struct_size|.
747       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
748       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
749       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
750   };
751   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
752   MojoHandleSignalsState hss;
753 
754   // It should be writable.
755   hss = GetSignalsState(producer_);
756   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
757   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
758                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
759             hss.satisfiable_signals);
760 
761   uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
762   void* write_ptr = nullptr;
763   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
764   EXPECT_TRUE(write_ptr);
765   EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
766 
767   // At this point, it shouldn't be writable.
768   hss = GetSignalsState(producer_);
769   ASSERT_EQ(0u, hss.satisfied_signals);
770   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
771                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
772             hss.satisfiable_signals);
773 
774   // It shouldn't be readable yet either (we'll wait later).
775   hss = GetSignalsState(consumer_);
776   ASSERT_EQ(0u, hss.satisfied_signals);
777   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
778                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
779                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
780             hss.satisfiable_signals);
781 
782   static_cast<int32_t*>(write_ptr)[0] = 123;
783   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t)));
784 
785   // It should immediately be writable again.
786   hss = GetSignalsState(producer_);
787   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
788   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
789                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
790             hss.satisfiable_signals);
791 
792   // It should become readable.
793   hss = MojoHandleSignalsState();
794   ASSERT_EQ(MOJO_RESULT_OK,
795             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
796   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
797             hss.satisfied_signals);
798   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
799                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
800                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
801             hss.satisfiable_signals);
802 
803   // Start another two-phase write and check that it's readable even in the
804   // middle of it.
805   num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
806   write_ptr = nullptr;
807   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
808   EXPECT_TRUE(write_ptr);
809   EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
810 
811   // It should be readable.
812   hss = MojoHandleSignalsState();
813   ASSERT_EQ(MOJO_RESULT_OK,
814             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
815   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
816             hss.satisfied_signals);
817   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
818                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
819                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
820             hss.satisfiable_signals);
821 
822   // End the two-phase write without writing anything.
823   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u));
824 
825   // Start a two-phase read.
826   num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
827   const void* read_ptr = nullptr;
828   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
829   EXPECT_TRUE(read_ptr);
830   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
831 
832   // At this point, it should still be writable.
833   hss = GetSignalsState(producer_);
834   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
835   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
836                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
837             hss.satisfiable_signals);
838 
839   // But not readable.
840   hss = GetSignalsState(consumer_);
841   ASSERT_EQ(0u, hss.satisfied_signals);
842   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
843                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
844                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
845             hss.satisfiable_signals);
846 
847   // End the two-phase read without reading anything.
848   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u));
849 
850   // It should be readable again.
851   hss = GetSignalsState(consumer_);
852   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
853   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
854                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
855                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
856             hss.satisfiable_signals);
857 }
858 
Seq(int32_t start,size_t count,int32_t * out)859 void Seq(int32_t start, size_t count, int32_t* out) {
860   for (size_t i = 0; i < count; i++)
861     out[i] = start + static_cast<int32_t>(i);
862 }
863 
TEST_F(DataPipeTest,AllOrNone)864 TEST_F(DataPipeTest, AllOrNone) {
865   const MojoCreateDataPipeOptions options = {
866       kSizeOfOptions,                          // |struct_size|.
867       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
868       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
869       10 * sizeof(int32_t)                     // |capacity_num_bytes|.
870   };
871   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
872   MojoHandleSignalsState hss;
873 
874   // Try writing more than the total capacity of the pipe.
875   uint32_t num_bytes = 20u * sizeof(int32_t);
876   int32_t buffer[100];
877   Seq(0, arraysize(buffer), buffer);
878   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
879 
880   // Should still be empty.
881   num_bytes = ~0u;
882   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
883   ASSERT_EQ(0u, num_bytes);
884 
885   // Write some data.
886   num_bytes = 5u * sizeof(int32_t);
887   Seq(100, arraysize(buffer), buffer);
888   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
889   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
890 
891   // Wait for data.
892   // TODO(vtl): There's no real guarantee that all the data will become
893   // available at once (except that in current implementations, with reasonable
894   // limits, it will). Eventually, we'll be able to wait for a specified amount
895   // of data to become available.
896   hss = MojoHandleSignalsState();
897   ASSERT_EQ(MOJO_RESULT_OK,
898             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
899   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
900             hss.satisfied_signals);
901   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
902                 MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_PEER_REMOTE,
903             hss.satisfiable_signals);
904 
905   // Half full.
906   num_bytes = 0u;
907   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
908   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
909 
910   // Try writing more than the available capacity of the pipe, but less than the
911   // total capacity.
912   num_bytes = 6u * sizeof(int32_t);
913   Seq(200, arraysize(buffer), buffer);
914   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
915 
916   // Try reading too much.
917   num_bytes = 11u * sizeof(int32_t);
918   memset(buffer, 0xab, sizeof(buffer));
919   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
920   int32_t expected_buffer[100];
921   memset(expected_buffer, 0xab, sizeof(expected_buffer));
922   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
923 
924   // Try discarding too much.
925   num_bytes = 11u * sizeof(int32_t);
926   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
927 
928   // Just a little.
929   num_bytes = 2u * sizeof(int32_t);
930   Seq(300, arraysize(buffer), buffer);
931   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
932   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
933 
934   // Just right.
935   num_bytes = 3u * sizeof(int32_t);
936   Seq(400, arraysize(buffer), buffer);
937   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
938   ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
939 
940   // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
941   // specified amount of data to be available, so poll.
942   for (size_t i = 0; i < kMaxPoll; i++) {
943     num_bytes = 0u;
944     ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
945     if (num_bytes >= 10u * sizeof(int32_t))
946       break;
947 
948     test::Sleep(test::EpsilonDeadline());
949   }
950   ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
951 
952   // Read half.
953   num_bytes = 5u * sizeof(int32_t);
954   memset(buffer, 0xab, sizeof(buffer));
955   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
956   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
957   memset(expected_buffer, 0xab, sizeof(expected_buffer));
958   Seq(100, 5, expected_buffer);
959   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
960 
961   // Try reading too much again.
962   num_bytes = 6u * sizeof(int32_t);
963   memset(buffer, 0xab, sizeof(buffer));
964   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
965   memset(expected_buffer, 0xab, sizeof(expected_buffer));
966   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
967 
968   // Try discarding too much again.
969   num_bytes = 6u * sizeof(int32_t);
970   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
971 
972   // Discard a little.
973   num_bytes = 2u * sizeof(int32_t);
974   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
975   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
976 
977   // Three left.
978   num_bytes = 0u;
979   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
980   ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
981 
982   // Close the producer, then test producer-closed cases.
983   CloseProducer();
984 
985   // Wait.
986   hss = MojoHandleSignalsState();
987   ASSERT_EQ(MOJO_RESULT_OK,
988             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
989   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
990             hss.satisfied_signals);
991   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
992             hss.satisfiable_signals);
993 
994   // Try reading too much; "failed precondition" since the producer is closed.
995   num_bytes = 4u * sizeof(int32_t);
996   memset(buffer, 0xab, sizeof(buffer));
997   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
998             ReadData(buffer, &num_bytes, true));
999   memset(expected_buffer, 0xab, sizeof(expected_buffer));
1000   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1001 
1002   // Try discarding too much; "failed precondition" again.
1003   num_bytes = 4u * sizeof(int32_t);
1004   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true));
1005 
1006   // Read a little.
1007   num_bytes = 2u * sizeof(int32_t);
1008   memset(buffer, 0xab, sizeof(buffer));
1009   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
1010   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
1011   memset(expected_buffer, 0xab, sizeof(expected_buffer));
1012   Seq(400, 2, expected_buffer);
1013   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1014 
1015   // Discard the remaining element.
1016   num_bytes = 1u * sizeof(int32_t);
1017   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
1018   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1019 
1020   // Empty again.
1021   num_bytes = ~0u;
1022   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1023   ASSERT_EQ(0u, num_bytes);
1024 }
1025 
1026 // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
1027 // respectively, as much as possible, even if it may have to "wrap around" the
1028 // internal circular buffer. (Note that the two-phase write and read need not do
1029 // this.)
TEST_F(DataPipeTest,WrapAround)1030 TEST_F(DataPipeTest, WrapAround) {
1031   unsigned char test_data[1000];
1032   for (size_t i = 0; i < arraysize(test_data); i++)
1033     test_data[i] = static_cast<unsigned char>(i);
1034 
1035   const MojoCreateDataPipeOptions options = {
1036       kSizeOfOptions,                   // |struct_size|.
1037       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
1038       1u,                               // |element_num_bytes|.
1039       100u                              // |capacity_num_bytes|.
1040   };
1041 
1042   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1043   MojoHandleSignalsState hss;
1044 
1045   // Write 20 bytes.
1046   uint32_t num_bytes = 20u;
1047   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&test_data[0], &num_bytes, true));
1048   ASSERT_EQ(20u, num_bytes);
1049 
1050   // Wait for data.
1051   ASSERT_EQ(MOJO_RESULT_OK,
1052             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1053   EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
1054   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1055                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1056                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1057             hss.satisfiable_signals);
1058 
1059   // Read 10 bytes.
1060   unsigned char read_buffer[1000] = {0};
1061   num_bytes = 10u;
1062   ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes, true));
1063   ASSERT_EQ(10u, num_bytes);
1064   ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
1065 
1066   // Check that a two-phase write can now only write (at most) 80 bytes. (This
1067   // checks an implementation detail; this behavior is not guaranteed.)
1068   void* write_buffer_ptr = nullptr;
1069   num_bytes = 0u;
1070   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1071   EXPECT_TRUE(write_buffer_ptr);
1072   ASSERT_EQ(80u, num_bytes);
1073   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0));
1074 
1075   size_t total_num_bytes = 0;
1076   while (total_num_bytes < 90) {
1077     // Wait to write.
1078     ASSERT_EQ(MOJO_RESULT_OK,
1079               WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
1080     ASSERT_EQ(hss.satisfied_signals, MOJO_HANDLE_SIGNAL_WRITABLE);
1081     ASSERT_EQ(hss.satisfiable_signals, MOJO_HANDLE_SIGNAL_WRITABLE |
1082                                            MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1083                                            MOJO_HANDLE_SIGNAL_PEER_REMOTE);
1084 
1085     // Write as much as we can.
1086     num_bytes = 100;
1087     ASSERT_EQ(MOJO_RESULT_OK,
1088               WriteData(&test_data[20 + total_num_bytes], &num_bytes, false));
1089     total_num_bytes += num_bytes;
1090   }
1091 
1092   ASSERT_EQ(90u, total_num_bytes);
1093 
1094   num_bytes = 0;
1095   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1096   ASSERT_EQ(100u, num_bytes);
1097 
1098   // Check that a two-phase read can now only read (at most) 90 bytes. (This
1099   // checks an implementation detail; this behavior is not guaranteed.)
1100   const void* read_buffer_ptr = nullptr;
1101   num_bytes = 0;
1102   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1103   EXPECT_TRUE(read_buffer_ptr);
1104   ASSERT_EQ(90u, num_bytes);
1105   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0));
1106 
1107   // Read as much as possible. We should read 100 bytes.
1108   num_bytes =
1109       static_cast<uint32_t>(arraysize(read_buffer) * sizeof(read_buffer[0]));
1110   memset(read_buffer, 0, num_bytes);
1111   ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes));
1112   ASSERT_EQ(100u, num_bytes);
1113   ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
1114 }
1115 
1116 // Tests the behavior of writing (simple and two-phase), closing the producer,
1117 // then reading (simple and two-phase).
TEST_F(DataPipeTest,WriteCloseProducerRead)1118 TEST_F(DataPipeTest, WriteCloseProducerRead) {
1119   const char kTestData[] = "hello world";
1120   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1121 
1122   const MojoCreateDataPipeOptions options = {
1123       kSizeOfOptions,                   // |struct_size|.
1124       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
1125       1u,                               // |element_num_bytes|.
1126       1000u                             // |capacity_num_bytes|.
1127   };
1128   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1129 
1130   // Write some data, so we'll have something to read.
1131   uint32_t num_bytes = kTestDataSize;
1132   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1133   ASSERT_EQ(kTestDataSize, num_bytes);
1134 
1135   // Write it again, so we'll have something left over.
1136   num_bytes = kTestDataSize;
1137   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1138   ASSERT_EQ(kTestDataSize, num_bytes);
1139 
1140   // Start two-phase write.
1141   void* write_buffer_ptr = nullptr;
1142   num_bytes = 0u;
1143   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1144   EXPECT_TRUE(write_buffer_ptr);
1145   EXPECT_GT(num_bytes, 0u);
1146 
1147   // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1148   for (size_t i = 0; i < kMaxPoll; i++) {
1149     num_bytes = 0u;
1150     ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1151     if (num_bytes >= 2u * kTestDataSize)
1152       break;
1153 
1154     test::Sleep(test::EpsilonDeadline());
1155   }
1156   ASSERT_EQ(2u * kTestDataSize, num_bytes);
1157 
1158   // Start two-phase read.
1159   const void* read_buffer_ptr = nullptr;
1160   num_bytes = 0u;
1161   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1162   EXPECT_TRUE(read_buffer_ptr);
1163   ASSERT_EQ(2u * kTestDataSize, num_bytes);
1164 
1165   // Close the producer.
1166   CloseProducer();
1167 
1168   // The consumer can finish its two-phase read.
1169   ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1170   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize));
1171 
1172   // And start another.
1173   read_buffer_ptr = nullptr;
1174   num_bytes = 0u;
1175   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1176   EXPECT_TRUE(read_buffer_ptr);
1177   ASSERT_EQ(kTestDataSize, num_bytes);
1178 }
1179 
1180 // Tests the behavior of interrupting a two-phase read and write by closing the
1181 // consumer.
TEST_F(DataPipeTest,TwoPhaseWriteReadCloseConsumer)1182 TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) {
1183   const char kTestData[] = "hello world";
1184   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1185 
1186   const MojoCreateDataPipeOptions options = {
1187       kSizeOfOptions,                   // |struct_size|.
1188       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
1189       1u,                               // |element_num_bytes|.
1190       1000u                             // |capacity_num_bytes|.
1191   };
1192   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1193   MojoHandleSignalsState hss;
1194 
1195   // Write some data, so we'll have something to read.
1196   uint32_t num_bytes = kTestDataSize;
1197   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1198   ASSERT_EQ(kTestDataSize, num_bytes);
1199 
1200   // Start two-phase write.
1201   void* write_buffer_ptr = nullptr;
1202   num_bytes = 0u;
1203   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1204   EXPECT_TRUE(write_buffer_ptr);
1205   ASSERT_GT(num_bytes, kTestDataSize);
1206 
1207   // Wait for data.
1208   // TODO(vtl): (See corresponding TODO in AllOrNone.)
1209   hss = MojoHandleSignalsState();
1210   ASSERT_EQ(MOJO_RESULT_OK,
1211             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1212   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1213             hss.satisfied_signals);
1214   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1215                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1216                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1217             hss.satisfiable_signals);
1218 
1219   // Start two-phase read.
1220   const void* read_buffer_ptr = nullptr;
1221   num_bytes = 0u;
1222   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1223   EXPECT_TRUE(read_buffer_ptr);
1224   ASSERT_EQ(kTestDataSize, num_bytes);
1225 
1226   // Close the consumer.
1227   CloseConsumer();
1228 
1229   // Wait for producer to know that the consumer is closed.
1230   hss = MojoHandleSignalsState();
1231   ASSERT_EQ(MOJO_RESULT_OK,
1232             WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
1233   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1234   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1235 
1236   // Actually write some data. (Note: Premature freeing of the buffer would
1237   // probably only be detected under ASAN or similar.)
1238   memcpy(write_buffer_ptr, kTestData, kTestDataSize);
1239   // Note: Even though the consumer has been closed, ending the two-phase
1240   // write will report success.
1241   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize));
1242 
1243   // But trying to write should result in failure.
1244   num_bytes = kTestDataSize;
1245   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes));
1246 
1247   // As will trying to start another two-phase write.
1248   write_buffer_ptr = nullptr;
1249   num_bytes = 0u;
1250   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1251             BeginWriteData(&write_buffer_ptr, &num_bytes));
1252 }
1253 
1254 // Tests the behavior of "interrupting" a two-phase write by closing both the
1255 // producer and the consumer.
TEST_F(DataPipeTest,TwoPhaseWriteCloseBoth)1256 TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) {
1257   const uint32_t kTestDataSize = 15u;
1258 
1259   const MojoCreateDataPipeOptions options = {
1260       kSizeOfOptions,                   // |struct_size|.
1261       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
1262       1u,                               // |element_num_bytes|.
1263       1000u                             // |capacity_num_bytes|.
1264   };
1265   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1266 
1267   // Start two-phase write.
1268   void* write_buffer_ptr = nullptr;
1269   uint32_t num_bytes = 0u;
1270   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1271   EXPECT_TRUE(write_buffer_ptr);
1272   ASSERT_GT(num_bytes, kTestDataSize);
1273 }
1274 
1275 // Tests the behavior of writing, closing the producer, and then reading (with
1276 // and without data remaining).
TEST_F(DataPipeTest,WriteCloseProducerReadNoData)1277 TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
1278   const char kTestData[] = "hello world";
1279   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1280 
1281   const MojoCreateDataPipeOptions options = {
1282       kSizeOfOptions,                   // |struct_size|.
1283       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
1284       1u,                               // |element_num_bytes|.
1285       1000u                             // |capacity_num_bytes|.
1286   };
1287   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1288   MojoHandleSignalsState hss;
1289 
1290   // Write some data, so we'll have something to read.
1291   uint32_t num_bytes = kTestDataSize;
1292   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1293   ASSERT_EQ(kTestDataSize, num_bytes);
1294 
1295   // Close the producer.
1296   CloseProducer();
1297 
1298   // Wait. (Note that once the consumer knows that the producer is closed, it
1299   // must also know about all the data that was sent.)
1300   hss = MojoHandleSignalsState();
1301   ASSERT_EQ(MOJO_RESULT_OK,
1302             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
1303   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1304                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1305             hss.satisfied_signals);
1306   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1307                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1308             hss.satisfiable_signals);
1309 
1310   // Peek that data.
1311   char buffer[1000];
1312   num_bytes = static_cast<uint32_t>(sizeof(buffer));
1313   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true));
1314   ASSERT_EQ(kTestDataSize, num_bytes);
1315   ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1316 
1317   // Read that data.
1318   memset(buffer, 0, 1000);
1319   num_bytes = static_cast<uint32_t>(sizeof(buffer));
1320   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes));
1321   ASSERT_EQ(kTestDataSize, num_bytes);
1322   ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1323 
1324   // A second read should fail.
1325   num_bytes = static_cast<uint32_t>(sizeof(buffer));
1326   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes));
1327 
1328   // A two-phase read should also fail.
1329   const void* read_buffer_ptr = nullptr;
1330   num_bytes = 0u;
1331   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1332             BeginReadData(&read_buffer_ptr, &num_bytes));
1333 
1334   // Ditto for discard.
1335   num_bytes = 10u;
1336   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
1337 }
1338 
1339 // Test that during a two phase read the memory stays valid even if more data
1340 // comes in.
TEST_F(DataPipeTest,TwoPhaseReadMemoryStable)1341 TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) {
1342   const char kTestData[] = "hello world";
1343   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1344 
1345   const MojoCreateDataPipeOptions options = {
1346       kSizeOfOptions,                   // |struct_size|.
1347       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
1348       1u,                               // |element_num_bytes|.
1349       1000u                             // |capacity_num_bytes|.
1350   };
1351   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1352   MojoHandleSignalsState hss;
1353 
1354   // Write some data.
1355   uint32_t num_bytes = kTestDataSize;
1356   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1357   ASSERT_EQ(kTestDataSize, num_bytes);
1358 
1359   // Wait for the data.
1360   hss = MojoHandleSignalsState();
1361   ASSERT_EQ(MOJO_RESULT_OK,
1362             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1363   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1364             hss.satisfied_signals);
1365   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1366                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1367                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1368             hss.satisfiable_signals);
1369 
1370   // Begin a two-phase read.
1371   const void* read_buffer_ptr = nullptr;
1372   uint32_t read_buffer_size = 0u;
1373   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size));
1374 
1375   // Write more data.
1376   const char kExtraData[] = "bye world";
1377   const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
1378   num_bytes = kExtraDataSize;
1379   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
1380   ASSERT_EQ(kExtraDataSize, num_bytes);
1381 
1382   // Close the producer.
1383   CloseProducer();
1384 
1385   // Wait. (Note that once the consumer knows that the producer is closed, it
1386   // must also have received the extra data).
1387   hss = MojoHandleSignalsState();
1388   ASSERT_EQ(MOJO_RESULT_OK,
1389             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
1390   EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1391   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1392                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1393             hss.satisfiable_signals);
1394 
1395   // Read the two phase memory to check it's still valid.
1396   ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1397   EndReadData(read_buffer_size);
1398 }
1399 
1400 // Test that two-phase reads/writes behave correctly when given invalid
1401 // arguments.
TEST_F(DataPipeTest,TwoPhaseMoreInvalidArguments)1402 TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
1403   const MojoCreateDataPipeOptions options = {
1404       kSizeOfOptions,                          // |struct_size|.
1405       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
1406       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
1407       10 * sizeof(int32_t)                     // |capacity_num_bytes|.
1408   };
1409   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1410   MojoHandleSignalsState hss;
1411 
1412   // No data.
1413   uint32_t num_bytes = 1000u;
1414   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1415   ASSERT_EQ(0u, num_bytes);
1416 
1417   // Try "ending" a two-phase write when one isn't active.
1418   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1419             EndWriteData(1u * sizeof(int32_t)));
1420 
1421   // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
1422   // have time to propagate.
1423   test::Sleep(test::EpsilonDeadline());
1424 
1425   // Still no data.
1426   num_bytes = 1000u;
1427   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1428   ASSERT_EQ(0u, num_bytes);
1429 
1430   // Try ending a two-phase write with an invalid amount (too much).
1431   num_bytes = 0u;
1432   void* write_ptr = nullptr;
1433   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1434   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1435             EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1436 
1437   // But the two-phase write still ended.
1438   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1439 
1440   // Wait a bit (as above).
1441   test::Sleep(test::EpsilonDeadline());
1442 
1443   // Still no data.
1444   num_bytes = 1000u;
1445   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1446   ASSERT_EQ(0u, num_bytes);
1447 
1448   // Try ending a two-phase write with an invalid amount (not a multiple of the
1449   // element size).
1450   num_bytes = 0u;
1451   write_ptr = nullptr;
1452   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1453   EXPECT_GE(num_bytes, 1u);
1454   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u));
1455 
1456   // But the two-phase write still ended.
1457   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1458 
1459   // Wait a bit (as above).
1460   test::Sleep(test::EpsilonDeadline());
1461 
1462   // Still no data.
1463   num_bytes = 1000u;
1464   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1465   ASSERT_EQ(0u, num_bytes);
1466 
1467   // Now write some data, so we'll be able to try reading.
1468   int32_t element = 123;
1469   num_bytes = 1u * sizeof(int32_t);
1470   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes));
1471 
1472   // Wait for data.
1473   // TODO(vtl): (See corresponding TODO in AllOrNone.)
1474   hss = MojoHandleSignalsState();
1475   ASSERT_EQ(MOJO_RESULT_OK,
1476             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1477   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1478             hss.satisfied_signals);
1479   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1480                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1481                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1482             hss.satisfiable_signals);
1483 
1484   // One element available.
1485   num_bytes = 0u;
1486   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1487   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1488 
1489   // Try "ending" a two-phase read when one isn't active.
1490   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t)));
1491 
1492   // Still one element available.
1493   num_bytes = 0u;
1494   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1495   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1496 
1497   // Try ending a two-phase read with an invalid amount (too much).
1498   num_bytes = 0u;
1499   const void* read_ptr = nullptr;
1500   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1501   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1502             EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1503 
1504   // Still one element available.
1505   num_bytes = 0u;
1506   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1507   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1508 
1509   // Try ending a two-phase read with an invalid amount (not a multiple of the
1510   // element size).
1511   num_bytes = 0u;
1512   read_ptr = nullptr;
1513   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1514   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1515   ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
1516   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u));
1517 
1518   // Still one element available.
1519   num_bytes = 0u;
1520   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1521   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1522 }
1523 
1524 // Test that a producer can be sent over a MP.
TEST_F(DataPipeTest,SendProducer)1525 TEST_F(DataPipeTest, SendProducer) {
1526   const char kTestData[] = "hello world";
1527   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1528 
1529   const MojoCreateDataPipeOptions options = {
1530       kSizeOfOptions,                   // |struct_size|.
1531       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
1532       1u,                               // |element_num_bytes|.
1533       1000u                             // |capacity_num_bytes|.
1534   };
1535   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1536   MojoHandleSignalsState hss;
1537 
1538   // Write some data.
1539   uint32_t num_bytes = kTestDataSize;
1540   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1541   ASSERT_EQ(kTestDataSize, num_bytes);
1542 
1543   // Wait for the data.
1544   hss = MojoHandleSignalsState();
1545   ASSERT_EQ(MOJO_RESULT_OK,
1546             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1547   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1548             hss.satisfied_signals);
1549   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1550                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1551                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1552             hss.satisfiable_signals);
1553 
1554   // Check the data.
1555   const void* read_buffer = nullptr;
1556   num_bytes = 0u;
1557   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
1558   ASSERT_EQ(0, memcmp(read_buffer, kTestData, kTestDataSize));
1559   EndReadData(num_bytes);
1560 
1561   // Now send the producer over a MP so that it's serialized.
1562   MojoHandle pipe0, pipe1;
1563   ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
1564 
1565   ASSERT_EQ(MOJO_RESULT_OK,
1566             WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &producer_, 1,
1567                             MOJO_WRITE_MESSAGE_FLAG_NONE));
1568   producer_ = MOJO_HANDLE_INVALID;
1569   ASSERT_EQ(MOJO_RESULT_OK,
1570             WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1571   ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &producer_, 1));
1572 
1573   // Write more data.
1574   const char kExtraData[] = "bye world";
1575   const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
1576   num_bytes = kExtraDataSize;
1577   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
1578   ASSERT_EQ(kExtraDataSize, num_bytes);
1579 
1580   // Wait for it.
1581   hss = MojoHandleSignalsState();
1582   ASSERT_EQ(MOJO_RESULT_OK,
1583             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1584   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1585             hss.satisfied_signals);
1586   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1587                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
1588                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1589             hss.satisfiable_signals);
1590 
1591   // Check the second write.
1592   num_bytes = 0u;
1593   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
1594   ASSERT_EQ(0, memcmp(read_buffer, kExtraData, kExtraDataSize));
1595   EndReadData(num_bytes);
1596 
1597   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
1598   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
1599 }
1600 
1601 // Ensures that if a data pipe consumer whose producer has closed is passed over
1602 // a message pipe, the deserialized dispatcher is also marked as having a closed
1603 // peer.
TEST_F(DataPipeTest,ConsumerWithClosedProducerSent)1604 TEST_F(DataPipeTest, ConsumerWithClosedProducerSent) {
1605   const MojoCreateDataPipeOptions options = {
1606       kSizeOfOptions,                          // |struct_size|.
1607       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
1608       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
1609       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
1610   };
1611 
1612   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1613 
1614   // We can write to a data pipe handle immediately.
1615   int32_t data = 123;
1616   uint32_t num_bytes = sizeof(data);
1617   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&data, &num_bytes));
1618   ASSERT_EQ(MOJO_RESULT_OK, CloseProducer());
1619 
1620   // Now wait for the other side to become readable and to see the peer closed.
1621   MojoHandleSignalsState state;
1622   ASSERT_EQ(MOJO_RESULT_OK,
1623             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state));
1624   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1625                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1626             state.satisfied_signals);
1627   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1628                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1629             state.satisfiable_signals);
1630 
1631   // Now send the consumer over a MP so that it's serialized.
1632   MojoHandle pipe0, pipe1;
1633   ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
1634 
1635   ASSERT_EQ(MOJO_RESULT_OK,
1636             WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &consumer_, 1,
1637                             MOJO_WRITE_MESSAGE_FLAG_NONE));
1638   consumer_ = MOJO_HANDLE_INVALID;
1639   ASSERT_EQ(MOJO_RESULT_OK,
1640             WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &state));
1641   ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &consumer_, 1));
1642 
1643   ASSERT_EQ(MOJO_RESULT_OK,
1644             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state));
1645   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1646                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1647             state.satisfied_signals);
1648   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1649                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
1650             state.satisfiable_signals);
1651 
1652   int32_t read_data;
1653   ASSERT_EQ(MOJO_RESULT_OK, ReadData(&read_data, &num_bytes));
1654   ASSERT_EQ(sizeof(read_data), num_bytes);
1655   ASSERT_EQ(data, read_data);
1656 
1657   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
1658   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
1659 }
1660 
WriteAllData(MojoHandle producer,const void * elements,uint32_t num_bytes)1661 bool WriteAllData(MojoHandle producer,
1662                   const void* elements,
1663                   uint32_t num_bytes) {
1664   for (size_t i = 0; i < kMaxPoll; i++) {
1665     // Write as much data as we can.
1666     uint32_t write_bytes = num_bytes;
1667     MojoResult result =
1668         MojoWriteData(producer, elements, &write_bytes, nullptr);
1669     if (result == MOJO_RESULT_OK) {
1670       num_bytes -= write_bytes;
1671       elements = static_cast<const uint8_t*>(elements) + write_bytes;
1672       if (num_bytes == 0)
1673         return true;
1674     } else {
1675       EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
1676     }
1677 
1678     MojoHandleSignalsState hss = MojoHandleSignalsState();
1679     EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals(
1680                                   producer, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
1681     EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_WRITABLE);
1682     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1683                   MOJO_HANDLE_SIGNAL_PEER_REMOTE,
1684               hss.satisfiable_signals);
1685   }
1686 
1687   return false;
1688 }
1689 
1690 // If |expect_empty| is true, expect |consumer| to be empty after reading.
ReadAllData(MojoHandle consumer,void * elements,uint32_t num_bytes,bool expect_empty)1691 bool ReadAllData(MojoHandle consumer,
1692                  void* elements,
1693                  uint32_t num_bytes,
1694                  bool expect_empty) {
1695   for (size_t i = 0; i < kMaxPoll; i++) {
1696     // Read as much data as we can.
1697     uint32_t read_bytes = num_bytes;
1698     MojoResult result = MojoReadData(consumer, nullptr, elements, &read_bytes);
1699     if (result == MOJO_RESULT_OK) {
1700       num_bytes -= read_bytes;
1701       elements = static_cast<uint8_t*>(elements) + read_bytes;
1702       if (num_bytes == 0) {
1703         if (expect_empty) {
1704           // Expect no more data.
1705           test::Sleep(test::TinyDeadline());
1706           MojoReadDataOptions options;
1707           options.struct_size = sizeof(options);
1708           options.flags = MOJO_READ_DATA_FLAG_QUERY;
1709           MojoReadData(consumer, &options, nullptr, &num_bytes);
1710           EXPECT_EQ(0u, num_bytes);
1711         }
1712         return true;
1713       }
1714     } else {
1715       EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
1716     }
1717 
1718     MojoHandleSignalsState hss = MojoHandleSignalsState();
1719     EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals(
1720                                   consumer, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1721     // Peer could have become closed while we're still waiting for data.
1722     EXPECT_TRUE(MOJO_HANDLE_SIGNAL_READABLE & hss.satisfied_signals);
1723     EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
1724     EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED);
1725   }
1726 
1727   return num_bytes == 0;
1728 }
1729 
1730 #if !defined(OS_IOS)
1731 
TEST_F(DataPipeTest,Multiprocess)1732 TEST_F(DataPipeTest, Multiprocess) {
1733   const uint32_t kTestDataSize =
1734       static_cast<uint32_t>(sizeof(kMultiprocessTestData));
1735   const MojoCreateDataPipeOptions options = {
1736       kSizeOfOptions,                   // |struct_size|.
1737       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
1738       1,                                // |element_num_bytes|.
1739       kMultiprocessCapacity             // |capacity_num_bytes|.
1740   };
1741   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1742 
1743   RunTestClient("MultiprocessClient", [&](MojoHandle server_mp) {
1744     // Send some data before serialising and sending the data pipe over.
1745     // This is the first write so we don't need to use WriteAllData.
1746     uint32_t num_bytes = kTestDataSize;
1747     ASSERT_EQ(MOJO_RESULT_OK, WriteData(kMultiprocessTestData, &num_bytes,
1748                                         MOJO_WRITE_DATA_FLAG_ALL_OR_NONE));
1749     ASSERT_EQ(kTestDataSize, num_bytes);
1750 
1751     // Send child process the data pipe.
1752     ASSERT_EQ(MOJO_RESULT_OK,
1753               WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0,
1754                               &consumer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
1755 
1756     // Send a bunch of data of varying sizes.
1757     uint8_t buffer[100];
1758     int seq = 0;
1759     for (int i = 0; i < kMultiprocessMaxIter; ++i) {
1760       for (uint32_t size = 1; size <= kMultiprocessCapacity; size++) {
1761         for (unsigned int j = 0; j < size; ++j)
1762           buffer[j] = seq + j;
1763         EXPECT_TRUE(WriteAllData(producer_, buffer, size));
1764         seq += size;
1765       }
1766     }
1767 
1768     // Write the test string in again.
1769     ASSERT_TRUE(WriteAllData(producer_, kMultiprocessTestData, kTestDataSize));
1770 
1771     // Swap ends.
1772     ASSERT_EQ(MOJO_RESULT_OK,
1773               WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0,
1774                               &producer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
1775 
1776     // Receive the consumer from the other side.
1777     producer_ = MOJO_HANDLE_INVALID;
1778     MojoHandleSignalsState hss = MojoHandleSignalsState();
1779     ASSERT_EQ(MOJO_RESULT_OK,
1780               WaitForSignals(server_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1781     ASSERT_EQ(MOJO_RESULT_OK,
1782               ReadEmptyMessageWithHandles(server_mp, &consumer_, 1));
1783 
1784     // Read the test string twice. Once for when we sent it, and once for the
1785     // other end sending it.
1786     for (int i = 0; i < 2; ++i) {
1787       EXPECT_TRUE(ReadAllData(consumer_, buffer, kTestDataSize, i == 1));
1788       EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
1789     }
1790 
1791     WriteMessage(server_mp, "quit");
1792 
1793     // Don't have to close the consumer here because it will be done for us.
1794   });
1795 }
1796 
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient,DataPipeTest,client_mp)1797 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient, DataPipeTest, client_mp) {
1798   const uint32_t kTestDataSize =
1799       static_cast<uint32_t>(sizeof(kMultiprocessTestData));
1800 
1801   // Receive the data pipe from the other side.
1802   MojoHandle consumer = MOJO_HANDLE_INVALID;
1803   MojoHandleSignalsState hss = MojoHandleSignalsState();
1804   ASSERT_EQ(MOJO_RESULT_OK,
1805             WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1806   ASSERT_EQ(MOJO_RESULT_OK,
1807             ReadEmptyMessageWithHandles(client_mp, &consumer, 1));
1808 
1809   // Read the initial string that was sent.
1810   int32_t buffer[100];
1811   EXPECT_TRUE(ReadAllData(consumer, buffer, kTestDataSize, false));
1812   EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
1813 
1814   // Receive the main data and check it is correct.
1815   int seq = 0;
1816   uint8_t expected_buffer[100];
1817   for (int i = 0; i < kMultiprocessMaxIter; ++i) {
1818     for (uint32_t size = 1; size <= kMultiprocessCapacity; ++size) {
1819       for (unsigned int j = 0; j < size; ++j)
1820         expected_buffer[j] = seq + j;
1821       EXPECT_TRUE(ReadAllData(consumer, buffer, size, false));
1822       EXPECT_EQ(0, memcmp(buffer, expected_buffer, size));
1823 
1824       seq += size;
1825     }
1826   }
1827 
1828   // Swap ends.
1829   ASSERT_EQ(MOJO_RESULT_OK,
1830             WriteMessageRaw(MessagePipeHandle(client_mp), nullptr, 0, &consumer,
1831                             1, MOJO_WRITE_MESSAGE_FLAG_NONE));
1832 
1833   // Receive the producer from the other side.
1834   MojoHandle producer = MOJO_HANDLE_INVALID;
1835   hss = MojoHandleSignalsState();
1836   ASSERT_EQ(MOJO_RESULT_OK,
1837             WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
1838   ASSERT_EQ(MOJO_RESULT_OK,
1839             ReadEmptyMessageWithHandles(client_mp, &producer, 1));
1840 
1841   // Write the test string one more time.
1842   EXPECT_TRUE(WriteAllData(producer, kMultiprocessTestData, kTestDataSize));
1843 
1844   // We swapped ends, so close the producer.
1845   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(producer));
1846 
1847   // Wait to receive a "quit" message before exiting.
1848   EXPECT_EQ("quit", ReadMessage(client_mp));
1849 }
1850 
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer,DataPipeTest,h)1851 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer, DataPipeTest, h) {
1852   MojoHandle p;
1853   std::string message = ReadMessageWithHandles(h, &p, 1);
1854 
1855   // Write some data to the producer and close it.
1856   uint32_t num_bytes = static_cast<uint32_t>(message.size());
1857   EXPECT_EQ(MOJO_RESULT_OK,
1858             MojoWriteData(p, message.data(), &num_bytes, nullptr));
1859   EXPECT_EQ(num_bytes, static_cast<uint32_t>(message.size()));
1860 
1861   // Close the producer before quitting.
1862   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
1863 
1864   // Wait for a quit message.
1865   EXPECT_EQ("quit", ReadMessage(h));
1866 }
1867 
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer,DataPipeTest,h)1868 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer, DataPipeTest, h) {
1869   MojoHandle c;
1870   std::string expected_message = ReadMessageWithHandles(h, &c, 1);
1871 
1872   // Wait for the consumer to become readable.
1873   EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE));
1874 
1875   // Drain the consumer and expect to find the given message.
1876   uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
1877   std::vector<char> bytes(expected_message.size());
1878   EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, nullptr, bytes.data(), &num_bytes));
1879   EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
1880 
1881   std::string message(bytes.data(), bytes.size());
1882   EXPECT_EQ(expected_message, message);
1883 
1884   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
1885 
1886   // Wait for a quit message.
1887   EXPECT_EQ("quit", ReadMessage(h));
1888 }
1889 
TEST_F(DataPipeTest,SendConsumerAndCloseProducer)1890 TEST_F(DataPipeTest, SendConsumerAndCloseProducer) {
1891   // Create a new data pipe.
1892   MojoHandle p, c;
1893   EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &p, &c));
1894 
1895   RunTestClient("WriteAndCloseProducer", [&](MojoHandle producer_client) {
1896     RunTestClient("ReadAndCloseConsumer", [&](MojoHandle consumer_client) {
1897       const std::string kMessage = "Hello, world!";
1898       WriteMessageWithHandles(producer_client, kMessage, &p, 1);
1899       WriteMessageWithHandles(consumer_client, kMessage, &c, 1);
1900 
1901       WriteMessage(consumer_client, "quit");
1902     });
1903 
1904     WriteMessage(producer_client, "quit");
1905   });
1906 }
1907 
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite,DataPipeTest,h)1908 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite, DataPipeTest, h) {
1909   const MojoCreateDataPipeOptions options = {
1910       kSizeOfOptions,                   // |struct_size|.
1911       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
1912       1,                                // |element_num_bytes|.
1913       kMultiprocessCapacity             // |capacity_num_bytes|.
1914   };
1915 
1916   MojoHandle p, c;
1917   ASSERT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(&options, &p, &c));
1918 
1919   const std::string kMessage = "Hello, world!";
1920   WriteMessageWithHandles(h, kMessage, &c, 1);
1921 
1922   // Write some data to the producer and close it.
1923   uint32_t num_bytes = static_cast<uint32_t>(kMessage.size());
1924   EXPECT_EQ(MOJO_RESULT_OK,
1925             MojoWriteData(p, kMessage.data(), &num_bytes, nullptr));
1926   EXPECT_EQ(num_bytes, static_cast<uint32_t>(kMessage.size()));
1927   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
1928 
1929   // Wait for a quit message.
1930   EXPECT_EQ("quit", ReadMessage(h));
1931 }
1932 
TEST_F(DataPipeTest,CreateInChild)1933 TEST_F(DataPipeTest, CreateInChild) {
1934   RunTestClient("CreateAndWrite", [&](MojoHandle child) {
1935     MojoHandle c;
1936     std::string expected_message = ReadMessageWithHandles(child, &c, 1);
1937 
1938     // Wait for the consumer to become readable.
1939     EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE));
1940 
1941     // Drain the consumer and expect to find the given message.
1942     uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
1943     std::vector<char> bytes(expected_message.size());
1944     EXPECT_EQ(MOJO_RESULT_OK,
1945               MojoReadData(c, nullptr, bytes.data(), &num_bytes));
1946     EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
1947 
1948     std::string message(bytes.data(), bytes.size());
1949     EXPECT_EQ(expected_message, message);
1950 
1951     EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
1952     WriteMessage(child, "quit");
1953   });
1954 }
1955 
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(DataPipeStatusChangeInTransitClient,DataPipeTest,parent)1956 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(DataPipeStatusChangeInTransitClient,
1957                                   DataPipeTest,
1958                                   parent) {
1959   // This test verifies that peer closure is detectable through various
1960   // mechanisms when it races with handle transfer.
1961 
1962   MojoHandle handles[6];
1963   EXPECT_EQ("o_O", ReadMessageWithHandles(parent, handles, 6));
1964   MojoHandle* producers = &handles[0];
1965   MojoHandle* consumers = &handles[3];
1966 
1967   // Wait on producer 0
1968   EXPECT_EQ(MOJO_RESULT_OK,
1969             WaitForSignals(producers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED));
1970 
1971   // Wait on consumer 0
1972   EXPECT_EQ(MOJO_RESULT_OK,
1973             WaitForSignals(consumers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED));
1974 
1975   base::MessageLoop message_loop;
1976 
1977   // Wait on producer 1 and consumer 1 using SimpleWatchers.
1978   {
1979     base::RunLoop run_loop;
1980     int count = 0;
1981     auto callback = base::Bind(
1982         [](base::RunLoop* loop, int* count, MojoResult result) {
1983           EXPECT_EQ(MOJO_RESULT_OK, result);
1984           if (++*count == 2)
1985             loop->Quit();
1986         },
1987         &run_loop, &count);
1988     SimpleWatcher producer_watcher(FROM_HERE,
1989                                    SimpleWatcher::ArmingPolicy::AUTOMATIC,
1990                                    base::SequencedTaskRunnerHandle::Get());
1991     SimpleWatcher consumer_watcher(FROM_HERE,
1992                                    SimpleWatcher::ArmingPolicy::AUTOMATIC,
1993                                    base::SequencedTaskRunnerHandle::Get());
1994     producer_watcher.Watch(Handle(producers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1995                            callback);
1996     consumer_watcher.Watch(Handle(consumers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1997                            callback);
1998     run_loop.Run();
1999     EXPECT_EQ(2, count);
2000   }
2001 
2002   // Wait on producer 2 by polling with MojoWriteData.
2003   MojoResult result;
2004   do {
2005     uint32_t num_bytes = 0;
2006     result = MojoWriteData(producers[2], nullptr, &num_bytes, nullptr);
2007   } while (result == MOJO_RESULT_OK);
2008   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
2009 
2010   // Wait on consumer 2 by polling with MojoReadData.
2011   do {
2012     char byte;
2013     uint32_t num_bytes = 1;
2014     result = MojoReadData(consumers[2], nullptr, &byte, &num_bytes);
2015   } while (result == MOJO_RESULT_SHOULD_WAIT);
2016   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
2017 
2018   for (size_t i = 0; i < 6; ++i)
2019     CloseHandle(handles[i]);
2020 }
2021 
TEST_F(DataPipeTest,StatusChangeInTransit)2022 TEST_F(DataPipeTest, StatusChangeInTransit) {
2023   MojoHandle producers[6];
2024   MojoHandle consumers[6];
2025   for (size_t i = 0; i < 6; ++i)
2026     CreateDataPipe(&producers[i], &consumers[i], 1);
2027 
2028   RunTestClient("DataPipeStatusChangeInTransitClient", [&](MojoHandle child) {
2029     MojoHandle handles[] = {producers[0], producers[1], producers[2],
2030                             consumers[3], consumers[4], consumers[5]};
2031 
2032     // Send 3 producers and 3 consumers, and let their transfer race with their
2033     // peers' closure.
2034     WriteMessageWithHandles(child, "o_O", handles, 6);
2035 
2036     for (size_t i = 0; i < 3; ++i)
2037       CloseHandle(consumers[i]);
2038     for (size_t i = 3; i < 6; ++i)
2039       CloseHandle(producers[i]);
2040   });
2041 }
2042 
2043 #endif  // !defined(OS_IOS)
2044 
2045 }  // namespace
2046 }  // namespace core
2047 }  // namespace mojo
2048