• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::VecDeque;
6 use std::io;
7 use std::sync::Arc;
8 use std::thread;
9 use std::time::Duration;
10 
11 use base::error;
12 use base::named_pipes;
13 use base::Event;
14 use base::FileSync;
15 use base::RawDescriptor;
16 use base::WorkerThread;
17 use sync::Mutex;
18 
19 use crate::serial_device::SerialInput;
20 use crate::serial_device::SerialOptions;
21 use crate::virtio::console::Console;
22 use crate::virtio::ProtectionType;
23 use crate::SerialDevice;
24 
25 impl SerialDevice for Console {
new( protection_type: ProtectionType, _event: Event, _input: Option<Box<dyn SerialInput>>, out: Option<Box<dyn io::Write + Send>>, _sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, keep_rds: Vec<RawDescriptor>, ) -> Console26     fn new(
27         protection_type: ProtectionType,
28         _event: Event,
29         _input: Option<Box<dyn SerialInput>>,
30         out: Option<Box<dyn io::Write + Send>>,
31         // TODO(b/171331752): connect filesync functionality.
32         _sync: Option<Box<dyn FileSync + Send>>,
33         options: SerialOptions,
34         keep_rds: Vec<RawDescriptor>,
35     ) -> Console {
36         Console::new(protection_type, None, out, keep_rds, options.pci_address)
37     }
38 
39     /// Constructs a console with named pipe as input/output connections.
new_with_pipe( protection_type: ProtectionType, _interrupt_evt: Event, pipe_in: named_pipes::PipeConnection, pipe_out: named_pipes::PipeConnection, options: SerialOptions, keep_rds: Vec<RawDescriptor>, ) -> Console40     fn new_with_pipe(
41         protection_type: ProtectionType,
42         _interrupt_evt: Event,
43         pipe_in: named_pipes::PipeConnection,
44         pipe_out: named_pipes::PipeConnection,
45         options: SerialOptions,
46         keep_rds: Vec<RawDescriptor>,
47     ) -> Console {
48         Console::new(
49             protection_type,
50             Some(Box::new(pipe_in)),
51             Some(Box::new(pipe_out)),
52             keep_rds,
53             options.pci_address,
54         )
55     }
56 }
57 
58 /// Platform-specific function to add a delay for reading rx.
59 ///
60 /// We can't issue blocking reads here and overlapped I/O is
61 /// incompatible with the call site where writes to this pipe are being
62 /// made, so instead we issue a small wait to prevent us from hogging
63 /// the CPU. This 20ms delay while typing doesn't seem to be noticeable.
read_delay_if_needed()64 fn read_delay_if_needed() {
65     thread::sleep(Duration::from_millis(20));
66 }
67 
is_a_fatal_input_error(e: &io::Error) -> bool68 fn is_a_fatal_input_error(e: &io::Error) -> bool {
69     !matches!(
70         e.kind(),
71         // Being interrupted is not an error.
72         io::ErrorKind::Interrupted |
73         // Ignore two kinds of errors on Windows.
74         //   - ErrorKind::Other when reading a named pipe before a client connects.
75         //   - ErrorKind::WouldBlock when reading a named pipe (we don't use non-blocking I/O).
76         io::ErrorKind::Other | io::ErrorKind::WouldBlock
77     )
78     // Everything else is a fatal input error.
79 }
80 
81 /// Starts a thread that reads rx and sends the input back via the returned buffer.
82 ///
83 /// The caller should listen on `in_avail_evt` for events. When `in_avail_evt` signals that data
84 /// is available, the caller should lock the returned `Mutex` and read data out of the inner
85 /// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed.
86 ///
87 /// # Arguments
88 ///
89 /// * `rx` - Data source that the reader thread will wait on to send data back to the buffer
90 /// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer
spawn_input_thread( mut rx: Box<named_pipes::PipeConnection>, in_avail_evt: &Event, input_buffer: VecDeque<u8>, ) -> ( Arc<Mutex<VecDeque<u8>>>, WorkerThread<Box<named_pipes::PipeConnection>>, )91 pub(in crate::virtio::console) fn spawn_input_thread(
92     mut rx: Box<named_pipes::PipeConnection>,
93     in_avail_evt: &Event,
94     input_buffer: VecDeque<u8>,
95 ) -> (
96     Arc<Mutex<VecDeque<u8>>>,
97     WorkerThread<Box<named_pipes::PipeConnection>>,
98 ) {
99     let buffer = Arc::new(Mutex::new(input_buffer));
100     let buffer_cloned = buffer.clone();
101 
102     let thread_in_avail_evt = in_avail_evt
103         .try_clone()
104         .expect("failed to clone in_avail_evt");
105 
106     let res = WorkerThread::start("v_console_input", move |kill_evt| {
107         // If there is already data, signal immediately.
108         if !buffer.lock().is_empty() {
109             thread_in_avail_evt.signal().unwrap();
110         }
111 
112         match rx.wait_for_client_connection_overlapped_blocking(&kill_evt) {
113             Err(e) if e.kind() == io::ErrorKind::Interrupted => return rx,
114             Err(e) => panic!("failed to wait for client: {}", e),
115             Ok(()) => (),
116         }
117 
118         read_input(&mut rx, &thread_in_avail_evt, buffer, kill_evt);
119         rx
120     });
121     (buffer_cloned, res)
122 }
123 
read_input( rx: &mut Box<named_pipes::PipeConnection>, thread_in_avail_evt: &Event, buffer: Arc<Mutex<VecDeque<u8>>>, kill_evt: Event, )124 pub(in crate::virtio::console) fn read_input(
125     rx: &mut Box<named_pipes::PipeConnection>,
126     thread_in_avail_evt: &Event,
127     buffer: Arc<Mutex<VecDeque<u8>>>,
128     kill_evt: Event,
129 ) {
130     let buffer_max_size = 1 << 12;
131     let mut rx_buf = Vec::with_capacity(buffer_max_size);
132 
133     let mut read_overlapped =
134         named_pipes::OverlappedWrapper::new(true).expect("failed to create OverlappedWrapper");
135     loop {
136         let size = rx
137             .get_available_byte_count()
138             .expect("failed to get available byte count") as usize;
139         // Clamp to [1, buffer capacity]. Need to read at least one byte so that the read call
140         // blocks until data is available.
141         let size = std::cmp::min(std::cmp::max(size, 1), buffer_max_size);
142         rx_buf.resize(size, Default::default());
143         let res = rx.read_overlapped_blocking(&mut rx_buf, &mut read_overlapped, &kill_evt);
144 
145         match res {
146             Ok(()) => {
147                 buffer.lock().extend(&rx_buf[..]);
148                 thread_in_avail_evt.signal().unwrap();
149             }
150             Err(e) if e.kind() == io::ErrorKind::Interrupted => {
151                 // Exit event triggered
152                 break;
153             }
154             Err(e) => {
155                 // Being interrupted is not an error, but everything else is.
156                 if is_a_fatal_input_error(&e) {
157                     error!(
158                         "failed to read for bytes to queue into console device: {}",
159                         e
160                     );
161                     break;
162                 }
163             }
164         }
165 
166         // Depending on the platform, a short sleep is needed here (ie. Windows).
167         read_delay_if_needed();
168     }
169 }
170