• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use super::super::{
6     named_pipes::{
7         PipeConnection, {self},
8     },
9     stream_channel::{BlockingMode, FramingMode},
10     CloseNotifier, Event, MultiProcessMutex, RawDescriptor, ReadNotifier, Result,
11 };
12 use crate::descriptor::AsRawDescriptor;
13 use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
14 use std::{cell::RefCell, io, sync::Arc};
15 use sync::Mutex;
16 
17 impl From<FramingMode> for named_pipes::FramingMode {
from(framing_mode: FramingMode) -> Self18     fn from(framing_mode: FramingMode) -> Self {
19         match framing_mode {
20             FramingMode::Message => named_pipes::FramingMode::Message,
21             FramingMode::Byte => named_pipes::FramingMode::Byte,
22         }
23     }
24 }
25 
26 impl From<BlockingMode> for named_pipes::BlockingMode {
from(blocking_mode: BlockingMode) -> Self27     fn from(blocking_mode: BlockingMode) -> Self {
28         match blocking_mode {
29             BlockingMode::Blocking => named_pipes::BlockingMode::Wait,
30             BlockingMode::Nonblocking => named_pipes::BlockingMode::NoWait,
31         }
32     }
33 }
34 
35 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
36 
37 /// An abstraction over named pipes and unix socketpairs.
38 ///
39 /// The ReadNotifier will return an event handle that is set when data is in the channel.
40 ///
41 /// In message mode, single writes larger than
42 /// `crate::platform::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
43 ///
44 /// # Notes for maintainers
45 /// 1. This struct contains extremely subtle thread safety considerations.
46 /// 2. Serialization is not derived! New fields need to be added manually.
47 #[derive(Deserialize, Debug)]
48 pub struct StreamChannel {
49     pipe_conn: named_pipes::PipeConnection,
50     write_notify: Event,
51     read_notify: Event,
52     pipe_closed: Event,
53 
54     // Held when reading on this end, to prevent additional writes from corrupting notification
55     // state.
56     remote_write_lock: MultiProcessMutex,
57 
58     // Held when a write is made on this end, so that if the remote end is reading, we wait to
59     // write to avoid corrupting notification state.
60     local_write_lock: MultiProcessMutex,
61 
62     // Held for the entire duration of a read. This enables the StreamChannel to be sync,
63     // ensuring there is no chance of concurrent reads creating a bad state in StreamChannel.
64     //
65     // In practice, there is no use-case for multiple threads actually contending over
66     // reading from a single pipe through StreamChannel, so this is mostly to provide a
67     // compiler guarantee while passing the StreamChannel to/from background executors.
68     //
69     // Note that this mutex does not work across processes, so the same StreamChannel end should
70     // NOT be concurrently used across process boundaries. (Odds are if you want to do this, it's
71     // not what you want. Wanting this means you want two readers on the *same end* of the pipe,
72     // which is not well defined behavior.)
73     #[serde(skip)]
74     #[serde(default = "create_read_lock")]
75     read_lock: Arc<Mutex<()>>,
76 
77     // Serde only has an immutable reference. Because of that, we have to cheat to signal when this
78     // channel end has been serialized. Once serialized, we know that the current end MUST NOT
79     // signal the channel has been closed when it was dropped, because a copy of it was sent to
80     // another process. It is the copy's responsibility to close the pipe.
81     #[serde(skip)]
82     #[serde(default = "create_true_cell")]
83     is_channel_closed_on_drop: RefCell<bool>,
84 
85     // For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
86     // up to that size.
87     send_buffer_size: usize,
88 }
89 
create_read_lock() -> Arc<Mutex<()>>90 fn create_read_lock() -> Arc<Mutex<()>> {
91     Arc::new(Mutex::new(()))
92 }
93 
create_true_cell() -> RefCell<bool>94 fn create_true_cell() -> RefCell<bool> {
95     RefCell::new(true)
96 }
97 
98 /// Serialize is manually implemented because we need to tell the local copy that a remote copy
99 /// exists, and to not send the close event. Our serialization is otherwise identical to what
100 /// derive would have generated.
101 impl Serialize for StreamChannel {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,102     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
103     where
104         S: Serializer,
105     {
106         let mut s = serializer.serialize_struct("StreamChannel", 7)?;
107         s.serialize_field("pipe_conn", &self.pipe_conn)?;
108         s.serialize_field("write_notify", &self.write_notify)?;
109         s.serialize_field("read_notify", &self.read_notify)?;
110         s.serialize_field("pipe_closed", &self.pipe_closed)?;
111         s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
112         s.serialize_field("local_write_lock", &self.local_write_lock)?;
113         s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
114         let ret = s.end();
115 
116         // Because this end has been serialized, the serialized copy is now responsible for setting
117         // the close event.
118         if ret.is_ok() {
119             *self.is_channel_closed_on_drop.borrow_mut() = false;
120         }
121 
122         ret
123     }
124 }
125 
126 impl Drop for StreamChannel {
drop(&mut self)127     fn drop(&mut self) {
128         if *self.is_channel_closed_on_drop.borrow() {
129             if let Err(e) = self.pipe_closed.write(0) {
130                 warn!("failed to notify on channel drop: {}", e);
131             }
132         }
133     }
134 }
135 
136 impl StreamChannel {
set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()>137     pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
138         // Safe because the pipe is open.
139         if nonblocking {
140             self.pipe_conn
141                 .set_blocking(&named_pipes::BlockingMode::NoWait)
142         } else {
143             self.pipe_conn
144                 .set_blocking(&named_pipes::BlockingMode::Wait)
145         }
146     }
147 
148     // WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
149     // > 1 reader per end is not defined.
try_clone(&self) -> io::Result<Self>150     pub fn try_clone(&self) -> io::Result<Self> {
151         Ok(StreamChannel {
152             pipe_conn: self.pipe_conn.try_clone()?,
153             write_notify: self.write_notify.try_clone()?,
154             read_notify: self.read_notify.try_clone()?,
155             pipe_closed: self.pipe_closed.try_clone()?,
156             remote_write_lock: self.remote_write_lock.try_clone()?,
157             local_write_lock: self.local_write_lock.try_clone()?,
158             read_lock: self.read_lock.clone(),
159             is_channel_closed_on_drop: create_true_cell(),
160             send_buffer_size: self.send_buffer_size,
161         })
162     }
163 
get_readable_byte_count(&self) -> io::Result<u32>164     fn get_readable_byte_count(&self) -> io::Result<u32> {
165         self.pipe_conn.get_available_byte_count().map_err(|e| {
166             error!("StreamChannel failed to get readable byte count: {}", e);
167             e
168         })
169     }
170 
inner_read(&self, buf: &mut [u8]) -> io::Result<usize>171     pub(super) fn inner_read(&self, buf: &mut [u8]) -> io::Result<usize> {
172         // We ensure concurrent read safety by holding a lock for the duration of the method.
173         // (If multiple concurrent readers were permitted, the pipe could be emptied after we decide
174         // that the notifier should be set, leading to an invalid notified/readable state which
175         // could stall readers.)
176         let _read_lock = self.read_lock.lock();
177 
178         let res = unsafe {
179             // Safe because no partial reads are possible, and the underlying code bounds the
180             // read by buf's size.
181             self.pipe_conn.read(buf)
182         };
183 
184         // The entire goal of this complex section is to avoid the need for shared memory between
185         // each channel end to synchronize the notification state. It is very subtle, modify with
186         // care.
187         loop {
188             // No other thread is reading, so we can find out, without the write lock, whether or
189             // not we need to clear the read notifier. If we don't, then we don't even have to try
190             // acquiring the write lock. This avoids deadlocks where the pipe is full and the write
191             // side blocks on a writing with the lock held. If it looks like we do need to clear
192             // the notifier though, then we have to be sure, so we'll proceed to the next section.
193             let byte_count = self.get_readable_byte_count()?;
194             if byte_count > 0 {
195                 // It's always safe to set the read notifier here because we know there is data in the
196                 // pipe, and no one else could read it out from under us.
197                 self.read_notify.write(0).map_err(|e| {
198                     io::Error::new(
199                         io::ErrorKind::Other,
200                         format!("failed to write to read notifier: {:?}", e),
201                     )
202                 })?;
203 
204                 // Notifier state has been safely synced.
205                 return res;
206             }
207 
208             // At this point, there *may* be no data in the pipe, meaning we may want to clear the
209             // notifier. Instead of just trying to acquire the lock outright which could deadlock
210             // with the writing side, we'll try with a timeout. If it fails, we know that the other
211             // side is in the middle of a write, so there either will be data in the pipe soon (a),
212             // or there won't be and we have to clear a spurious notification (b).
213             //
214             // For (a), we can safely return from the read without needing the lock, so we just come
215             // around in the loop to check again, and the loop will exit quickly.
216             //
217             // For (b) we'll return to this point and acquire the lock, as we're just waiting for
218             // the spurious notification to arrive so we can clear it (that code path is very fast),
219             // and the loop will exit.
220             //
221             // If we successfully acquire the lock though, then we can go ahead and clear the
222             // notifier if the pipe is indeed empty, because we are assured that no writes are
223             // happening (we hold the lock). Here, we wait up to 1ms to acquire the lock because
224             // that's a decent balance between avoiding an unnecessary iteration, and minimizing
225             // latency.
226             if let Some(_write_lock) = self.remote_write_lock.try_lock(/* timeout_ms= */ 1) {
227                 let byte_count = self.get_readable_byte_count()?;
228                 if byte_count > 0 {
229                     // Safe because no one else can be reading from the pipe.
230                     self.read_notify.write(0).map_err(|e| {
231                         io::Error::new(
232                             io::ErrorKind::Other,
233                             format!("failed to write to read notifier: {:?}", e),
234                         )
235                     })?;
236                 } else {
237                     // Safe because no other writes can be happening (_lock is held).
238                     self.read_notify.reset().map_err(|e| {
239                         io::Error::new(
240                             io::ErrorKind::Other,
241                             format!("failed to reset read notifier: {:?}", e),
242                         )
243                     })?;
244                 }
245 
246                 // Notifier state has been safely synced.
247                 return res;
248             }
249         }
250     }
251 
252     /// Exists as a workaround for Tube which does not expect its transport to be mutable,
253     /// even though io::Write requires it.
write_immutable(&self, buf: &[u8]) -> io::Result<usize>254     pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
255         if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
256             && buf.len() > self.send_buffer_size
257         {
258             return Err(io::Error::new(
259                 io::ErrorKind::Other,
260                 format!(
261                     "StreamChannel forbids message mode writes larger than the \
262                      default buffer size of {}.",
263                     self.send_buffer_size,
264                 ),
265             ));
266         }
267 
268         let _lock = self.local_write_lock.lock();
269         let res = self.pipe_conn.write(buf);
270 
271         // We can always set the write notifier because we know that the reader is in one of the
272         // following states:
273         //      1) a read is running, and it consumes these bytes, so the notification is
274         //         unnecessary. That's fine, because the reader will resync the notifier state once
275         //         it finishes reading.
276         //      2) a read has completed and is blocked on the lock. The notification state is
277         //         already correct, and the read's resync won't change that.
278         if res.is_ok() {
279             self.write_notify.write(0).map_err(|e| {
280                 io::Error::new(
281                     io::ErrorKind::Other,
282                     format!("failed to write to read notifier: {:?}", e),
283                 )
284             })?;
285         }
286 
287         res
288     }
289 
290     /// This only works with empty pipes. U.B. will result if used in any other scenario.
from_pipes( pipe_a: PipeConnection, pipe_b: PipeConnection, send_buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>291     pub fn from_pipes(
292         pipe_a: PipeConnection,
293         pipe_b: PipeConnection,
294         send_buffer_size: usize,
295     ) -> Result<(StreamChannel, StreamChannel)> {
296         let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
297         let pipe_closed = Event::new()?;
298 
299         let write_lock_a = MultiProcessMutex::new()?;
300         let write_lock_b = MultiProcessMutex::new()?;
301 
302         let sock_a = StreamChannel {
303             pipe_conn: pipe_a,
304             write_notify: notify_a_write.try_clone()?,
305             read_notify: notify_b_write.try_clone()?,
306             read_lock: Arc::new(Mutex::new(())),
307             local_write_lock: write_lock_a.try_clone()?,
308             remote_write_lock: write_lock_b.try_clone()?,
309             pipe_closed: pipe_closed.try_clone()?,
310             is_channel_closed_on_drop: create_true_cell(),
311             send_buffer_size,
312         };
313         let sock_b = StreamChannel {
314             pipe_conn: pipe_b,
315             write_notify: notify_b_write,
316             read_notify: notify_a_write,
317             read_lock: Arc::new(Mutex::new(())),
318             local_write_lock: write_lock_b,
319             remote_write_lock: write_lock_a,
320             pipe_closed,
321             is_channel_closed_on_drop: create_true_cell(),
322             send_buffer_size,
323         };
324         Ok((sock_a, sock_b))
325     }
326 
327     /// Create a pair with a specific buffer size. Note that this is the only way to send messages
328     /// larger than the default named pipe buffer size.
pair_with_buffer_size( blocking_mode: BlockingMode, framing_mode: FramingMode, buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>329     pub fn pair_with_buffer_size(
330         blocking_mode: BlockingMode,
331         framing_mode: FramingMode,
332         buffer_size: usize,
333     ) -> Result<(StreamChannel, StreamChannel)> {
334         let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
335             &named_pipes::FramingMode::from(framing_mode),
336             &named_pipes::BlockingMode::from(blocking_mode),
337             0,
338             buffer_size,
339             false,
340         )?;
341         Self::from_pipes(pipe_a, pipe_b, buffer_size)
342     }
343     /// Creates a cross platform channel pair.
344     /// On Windows the result is in the form (server, client).
pair( blocking_mode: BlockingMode, framing_mode: FramingMode, ) -> Result<(StreamChannel, StreamChannel)>345     pub fn pair(
346         blocking_mode: BlockingMode,
347         framing_mode: FramingMode,
348     ) -> Result<(StreamChannel, StreamChannel)> {
349         let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
350             &named_pipes::FramingMode::from(framing_mode),
351             &named_pipes::BlockingMode::from(blocking_mode),
352             0,
353             DEFAULT_BUFFER_SIZE,
354             false,
355         )?;
356         Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
357     }
358 
359     /// Blocks until the pipe buffer is empty.
360     /// NOTE: that this will only work for server pipes on Windows.
flush_blocking(&self) -> io::Result<()>361     pub fn flush_blocking(&self) -> io::Result<()> {
362         self.pipe_conn.flush_data_blocking().map_err(|e| e)
363     }
364 }
365 
366 impl io::Write for StreamChannel {
write(&mut self, buf: &[u8]) -> io::Result<usize>367     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
368         self.write_immutable(buf)
369     }
flush(&mut self) -> io::Result<()>370     fn flush(&mut self) -> io::Result<()> {
371         // There is no userspace buffering inside crosvm to flush for named pipes. We write
372         // directly to the named pipe using WriteFile.
373         Ok(())
374     }
375 }
376 
377 impl AsRawDescriptor for &StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor378     fn as_raw_descriptor(&self) -> RawDescriptor {
379         self.pipe_conn.as_raw_descriptor()
380     }
381 }
382 
383 impl ReadNotifier for StreamChannel {
384     /// Returns a RawDescriptor that can be polled for reads using PollContext.
get_read_notifier(&self) -> &dyn AsRawDescriptor385     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
386         &self.read_notify
387     }
388 }
389 
390 impl CloseNotifier for StreamChannel {
get_close_notifier(&self) -> &dyn AsRawDescriptor391     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
392         &self.pipe_closed
393     }
394 }
395 
396 #[cfg(test)]
397 mod test {
398     use super::{
399         super::super::{EventContext, EventTrigger, PollToken, ReadNotifier},
400         *,
401     };
402     use std::{
403         io::{Read, Write},
404         time::Duration,
405     };
406 
407     #[derive(PollToken, Debug, Eq, PartialEq, Copy, Clone)]
408     enum Token {
409         ReceivedData,
410     }
411 
412     const EVENT_WAIT_TIME: Duration = Duration::from_secs(10);
413 
414     #[test]
test_read_notifies_multiple_writes()415     fn test_read_notifies_multiple_writes() {
416         let (mut sender, mut receiver) =
417             StreamChannel::pair(BlockingMode::Blocking, FramingMode::Byte).unwrap();
418         sender.write_all(&[1, 2]).unwrap();
419 
420         // Wait for the write to arrive.
421         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
422             receiver.get_read_notifier(),
423             Token::ReceivedData,
424         )])
425         .unwrap();
426         assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
427 
428         // Read just one byte. This leaves another byte in the pipe.
429         let mut recv_buffer = [0u8; 1];
430         let size = receiver.read(&mut recv_buffer).unwrap();
431         assert_eq!(size, 1);
432         assert_eq!(recv_buffer[0], 1);
433 
434         // The notifier should still be set, because the pipe has data.
435         assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
436         let size = receiver.read(&mut recv_buffer).unwrap();
437         assert_eq!(size, 1);
438         assert_eq!(recv_buffer[0], 2);
439     }
440 
441     #[test]
test_blocked_writer_wont_deadlock()442     fn test_blocked_writer_wont_deadlock() {
443         let (mut writer, mut reader) =
444             StreamChannel::pair_with_buffer_size(BlockingMode::Blocking, FramingMode::Byte, 100)
445                 .unwrap();
446         const NUM_OPS: usize = 100;
447 
448         // We set the buffer size to 100 bytes. It seems that we must exceed that buffer size by
449         // 100x before we run into a blocking write, so that's what we do here. This makes sense
450         // to a degree because the docs suggest that some automatic expansion of a pipe's buffers
451         // is supposed to be handled by the kernel.
452         let writer = std::thread::spawn(move || {
453             let buf = [0u8; 100];
454             for _ in 0..NUM_OPS {
455                 assert_eq!(writer.write(&buf).unwrap(), buf.len());
456             }
457             writer
458         });
459 
460         // The test passes if the reader can read (this used to deadlock).
461         let mut buf = [0u8; 100];
462         for _ in 0..NUM_OPS {
463             assert_eq!(reader.read(&mut buf).unwrap(), buf.len());
464         }
465 
466         // Writer must exit cleanly.
467         writer.join().unwrap();
468     }
469 }
470