1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::cell::RefCell;
6 use std::io;
7 use std::sync::Arc;
8
9 use log::error;
10 use log::warn;
11 use serde::ser::SerializeStruct;
12 use serde::Deserialize;
13 use serde::Serialize;
14 use serde::Serializer;
15 use sync::Mutex;
16
17 use super::named_pipes;
18 use super::named_pipes::PipeConnection;
19 use super::MultiProcessMutex;
20 use super::RawDescriptor;
21 use super::Result;
22 use crate::descriptor::AsRawDescriptor;
23 use crate::CloseNotifier;
24 use crate::Event;
25 use crate::ReadNotifier;
26
27 #[derive(Copy, Clone)]
28 pub enum FramingMode {
29 Message,
30 Byte,
31 }
32
33 #[derive(Copy, Clone, PartialEq, Eq)]
34 pub enum BlockingMode {
35 Blocking,
36 Nonblocking,
37 }
38
39 impl From<FramingMode> for named_pipes::FramingMode {
from(framing_mode: FramingMode) -> Self40 fn from(framing_mode: FramingMode) -> Self {
41 match framing_mode {
42 FramingMode::Message => named_pipes::FramingMode::Message,
43 FramingMode::Byte => named_pipes::FramingMode::Byte,
44 }
45 }
46 }
47
48 impl From<BlockingMode> for named_pipes::BlockingMode {
from(blocking_mode: BlockingMode) -> Self49 fn from(blocking_mode: BlockingMode) -> Self {
50 match blocking_mode {
51 BlockingMode::Blocking => named_pipes::BlockingMode::Wait,
52 BlockingMode::Nonblocking => named_pipes::BlockingMode::NoWait,
53 }
54 }
55 }
56
57 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
58
59 /// An abstraction over named pipes and unix socketpairs.
60 ///
61 /// The ReadNotifier will return an event handle that is set when data is in the channel.
62 ///
63 /// In message mode, single writes larger than
64 /// `crate::windows::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
65 ///
66 /// # Notes for maintainers
67 /// 1. This struct contains extremely subtle thread safety considerations.
68 /// 2. Serialization is not derived! New fields need to be added manually.
69 #[derive(Deserialize, Debug)]
70 pub struct StreamChannel {
71 pipe_conn: named_pipes::PipeConnection,
72 write_notify: Event,
73 read_notify: Event,
74 pipe_closed: Event,
75
76 // Held when reading on this end, to prevent additional writes from corrupting notification
77 // state.
78 remote_write_lock: MultiProcessMutex,
79
80 // Held when a write is made on this end, so that if the remote end is reading, we wait to
81 // write to avoid corrupting notification state.
82 local_write_lock: MultiProcessMutex,
83
84 // Held for the entire duration of a read. This enables the StreamChannel to be sync,
85 // ensuring there is no chance of concurrent reads creating a bad state in StreamChannel.
86 //
87 // In practice, there is no use-case for multiple threads actually contending over
88 // reading from a single pipe through StreamChannel, so this is mostly to provide a
89 // compiler guarantee while passing the StreamChannel to/from background executors.
90 //
91 // Note that this mutex does not work across processes, so the same StreamChannel end should
92 // NOT be concurrently used across process boundaries. (Odds are if you want to do this, it's
93 // not what you want. Wanting this means you want two readers on the *same end* of the pipe,
94 // which is not well defined behavior.)
95 #[serde(skip)]
96 #[serde(default = "create_read_lock")]
97 read_lock: Arc<Mutex<()>>,
98
99 // Serde only has an immutable reference. Because of that, we have to cheat to signal when this
100 // channel end has been serialized. Once serialized, we know that the current end MUST NOT
101 // signal the channel has been closed when it was dropped, because a copy of it was sent to
102 // another process. It is the copy's responsibility to close the pipe.
103 #[serde(skip)]
104 #[serde(default = "create_true_cell")]
105 is_channel_closed_on_drop: RefCell<bool>,
106
107 // For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
108 // up to that size.
109 send_buffer_size: usize,
110 }
111
create_read_lock() -> Arc<Mutex<()>>112 fn create_read_lock() -> Arc<Mutex<()>> {
113 Arc::new(Mutex::new(()))
114 }
115
create_true_cell() -> RefCell<bool>116 fn create_true_cell() -> RefCell<bool> {
117 RefCell::new(true)
118 }
119
120 /// Serialize is manually implemented because we need to tell the local copy that a remote copy
121 /// exists, and to not send the close event. Our serialization is otherwise identical to what
122 /// derive would have generated.
123 impl Serialize for StreamChannel {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,124 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
125 where
126 S: Serializer,
127 {
128 let mut s = serializer.serialize_struct("StreamChannel", 7)?;
129 s.serialize_field("pipe_conn", &self.pipe_conn)?;
130 s.serialize_field("write_notify", &self.write_notify)?;
131 s.serialize_field("read_notify", &self.read_notify)?;
132 s.serialize_field("pipe_closed", &self.pipe_closed)?;
133 s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
134 s.serialize_field("local_write_lock", &self.local_write_lock)?;
135 s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
136 let ret = s.end();
137
138 // Because this end has been serialized, the serialized copy is now responsible for setting
139 // the close event.
140 if ret.is_ok() {
141 *self.is_channel_closed_on_drop.borrow_mut() = false;
142 }
143
144 ret
145 }
146 }
147
148 impl Drop for StreamChannel {
drop(&mut self)149 fn drop(&mut self) {
150 if *self.is_channel_closed_on_drop.borrow() {
151 if let Err(e) = self.pipe_closed.signal() {
152 warn!("failed to notify on channel drop: {}", e);
153 }
154 }
155 }
156 }
157
158 impl StreamChannel {
set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()>159 pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
160 // Safe because the pipe is open.
161 if nonblocking {
162 self.pipe_conn
163 .set_blocking(&named_pipes::BlockingMode::NoWait)
164 } else {
165 self.pipe_conn
166 .set_blocking(&named_pipes::BlockingMode::Wait)
167 }
168 }
169
170 // WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
171 // > 1 reader per end is not defined.
try_clone(&self) -> io::Result<Self>172 pub fn try_clone(&self) -> io::Result<Self> {
173 Ok(StreamChannel {
174 pipe_conn: self.pipe_conn.try_clone()?,
175 write_notify: self.write_notify.try_clone()?,
176 read_notify: self.read_notify.try_clone()?,
177 pipe_closed: self.pipe_closed.try_clone()?,
178 remote_write_lock: self.remote_write_lock.try_clone()?,
179 local_write_lock: self.local_write_lock.try_clone()?,
180 read_lock: self.read_lock.clone(),
181 is_channel_closed_on_drop: create_true_cell(),
182 send_buffer_size: self.send_buffer_size,
183 })
184 }
185
186 /// Gets the readable byte count. Returns zero for broken pipes since that will cause the read
187 /// notifier to be set, and for the consumer to quickly discover the broken pipe.
get_readable_byte_count(&self) -> io::Result<u32>188 fn get_readable_byte_count(&self) -> io::Result<u32> {
189 match self.pipe_conn.get_available_byte_count() {
190 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
191 Err(e) => {
192 error!("StreamChannel failed to get readable byte count: {}", e);
193 Err(e)
194 }
195 Ok(byte_count) => Ok(byte_count),
196 }
197 }
198
inner_read(&self, buf: &mut [u8]) -> io::Result<usize>199 pub(super) fn inner_read(&self, buf: &mut [u8]) -> io::Result<usize> {
200 // We ensure concurrent read safety by holding a lock for the duration of the method.
201 // (If multiple concurrent readers were permitted, the pipe could be emptied after we decide
202 // that the notifier should be set, leading to an invalid notified/readable state which
203 // could stall readers.)
204 let _read_lock = self.read_lock.lock();
205
206 // SAFETY:
207 // Safe because no partial reads are possible, and the underlying code bounds the
208 // read by buf's size.
209 let res = unsafe { self.pipe_conn.read(buf) };
210
211 // The entire goal of this complex section is to avoid the need for shared memory between
212 // each channel end to synchronize the notification state. It is very subtle, modify with
213 // care.
214 loop {
215 // No other thread is reading, so we can find out, without the write lock, whether or
216 // not we need to clear the read notifier. If we don't, then we don't even have to try
217 // acquiring the write lock. This avoids deadlocks where the pipe is full and the write
218 // side blocks on a writing with the lock held. If it looks like we do need to clear
219 // the notifier though, then we have to be sure, so we'll proceed to the next section.
220 let byte_count = self.get_readable_byte_count()?;
221 if byte_count > 0 {
222 // It's always safe to set the read notifier here because we know there is data in
223 // the pipe, and no one else could read it out from under us.
224 self.read_notify.signal().map_err(|e| {
225 io::Error::new(
226 io::ErrorKind::Other,
227 format!("failed to write to read notifier: {:?}", e),
228 )
229 })?;
230
231 // Notifier state has been safely synced.
232 return res;
233 }
234
235 // At this point, there *may* be no data in the pipe, meaning we may want to clear the
236 // notifier. Instead of just trying to acquire the lock outright which could deadlock
237 // with the writing side, we'll try with a timeout. If it fails, we know that the other
238 // side is in the middle of a write, so there either will be data in the pipe soon (a),
239 // or there won't be and we have to clear a spurious notification (b).
240 //
241 // For (a), we can safely return from the read without needing the lock, so we just come
242 // around in the loop to check again, and the loop will exit quickly.
243 //
244 // For (b) we'll return to this point and acquire the lock, as we're just waiting for
245 // the spurious notification to arrive so we can clear it (that code path is very fast),
246 // and the loop will exit.
247 //
248 // If we successfully acquire the lock though, then we can go ahead and clear the
249 // notifier if the pipe is indeed empty, because we are assured that no writes are
250 // happening (we hold the lock). Here, we wait up to 1ms to acquire the lock because
251 // that's a decent balance between avoiding an unnecessary iteration, and minimizing
252 // latency.
253 if let Some(_write_lock) = self.remote_write_lock.try_lock(/* timeout_ms= */ 1) {
254 let byte_count = self.get_readable_byte_count()?;
255 if byte_count > 0 {
256 // Safe because no one else can be reading from the pipe.
257 self.read_notify.signal().map_err(|e| {
258 io::Error::new(
259 io::ErrorKind::Other,
260 format!("failed to write to read notifier: {:?}", e),
261 )
262 })?;
263 } else {
264 // Safe because no other writes can be happening (_lock is held).
265 self.read_notify.reset().map_err(|e| {
266 io::Error::new(
267 io::ErrorKind::Other,
268 format!("failed to reset read notifier: {:?}", e),
269 )
270 })?;
271 }
272
273 // Notifier state has been safely synced.
274 return res;
275 }
276 }
277 }
278
279 /// Exists as a workaround for Tube which does not expect its transport to be mutable,
280 /// even though io::Write requires it.
write_immutable(&self, buf: &[u8]) -> io::Result<usize>281 pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
282 if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
283 && buf.len() > self.send_buffer_size
284 {
285 return Err(io::Error::new(
286 io::ErrorKind::Other,
287 format!(
288 "StreamChannel forbids message mode writes larger than the \
289 default buffer size of {}.",
290 self.send_buffer_size,
291 ),
292 ));
293 }
294
295 let _lock = self.local_write_lock.lock();
296 let res = self.pipe_conn.write(buf);
297
298 // We can always set the write notifier because we know that the reader is in one of the
299 // following states:
300 // 1) a read is running, and it consumes these bytes, so the notification is
301 // unnecessary. That's fine, because the reader will resync the notifier state once
302 // it finishes reading.
303 // 2) a read has completed and is blocked on the lock. The notification state is
304 // already correct, and the read's resync won't change that.
305 if res.is_ok() {
306 self.write_notify.signal().map_err(|e| {
307 io::Error::new(
308 io::ErrorKind::Other,
309 format!("failed to write to read notifier: {:?}", e),
310 )
311 })?;
312 }
313
314 res
315 }
316
317 /// This only works with empty pipes. U.B. will result if used in any other scenario.
from_pipes( pipe_a: PipeConnection, pipe_b: PipeConnection, send_buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>318 pub fn from_pipes(
319 pipe_a: PipeConnection,
320 pipe_b: PipeConnection,
321 send_buffer_size: usize,
322 ) -> Result<(StreamChannel, StreamChannel)> {
323 let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
324 let pipe_closed = Event::new()?;
325
326 let write_lock_a = MultiProcessMutex::new()?;
327 let write_lock_b = MultiProcessMutex::new()?;
328
329 let sock_a = StreamChannel {
330 pipe_conn: pipe_a,
331 write_notify: notify_a_write.try_clone()?,
332 read_notify: notify_b_write.try_clone()?,
333 read_lock: Arc::new(Mutex::new(())),
334 local_write_lock: write_lock_a.try_clone()?,
335 remote_write_lock: write_lock_b.try_clone()?,
336 pipe_closed: pipe_closed.try_clone()?,
337 is_channel_closed_on_drop: create_true_cell(),
338 send_buffer_size,
339 };
340 let sock_b = StreamChannel {
341 pipe_conn: pipe_b,
342 write_notify: notify_b_write,
343 read_notify: notify_a_write,
344 read_lock: Arc::new(Mutex::new(())),
345 local_write_lock: write_lock_b,
346 remote_write_lock: write_lock_a,
347 pipe_closed,
348 is_channel_closed_on_drop: create_true_cell(),
349 send_buffer_size,
350 };
351 Ok((sock_a, sock_b))
352 }
353
354 /// Create a pair with a specific buffer size. Note that this is the only way to send messages
355 /// larger than the default named pipe buffer size.
pair_with_buffer_size( blocking_mode: BlockingMode, framing_mode: FramingMode, buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>356 pub fn pair_with_buffer_size(
357 blocking_mode: BlockingMode,
358 framing_mode: FramingMode,
359 buffer_size: usize,
360 ) -> Result<(StreamChannel, StreamChannel)> {
361 let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
362 &named_pipes::FramingMode::from(framing_mode),
363 &named_pipes::BlockingMode::from(blocking_mode),
364 0,
365 buffer_size,
366 false,
367 )?;
368 Self::from_pipes(pipe_a, pipe_b, buffer_size)
369 }
370 /// Creates a cross platform channel pair.
371 /// On Windows the result is in the form (server, client).
pair( blocking_mode: BlockingMode, framing_mode: FramingMode, ) -> Result<(StreamChannel, StreamChannel)>372 pub fn pair(
373 blocking_mode: BlockingMode,
374 framing_mode: FramingMode,
375 ) -> Result<(StreamChannel, StreamChannel)> {
376 let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
377 &named_pipes::FramingMode::from(framing_mode),
378 &named_pipes::BlockingMode::from(blocking_mode),
379 0,
380 DEFAULT_BUFFER_SIZE,
381 false,
382 )?;
383 Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
384 }
385
386 /// Blocks until the pipe buffer is empty.
387 /// NOTE: that this will only work for server pipes on Windows.
flush_blocking(&self) -> io::Result<()>388 pub fn flush_blocking(&self) -> io::Result<()> {
389 self.pipe_conn.flush_data_blocking()
390 }
391 }
392
393 impl io::Write for StreamChannel {
write(&mut self, buf: &[u8]) -> io::Result<usize>394 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
395 self.write_immutable(buf)
396 }
flush(&mut self) -> io::Result<()>397 fn flush(&mut self) -> io::Result<()> {
398 // There is no userspace buffering inside crosvm to flush for named pipes. We write
399 // directly to the named pipe using WriteFile.
400 Ok(())
401 }
402 }
403
404 impl AsRawDescriptor for &StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor405 fn as_raw_descriptor(&self) -> RawDescriptor {
406 self.pipe_conn.as_raw_descriptor()
407 }
408 }
409
410 impl ReadNotifier for StreamChannel {
411 /// Returns a RawDescriptor that can be polled for reads using WaitContext.
get_read_notifier(&self) -> &dyn AsRawDescriptor412 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
413 &self.read_notify
414 }
415 }
416
417 impl CloseNotifier for StreamChannel {
get_close_notifier(&self) -> &dyn AsRawDescriptor418 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
419 &self.pipe_closed
420 }
421 }
422
423 impl io::Read for StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>424 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
425 self.inner_read(buf)
426 }
427 }
428
429 impl io::Read for &StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>430 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
431 self.inner_read(buf)
432 }
433 }
434
435 impl AsRawDescriptor for StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor436 fn as_raw_descriptor(&self) -> RawDescriptor {
437 (&self).as_raw_descriptor()
438 }
439 }
440
441 #[cfg(test)]
442 mod test {
443 use std::io::Read;
444 use std::io::Write;
445 use std::time::Duration;
446
447 use super::super::EventContext;
448 use super::super::EventTrigger;
449 use super::*;
450 use crate::EventToken;
451 use crate::ReadNotifier;
452
453 #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
454 enum Token {
455 ReceivedData,
456 }
457
458 const EVENT_WAIT_TIME: Duration = Duration::from_secs(10);
459
460 #[test]
test_read_notifies_multiple_writes()461 fn test_read_notifies_multiple_writes() {
462 let (mut sender, mut receiver) =
463 StreamChannel::pair(BlockingMode::Blocking, FramingMode::Byte).unwrap();
464 sender.write_all(&[1, 2]).unwrap();
465
466 // Wait for the write to arrive.
467 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
468 receiver.get_read_notifier(),
469 Token::ReceivedData,
470 )])
471 .unwrap();
472 assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
473
474 // Read just one byte. This leaves another byte in the pipe.
475 let mut recv_buffer = [0u8; 1];
476 let size = receiver.read(&mut recv_buffer).unwrap();
477 assert_eq!(size, 1);
478 assert_eq!(recv_buffer[0], 1);
479
480 // The notifier should still be set, because the pipe has data.
481 assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
482 let size = receiver.read(&mut recv_buffer).unwrap();
483 assert_eq!(size, 1);
484 assert_eq!(recv_buffer[0], 2);
485 }
486
487 #[test]
test_blocked_writer_wont_deadlock()488 fn test_blocked_writer_wont_deadlock() {
489 let (mut writer, mut reader) =
490 StreamChannel::pair_with_buffer_size(BlockingMode::Blocking, FramingMode::Byte, 100)
491 .unwrap();
492 const NUM_OPS: usize = 100;
493
494 // We set the buffer size to 100 bytes. It seems that we must exceed that buffer size by
495 // 100x before we run into a blocking write, so that's what we do here. This makes sense
496 // to a degree because the docs suggest that some automatic expansion of a pipe's buffers
497 // is supposed to be handled by the kernel.
498 let writer = std::thread::spawn(move || {
499 let buf = [0u8; 100];
500 for _ in 0..NUM_OPS {
501 assert_eq!(writer.write(&buf).unwrap(), buf.len());
502 }
503 writer
504 });
505
506 // The test passes if the reader can read (this used to deadlock).
507 let mut buf = [0u8; 100];
508 for _ in 0..NUM_OPS {
509 assert_eq!(reader.read(&mut buf).unwrap(), buf.len());
510 }
511
512 // Writer must exit cleanly.
513 writer.join().unwrap();
514 }
515
516 #[test]
test_non_blocking_pair()517 fn test_non_blocking_pair() {
518 let (mut sender, mut receiver) =
519 StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
520
521 sender.write_all(&[75, 77, 54, 82, 76, 65]).unwrap();
522
523 // Wait for the data to arrive.
524 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
525 receiver.get_read_notifier(),
526 Token::ReceivedData,
527 )])
528 .unwrap();
529 let events = event_ctx.wait().unwrap();
530 let tokens: Vec<Token> = events
531 .iter()
532 .filter(|e| e.is_readable)
533 .map(|e| e.token)
534 .collect();
535 assert_eq!(tokens, vec! {Token::ReceivedData});
536
537 // Smaller than what we sent so we get multiple chunks
538 let mut recv_buffer: [u8; 4] = [0; 4];
539
540 let mut size = receiver.read(&mut recv_buffer).unwrap();
541 assert_eq!(size, 4);
542 assert_eq!(recv_buffer, [75, 77, 54, 82]);
543
544 size = receiver.read(&mut recv_buffer).unwrap();
545 assert_eq!(size, 2);
546 assert_eq!(recv_buffer[0..2], [76, 65]);
547
548 // Now that we've polled for & received all data, polling again should show no events.
549 assert_eq!(
550 event_ctx
551 .wait_timeout(std::time::Duration::new(0, 0))
552 .unwrap()
553 .len(),
554 0
555 );
556 }
557
558 #[test]
test_non_blocking_pair_error_no_data()559 fn test_non_blocking_pair_error_no_data() {
560 let (mut sender, mut receiver) =
561 StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
562 receiver
563 .set_nonblocking(true)
564 .expect("Failed to set receiver to nonblocking mode.");
565
566 sender.write_all(&[75, 77]).unwrap();
567
568 // Wait for the data to arrive.
569 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
570 receiver.get_read_notifier(),
571 Token::ReceivedData,
572 )])
573 .unwrap();
574 let events = event_ctx.wait().unwrap();
575 let tokens: Vec<Token> = events
576 .iter()
577 .filter(|e| e.is_readable)
578 .map(|e| e.token)
579 .collect();
580 assert_eq!(tokens, vec! {Token::ReceivedData});
581
582 // We only read 2 bytes, even though we requested 4 bytes.
583 let mut recv_buffer: [u8; 4] = [0; 4];
584 let size = receiver.read(&mut recv_buffer).unwrap();
585 assert_eq!(size, 2);
586 assert_eq!(recv_buffer, [75, 77, 00, 00]);
587
588 // Further reads should encounter an error since there is no available data and this is a
589 // non blocking pipe.
590 assert!(receiver.read(&mut recv_buffer).is_err());
591 }
592 }
593