• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::RefCell;
6 use std::io;
7 use std::sync::Arc;
8 
9 use log::error;
10 use log::warn;
11 use serde::ser::SerializeStruct;
12 use serde::Deserialize;
13 use serde::Serialize;
14 use serde::Serializer;
15 use sync::Mutex;
16 
17 use super::named_pipes;
18 use super::named_pipes::PipeConnection;
19 use super::MultiProcessMutex;
20 use super::RawDescriptor;
21 use super::Result;
22 use crate::descriptor::AsRawDescriptor;
23 use crate::CloseNotifier;
24 use crate::Event;
25 use crate::ReadNotifier;
26 
27 #[derive(Copy, Clone)]
28 pub enum FramingMode {
29     Message,
30     Byte,
31 }
32 
33 #[derive(Copy, Clone, PartialEq, Eq)]
34 pub enum BlockingMode {
35     Blocking,
36     Nonblocking,
37 }
38 
39 impl From<FramingMode> for named_pipes::FramingMode {
from(framing_mode: FramingMode) -> Self40     fn from(framing_mode: FramingMode) -> Self {
41         match framing_mode {
42             FramingMode::Message => named_pipes::FramingMode::Message,
43             FramingMode::Byte => named_pipes::FramingMode::Byte,
44         }
45     }
46 }
47 
48 impl From<BlockingMode> for named_pipes::BlockingMode {
from(blocking_mode: BlockingMode) -> Self49     fn from(blocking_mode: BlockingMode) -> Self {
50         match blocking_mode {
51             BlockingMode::Blocking => named_pipes::BlockingMode::Wait,
52             BlockingMode::Nonblocking => named_pipes::BlockingMode::NoWait,
53         }
54     }
55 }
56 
57 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
58 
59 /// An abstraction over named pipes and unix socketpairs.
60 ///
61 /// The ReadNotifier will return an event handle that is set when data is in the channel.
62 ///
63 /// In message mode, single writes larger than
64 /// `crate::platform::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
65 ///
66 /// # Notes for maintainers
67 /// 1. This struct contains extremely subtle thread safety considerations.
68 /// 2. Serialization is not derived! New fields need to be added manually.
69 #[derive(Deserialize, Debug)]
70 pub struct StreamChannel {
71     pipe_conn: named_pipes::PipeConnection,
72     write_notify: Event,
73     read_notify: Event,
74     pipe_closed: Event,
75 
76     // Held when reading on this end, to prevent additional writes from corrupting notification
77     // state.
78     remote_write_lock: MultiProcessMutex,
79 
80     // Held when a write is made on this end, so that if the remote end is reading, we wait to
81     // write to avoid corrupting notification state.
82     local_write_lock: MultiProcessMutex,
83 
84     // Held for the entire duration of a read. This enables the StreamChannel to be sync,
85     // ensuring there is no chance of concurrent reads creating a bad state in StreamChannel.
86     //
87     // In practice, there is no use-case for multiple threads actually contending over
88     // reading from a single pipe through StreamChannel, so this is mostly to provide a
89     // compiler guarantee while passing the StreamChannel to/from background executors.
90     //
91     // Note that this mutex does not work across processes, so the same StreamChannel end should
92     // NOT be concurrently used across process boundaries. (Odds are if you want to do this, it's
93     // not what you want. Wanting this means you want two readers on the *same end* of the pipe,
94     // which is not well defined behavior.)
95     #[serde(skip)]
96     #[serde(default = "create_read_lock")]
97     read_lock: Arc<Mutex<()>>,
98 
99     // Serde only has an immutable reference. Because of that, we have to cheat to signal when this
100     // channel end has been serialized. Once serialized, we know that the current end MUST NOT
101     // signal the channel has been closed when it was dropped, because a copy of it was sent to
102     // another process. It is the copy's responsibility to close the pipe.
103     #[serde(skip)]
104     #[serde(default = "create_true_cell")]
105     is_channel_closed_on_drop: RefCell<bool>,
106 
107     // For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
108     // up to that size.
109     send_buffer_size: usize,
110 }
111 
create_read_lock() -> Arc<Mutex<()>>112 fn create_read_lock() -> Arc<Mutex<()>> {
113     Arc::new(Mutex::new(()))
114 }
115 
create_true_cell() -> RefCell<bool>116 fn create_true_cell() -> RefCell<bool> {
117     RefCell::new(true)
118 }
119 
120 /// Serialize is manually implemented because we need to tell the local copy that a remote copy
121 /// exists, and to not send the close event. Our serialization is otherwise identical to what
122 /// derive would have generated.
123 impl Serialize for StreamChannel {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,124     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
125     where
126         S: Serializer,
127     {
128         let mut s = serializer.serialize_struct("StreamChannel", 7)?;
129         s.serialize_field("pipe_conn", &self.pipe_conn)?;
130         s.serialize_field("write_notify", &self.write_notify)?;
131         s.serialize_field("read_notify", &self.read_notify)?;
132         s.serialize_field("pipe_closed", &self.pipe_closed)?;
133         s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
134         s.serialize_field("local_write_lock", &self.local_write_lock)?;
135         s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
136         let ret = s.end();
137 
138         // Because this end has been serialized, the serialized copy is now responsible for setting
139         // the close event.
140         if ret.is_ok() {
141             *self.is_channel_closed_on_drop.borrow_mut() = false;
142         }
143 
144         ret
145     }
146 }
147 
148 impl Drop for StreamChannel {
drop(&mut self)149     fn drop(&mut self) {
150         if *self.is_channel_closed_on_drop.borrow() {
151             if let Err(e) = self.pipe_closed.signal() {
152                 warn!("failed to notify on channel drop: {}", e);
153             }
154         }
155     }
156 }
157 
158 impl StreamChannel {
set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()>159     pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
160         // Safe because the pipe is open.
161         if nonblocking {
162             self.pipe_conn
163                 .set_blocking(&named_pipes::BlockingMode::NoWait)
164         } else {
165             self.pipe_conn
166                 .set_blocking(&named_pipes::BlockingMode::Wait)
167         }
168     }
169 
170     // WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
171     // > 1 reader per end is not defined.
try_clone(&self) -> io::Result<Self>172     pub fn try_clone(&self) -> io::Result<Self> {
173         Ok(StreamChannel {
174             pipe_conn: self.pipe_conn.try_clone()?,
175             write_notify: self.write_notify.try_clone()?,
176             read_notify: self.read_notify.try_clone()?,
177             pipe_closed: self.pipe_closed.try_clone()?,
178             remote_write_lock: self.remote_write_lock.try_clone()?,
179             local_write_lock: self.local_write_lock.try_clone()?,
180             read_lock: self.read_lock.clone(),
181             is_channel_closed_on_drop: create_true_cell(),
182             send_buffer_size: self.send_buffer_size,
183         })
184     }
185 
get_readable_byte_count(&self) -> io::Result<u32>186     fn get_readable_byte_count(&self) -> io::Result<u32> {
187         self.pipe_conn.get_available_byte_count().map_err(|e| {
188             error!("StreamChannel failed to get readable byte count: {}", e);
189             e
190         })
191     }
192 
inner_read(&self, buf: &mut [u8]) -> io::Result<usize>193     pub(super) fn inner_read(&self, buf: &mut [u8]) -> io::Result<usize> {
194         // We ensure concurrent read safety by holding a lock for the duration of the method.
195         // (If multiple concurrent readers were permitted, the pipe could be emptied after we decide
196         // that the notifier should be set, leading to an invalid notified/readable state which
197         // could stall readers.)
198         let _read_lock = self.read_lock.lock();
199 
200         let res = unsafe {
201             // Safe because no partial reads are possible, and the underlying code bounds the
202             // read by buf's size.
203             self.pipe_conn.read(buf)
204         };
205 
206         // The entire goal of this complex section is to avoid the need for shared memory between
207         // each channel end to synchronize the notification state. It is very subtle, modify with
208         // care.
209         loop {
210             // No other thread is reading, so we can find out, without the write lock, whether or
211             // not we need to clear the read notifier. If we don't, then we don't even have to try
212             // acquiring the write lock. This avoids deadlocks where the pipe is full and the write
213             // side blocks on a writing with the lock held. If it looks like we do need to clear
214             // the notifier though, then we have to be sure, so we'll proceed to the next section.
215             let byte_count = self.get_readable_byte_count()?;
216             if byte_count > 0 {
217                 // It's always safe to set the read notifier here because we know there is data in the
218                 // pipe, and no one else could read it out from under us.
219                 self.read_notify.signal().map_err(|e| {
220                     io::Error::new(
221                         io::ErrorKind::Other,
222                         format!("failed to write to read notifier: {:?}", e),
223                     )
224                 })?;
225 
226                 // Notifier state has been safely synced.
227                 return res;
228             }
229 
230             // At this point, there *may* be no data in the pipe, meaning we may want to clear the
231             // notifier. Instead of just trying to acquire the lock outright which could deadlock
232             // with the writing side, we'll try with a timeout. If it fails, we know that the other
233             // side is in the middle of a write, so there either will be data in the pipe soon (a),
234             // or there won't be and we have to clear a spurious notification (b).
235             //
236             // For (a), we can safely return from the read without needing the lock, so we just come
237             // around in the loop to check again, and the loop will exit quickly.
238             //
239             // For (b) we'll return to this point and acquire the lock, as we're just waiting for
240             // the spurious notification to arrive so we can clear it (that code path is very fast),
241             // and the loop will exit.
242             //
243             // If we successfully acquire the lock though, then we can go ahead and clear the
244             // notifier if the pipe is indeed empty, because we are assured that no writes are
245             // happening (we hold the lock). Here, we wait up to 1ms to acquire the lock because
246             // that's a decent balance between avoiding an unnecessary iteration, and minimizing
247             // latency.
248             if let Some(_write_lock) = self.remote_write_lock.try_lock(/* timeout_ms= */ 1) {
249                 let byte_count = self.get_readable_byte_count()?;
250                 if byte_count > 0 {
251                     // Safe because no one else can be reading from the pipe.
252                     self.read_notify.signal().map_err(|e| {
253                         io::Error::new(
254                             io::ErrorKind::Other,
255                             format!("failed to write to read notifier: {:?}", e),
256                         )
257                     })?;
258                 } else {
259                     // Safe because no other writes can be happening (_lock is held).
260                     self.read_notify.reset().map_err(|e| {
261                         io::Error::new(
262                             io::ErrorKind::Other,
263                             format!("failed to reset read notifier: {:?}", e),
264                         )
265                     })?;
266                 }
267 
268                 // Notifier state has been safely synced.
269                 return res;
270             }
271         }
272     }
273 
274     /// Exists as a workaround for Tube which does not expect its transport to be mutable,
275     /// even though io::Write requires it.
write_immutable(&self, buf: &[u8]) -> io::Result<usize>276     pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
277         if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
278             && buf.len() > self.send_buffer_size
279         {
280             return Err(io::Error::new(
281                 io::ErrorKind::Other,
282                 format!(
283                     "StreamChannel forbids message mode writes larger than the \
284                      default buffer size of {}.",
285                     self.send_buffer_size,
286                 ),
287             ));
288         }
289 
290         let _lock = self.local_write_lock.lock();
291         let res = self.pipe_conn.write(buf);
292 
293         // We can always set the write notifier because we know that the reader is in one of the
294         // following states:
295         //      1) a read is running, and it consumes these bytes, so the notification is
296         //         unnecessary. That's fine, because the reader will resync the notifier state once
297         //         it finishes reading.
298         //      2) a read has completed and is blocked on the lock. The notification state is
299         //         already correct, and the read's resync won't change that.
300         if res.is_ok() {
301             self.write_notify.signal().map_err(|e| {
302                 io::Error::new(
303                     io::ErrorKind::Other,
304                     format!("failed to write to read notifier: {:?}", e),
305                 )
306             })?;
307         }
308 
309         res
310     }
311 
312     /// This only works with empty pipes. U.B. will result if used in any other scenario.
from_pipes( pipe_a: PipeConnection, pipe_b: PipeConnection, send_buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>313     pub fn from_pipes(
314         pipe_a: PipeConnection,
315         pipe_b: PipeConnection,
316         send_buffer_size: usize,
317     ) -> Result<(StreamChannel, StreamChannel)> {
318         let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
319         let pipe_closed = Event::new()?;
320 
321         let write_lock_a = MultiProcessMutex::new()?;
322         let write_lock_b = MultiProcessMutex::new()?;
323 
324         let sock_a = StreamChannel {
325             pipe_conn: pipe_a,
326             write_notify: notify_a_write.try_clone()?,
327             read_notify: notify_b_write.try_clone()?,
328             read_lock: Arc::new(Mutex::new(())),
329             local_write_lock: write_lock_a.try_clone()?,
330             remote_write_lock: write_lock_b.try_clone()?,
331             pipe_closed: pipe_closed.try_clone()?,
332             is_channel_closed_on_drop: create_true_cell(),
333             send_buffer_size,
334         };
335         let sock_b = StreamChannel {
336             pipe_conn: pipe_b,
337             write_notify: notify_b_write,
338             read_notify: notify_a_write,
339             read_lock: Arc::new(Mutex::new(())),
340             local_write_lock: write_lock_b,
341             remote_write_lock: write_lock_a,
342             pipe_closed,
343             is_channel_closed_on_drop: create_true_cell(),
344             send_buffer_size,
345         };
346         Ok((sock_a, sock_b))
347     }
348 
349     /// Create a pair with a specific buffer size. Note that this is the only way to send messages
350     /// larger than the default named pipe buffer size.
pair_with_buffer_size( blocking_mode: BlockingMode, framing_mode: FramingMode, buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>351     pub fn pair_with_buffer_size(
352         blocking_mode: BlockingMode,
353         framing_mode: FramingMode,
354         buffer_size: usize,
355     ) -> Result<(StreamChannel, StreamChannel)> {
356         let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
357             &named_pipes::FramingMode::from(framing_mode),
358             &named_pipes::BlockingMode::from(blocking_mode),
359             0,
360             buffer_size,
361             false,
362         )?;
363         Self::from_pipes(pipe_a, pipe_b, buffer_size)
364     }
365     /// Creates a cross platform channel pair.
366     /// On Windows the result is in the form (server, client).
pair( blocking_mode: BlockingMode, framing_mode: FramingMode, ) -> Result<(StreamChannel, StreamChannel)>367     pub fn pair(
368         blocking_mode: BlockingMode,
369         framing_mode: FramingMode,
370     ) -> Result<(StreamChannel, StreamChannel)> {
371         let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
372             &named_pipes::FramingMode::from(framing_mode),
373             &named_pipes::BlockingMode::from(blocking_mode),
374             0,
375             DEFAULT_BUFFER_SIZE,
376             false,
377         )?;
378         Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
379     }
380 
381     /// Blocks until the pipe buffer is empty.
382     /// NOTE: that this will only work for server pipes on Windows.
flush_blocking(&self) -> io::Result<()>383     pub fn flush_blocking(&self) -> io::Result<()> {
384         self.pipe_conn.flush_data_blocking()
385     }
386 }
387 
388 impl io::Write for StreamChannel {
write(&mut self, buf: &[u8]) -> io::Result<usize>389     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
390         self.write_immutable(buf)
391     }
flush(&mut self) -> io::Result<()>392     fn flush(&mut self) -> io::Result<()> {
393         // There is no userspace buffering inside crosvm to flush for named pipes. We write
394         // directly to the named pipe using WriteFile.
395         Ok(())
396     }
397 }
398 
399 impl AsRawDescriptor for &StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor400     fn as_raw_descriptor(&self) -> RawDescriptor {
401         self.pipe_conn.as_raw_descriptor()
402     }
403 }
404 
405 impl ReadNotifier for StreamChannel {
406     /// Returns a RawDescriptor that can be polled for reads using WaitContext.
get_read_notifier(&self) -> &dyn AsRawDescriptor407     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
408         &self.read_notify
409     }
410 }
411 
412 impl CloseNotifier for StreamChannel {
get_close_notifier(&self) -> &dyn AsRawDescriptor413     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
414         &self.pipe_closed
415     }
416 }
417 
418 impl io::Read for StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>419     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
420         self.inner_read(buf)
421     }
422 }
423 
424 impl io::Read for &StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>425     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
426         self.inner_read(buf)
427     }
428 }
429 
430 impl AsRawDescriptor for StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor431     fn as_raw_descriptor(&self) -> RawDescriptor {
432         (&self).as_raw_descriptor()
433     }
434 }
435 
436 #[cfg(test)]
437 mod test {
438     use std::io::Read;
439     use std::io::Write;
440     use std::time::Duration;
441 
442     use super::super::EventContext;
443     use super::super::EventTrigger;
444     use super::*;
445     use crate::EventToken;
446     use crate::ReadNotifier;
447 
448     #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
449     enum Token {
450         ReceivedData,
451     }
452 
453     const EVENT_WAIT_TIME: Duration = Duration::from_secs(10);
454 
455     #[test]
test_read_notifies_multiple_writes()456     fn test_read_notifies_multiple_writes() {
457         let (mut sender, mut receiver) =
458             StreamChannel::pair(BlockingMode::Blocking, FramingMode::Byte).unwrap();
459         sender.write_all(&[1, 2]).unwrap();
460 
461         // Wait for the write to arrive.
462         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
463             receiver.get_read_notifier(),
464             Token::ReceivedData,
465         )])
466         .unwrap();
467         assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
468 
469         // Read just one byte. This leaves another byte in the pipe.
470         let mut recv_buffer = [0u8; 1];
471         let size = receiver.read(&mut recv_buffer).unwrap();
472         assert_eq!(size, 1);
473         assert_eq!(recv_buffer[0], 1);
474 
475         // The notifier should still be set, because the pipe has data.
476         assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
477         let size = receiver.read(&mut recv_buffer).unwrap();
478         assert_eq!(size, 1);
479         assert_eq!(recv_buffer[0], 2);
480     }
481 
482     #[test]
test_blocked_writer_wont_deadlock()483     fn test_blocked_writer_wont_deadlock() {
484         let (mut writer, mut reader) =
485             StreamChannel::pair_with_buffer_size(BlockingMode::Blocking, FramingMode::Byte, 100)
486                 .unwrap();
487         const NUM_OPS: usize = 100;
488 
489         // We set the buffer size to 100 bytes. It seems that we must exceed that buffer size by
490         // 100x before we run into a blocking write, so that's what we do here. This makes sense
491         // to a degree because the docs suggest that some automatic expansion of a pipe's buffers
492         // is supposed to be handled by the kernel.
493         let writer = std::thread::spawn(move || {
494             let buf = [0u8; 100];
495             for _ in 0..NUM_OPS {
496                 assert_eq!(writer.write(&buf).unwrap(), buf.len());
497             }
498             writer
499         });
500 
501         // The test passes if the reader can read (this used to deadlock).
502         let mut buf = [0u8; 100];
503         for _ in 0..NUM_OPS {
504             assert_eq!(reader.read(&mut buf).unwrap(), buf.len());
505         }
506 
507         // Writer must exit cleanly.
508         writer.join().unwrap();
509     }
510 
511     #[test]
test_non_blocking_pair()512     fn test_non_blocking_pair() {
513         let (mut sender, mut receiver) =
514             StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
515 
516         sender.write_all(&[75, 77, 54, 82, 76, 65]).unwrap();
517 
518         // Wait for the data to arrive.
519         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
520             receiver.get_read_notifier(),
521             Token::ReceivedData,
522         )])
523         .unwrap();
524         let events = event_ctx.wait().unwrap();
525         let tokens: Vec<Token> = events
526             .iter()
527             .filter(|e| e.is_readable)
528             .map(|e| e.token)
529             .collect();
530         assert_eq!(tokens, vec! {Token::ReceivedData});
531 
532         // Smaller than what we sent so we get multiple chunks
533         let mut recv_buffer: [u8; 4] = [0; 4];
534 
535         let mut size = receiver.read(&mut recv_buffer).unwrap();
536         assert_eq!(size, 4);
537         assert_eq!(recv_buffer, [75, 77, 54, 82]);
538 
539         size = receiver.read(&mut recv_buffer).unwrap();
540         assert_eq!(size, 2);
541         assert_eq!(recv_buffer[0..2], [76, 65]);
542 
543         // Now that we've polled for & received all data, polling again should show no events.
544         assert_eq!(
545             event_ctx
546                 .wait_timeout(std::time::Duration::new(0, 0))
547                 .unwrap()
548                 .len(),
549             0
550         );
551     }
552 
553     #[test]
test_non_blocking_pair_error_no_data()554     fn test_non_blocking_pair_error_no_data() {
555         let (mut sender, mut receiver) =
556             StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
557         receiver
558             .set_nonblocking(true)
559             .expect("Failed to set receiver to nonblocking mode.");
560 
561         sender.write_all(&[75, 77]).unwrap();
562 
563         // Wait for the data to arrive.
564         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
565             receiver.get_read_notifier(),
566             Token::ReceivedData,
567         )])
568         .unwrap();
569         let events = event_ctx.wait().unwrap();
570         let tokens: Vec<Token> = events
571             .iter()
572             .filter(|e| e.is_readable)
573             .map(|e| e.token)
574             .collect();
575         assert_eq!(tokens, vec! {Token::ReceivedData});
576 
577         // We only read 2 bytes, even though we requested 4 bytes.
578         let mut recv_buffer: [u8; 4] = [0; 4];
579         let size = receiver.read(&mut recv_buffer).unwrap();
580         assert_eq!(size, 2);
581         assert_eq!(recv_buffer, [75, 77, 00, 00]);
582 
583         // Further reads should encounter an error since there is no available data and this is a
584         // non blocking pipe.
585         assert!(receiver.read(&mut recv_buffer).is_err());
586     }
587 }
588