1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::cell::RefCell;
6 use std::io;
7 use std::sync::Arc;
8
9 use log::error;
10 use log::warn;
11 use serde::ser::SerializeStruct;
12 use serde::Deserialize;
13 use serde::Serialize;
14 use serde::Serializer;
15 use sync::Mutex;
16
17 use super::named_pipes;
18 use super::named_pipes::PipeConnection;
19 use super::MultiProcessMutex;
20 use super::RawDescriptor;
21 use super::Result;
22 use crate::descriptor::AsRawDescriptor;
23 use crate::CloseNotifier;
24 use crate::Event;
25 use crate::ReadNotifier;
26
27 #[derive(Copy, Clone)]
28 pub enum FramingMode {
29 Message,
30 Byte,
31 }
32
33 #[derive(Copy, Clone, PartialEq, Eq)]
34 pub enum BlockingMode {
35 Blocking,
36 Nonblocking,
37 }
38
39 impl From<FramingMode> for named_pipes::FramingMode {
from(framing_mode: FramingMode) -> Self40 fn from(framing_mode: FramingMode) -> Self {
41 match framing_mode {
42 FramingMode::Message => named_pipes::FramingMode::Message,
43 FramingMode::Byte => named_pipes::FramingMode::Byte,
44 }
45 }
46 }
47
48 impl From<BlockingMode> for named_pipes::BlockingMode {
from(blocking_mode: BlockingMode) -> Self49 fn from(blocking_mode: BlockingMode) -> Self {
50 match blocking_mode {
51 BlockingMode::Blocking => named_pipes::BlockingMode::Wait,
52 BlockingMode::Nonblocking => named_pipes::BlockingMode::NoWait,
53 }
54 }
55 }
56
57 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
58
59 /// An abstraction over named pipes and unix socketpairs.
60 ///
61 /// The ReadNotifier will return an event handle that is set when data is in the channel.
62 ///
63 /// In message mode, single writes larger than
64 /// `crate::platform::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
65 ///
66 /// # Notes for maintainers
67 /// 1. This struct contains extremely subtle thread safety considerations.
68 /// 2. Serialization is not derived! New fields need to be added manually.
69 #[derive(Deserialize, Debug)]
70 pub struct StreamChannel {
71 pipe_conn: named_pipes::PipeConnection,
72 write_notify: Event,
73 read_notify: Event,
74 pipe_closed: Event,
75
76 // Held when reading on this end, to prevent additional writes from corrupting notification
77 // state.
78 remote_write_lock: MultiProcessMutex,
79
80 // Held when a write is made on this end, so that if the remote end is reading, we wait to
81 // write to avoid corrupting notification state.
82 local_write_lock: MultiProcessMutex,
83
84 // Held for the entire duration of a read. This enables the StreamChannel to be sync,
85 // ensuring there is no chance of concurrent reads creating a bad state in StreamChannel.
86 //
87 // In practice, there is no use-case for multiple threads actually contending over
88 // reading from a single pipe through StreamChannel, so this is mostly to provide a
89 // compiler guarantee while passing the StreamChannel to/from background executors.
90 //
91 // Note that this mutex does not work across processes, so the same StreamChannel end should
92 // NOT be concurrently used across process boundaries. (Odds are if you want to do this, it's
93 // not what you want. Wanting this means you want two readers on the *same end* of the pipe,
94 // which is not well defined behavior.)
95 #[serde(skip)]
96 #[serde(default = "create_read_lock")]
97 read_lock: Arc<Mutex<()>>,
98
99 // Serde only has an immutable reference. Because of that, we have to cheat to signal when this
100 // channel end has been serialized. Once serialized, we know that the current end MUST NOT
101 // signal the channel has been closed when it was dropped, because a copy of it was sent to
102 // another process. It is the copy's responsibility to close the pipe.
103 #[serde(skip)]
104 #[serde(default = "create_true_cell")]
105 is_channel_closed_on_drop: RefCell<bool>,
106
107 // For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
108 // up to that size.
109 send_buffer_size: usize,
110 }
111
create_read_lock() -> Arc<Mutex<()>>112 fn create_read_lock() -> Arc<Mutex<()>> {
113 Arc::new(Mutex::new(()))
114 }
115
create_true_cell() -> RefCell<bool>116 fn create_true_cell() -> RefCell<bool> {
117 RefCell::new(true)
118 }
119
120 /// Serialize is manually implemented because we need to tell the local copy that a remote copy
121 /// exists, and to not send the close event. Our serialization is otherwise identical to what
122 /// derive would have generated.
123 impl Serialize for StreamChannel {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,124 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
125 where
126 S: Serializer,
127 {
128 let mut s = serializer.serialize_struct("StreamChannel", 7)?;
129 s.serialize_field("pipe_conn", &self.pipe_conn)?;
130 s.serialize_field("write_notify", &self.write_notify)?;
131 s.serialize_field("read_notify", &self.read_notify)?;
132 s.serialize_field("pipe_closed", &self.pipe_closed)?;
133 s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
134 s.serialize_field("local_write_lock", &self.local_write_lock)?;
135 s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
136 let ret = s.end();
137
138 // Because this end has been serialized, the serialized copy is now responsible for setting
139 // the close event.
140 if ret.is_ok() {
141 *self.is_channel_closed_on_drop.borrow_mut() = false;
142 }
143
144 ret
145 }
146 }
147
148 impl Drop for StreamChannel {
drop(&mut self)149 fn drop(&mut self) {
150 if *self.is_channel_closed_on_drop.borrow() {
151 if let Err(e) = self.pipe_closed.signal() {
152 warn!("failed to notify on channel drop: {}", e);
153 }
154 }
155 }
156 }
157
158 impl StreamChannel {
set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()>159 pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
160 // Safe because the pipe is open.
161 if nonblocking {
162 self.pipe_conn
163 .set_blocking(&named_pipes::BlockingMode::NoWait)
164 } else {
165 self.pipe_conn
166 .set_blocking(&named_pipes::BlockingMode::Wait)
167 }
168 }
169
170 // WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
171 // > 1 reader per end is not defined.
try_clone(&self) -> io::Result<Self>172 pub fn try_clone(&self) -> io::Result<Self> {
173 Ok(StreamChannel {
174 pipe_conn: self.pipe_conn.try_clone()?,
175 write_notify: self.write_notify.try_clone()?,
176 read_notify: self.read_notify.try_clone()?,
177 pipe_closed: self.pipe_closed.try_clone()?,
178 remote_write_lock: self.remote_write_lock.try_clone()?,
179 local_write_lock: self.local_write_lock.try_clone()?,
180 read_lock: self.read_lock.clone(),
181 is_channel_closed_on_drop: create_true_cell(),
182 send_buffer_size: self.send_buffer_size,
183 })
184 }
185
get_readable_byte_count(&self) -> io::Result<u32>186 fn get_readable_byte_count(&self) -> io::Result<u32> {
187 self.pipe_conn.get_available_byte_count().map_err(|e| {
188 error!("StreamChannel failed to get readable byte count: {}", e);
189 e
190 })
191 }
192
inner_read(&self, buf: &mut [u8]) -> io::Result<usize>193 pub(super) fn inner_read(&self, buf: &mut [u8]) -> io::Result<usize> {
194 // We ensure concurrent read safety by holding a lock for the duration of the method.
195 // (If multiple concurrent readers were permitted, the pipe could be emptied after we decide
196 // that the notifier should be set, leading to an invalid notified/readable state which
197 // could stall readers.)
198 let _read_lock = self.read_lock.lock();
199
200 let res = unsafe {
201 // Safe because no partial reads are possible, and the underlying code bounds the
202 // read by buf's size.
203 self.pipe_conn.read(buf)
204 };
205
206 // The entire goal of this complex section is to avoid the need for shared memory between
207 // each channel end to synchronize the notification state. It is very subtle, modify with
208 // care.
209 loop {
210 // No other thread is reading, so we can find out, without the write lock, whether or
211 // not we need to clear the read notifier. If we don't, then we don't even have to try
212 // acquiring the write lock. This avoids deadlocks where the pipe is full and the write
213 // side blocks on a writing with the lock held. If it looks like we do need to clear
214 // the notifier though, then we have to be sure, so we'll proceed to the next section.
215 let byte_count = self.get_readable_byte_count()?;
216 if byte_count > 0 {
217 // It's always safe to set the read notifier here because we know there is data in the
218 // pipe, and no one else could read it out from under us.
219 self.read_notify.signal().map_err(|e| {
220 io::Error::new(
221 io::ErrorKind::Other,
222 format!("failed to write to read notifier: {:?}", e),
223 )
224 })?;
225
226 // Notifier state has been safely synced.
227 return res;
228 }
229
230 // At this point, there *may* be no data in the pipe, meaning we may want to clear the
231 // notifier. Instead of just trying to acquire the lock outright which could deadlock
232 // with the writing side, we'll try with a timeout. If it fails, we know that the other
233 // side is in the middle of a write, so there either will be data in the pipe soon (a),
234 // or there won't be and we have to clear a spurious notification (b).
235 //
236 // For (a), we can safely return from the read without needing the lock, so we just come
237 // around in the loop to check again, and the loop will exit quickly.
238 //
239 // For (b) we'll return to this point and acquire the lock, as we're just waiting for
240 // the spurious notification to arrive so we can clear it (that code path is very fast),
241 // and the loop will exit.
242 //
243 // If we successfully acquire the lock though, then we can go ahead and clear the
244 // notifier if the pipe is indeed empty, because we are assured that no writes are
245 // happening (we hold the lock). Here, we wait up to 1ms to acquire the lock because
246 // that's a decent balance between avoiding an unnecessary iteration, and minimizing
247 // latency.
248 if let Some(_write_lock) = self.remote_write_lock.try_lock(/* timeout_ms= */ 1) {
249 let byte_count = self.get_readable_byte_count()?;
250 if byte_count > 0 {
251 // Safe because no one else can be reading from the pipe.
252 self.read_notify.signal().map_err(|e| {
253 io::Error::new(
254 io::ErrorKind::Other,
255 format!("failed to write to read notifier: {:?}", e),
256 )
257 })?;
258 } else {
259 // Safe because no other writes can be happening (_lock is held).
260 self.read_notify.reset().map_err(|e| {
261 io::Error::new(
262 io::ErrorKind::Other,
263 format!("failed to reset read notifier: {:?}", e),
264 )
265 })?;
266 }
267
268 // Notifier state has been safely synced.
269 return res;
270 }
271 }
272 }
273
274 /// Exists as a workaround for Tube which does not expect its transport to be mutable,
275 /// even though io::Write requires it.
write_immutable(&self, buf: &[u8]) -> io::Result<usize>276 pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
277 if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
278 && buf.len() > self.send_buffer_size
279 {
280 return Err(io::Error::new(
281 io::ErrorKind::Other,
282 format!(
283 "StreamChannel forbids message mode writes larger than the \
284 default buffer size of {}.",
285 self.send_buffer_size,
286 ),
287 ));
288 }
289
290 let _lock = self.local_write_lock.lock();
291 let res = self.pipe_conn.write(buf);
292
293 // We can always set the write notifier because we know that the reader is in one of the
294 // following states:
295 // 1) a read is running, and it consumes these bytes, so the notification is
296 // unnecessary. That's fine, because the reader will resync the notifier state once
297 // it finishes reading.
298 // 2) a read has completed and is blocked on the lock. The notification state is
299 // already correct, and the read's resync won't change that.
300 if res.is_ok() {
301 self.write_notify.signal().map_err(|e| {
302 io::Error::new(
303 io::ErrorKind::Other,
304 format!("failed to write to read notifier: {:?}", e),
305 )
306 })?;
307 }
308
309 res
310 }
311
312 /// This only works with empty pipes. U.B. will result if used in any other scenario.
from_pipes( pipe_a: PipeConnection, pipe_b: PipeConnection, send_buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>313 pub fn from_pipes(
314 pipe_a: PipeConnection,
315 pipe_b: PipeConnection,
316 send_buffer_size: usize,
317 ) -> Result<(StreamChannel, StreamChannel)> {
318 let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
319 let pipe_closed = Event::new()?;
320
321 let write_lock_a = MultiProcessMutex::new()?;
322 let write_lock_b = MultiProcessMutex::new()?;
323
324 let sock_a = StreamChannel {
325 pipe_conn: pipe_a,
326 write_notify: notify_a_write.try_clone()?,
327 read_notify: notify_b_write.try_clone()?,
328 read_lock: Arc::new(Mutex::new(())),
329 local_write_lock: write_lock_a.try_clone()?,
330 remote_write_lock: write_lock_b.try_clone()?,
331 pipe_closed: pipe_closed.try_clone()?,
332 is_channel_closed_on_drop: create_true_cell(),
333 send_buffer_size,
334 };
335 let sock_b = StreamChannel {
336 pipe_conn: pipe_b,
337 write_notify: notify_b_write,
338 read_notify: notify_a_write,
339 read_lock: Arc::new(Mutex::new(())),
340 local_write_lock: write_lock_b,
341 remote_write_lock: write_lock_a,
342 pipe_closed,
343 is_channel_closed_on_drop: create_true_cell(),
344 send_buffer_size,
345 };
346 Ok((sock_a, sock_b))
347 }
348
349 /// Create a pair with a specific buffer size. Note that this is the only way to send messages
350 /// larger than the default named pipe buffer size.
pair_with_buffer_size( blocking_mode: BlockingMode, framing_mode: FramingMode, buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>351 pub fn pair_with_buffer_size(
352 blocking_mode: BlockingMode,
353 framing_mode: FramingMode,
354 buffer_size: usize,
355 ) -> Result<(StreamChannel, StreamChannel)> {
356 let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
357 &named_pipes::FramingMode::from(framing_mode),
358 &named_pipes::BlockingMode::from(blocking_mode),
359 0,
360 buffer_size,
361 false,
362 )?;
363 Self::from_pipes(pipe_a, pipe_b, buffer_size)
364 }
365 /// Creates a cross platform channel pair.
366 /// On Windows the result is in the form (server, client).
pair( blocking_mode: BlockingMode, framing_mode: FramingMode, ) -> Result<(StreamChannel, StreamChannel)>367 pub fn pair(
368 blocking_mode: BlockingMode,
369 framing_mode: FramingMode,
370 ) -> Result<(StreamChannel, StreamChannel)> {
371 let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
372 &named_pipes::FramingMode::from(framing_mode),
373 &named_pipes::BlockingMode::from(blocking_mode),
374 0,
375 DEFAULT_BUFFER_SIZE,
376 false,
377 )?;
378 Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
379 }
380
381 /// Blocks until the pipe buffer is empty.
382 /// NOTE: that this will only work for server pipes on Windows.
flush_blocking(&self) -> io::Result<()>383 pub fn flush_blocking(&self) -> io::Result<()> {
384 self.pipe_conn.flush_data_blocking()
385 }
386 }
387
388 impl io::Write for StreamChannel {
write(&mut self, buf: &[u8]) -> io::Result<usize>389 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
390 self.write_immutable(buf)
391 }
flush(&mut self) -> io::Result<()>392 fn flush(&mut self) -> io::Result<()> {
393 // There is no userspace buffering inside crosvm to flush for named pipes. We write
394 // directly to the named pipe using WriteFile.
395 Ok(())
396 }
397 }
398
399 impl AsRawDescriptor for &StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor400 fn as_raw_descriptor(&self) -> RawDescriptor {
401 self.pipe_conn.as_raw_descriptor()
402 }
403 }
404
405 impl ReadNotifier for StreamChannel {
406 /// Returns a RawDescriptor that can be polled for reads using WaitContext.
get_read_notifier(&self) -> &dyn AsRawDescriptor407 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
408 &self.read_notify
409 }
410 }
411
412 impl CloseNotifier for StreamChannel {
get_close_notifier(&self) -> &dyn AsRawDescriptor413 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
414 &self.pipe_closed
415 }
416 }
417
418 impl io::Read for StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>419 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
420 self.inner_read(buf)
421 }
422 }
423
424 impl io::Read for &StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>425 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
426 self.inner_read(buf)
427 }
428 }
429
430 impl AsRawDescriptor for StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor431 fn as_raw_descriptor(&self) -> RawDescriptor {
432 (&self).as_raw_descriptor()
433 }
434 }
435
436 #[cfg(test)]
437 mod test {
438 use std::io::Read;
439 use std::io::Write;
440 use std::time::Duration;
441
442 use super::super::EventContext;
443 use super::super::EventTrigger;
444 use super::*;
445 use crate::EventToken;
446 use crate::ReadNotifier;
447
448 #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
449 enum Token {
450 ReceivedData,
451 }
452
453 const EVENT_WAIT_TIME: Duration = Duration::from_secs(10);
454
455 #[test]
test_read_notifies_multiple_writes()456 fn test_read_notifies_multiple_writes() {
457 let (mut sender, mut receiver) =
458 StreamChannel::pair(BlockingMode::Blocking, FramingMode::Byte).unwrap();
459 sender.write_all(&[1, 2]).unwrap();
460
461 // Wait for the write to arrive.
462 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
463 receiver.get_read_notifier(),
464 Token::ReceivedData,
465 )])
466 .unwrap();
467 assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
468
469 // Read just one byte. This leaves another byte in the pipe.
470 let mut recv_buffer = [0u8; 1];
471 let size = receiver.read(&mut recv_buffer).unwrap();
472 assert_eq!(size, 1);
473 assert_eq!(recv_buffer[0], 1);
474
475 // The notifier should still be set, because the pipe has data.
476 assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
477 let size = receiver.read(&mut recv_buffer).unwrap();
478 assert_eq!(size, 1);
479 assert_eq!(recv_buffer[0], 2);
480 }
481
482 #[test]
test_blocked_writer_wont_deadlock()483 fn test_blocked_writer_wont_deadlock() {
484 let (mut writer, mut reader) =
485 StreamChannel::pair_with_buffer_size(BlockingMode::Blocking, FramingMode::Byte, 100)
486 .unwrap();
487 const NUM_OPS: usize = 100;
488
489 // We set the buffer size to 100 bytes. It seems that we must exceed that buffer size by
490 // 100x before we run into a blocking write, so that's what we do here. This makes sense
491 // to a degree because the docs suggest that some automatic expansion of a pipe's buffers
492 // is supposed to be handled by the kernel.
493 let writer = std::thread::spawn(move || {
494 let buf = [0u8; 100];
495 for _ in 0..NUM_OPS {
496 assert_eq!(writer.write(&buf).unwrap(), buf.len());
497 }
498 writer
499 });
500
501 // The test passes if the reader can read (this used to deadlock).
502 let mut buf = [0u8; 100];
503 for _ in 0..NUM_OPS {
504 assert_eq!(reader.read(&mut buf).unwrap(), buf.len());
505 }
506
507 // Writer must exit cleanly.
508 writer.join().unwrap();
509 }
510
511 #[test]
test_non_blocking_pair()512 fn test_non_blocking_pair() {
513 let (mut sender, mut receiver) =
514 StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
515
516 sender.write_all(&[75, 77, 54, 82, 76, 65]).unwrap();
517
518 // Wait for the data to arrive.
519 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
520 receiver.get_read_notifier(),
521 Token::ReceivedData,
522 )])
523 .unwrap();
524 let events = event_ctx.wait().unwrap();
525 let tokens: Vec<Token> = events
526 .iter()
527 .filter(|e| e.is_readable)
528 .map(|e| e.token)
529 .collect();
530 assert_eq!(tokens, vec! {Token::ReceivedData});
531
532 // Smaller than what we sent so we get multiple chunks
533 let mut recv_buffer: [u8; 4] = [0; 4];
534
535 let mut size = receiver.read(&mut recv_buffer).unwrap();
536 assert_eq!(size, 4);
537 assert_eq!(recv_buffer, [75, 77, 54, 82]);
538
539 size = receiver.read(&mut recv_buffer).unwrap();
540 assert_eq!(size, 2);
541 assert_eq!(recv_buffer[0..2], [76, 65]);
542
543 // Now that we've polled for & received all data, polling again should show no events.
544 assert_eq!(
545 event_ctx
546 .wait_timeout(std::time::Duration::new(0, 0))
547 .unwrap()
548 .len(),
549 0
550 );
551 }
552
553 #[test]
test_non_blocking_pair_error_no_data()554 fn test_non_blocking_pair_error_no_data() {
555 let (mut sender, mut receiver) =
556 StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
557 receiver
558 .set_nonblocking(true)
559 .expect("Failed to set receiver to nonblocking mode.");
560
561 sender.write_all(&[75, 77]).unwrap();
562
563 // Wait for the data to arrive.
564 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
565 receiver.get_read_notifier(),
566 Token::ReceivedData,
567 )])
568 .unwrap();
569 let events = event_ctx.wait().unwrap();
570 let tokens: Vec<Token> = events
571 .iter()
572 .filter(|e| e.is_readable)
573 .map(|e| e.token)
574 .collect();
575 assert_eq!(tokens, vec! {Token::ReceivedData});
576
577 // We only read 2 bytes, even though we requested 4 bytes.
578 let mut recv_buffer: [u8; 4] = [0; 4];
579 let size = receiver.read(&mut recv_buffer).unwrap();
580 assert_eq!(size, 2);
581 assert_eq!(recv_buffer, [75, 77, 00, 00]);
582
583 // Further reads should encounter an error since there is no available data and this is a
584 // non blocking pipe.
585 assert!(receiver.read(&mut recv_buffer).is_err());
586 }
587 }
588