1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::io;
6 use std::sync::Arc;
7
8 use log::error;
9 use log::warn;
10 use serde::ser::SerializeStruct;
11 use serde::Deserialize;
12 use serde::Serialize;
13 use serde::Serializer;
14 use sync::Mutex;
15
16 use super::named_pipes;
17 use super::named_pipes::PipeConnection;
18 use super::MultiProcessMutex;
19 use super::RawDescriptor;
20 use super::Result;
21 use crate::descriptor::AsRawDescriptor;
22 use crate::CloseNotifier;
23 use crate::Event;
24 use crate::ReadNotifier;
25
26 #[derive(Copy, Clone)]
27 pub enum FramingMode {
28 Message,
29 Byte,
30 }
31
32 #[derive(Copy, Clone, PartialEq, Eq)]
33 pub enum BlockingMode {
34 Blocking,
35 Nonblocking,
36 }
37
38 impl From<FramingMode> for named_pipes::FramingMode {
from(framing_mode: FramingMode) -> Self39 fn from(framing_mode: FramingMode) -> Self {
40 match framing_mode {
41 FramingMode::Message => named_pipes::FramingMode::Message,
42 FramingMode::Byte => named_pipes::FramingMode::Byte,
43 }
44 }
45 }
46
47 impl From<BlockingMode> for named_pipes::BlockingMode {
from(blocking_mode: BlockingMode) -> Self48 fn from(blocking_mode: BlockingMode) -> Self {
49 match blocking_mode {
50 BlockingMode::Blocking => named_pipes::BlockingMode::Wait,
51 BlockingMode::Nonblocking => named_pipes::BlockingMode::NoWait,
52 }
53 }
54 }
55
56 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
57
58 /// An abstraction over named pipes and unix socketpairs.
59 ///
60 /// WARNING: partial reads of messages behave differently depending on the platform.
61 /// See sys::unix::StreamChannel::inner_read for details.
62 ///
63 /// The ReadNotifier will return an event handle that is set when data is in the channel.
64 ///
65 /// In message mode, single writes larger than
66 /// `crate::windows::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
67 ///
68 /// # Notes for maintainers
69 /// 1. This struct contains extremely subtle thread safety considerations.
70 /// 2. Serialization is not derived! New fields need to be added manually.
71 #[derive(Deserialize, Debug)]
72 pub struct StreamChannel {
73 pipe_conn: named_pipes::PipeConnection,
74 write_notify: Event,
75 read_notify: Event,
76 pipe_closed: Event,
77
78 // Held when reading on this end, to prevent additional writes from corrupting notification
79 // state.
80 remote_write_lock: MultiProcessMutex,
81
82 // Held when a write is made on this end, so that if the remote end is reading, we wait to
83 // write to avoid corrupting notification state.
84 local_write_lock: MultiProcessMutex,
85
86 // Held for the entire duration of a read. This enables the StreamChannel to be sync,
87 // ensuring there is no chance of concurrent reads creating a bad state in StreamChannel.
88 //
89 // In practice, there is no use-case for multiple threads actually contending over
90 // reading from a single pipe through StreamChannel, so this is mostly to provide a
91 // compiler guarantee while passing the StreamChannel to/from background executors.
92 //
93 // Note that this mutex does not work across processes, so the same StreamChannel end should
94 // NOT be concurrently used across process boundaries. (Odds are if you want to do this, it's
95 // not what you want. Wanting this means you want two readers on the *same end* of the pipe,
96 // which is not well defined behavior.)
97 #[serde(skip)]
98 #[serde(default = "create_read_lock")]
99 read_lock: Arc<Mutex<()>>,
100
101 // Serde only has an immutable reference. Because of that, we have to cheat to signal when this
102 // channel end has been serialized. Once serialized, we know that the current end MUST NOT
103 // signal the channel has been closed when it was dropped, because a copy of it was sent to
104 // another process. It is the copy's responsibility to close the pipe.
105 #[serde(skip)]
106 #[serde(default = "create_true_mutex")]
107 is_channel_closed_on_drop: Mutex<bool>,
108
109 // For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
110 // up to that size.
111 send_buffer_size: usize,
112 }
113
create_read_lock() -> Arc<Mutex<()>>114 fn create_read_lock() -> Arc<Mutex<()>> {
115 Arc::new(Mutex::new(()))
116 }
117
create_true_mutex() -> Mutex<bool>118 fn create_true_mutex() -> Mutex<bool> {
119 Mutex::new(true)
120 }
121
122 /// Serialize is manually implemented because we need to tell the local copy that a remote copy
123 /// exists, and to not send the close event. Our serialization is otherwise identical to what
124 /// derive would have generated.
125 impl Serialize for StreamChannel {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,126 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
127 where
128 S: Serializer,
129 {
130 let mut s = serializer.serialize_struct("StreamChannel", 7)?;
131 s.serialize_field("pipe_conn", &self.pipe_conn)?;
132 s.serialize_field("write_notify", &self.write_notify)?;
133 s.serialize_field("read_notify", &self.read_notify)?;
134 s.serialize_field("pipe_closed", &self.pipe_closed)?;
135 s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
136 s.serialize_field("local_write_lock", &self.local_write_lock)?;
137 s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
138 let ret = s.end();
139
140 // Because this end has been serialized, the serialized copy is now responsible for setting
141 // the close event.
142 if ret.is_ok() {
143 *self.is_channel_closed_on_drop.lock() = false;
144 }
145
146 ret
147 }
148 }
149
150 impl Drop for StreamChannel {
drop(&mut self)151 fn drop(&mut self) {
152 if *self.is_channel_closed_on_drop.lock() {
153 if let Err(e) = self.pipe_closed.signal() {
154 warn!("failed to notify on channel drop: {}", e);
155 }
156 }
157 }
158 }
159
160 impl StreamChannel {
set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()>161 pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
162 // Safe because the pipe is open.
163 if nonblocking {
164 self.pipe_conn
165 .set_blocking(&named_pipes::BlockingMode::NoWait)
166 } else {
167 self.pipe_conn
168 .set_blocking(&named_pipes::BlockingMode::Wait)
169 }
170 }
171
172 // WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
173 // > 1 reader per end is not defined.
try_clone(&self) -> io::Result<Self>174 pub fn try_clone(&self) -> io::Result<Self> {
175 Ok(StreamChannel {
176 pipe_conn: self.pipe_conn.try_clone()?,
177 write_notify: self.write_notify.try_clone()?,
178 read_notify: self.read_notify.try_clone()?,
179 pipe_closed: self.pipe_closed.try_clone()?,
180 remote_write_lock: self.remote_write_lock.try_clone()?,
181 local_write_lock: self.local_write_lock.try_clone()?,
182 read_lock: self.read_lock.clone(),
183 is_channel_closed_on_drop: create_true_mutex(),
184 send_buffer_size: self.send_buffer_size,
185 })
186 }
187
188 /// Gets the readable byte count. Returns zero for broken pipes since that will cause the read
189 /// notifier to be set, and for the consumer to quickly discover the broken pipe.
get_readable_byte_count(&self) -> io::Result<u32>190 fn get_readable_byte_count(&self) -> io::Result<u32> {
191 match self.pipe_conn.get_available_byte_count() {
192 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
193 Err(e) => {
194 error!("StreamChannel failed to get readable byte count: {}", e);
195 Err(e)
196 }
197 Ok(byte_count) => Ok(byte_count),
198 }
199 }
200
inner_read(&self, buf: &mut [u8]) -> io::Result<usize>201 pub(super) fn inner_read(&self, buf: &mut [u8]) -> io::Result<usize> {
202 // We ensure concurrent read safety by holding a lock for the duration of the method.
203 // (If multiple concurrent readers were permitted, the pipe could be emptied after we decide
204 // that the notifier should be set, leading to an invalid notified/readable state which
205 // could stall readers.)
206 let _read_lock = self.read_lock.lock();
207
208 // SAFETY:
209 // Safe because no partial reads are possible, and the underlying code bounds the
210 // read by buf's size.
211 let res = unsafe { self.pipe_conn.read(buf) };
212
213 // The entire goal of this complex section is to avoid the need for shared memory between
214 // each channel end to synchronize the notification state. It is very subtle, modify with
215 // care.
216 loop {
217 // No other thread is reading, so we can find out, without the write lock, whether or
218 // not we need to clear the read notifier. If we don't, then we don't even have to try
219 // acquiring the write lock. This avoids deadlocks where the pipe is full and the write
220 // side blocks on a writing with the lock held. If it looks like we do need to clear
221 // the notifier though, then we have to be sure, so we'll proceed to the next section.
222 let byte_count = self.get_readable_byte_count()?;
223 if byte_count > 0 {
224 // It's always safe to set the read notifier here because we know there is data in
225 // the pipe, and no one else could read it out from under us.
226 self.read_notify.signal().map_err(|e| {
227 io::Error::new(
228 io::ErrorKind::Other,
229 format!("failed to write to read notifier: {:?}", e),
230 )
231 })?;
232
233 // Notifier state has been safely synced.
234 return res;
235 }
236
237 // At this point, there *may* be no data in the pipe, meaning we may want to clear the
238 // notifier. Instead of just trying to acquire the lock outright which could deadlock
239 // with the writing side, we'll try with a timeout. If it fails, we know that the other
240 // side is in the middle of a write, so there either will be data in the pipe soon (a),
241 // or there won't be and we have to clear a spurious notification (b).
242 //
243 // For (a), we can safely return from the read without needing the lock, so we just come
244 // around in the loop to check again, and the loop will exit quickly.
245 //
246 // For (b) we'll return to this point and acquire the lock, as we're just waiting for
247 // the spurious notification to arrive so we can clear it (that code path is very fast),
248 // and the loop will exit.
249 //
250 // If we successfully acquire the lock though, then we can go ahead and clear the
251 // notifier if the pipe is indeed empty, because we are assured that no writes are
252 // happening (we hold the lock). Here, we wait up to 1ms to acquire the lock because
253 // that's a decent balance between avoiding an unnecessary iteration, and minimizing
254 // latency.
255 if let Some(_write_lock) = self.remote_write_lock.try_lock(/* timeout_ms= */ 1) {
256 let byte_count = self.get_readable_byte_count()?;
257 if byte_count > 0 {
258 // Safe because no one else can be reading from the pipe.
259 self.read_notify.signal().map_err(|e| {
260 io::Error::new(
261 io::ErrorKind::Other,
262 format!("failed to write to read notifier: {:?}", e),
263 )
264 })?;
265 } else {
266 // Safe because no other writes can be happening (_lock is held).
267 self.read_notify.reset().map_err(|e| {
268 io::Error::new(
269 io::ErrorKind::Other,
270 format!("failed to reset read notifier: {:?}", e),
271 )
272 })?;
273 }
274
275 // Notifier state has been safely synced.
276 return res;
277 }
278 }
279 }
280
281 /// Exists as a workaround for Tube which does not expect its transport to be mutable,
282 /// even though io::Write requires it.
write_immutable(&self, buf: &[u8]) -> io::Result<usize>283 pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
284 if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
285 && buf.len() > self.send_buffer_size
286 {
287 return Err(io::Error::new(
288 io::ErrorKind::Other,
289 format!(
290 "StreamChannel forbids message mode writes larger than the \
291 default buffer size of {}.",
292 self.send_buffer_size,
293 ),
294 ));
295 }
296
297 let _lock = self.local_write_lock.lock();
298 let res = self.pipe_conn.write(buf);
299
300 // We can always set the write notifier because we know that the reader is in one of the
301 // following states:
302 // 1) a read is running, and it consumes these bytes, so the notification is
303 // unnecessary. That's fine, because the reader will resync the notifier state once
304 // it finishes reading.
305 // 2) a read has completed and is blocked on the lock. The notification state is
306 // already correct, and the read's resync won't change that.
307 if res.is_ok() {
308 self.write_notify.signal().map_err(|e| {
309 io::Error::new(
310 io::ErrorKind::Other,
311 format!("failed to write to read notifier: {:?}", e),
312 )
313 })?;
314 }
315
316 res
317 }
318
319 /// This only works with empty pipes. U.B. will result if used in any other scenario.
from_pipes( pipe_a: PipeConnection, pipe_b: PipeConnection, send_buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>320 pub fn from_pipes(
321 pipe_a: PipeConnection,
322 pipe_b: PipeConnection,
323 send_buffer_size: usize,
324 ) -> Result<(StreamChannel, StreamChannel)> {
325 let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
326 let pipe_closed = Event::new()?;
327
328 let write_lock_a = MultiProcessMutex::new()?;
329 let write_lock_b = MultiProcessMutex::new()?;
330
331 let sock_a = StreamChannel {
332 pipe_conn: pipe_a,
333 write_notify: notify_a_write.try_clone()?,
334 read_notify: notify_b_write.try_clone()?,
335 read_lock: Arc::new(Mutex::new(())),
336 local_write_lock: write_lock_a.try_clone()?,
337 remote_write_lock: write_lock_b.try_clone()?,
338 pipe_closed: pipe_closed.try_clone()?,
339 is_channel_closed_on_drop: create_true_mutex(),
340 send_buffer_size,
341 };
342 let sock_b = StreamChannel {
343 pipe_conn: pipe_b,
344 write_notify: notify_b_write,
345 read_notify: notify_a_write,
346 read_lock: Arc::new(Mutex::new(())),
347 local_write_lock: write_lock_b,
348 remote_write_lock: write_lock_a,
349 pipe_closed,
350 is_channel_closed_on_drop: create_true_mutex(),
351 send_buffer_size,
352 };
353 Ok((sock_a, sock_b))
354 }
355
356 /// Create a pair with a specific buffer size. Note that this is the only way to send messages
357 /// larger than the default named pipe buffer size.
pair_with_buffer_size( blocking_mode: BlockingMode, framing_mode: FramingMode, buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>358 pub fn pair_with_buffer_size(
359 blocking_mode: BlockingMode,
360 framing_mode: FramingMode,
361 buffer_size: usize,
362 ) -> Result<(StreamChannel, StreamChannel)> {
363 let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
364 &named_pipes::FramingMode::from(framing_mode),
365 &named_pipes::BlockingMode::from(blocking_mode),
366 0,
367 buffer_size,
368 false,
369 )?;
370 Self::from_pipes(pipe_a, pipe_b, buffer_size)
371 }
372 /// Creates a cross platform channel pair.
373 /// On Windows the result is in the form (server, client).
pair( blocking_mode: BlockingMode, framing_mode: FramingMode, ) -> Result<(StreamChannel, StreamChannel)>374 pub fn pair(
375 blocking_mode: BlockingMode,
376 framing_mode: FramingMode,
377 ) -> Result<(StreamChannel, StreamChannel)> {
378 let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
379 &named_pipes::FramingMode::from(framing_mode),
380 &named_pipes::BlockingMode::from(blocking_mode),
381 0,
382 DEFAULT_BUFFER_SIZE,
383 false,
384 )?;
385 Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
386 }
387
388 /// Blocks until the pipe buffer is empty.
389 /// NOTE: that this will only work for server pipes on Windows.
flush_blocking(&self) -> io::Result<()>390 pub fn flush_blocking(&self) -> io::Result<()> {
391 self.pipe_conn.flush_data_blocking()
392 }
393
get_read_notifier_event(&self) -> &Event394 pub(crate) fn get_read_notifier_event(&self) -> &Event {
395 &self.read_notify
396 }
397
get_close_notifier_event(&self) -> &Event398 pub(crate) fn get_close_notifier_event(&self) -> &Event {
399 &self.pipe_closed
400 }
401 }
402
403 impl io::Write for StreamChannel {
write(&mut self, buf: &[u8]) -> io::Result<usize>404 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
405 self.write_immutable(buf)
406 }
flush(&mut self) -> io::Result<()>407 fn flush(&mut self) -> io::Result<()> {
408 // There is no userspace buffering inside crosvm to flush for named pipes. We write
409 // directly to the named pipe using WriteFile.
410 Ok(())
411 }
412 }
413
414 impl AsRawDescriptor for &StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor415 fn as_raw_descriptor(&self) -> RawDescriptor {
416 self.pipe_conn.as_raw_descriptor()
417 }
418 }
419
420 impl ReadNotifier for StreamChannel {
421 /// Returns a RawDescriptor that can be polled for reads using WaitContext.
get_read_notifier(&self) -> &dyn AsRawDescriptor422 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
423 &self.read_notify
424 }
425 }
426
427 impl CloseNotifier for StreamChannel {
get_close_notifier(&self) -> &dyn AsRawDescriptor428 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
429 &self.pipe_closed
430 }
431 }
432
433 impl io::Read for StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>434 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
435 self.inner_read(buf)
436 }
437 }
438
439 impl io::Read for &StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>440 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
441 self.inner_read(buf)
442 }
443 }
444
445 impl AsRawDescriptor for StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor446 fn as_raw_descriptor(&self) -> RawDescriptor {
447 (&self).as_raw_descriptor()
448 }
449 }
450
451 #[cfg(test)]
452 mod test {
453 use std::io::Read;
454 use std::io::Write;
455 use std::time::Duration;
456
457 use super::super::EventContext;
458 use super::super::EventTrigger;
459 use super::*;
460 use crate::EventToken;
461 use crate::ReadNotifier;
462
463 #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
464 enum Token {
465 ReceivedData,
466 }
467
468 const EVENT_WAIT_TIME: Duration = Duration::from_secs(10);
469
470 #[test]
test_read_notifies_multiple_writes()471 fn test_read_notifies_multiple_writes() {
472 let (mut sender, mut receiver) =
473 StreamChannel::pair(BlockingMode::Blocking, FramingMode::Byte).unwrap();
474 sender.write_all(&[1, 2]).unwrap();
475
476 // Wait for the write to arrive.
477 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
478 receiver.get_read_notifier(),
479 Token::ReceivedData,
480 )])
481 .unwrap();
482 assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
483
484 // Read just one byte. This leaves another byte in the pipe.
485 let mut recv_buffer = [0u8; 1];
486 let size = receiver.read(&mut recv_buffer).unwrap();
487 assert_eq!(size, 1);
488 assert_eq!(recv_buffer[0], 1);
489
490 // The notifier should still be set, because the pipe has data.
491 assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
492 let size = receiver.read(&mut recv_buffer).unwrap();
493 assert_eq!(size, 1);
494 assert_eq!(recv_buffer[0], 2);
495 }
496
497 #[test]
test_blocked_writer_wont_deadlock()498 fn test_blocked_writer_wont_deadlock() {
499 let (mut writer, mut reader) =
500 StreamChannel::pair_with_buffer_size(BlockingMode::Blocking, FramingMode::Byte, 100)
501 .unwrap();
502 const NUM_OPS: usize = 100;
503
504 // We set the buffer size to 100 bytes. It seems that we must exceed that buffer size by
505 // 100x before we run into a blocking write, so that's what we do here. This makes sense
506 // to a degree because the docs suggest that some automatic expansion of a pipe's buffers
507 // is supposed to be handled by the kernel.
508 let writer = std::thread::spawn(move || {
509 let buf = [0u8; 100];
510 for _ in 0..NUM_OPS {
511 assert_eq!(writer.write(&buf).unwrap(), buf.len());
512 }
513 writer
514 });
515
516 // The test passes if the reader can read (this used to deadlock).
517 let mut buf = [0u8; 100];
518 for _ in 0..NUM_OPS {
519 assert_eq!(reader.read(&mut buf).unwrap(), buf.len());
520 }
521
522 // Writer must exit cleanly.
523 writer.join().unwrap();
524 }
525
526 #[test]
test_non_blocking_pair()527 fn test_non_blocking_pair() {
528 let (mut sender, mut receiver) =
529 StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
530
531 sender.write_all(&[75, 77, 54, 82, 76, 65]).unwrap();
532
533 // Wait for the data to arrive.
534 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
535 receiver.get_read_notifier(),
536 Token::ReceivedData,
537 )])
538 .unwrap();
539 let events = event_ctx.wait().unwrap();
540 let tokens: Vec<Token> = events
541 .iter()
542 .filter(|e| e.is_readable)
543 .map(|e| e.token)
544 .collect();
545 assert_eq!(tokens, vec! {Token::ReceivedData});
546
547 // Smaller than what we sent so we get multiple chunks
548 let mut recv_buffer: [u8; 4] = [0; 4];
549
550 let mut size = receiver.read(&mut recv_buffer).unwrap();
551 assert_eq!(size, 4);
552 assert_eq!(recv_buffer, [75, 77, 54, 82]);
553
554 size = receiver.read(&mut recv_buffer).unwrap();
555 assert_eq!(size, 2);
556 assert_eq!(recv_buffer[0..2], [76, 65]);
557
558 // Now that we've polled for & received all data, polling again should show no events.
559 assert_eq!(
560 event_ctx
561 .wait_timeout(std::time::Duration::new(0, 0))
562 .unwrap()
563 .len(),
564 0
565 );
566 }
567
568 #[test]
test_non_blocking_pair_error_no_data()569 fn test_non_blocking_pair_error_no_data() {
570 let (mut sender, mut receiver) =
571 StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
572 receiver
573 .set_nonblocking(true)
574 .expect("Failed to set receiver to nonblocking mode.");
575
576 sender.write_all(&[75, 77]).unwrap();
577
578 // Wait for the data to arrive.
579 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
580 receiver.get_read_notifier(),
581 Token::ReceivedData,
582 )])
583 .unwrap();
584 let events = event_ctx.wait().unwrap();
585 let tokens: Vec<Token> = events
586 .iter()
587 .filter(|e| e.is_readable)
588 .map(|e| e.token)
589 .collect();
590 assert_eq!(tokens, vec! {Token::ReceivedData});
591
592 // We only read 2 bytes, even though we requested 4 bytes.
593 let mut recv_buffer: [u8; 4] = [0; 4];
594 let size = receiver.read(&mut recv_buffer).unwrap();
595 assert_eq!(size, 2);
596 assert_eq!(recv_buffer, [75, 77, 00, 00]);
597
598 // Further reads should encounter an error since there is no available data and this is a
599 // non blocking pipe.
600 assert!(receiver.read(&mut recv_buffer).is_err());
601 }
602 }
603