• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::RefCell;
6 use std::io;
7 use std::sync::Arc;
8 
9 use log::error;
10 use log::warn;
11 use serde::ser::SerializeStruct;
12 use serde::Deserialize;
13 use serde::Serialize;
14 use serde::Serializer;
15 use sync::Mutex;
16 
17 use super::named_pipes;
18 use super::named_pipes::PipeConnection;
19 use super::MultiProcessMutex;
20 use super::RawDescriptor;
21 use super::Result;
22 use crate::descriptor::AsRawDescriptor;
23 use crate::CloseNotifier;
24 use crate::Event;
25 use crate::ReadNotifier;
26 
27 #[derive(Copy, Clone)]
28 pub enum FramingMode {
29     Message,
30     Byte,
31 }
32 
33 #[derive(Copy, Clone, PartialEq, Eq)]
34 pub enum BlockingMode {
35     Blocking,
36     Nonblocking,
37 }
38 
39 impl From<FramingMode> for named_pipes::FramingMode {
from(framing_mode: FramingMode) -> Self40     fn from(framing_mode: FramingMode) -> Self {
41         match framing_mode {
42             FramingMode::Message => named_pipes::FramingMode::Message,
43             FramingMode::Byte => named_pipes::FramingMode::Byte,
44         }
45     }
46 }
47 
48 impl From<BlockingMode> for named_pipes::BlockingMode {
from(blocking_mode: BlockingMode) -> Self49     fn from(blocking_mode: BlockingMode) -> Self {
50         match blocking_mode {
51             BlockingMode::Blocking => named_pipes::BlockingMode::Wait,
52             BlockingMode::Nonblocking => named_pipes::BlockingMode::NoWait,
53         }
54     }
55 }
56 
57 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
58 
59 /// An abstraction over named pipes and unix socketpairs.
60 ///
61 /// The ReadNotifier will return an event handle that is set when data is in the channel.
62 ///
63 /// In message mode, single writes larger than
64 /// `crate::windows::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
65 ///
66 /// # Notes for maintainers
67 /// 1. This struct contains extremely subtle thread safety considerations.
68 /// 2. Serialization is not derived! New fields need to be added manually.
69 #[derive(Deserialize, Debug)]
70 pub struct StreamChannel {
71     pipe_conn: named_pipes::PipeConnection,
72     write_notify: Event,
73     read_notify: Event,
74     pipe_closed: Event,
75 
76     // Held when reading on this end, to prevent additional writes from corrupting notification
77     // state.
78     remote_write_lock: MultiProcessMutex,
79 
80     // Held when a write is made on this end, so that if the remote end is reading, we wait to
81     // write to avoid corrupting notification state.
82     local_write_lock: MultiProcessMutex,
83 
84     // Held for the entire duration of a read. This enables the StreamChannel to be sync,
85     // ensuring there is no chance of concurrent reads creating a bad state in StreamChannel.
86     //
87     // In practice, there is no use-case for multiple threads actually contending over
88     // reading from a single pipe through StreamChannel, so this is mostly to provide a
89     // compiler guarantee while passing the StreamChannel to/from background executors.
90     //
91     // Note that this mutex does not work across processes, so the same StreamChannel end should
92     // NOT be concurrently used across process boundaries. (Odds are if you want to do this, it's
93     // not what you want. Wanting this means you want two readers on the *same end* of the pipe,
94     // which is not well defined behavior.)
95     #[serde(skip)]
96     #[serde(default = "create_read_lock")]
97     read_lock: Arc<Mutex<()>>,
98 
99     // Serde only has an immutable reference. Because of that, we have to cheat to signal when this
100     // channel end has been serialized. Once serialized, we know that the current end MUST NOT
101     // signal the channel has been closed when it was dropped, because a copy of it was sent to
102     // another process. It is the copy's responsibility to close the pipe.
103     #[serde(skip)]
104     #[serde(default = "create_true_cell")]
105     is_channel_closed_on_drop: RefCell<bool>,
106 
107     // For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
108     // up to that size.
109     send_buffer_size: usize,
110 }
111 
create_read_lock() -> Arc<Mutex<()>>112 fn create_read_lock() -> Arc<Mutex<()>> {
113     Arc::new(Mutex::new(()))
114 }
115 
create_true_cell() -> RefCell<bool>116 fn create_true_cell() -> RefCell<bool> {
117     RefCell::new(true)
118 }
119 
120 /// Serialize is manually implemented because we need to tell the local copy that a remote copy
121 /// exists, and to not send the close event. Our serialization is otherwise identical to what
122 /// derive would have generated.
123 impl Serialize for StreamChannel {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,124     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
125     where
126         S: Serializer,
127     {
128         let mut s = serializer.serialize_struct("StreamChannel", 7)?;
129         s.serialize_field("pipe_conn", &self.pipe_conn)?;
130         s.serialize_field("write_notify", &self.write_notify)?;
131         s.serialize_field("read_notify", &self.read_notify)?;
132         s.serialize_field("pipe_closed", &self.pipe_closed)?;
133         s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
134         s.serialize_field("local_write_lock", &self.local_write_lock)?;
135         s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
136         let ret = s.end();
137 
138         // Because this end has been serialized, the serialized copy is now responsible for setting
139         // the close event.
140         if ret.is_ok() {
141             *self.is_channel_closed_on_drop.borrow_mut() = false;
142         }
143 
144         ret
145     }
146 }
147 
148 impl Drop for StreamChannel {
drop(&mut self)149     fn drop(&mut self) {
150         if *self.is_channel_closed_on_drop.borrow() {
151             if let Err(e) = self.pipe_closed.signal() {
152                 warn!("failed to notify on channel drop: {}", e);
153             }
154         }
155     }
156 }
157 
158 impl StreamChannel {
set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()>159     pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
160         // Safe because the pipe is open.
161         if nonblocking {
162             self.pipe_conn
163                 .set_blocking(&named_pipes::BlockingMode::NoWait)
164         } else {
165             self.pipe_conn
166                 .set_blocking(&named_pipes::BlockingMode::Wait)
167         }
168     }
169 
170     // WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
171     // > 1 reader per end is not defined.
try_clone(&self) -> io::Result<Self>172     pub fn try_clone(&self) -> io::Result<Self> {
173         Ok(StreamChannel {
174             pipe_conn: self.pipe_conn.try_clone()?,
175             write_notify: self.write_notify.try_clone()?,
176             read_notify: self.read_notify.try_clone()?,
177             pipe_closed: self.pipe_closed.try_clone()?,
178             remote_write_lock: self.remote_write_lock.try_clone()?,
179             local_write_lock: self.local_write_lock.try_clone()?,
180             read_lock: self.read_lock.clone(),
181             is_channel_closed_on_drop: create_true_cell(),
182             send_buffer_size: self.send_buffer_size,
183         })
184     }
185 
186     /// Gets the readable byte count. Returns zero for broken pipes since that will cause the read
187     /// notifier to be set, and for the consumer to quickly discover the broken pipe.
get_readable_byte_count(&self) -> io::Result<u32>188     fn get_readable_byte_count(&self) -> io::Result<u32> {
189         match self.pipe_conn.get_available_byte_count() {
190             Err(e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
191             Err(e) => {
192                 error!("StreamChannel failed to get readable byte count: {}", e);
193                 Err(e)
194             }
195             Ok(byte_count) => Ok(byte_count),
196         }
197     }
198 
inner_read(&self, buf: &mut [u8]) -> io::Result<usize>199     pub(super) fn inner_read(&self, buf: &mut [u8]) -> io::Result<usize> {
200         // We ensure concurrent read safety by holding a lock for the duration of the method.
201         // (If multiple concurrent readers were permitted, the pipe could be emptied after we decide
202         // that the notifier should be set, leading to an invalid notified/readable state which
203         // could stall readers.)
204         let _read_lock = self.read_lock.lock();
205 
206         // SAFETY:
207         // Safe because no partial reads are possible, and the underlying code bounds the
208         // read by buf's size.
209         let res = unsafe { self.pipe_conn.read(buf) };
210 
211         // The entire goal of this complex section is to avoid the need for shared memory between
212         // each channel end to synchronize the notification state. It is very subtle, modify with
213         // care.
214         loop {
215             // No other thread is reading, so we can find out, without the write lock, whether or
216             // not we need to clear the read notifier. If we don't, then we don't even have to try
217             // acquiring the write lock. This avoids deadlocks where the pipe is full and the write
218             // side blocks on a writing with the lock held. If it looks like we do need to clear
219             // the notifier though, then we have to be sure, so we'll proceed to the next section.
220             let byte_count = self.get_readable_byte_count()?;
221             if byte_count > 0 {
222                 // It's always safe to set the read notifier here because we know there is data in
223                 // the pipe, and no one else could read it out from under us.
224                 self.read_notify.signal().map_err(|e| {
225                     io::Error::new(
226                         io::ErrorKind::Other,
227                         format!("failed to write to read notifier: {:?}", e),
228                     )
229                 })?;
230 
231                 // Notifier state has been safely synced.
232                 return res;
233             }
234 
235             // At this point, there *may* be no data in the pipe, meaning we may want to clear the
236             // notifier. Instead of just trying to acquire the lock outright which could deadlock
237             // with the writing side, we'll try with a timeout. If it fails, we know that the other
238             // side is in the middle of a write, so there either will be data in the pipe soon (a),
239             // or there won't be and we have to clear a spurious notification (b).
240             //
241             // For (a), we can safely return from the read without needing the lock, so we just come
242             // around in the loop to check again, and the loop will exit quickly.
243             //
244             // For (b) we'll return to this point and acquire the lock, as we're just waiting for
245             // the spurious notification to arrive so we can clear it (that code path is very fast),
246             // and the loop will exit.
247             //
248             // If we successfully acquire the lock though, then we can go ahead and clear the
249             // notifier if the pipe is indeed empty, because we are assured that no writes are
250             // happening (we hold the lock). Here, we wait up to 1ms to acquire the lock because
251             // that's a decent balance between avoiding an unnecessary iteration, and minimizing
252             // latency.
253             if let Some(_write_lock) = self.remote_write_lock.try_lock(/* timeout_ms= */ 1) {
254                 let byte_count = self.get_readable_byte_count()?;
255                 if byte_count > 0 {
256                     // Safe because no one else can be reading from the pipe.
257                     self.read_notify.signal().map_err(|e| {
258                         io::Error::new(
259                             io::ErrorKind::Other,
260                             format!("failed to write to read notifier: {:?}", e),
261                         )
262                     })?;
263                 } else {
264                     // Safe because no other writes can be happening (_lock is held).
265                     self.read_notify.reset().map_err(|e| {
266                         io::Error::new(
267                             io::ErrorKind::Other,
268                             format!("failed to reset read notifier: {:?}", e),
269                         )
270                     })?;
271                 }
272 
273                 // Notifier state has been safely synced.
274                 return res;
275             }
276         }
277     }
278 
279     /// Exists as a workaround for Tube which does not expect its transport to be mutable,
280     /// even though io::Write requires it.
write_immutable(&self, buf: &[u8]) -> io::Result<usize>281     pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
282         if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
283             && buf.len() > self.send_buffer_size
284         {
285             return Err(io::Error::new(
286                 io::ErrorKind::Other,
287                 format!(
288                     "StreamChannel forbids message mode writes larger than the \
289                      default buffer size of {}.",
290                     self.send_buffer_size,
291                 ),
292             ));
293         }
294 
295         let _lock = self.local_write_lock.lock();
296         let res = self.pipe_conn.write(buf);
297 
298         // We can always set the write notifier because we know that the reader is in one of the
299         // following states:
300         //      1) a read is running, and it consumes these bytes, so the notification is
301         //         unnecessary. That's fine, because the reader will resync the notifier state once
302         //         it finishes reading.
303         //      2) a read has completed and is blocked on the lock. The notification state is
304         //         already correct, and the read's resync won't change that.
305         if res.is_ok() {
306             self.write_notify.signal().map_err(|e| {
307                 io::Error::new(
308                     io::ErrorKind::Other,
309                     format!("failed to write to read notifier: {:?}", e),
310                 )
311             })?;
312         }
313 
314         res
315     }
316 
317     /// This only works with empty pipes. U.B. will result if used in any other scenario.
from_pipes( pipe_a: PipeConnection, pipe_b: PipeConnection, send_buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>318     pub fn from_pipes(
319         pipe_a: PipeConnection,
320         pipe_b: PipeConnection,
321         send_buffer_size: usize,
322     ) -> Result<(StreamChannel, StreamChannel)> {
323         let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
324         let pipe_closed = Event::new()?;
325 
326         let write_lock_a = MultiProcessMutex::new()?;
327         let write_lock_b = MultiProcessMutex::new()?;
328 
329         let sock_a = StreamChannel {
330             pipe_conn: pipe_a,
331             write_notify: notify_a_write.try_clone()?,
332             read_notify: notify_b_write.try_clone()?,
333             read_lock: Arc::new(Mutex::new(())),
334             local_write_lock: write_lock_a.try_clone()?,
335             remote_write_lock: write_lock_b.try_clone()?,
336             pipe_closed: pipe_closed.try_clone()?,
337             is_channel_closed_on_drop: create_true_cell(),
338             send_buffer_size,
339         };
340         let sock_b = StreamChannel {
341             pipe_conn: pipe_b,
342             write_notify: notify_b_write,
343             read_notify: notify_a_write,
344             read_lock: Arc::new(Mutex::new(())),
345             local_write_lock: write_lock_b,
346             remote_write_lock: write_lock_a,
347             pipe_closed,
348             is_channel_closed_on_drop: create_true_cell(),
349             send_buffer_size,
350         };
351         Ok((sock_a, sock_b))
352     }
353 
354     /// Create a pair with a specific buffer size. Note that this is the only way to send messages
355     /// larger than the default named pipe buffer size.
pair_with_buffer_size( blocking_mode: BlockingMode, framing_mode: FramingMode, buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>356     pub fn pair_with_buffer_size(
357         blocking_mode: BlockingMode,
358         framing_mode: FramingMode,
359         buffer_size: usize,
360     ) -> Result<(StreamChannel, StreamChannel)> {
361         let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
362             &named_pipes::FramingMode::from(framing_mode),
363             &named_pipes::BlockingMode::from(blocking_mode),
364             0,
365             buffer_size,
366             false,
367         )?;
368         Self::from_pipes(pipe_a, pipe_b, buffer_size)
369     }
370     /// Creates a cross platform channel pair.
371     /// On Windows the result is in the form (server, client).
pair( blocking_mode: BlockingMode, framing_mode: FramingMode, ) -> Result<(StreamChannel, StreamChannel)>372     pub fn pair(
373         blocking_mode: BlockingMode,
374         framing_mode: FramingMode,
375     ) -> Result<(StreamChannel, StreamChannel)> {
376         let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
377             &named_pipes::FramingMode::from(framing_mode),
378             &named_pipes::BlockingMode::from(blocking_mode),
379             0,
380             DEFAULT_BUFFER_SIZE,
381             false,
382         )?;
383         Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
384     }
385 
386     /// Blocks until the pipe buffer is empty.
387     /// NOTE: that this will only work for server pipes on Windows.
flush_blocking(&self) -> io::Result<()>388     pub fn flush_blocking(&self) -> io::Result<()> {
389         self.pipe_conn.flush_data_blocking()
390     }
391 }
392 
393 impl io::Write for StreamChannel {
write(&mut self, buf: &[u8]) -> io::Result<usize>394     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
395         self.write_immutable(buf)
396     }
flush(&mut self) -> io::Result<()>397     fn flush(&mut self) -> io::Result<()> {
398         // There is no userspace buffering inside crosvm to flush for named pipes. We write
399         // directly to the named pipe using WriteFile.
400         Ok(())
401     }
402 }
403 
404 impl AsRawDescriptor for &StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor405     fn as_raw_descriptor(&self) -> RawDescriptor {
406         self.pipe_conn.as_raw_descriptor()
407     }
408 }
409 
410 impl ReadNotifier for StreamChannel {
411     /// Returns a RawDescriptor that can be polled for reads using WaitContext.
get_read_notifier(&self) -> &dyn AsRawDescriptor412     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
413         &self.read_notify
414     }
415 }
416 
417 impl CloseNotifier for StreamChannel {
get_close_notifier(&self) -> &dyn AsRawDescriptor418     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
419         &self.pipe_closed
420     }
421 }
422 
423 impl io::Read for StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>424     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
425         self.inner_read(buf)
426     }
427 }
428 
429 impl io::Read for &StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>430     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
431         self.inner_read(buf)
432     }
433 }
434 
435 impl AsRawDescriptor for StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor436     fn as_raw_descriptor(&self) -> RawDescriptor {
437         (&self).as_raw_descriptor()
438     }
439 }
440 
441 #[cfg(test)]
442 mod test {
443     use std::io::Read;
444     use std::io::Write;
445     use std::time::Duration;
446 
447     use super::super::EventContext;
448     use super::super::EventTrigger;
449     use super::*;
450     use crate::EventToken;
451     use crate::ReadNotifier;
452 
453     #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
454     enum Token {
455         ReceivedData,
456     }
457 
458     const EVENT_WAIT_TIME: Duration = Duration::from_secs(10);
459 
460     #[test]
test_read_notifies_multiple_writes()461     fn test_read_notifies_multiple_writes() {
462         let (mut sender, mut receiver) =
463             StreamChannel::pair(BlockingMode::Blocking, FramingMode::Byte).unwrap();
464         sender.write_all(&[1, 2]).unwrap();
465 
466         // Wait for the write to arrive.
467         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
468             receiver.get_read_notifier(),
469             Token::ReceivedData,
470         )])
471         .unwrap();
472         assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
473 
474         // Read just one byte. This leaves another byte in the pipe.
475         let mut recv_buffer = [0u8; 1];
476         let size = receiver.read(&mut recv_buffer).unwrap();
477         assert_eq!(size, 1);
478         assert_eq!(recv_buffer[0], 1);
479 
480         // The notifier should still be set, because the pipe has data.
481         assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
482         let size = receiver.read(&mut recv_buffer).unwrap();
483         assert_eq!(size, 1);
484         assert_eq!(recv_buffer[0], 2);
485     }
486 
487     #[test]
test_blocked_writer_wont_deadlock()488     fn test_blocked_writer_wont_deadlock() {
489         let (mut writer, mut reader) =
490             StreamChannel::pair_with_buffer_size(BlockingMode::Blocking, FramingMode::Byte, 100)
491                 .unwrap();
492         const NUM_OPS: usize = 100;
493 
494         // We set the buffer size to 100 bytes. It seems that we must exceed that buffer size by
495         // 100x before we run into a blocking write, so that's what we do here. This makes sense
496         // to a degree because the docs suggest that some automatic expansion of a pipe's buffers
497         // is supposed to be handled by the kernel.
498         let writer = std::thread::spawn(move || {
499             let buf = [0u8; 100];
500             for _ in 0..NUM_OPS {
501                 assert_eq!(writer.write(&buf).unwrap(), buf.len());
502             }
503             writer
504         });
505 
506         // The test passes if the reader can read (this used to deadlock).
507         let mut buf = [0u8; 100];
508         for _ in 0..NUM_OPS {
509             assert_eq!(reader.read(&mut buf).unwrap(), buf.len());
510         }
511 
512         // Writer must exit cleanly.
513         writer.join().unwrap();
514     }
515 
516     #[test]
test_non_blocking_pair()517     fn test_non_blocking_pair() {
518         let (mut sender, mut receiver) =
519             StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
520 
521         sender.write_all(&[75, 77, 54, 82, 76, 65]).unwrap();
522 
523         // Wait for the data to arrive.
524         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
525             receiver.get_read_notifier(),
526             Token::ReceivedData,
527         )])
528         .unwrap();
529         let events = event_ctx.wait().unwrap();
530         let tokens: Vec<Token> = events
531             .iter()
532             .filter(|e| e.is_readable)
533             .map(|e| e.token)
534             .collect();
535         assert_eq!(tokens, vec! {Token::ReceivedData});
536 
537         // Smaller than what we sent so we get multiple chunks
538         let mut recv_buffer: [u8; 4] = [0; 4];
539 
540         let mut size = receiver.read(&mut recv_buffer).unwrap();
541         assert_eq!(size, 4);
542         assert_eq!(recv_buffer, [75, 77, 54, 82]);
543 
544         size = receiver.read(&mut recv_buffer).unwrap();
545         assert_eq!(size, 2);
546         assert_eq!(recv_buffer[0..2], [76, 65]);
547 
548         // Now that we've polled for & received all data, polling again should show no events.
549         assert_eq!(
550             event_ctx
551                 .wait_timeout(std::time::Duration::new(0, 0))
552                 .unwrap()
553                 .len(),
554             0
555         );
556     }
557 
558     #[test]
test_non_blocking_pair_error_no_data()559     fn test_non_blocking_pair_error_no_data() {
560         let (mut sender, mut receiver) =
561             StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
562         receiver
563             .set_nonblocking(true)
564             .expect("Failed to set receiver to nonblocking mode.");
565 
566         sender.write_all(&[75, 77]).unwrap();
567 
568         // Wait for the data to arrive.
569         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
570             receiver.get_read_notifier(),
571             Token::ReceivedData,
572         )])
573         .unwrap();
574         let events = event_ctx.wait().unwrap();
575         let tokens: Vec<Token> = events
576             .iter()
577             .filter(|e| e.is_readable)
578             .map(|e| e.token)
579             .collect();
580         assert_eq!(tokens, vec! {Token::ReceivedData});
581 
582         // We only read 2 bytes, even though we requested 4 bytes.
583         let mut recv_buffer: [u8; 4] = [0; 4];
584         let size = receiver.read(&mut recv_buffer).unwrap();
585         assert_eq!(size, 2);
586         assert_eq!(recv_buffer, [75, 77, 00, 00]);
587 
588         // Further reads should encounter an error since there is no available data and this is a
589         // non blocking pipe.
590         assert!(receiver.read(&mut recv_buffer).is_err());
591     }
592 }
593