• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io;
6 use std::sync::Arc;
7 
8 use log::error;
9 use log::warn;
10 use serde::ser::SerializeStruct;
11 use serde::Deserialize;
12 use serde::Serialize;
13 use serde::Serializer;
14 use sync::Mutex;
15 
16 use super::named_pipes;
17 use super::named_pipes::PipeConnection;
18 use super::MultiProcessMutex;
19 use super::RawDescriptor;
20 use super::Result;
21 use crate::descriptor::AsRawDescriptor;
22 use crate::CloseNotifier;
23 use crate::Event;
24 use crate::ReadNotifier;
25 
26 #[derive(Copy, Clone)]
27 pub enum FramingMode {
28     Message,
29     Byte,
30 }
31 
32 #[derive(Copy, Clone, PartialEq, Eq)]
33 pub enum BlockingMode {
34     Blocking,
35     Nonblocking,
36 }
37 
38 impl From<FramingMode> for named_pipes::FramingMode {
from(framing_mode: FramingMode) -> Self39     fn from(framing_mode: FramingMode) -> Self {
40         match framing_mode {
41             FramingMode::Message => named_pipes::FramingMode::Message,
42             FramingMode::Byte => named_pipes::FramingMode::Byte,
43         }
44     }
45 }
46 
47 impl From<BlockingMode> for named_pipes::BlockingMode {
from(blocking_mode: BlockingMode) -> Self48     fn from(blocking_mode: BlockingMode) -> Self {
49         match blocking_mode {
50             BlockingMode::Blocking => named_pipes::BlockingMode::Wait,
51             BlockingMode::Nonblocking => named_pipes::BlockingMode::NoWait,
52         }
53     }
54 }
55 
56 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
57 
58 /// An abstraction over named pipes and unix socketpairs.
59 ///
60 /// WARNING: partial reads of messages behave differently depending on the platform.
61 /// See sys::unix::StreamChannel::inner_read for details.
62 ///
63 /// The ReadNotifier will return an event handle that is set when data is in the channel.
64 ///
65 /// In message mode, single writes larger than
66 /// `crate::windows::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
67 ///
68 /// # Notes for maintainers
69 /// 1. This struct contains extremely subtle thread safety considerations.
70 /// 2. Serialization is not derived! New fields need to be added manually.
71 #[derive(Deserialize, Debug)]
72 pub struct StreamChannel {
73     pipe_conn: named_pipes::PipeConnection,
74     write_notify: Event,
75     read_notify: Event,
76     pipe_closed: Event,
77 
78     // Held when reading on this end, to prevent additional writes from corrupting notification
79     // state.
80     remote_write_lock: MultiProcessMutex,
81 
82     // Held when a write is made on this end, so that if the remote end is reading, we wait to
83     // write to avoid corrupting notification state.
84     local_write_lock: MultiProcessMutex,
85 
86     // Held for the entire duration of a read. This enables the StreamChannel to be sync,
87     // ensuring there is no chance of concurrent reads creating a bad state in StreamChannel.
88     //
89     // In practice, there is no use-case for multiple threads actually contending over
90     // reading from a single pipe through StreamChannel, so this is mostly to provide a
91     // compiler guarantee while passing the StreamChannel to/from background executors.
92     //
93     // Note that this mutex does not work across processes, so the same StreamChannel end should
94     // NOT be concurrently used across process boundaries. (Odds are if you want to do this, it's
95     // not what you want. Wanting this means you want two readers on the *same end* of the pipe,
96     // which is not well defined behavior.)
97     #[serde(skip)]
98     #[serde(default = "create_read_lock")]
99     read_lock: Arc<Mutex<()>>,
100 
101     // Serde only has an immutable reference. Because of that, we have to cheat to signal when this
102     // channel end has been serialized. Once serialized, we know that the current end MUST NOT
103     // signal the channel has been closed when it was dropped, because a copy of it was sent to
104     // another process. It is the copy's responsibility to close the pipe.
105     #[serde(skip)]
106     #[serde(default = "create_true_mutex")]
107     is_channel_closed_on_drop: Mutex<bool>,
108 
109     // For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
110     // up to that size.
111     send_buffer_size: usize,
112 }
113 
create_read_lock() -> Arc<Mutex<()>>114 fn create_read_lock() -> Arc<Mutex<()>> {
115     Arc::new(Mutex::new(()))
116 }
117 
create_true_mutex() -> Mutex<bool>118 fn create_true_mutex() -> Mutex<bool> {
119     Mutex::new(true)
120 }
121 
122 /// Serialize is manually implemented because we need to tell the local copy that a remote copy
123 /// exists, and to not send the close event. Our serialization is otherwise identical to what
124 /// derive would have generated.
125 impl Serialize for StreamChannel {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,126     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
127     where
128         S: Serializer,
129     {
130         let mut s = serializer.serialize_struct("StreamChannel", 7)?;
131         s.serialize_field("pipe_conn", &self.pipe_conn)?;
132         s.serialize_field("write_notify", &self.write_notify)?;
133         s.serialize_field("read_notify", &self.read_notify)?;
134         s.serialize_field("pipe_closed", &self.pipe_closed)?;
135         s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
136         s.serialize_field("local_write_lock", &self.local_write_lock)?;
137         s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
138         let ret = s.end();
139 
140         // Because this end has been serialized, the serialized copy is now responsible for setting
141         // the close event.
142         if ret.is_ok() {
143             *self.is_channel_closed_on_drop.lock() = false;
144         }
145 
146         ret
147     }
148 }
149 
150 impl Drop for StreamChannel {
drop(&mut self)151     fn drop(&mut self) {
152         if *self.is_channel_closed_on_drop.lock() {
153             if let Err(e) = self.pipe_closed.signal() {
154                 warn!("failed to notify on channel drop: {}", e);
155             }
156         }
157     }
158 }
159 
160 impl StreamChannel {
set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()>161     pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
162         // Safe because the pipe is open.
163         if nonblocking {
164             self.pipe_conn
165                 .set_blocking(&named_pipes::BlockingMode::NoWait)
166         } else {
167             self.pipe_conn
168                 .set_blocking(&named_pipes::BlockingMode::Wait)
169         }
170     }
171 
172     // WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
173     // > 1 reader per end is not defined.
try_clone(&self) -> io::Result<Self>174     pub fn try_clone(&self) -> io::Result<Self> {
175         Ok(StreamChannel {
176             pipe_conn: self.pipe_conn.try_clone()?,
177             write_notify: self.write_notify.try_clone()?,
178             read_notify: self.read_notify.try_clone()?,
179             pipe_closed: self.pipe_closed.try_clone()?,
180             remote_write_lock: self.remote_write_lock.try_clone()?,
181             local_write_lock: self.local_write_lock.try_clone()?,
182             read_lock: self.read_lock.clone(),
183             is_channel_closed_on_drop: create_true_mutex(),
184             send_buffer_size: self.send_buffer_size,
185         })
186     }
187 
188     /// Gets the readable byte count. Returns zero for broken pipes since that will cause the read
189     /// notifier to be set, and for the consumer to quickly discover the broken pipe.
get_readable_byte_count(&self) -> io::Result<u32>190     fn get_readable_byte_count(&self) -> io::Result<u32> {
191         match self.pipe_conn.get_available_byte_count() {
192             Err(e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
193             Err(e) => {
194                 error!("StreamChannel failed to get readable byte count: {}", e);
195                 Err(e)
196             }
197             Ok(byte_count) => Ok(byte_count),
198         }
199     }
200 
inner_read(&self, buf: &mut [u8]) -> io::Result<usize>201     pub(super) fn inner_read(&self, buf: &mut [u8]) -> io::Result<usize> {
202         // We ensure concurrent read safety by holding a lock for the duration of the method.
203         // (If multiple concurrent readers were permitted, the pipe could be emptied after we decide
204         // that the notifier should be set, leading to an invalid notified/readable state which
205         // could stall readers.)
206         let _read_lock = self.read_lock.lock();
207 
208         // SAFETY:
209         // Safe because no partial reads are possible, and the underlying code bounds the
210         // read by buf's size.
211         let res = unsafe { self.pipe_conn.read(buf) };
212 
213         // The entire goal of this complex section is to avoid the need for shared memory between
214         // each channel end to synchronize the notification state. It is very subtle, modify with
215         // care.
216         loop {
217             // No other thread is reading, so we can find out, without the write lock, whether or
218             // not we need to clear the read notifier. If we don't, then we don't even have to try
219             // acquiring the write lock. This avoids deadlocks where the pipe is full and the write
220             // side blocks on a writing with the lock held. If it looks like we do need to clear
221             // the notifier though, then we have to be sure, so we'll proceed to the next section.
222             let byte_count = self.get_readable_byte_count()?;
223             if byte_count > 0 {
224                 // It's always safe to set the read notifier here because we know there is data in
225                 // the pipe, and no one else could read it out from under us.
226                 self.read_notify.signal().map_err(|e| {
227                     io::Error::new(
228                         io::ErrorKind::Other,
229                         format!("failed to write to read notifier: {:?}", e),
230                     )
231                 })?;
232 
233                 // Notifier state has been safely synced.
234                 return res;
235             }
236 
237             // At this point, there *may* be no data in the pipe, meaning we may want to clear the
238             // notifier. Instead of just trying to acquire the lock outright which could deadlock
239             // with the writing side, we'll try with a timeout. If it fails, we know that the other
240             // side is in the middle of a write, so there either will be data in the pipe soon (a),
241             // or there won't be and we have to clear a spurious notification (b).
242             //
243             // For (a), we can safely return from the read without needing the lock, so we just come
244             // around in the loop to check again, and the loop will exit quickly.
245             //
246             // For (b) we'll return to this point and acquire the lock, as we're just waiting for
247             // the spurious notification to arrive so we can clear it (that code path is very fast),
248             // and the loop will exit.
249             //
250             // If we successfully acquire the lock though, then we can go ahead and clear the
251             // notifier if the pipe is indeed empty, because we are assured that no writes are
252             // happening (we hold the lock). Here, we wait up to 1ms to acquire the lock because
253             // that's a decent balance between avoiding an unnecessary iteration, and minimizing
254             // latency.
255             if let Some(_write_lock) = self.remote_write_lock.try_lock(/* timeout_ms= */ 1) {
256                 let byte_count = self.get_readable_byte_count()?;
257                 if byte_count > 0 {
258                     // Safe because no one else can be reading from the pipe.
259                     self.read_notify.signal().map_err(|e| {
260                         io::Error::new(
261                             io::ErrorKind::Other,
262                             format!("failed to write to read notifier: {:?}", e),
263                         )
264                     })?;
265                 } else {
266                     // Safe because no other writes can be happening (_lock is held).
267                     self.read_notify.reset().map_err(|e| {
268                         io::Error::new(
269                             io::ErrorKind::Other,
270                             format!("failed to reset read notifier: {:?}", e),
271                         )
272                     })?;
273                 }
274 
275                 // Notifier state has been safely synced.
276                 return res;
277             }
278         }
279     }
280 
281     /// Exists as a workaround for Tube which does not expect its transport to be mutable,
282     /// even though io::Write requires it.
write_immutable(&self, buf: &[u8]) -> io::Result<usize>283     pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
284         if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
285             && buf.len() > self.send_buffer_size
286         {
287             return Err(io::Error::new(
288                 io::ErrorKind::Other,
289                 format!(
290                     "StreamChannel forbids message mode writes larger than the \
291                      default buffer size of {}.",
292                     self.send_buffer_size,
293                 ),
294             ));
295         }
296 
297         let _lock = self.local_write_lock.lock();
298         let res = self.pipe_conn.write(buf);
299 
300         // We can always set the write notifier because we know that the reader is in one of the
301         // following states:
302         //      1) a read is running, and it consumes these bytes, so the notification is
303         //         unnecessary. That's fine, because the reader will resync the notifier state once
304         //         it finishes reading.
305         //      2) a read has completed and is blocked on the lock. The notification state is
306         //         already correct, and the read's resync won't change that.
307         if res.is_ok() {
308             self.write_notify.signal().map_err(|e| {
309                 io::Error::new(
310                     io::ErrorKind::Other,
311                     format!("failed to write to read notifier: {:?}", e),
312                 )
313             })?;
314         }
315 
316         res
317     }
318 
319     /// This only works with empty pipes. U.B. will result if used in any other scenario.
from_pipes( pipe_a: PipeConnection, pipe_b: PipeConnection, send_buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>320     pub fn from_pipes(
321         pipe_a: PipeConnection,
322         pipe_b: PipeConnection,
323         send_buffer_size: usize,
324     ) -> Result<(StreamChannel, StreamChannel)> {
325         let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
326         let pipe_closed = Event::new()?;
327 
328         let write_lock_a = MultiProcessMutex::new()?;
329         let write_lock_b = MultiProcessMutex::new()?;
330 
331         let sock_a = StreamChannel {
332             pipe_conn: pipe_a,
333             write_notify: notify_a_write.try_clone()?,
334             read_notify: notify_b_write.try_clone()?,
335             read_lock: Arc::new(Mutex::new(())),
336             local_write_lock: write_lock_a.try_clone()?,
337             remote_write_lock: write_lock_b.try_clone()?,
338             pipe_closed: pipe_closed.try_clone()?,
339             is_channel_closed_on_drop: create_true_mutex(),
340             send_buffer_size,
341         };
342         let sock_b = StreamChannel {
343             pipe_conn: pipe_b,
344             write_notify: notify_b_write,
345             read_notify: notify_a_write,
346             read_lock: Arc::new(Mutex::new(())),
347             local_write_lock: write_lock_b,
348             remote_write_lock: write_lock_a,
349             pipe_closed,
350             is_channel_closed_on_drop: create_true_mutex(),
351             send_buffer_size,
352         };
353         Ok((sock_a, sock_b))
354     }
355 
356     /// Create a pair with a specific buffer size. Note that this is the only way to send messages
357     /// larger than the default named pipe buffer size.
pair_with_buffer_size( blocking_mode: BlockingMode, framing_mode: FramingMode, buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>358     pub fn pair_with_buffer_size(
359         blocking_mode: BlockingMode,
360         framing_mode: FramingMode,
361         buffer_size: usize,
362     ) -> Result<(StreamChannel, StreamChannel)> {
363         let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
364             &named_pipes::FramingMode::from(framing_mode),
365             &named_pipes::BlockingMode::from(blocking_mode),
366             0,
367             buffer_size,
368             false,
369         )?;
370         Self::from_pipes(pipe_a, pipe_b, buffer_size)
371     }
372     /// Creates a cross platform channel pair.
373     /// On Windows the result is in the form (server, client).
pair( blocking_mode: BlockingMode, framing_mode: FramingMode, ) -> Result<(StreamChannel, StreamChannel)>374     pub fn pair(
375         blocking_mode: BlockingMode,
376         framing_mode: FramingMode,
377     ) -> Result<(StreamChannel, StreamChannel)> {
378         let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
379             &named_pipes::FramingMode::from(framing_mode),
380             &named_pipes::BlockingMode::from(blocking_mode),
381             0,
382             DEFAULT_BUFFER_SIZE,
383             false,
384         )?;
385         Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
386     }
387 
388     /// Blocks until the pipe buffer is empty.
389     /// NOTE: that this will only work for server pipes on Windows.
flush_blocking(&self) -> io::Result<()>390     pub fn flush_blocking(&self) -> io::Result<()> {
391         self.pipe_conn.flush_data_blocking()
392     }
393 
get_read_notifier_event(&self) -> &Event394     pub(crate) fn get_read_notifier_event(&self) -> &Event {
395         &self.read_notify
396     }
397 
get_close_notifier_event(&self) -> &Event398     pub(crate) fn get_close_notifier_event(&self) -> &Event {
399         &self.pipe_closed
400     }
401 }
402 
403 impl io::Write for StreamChannel {
write(&mut self, buf: &[u8]) -> io::Result<usize>404     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
405         self.write_immutable(buf)
406     }
flush(&mut self) -> io::Result<()>407     fn flush(&mut self) -> io::Result<()> {
408         // There is no userspace buffering inside crosvm to flush for named pipes. We write
409         // directly to the named pipe using WriteFile.
410         Ok(())
411     }
412 }
413 
414 impl AsRawDescriptor for &StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor415     fn as_raw_descriptor(&self) -> RawDescriptor {
416         self.pipe_conn.as_raw_descriptor()
417     }
418 }
419 
420 impl ReadNotifier for StreamChannel {
421     /// Returns a RawDescriptor that can be polled for reads using WaitContext.
get_read_notifier(&self) -> &dyn AsRawDescriptor422     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
423         &self.read_notify
424     }
425 }
426 
427 impl CloseNotifier for StreamChannel {
get_close_notifier(&self) -> &dyn AsRawDescriptor428     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
429         &self.pipe_closed
430     }
431 }
432 
433 impl io::Read for StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>434     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
435         self.inner_read(buf)
436     }
437 }
438 
439 impl io::Read for &StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>440     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
441         self.inner_read(buf)
442     }
443 }
444 
445 impl AsRawDescriptor for StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor446     fn as_raw_descriptor(&self) -> RawDescriptor {
447         (&self).as_raw_descriptor()
448     }
449 }
450 
451 #[cfg(test)]
452 mod test {
453     use std::io::Read;
454     use std::io::Write;
455     use std::time::Duration;
456 
457     use super::super::EventContext;
458     use super::super::EventTrigger;
459     use super::*;
460     use crate::EventToken;
461     use crate::ReadNotifier;
462 
463     #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
464     enum Token {
465         ReceivedData,
466     }
467 
468     const EVENT_WAIT_TIME: Duration = Duration::from_secs(10);
469 
470     #[test]
test_read_notifies_multiple_writes()471     fn test_read_notifies_multiple_writes() {
472         let (mut sender, mut receiver) =
473             StreamChannel::pair(BlockingMode::Blocking, FramingMode::Byte).unwrap();
474         sender.write_all(&[1, 2]).unwrap();
475 
476         // Wait for the write to arrive.
477         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
478             receiver.get_read_notifier(),
479             Token::ReceivedData,
480         )])
481         .unwrap();
482         assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
483 
484         // Read just one byte. This leaves another byte in the pipe.
485         let mut recv_buffer = [0u8; 1];
486         let size = receiver.read(&mut recv_buffer).unwrap();
487         assert_eq!(size, 1);
488         assert_eq!(recv_buffer[0], 1);
489 
490         // The notifier should still be set, because the pipe has data.
491         assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
492         let size = receiver.read(&mut recv_buffer).unwrap();
493         assert_eq!(size, 1);
494         assert_eq!(recv_buffer[0], 2);
495     }
496 
497     #[test]
test_blocked_writer_wont_deadlock()498     fn test_blocked_writer_wont_deadlock() {
499         let (mut writer, mut reader) =
500             StreamChannel::pair_with_buffer_size(BlockingMode::Blocking, FramingMode::Byte, 100)
501                 .unwrap();
502         const NUM_OPS: usize = 100;
503 
504         // We set the buffer size to 100 bytes. It seems that we must exceed that buffer size by
505         // 100x before we run into a blocking write, so that's what we do here. This makes sense
506         // to a degree because the docs suggest that some automatic expansion of a pipe's buffers
507         // is supposed to be handled by the kernel.
508         let writer = std::thread::spawn(move || {
509             let buf = [0u8; 100];
510             for _ in 0..NUM_OPS {
511                 assert_eq!(writer.write(&buf).unwrap(), buf.len());
512             }
513             writer
514         });
515 
516         // The test passes if the reader can read (this used to deadlock).
517         let mut buf = [0u8; 100];
518         for _ in 0..NUM_OPS {
519             assert_eq!(reader.read(&mut buf).unwrap(), buf.len());
520         }
521 
522         // Writer must exit cleanly.
523         writer.join().unwrap();
524     }
525 
526     #[test]
test_non_blocking_pair()527     fn test_non_blocking_pair() {
528         let (mut sender, mut receiver) =
529             StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
530 
531         sender.write_all(&[75, 77, 54, 82, 76, 65]).unwrap();
532 
533         // Wait for the data to arrive.
534         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
535             receiver.get_read_notifier(),
536             Token::ReceivedData,
537         )])
538         .unwrap();
539         let events = event_ctx.wait().unwrap();
540         let tokens: Vec<Token> = events
541             .iter()
542             .filter(|e| e.is_readable)
543             .map(|e| e.token)
544             .collect();
545         assert_eq!(tokens, vec! {Token::ReceivedData});
546 
547         // Smaller than what we sent so we get multiple chunks
548         let mut recv_buffer: [u8; 4] = [0; 4];
549 
550         let mut size = receiver.read(&mut recv_buffer).unwrap();
551         assert_eq!(size, 4);
552         assert_eq!(recv_buffer, [75, 77, 54, 82]);
553 
554         size = receiver.read(&mut recv_buffer).unwrap();
555         assert_eq!(size, 2);
556         assert_eq!(recv_buffer[0..2], [76, 65]);
557 
558         // Now that we've polled for & received all data, polling again should show no events.
559         assert_eq!(
560             event_ctx
561                 .wait_timeout(std::time::Duration::new(0, 0))
562                 .unwrap()
563                 .len(),
564             0
565         );
566     }
567 
568     #[test]
test_non_blocking_pair_error_no_data()569     fn test_non_blocking_pair_error_no_data() {
570         let (mut sender, mut receiver) =
571             StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
572         receiver
573             .set_nonblocking(true)
574             .expect("Failed to set receiver to nonblocking mode.");
575 
576         sender.write_all(&[75, 77]).unwrap();
577 
578         // Wait for the data to arrive.
579         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
580             receiver.get_read_notifier(),
581             Token::ReceivedData,
582         )])
583         .unwrap();
584         let events = event_ctx.wait().unwrap();
585         let tokens: Vec<Token> = events
586             .iter()
587             .filter(|e| e.is_readable)
588             .map(|e| e.token)
589             .collect();
590         assert_eq!(tokens, vec! {Token::ReceivedData});
591 
592         // We only read 2 bytes, even though we requested 4 bytes.
593         let mut recv_buffer: [u8; 4] = [0; 4];
594         let size = receiver.read(&mut recv_buffer).unwrap();
595         assert_eq!(size, 2);
596         assert_eq!(recv_buffer, [75, 77, 00, 00]);
597 
598         // Further reads should encounter an error since there is no available data and this is a
599         // non blocking pipe.
600         assert!(receiver.read(&mut recv_buffer).is_err());
601     }
602 }
603