• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Legacy console device that uses a polling thread. This is kept because it is still used by
6 //! Windows ; outside of this use-case, please use [[asynchronous::AsyncConsole]] instead.
7 
8 #[cfg(unix)]
9 pub mod asynchronous;
10 mod sys;
11 
12 use std::collections::VecDeque;
13 use std::io;
14 use std::io::Read;
15 use std::io::Write;
16 use std::ops::DerefMut;
17 use std::result;
18 use std::sync::Arc;
19 use std::thread;
20 
21 use anyhow::anyhow;
22 use anyhow::Context;
23 use base::error;
24 use base::AsRawDescriptor;
25 use base::Descriptor;
26 use base::Event;
27 use base::EventToken;
28 use base::RawDescriptor;
29 use base::WaitContext;
30 use base::WorkerThread;
31 use data_model::Le16;
32 use data_model::Le32;
33 use hypervisor::ProtectionType;
34 use remain::sorted;
35 use sync::Mutex;
36 use thiserror::Error as ThisError;
37 use vm_memory::GuestMemory;
38 use zerocopy::AsBytes;
39 use zerocopy::FromBytes;
40 
41 use crate::virtio::base_features;
42 use crate::virtio::copy_config;
43 use crate::virtio::DeviceType;
44 use crate::virtio::Interrupt;
45 use crate::virtio::Queue;
46 use crate::virtio::Reader;
47 use crate::virtio::SignalableInterrupt;
48 use crate::virtio::VirtioDevice;
49 use crate::virtio::Writer;
50 use crate::Suspendable;
51 
52 pub(crate) const QUEUE_SIZE: u16 = 256;
53 
54 // For now, just implement port 0 (receiveq and transmitq).
55 // If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed.
56 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE];
57 
58 #[sorted]
59 #[derive(ThisError, Debug)]
60 pub enum ConsoleError {
61     /// There are no more available descriptors to receive into
62     #[error("no rx descriptors available")]
63     RxDescriptorsExhausted,
64 }
65 
66 #[derive(Copy, Clone, Debug, Default, AsBytes, FromBytes)]
67 #[repr(C)]
68 pub struct virtio_console_config {
69     pub cols: Le16,
70     pub rows: Le16,
71     pub max_nr_ports: Le32,
72     pub emerg_wr: Le32,
73 }
74 
75 /// Checks for input from `buffer` and transfers it to the receive queue, if any.
76 ///
77 /// # Arguments
78 ///
79 /// * `mem` - The GuestMemory to write the data into
80 /// * `interrupt` - SignalableInterrupt used to signal that the queue has been used
81 /// * `buffer` - Ring buffer providing data to put into the guest
82 /// * `receive_queue` - The receive virtio Queue
handle_input<I: SignalableInterrupt>( mem: &GuestMemory, interrupt: &I, buffer: &mut VecDeque<u8>, receive_queue: &mut Queue, ) -> result::Result<(), ConsoleError>83 fn handle_input<I: SignalableInterrupt>(
84     mem: &GuestMemory,
85     interrupt: &I,
86     buffer: &mut VecDeque<u8>,
87     receive_queue: &mut Queue,
88 ) -> result::Result<(), ConsoleError> {
89     loop {
90         let desc = receive_queue
91             .peek(mem)
92             .ok_or(ConsoleError::RxDescriptorsExhausted)?;
93         let desc_index = desc.index;
94         // TODO(morg): Handle extra error cases as Err(ConsoleError) instead of just returning.
95         let mut writer = match Writer::new(mem.clone(), desc) {
96             Ok(w) => w,
97             Err(e) => {
98                 error!("console: failed to create Writer: {}", e);
99                 return Ok(());
100             }
101         };
102 
103         while writer.available_bytes() > 0 && !buffer.is_empty() {
104             let (buffer_front, buffer_back) = buffer.as_slices();
105             let buffer_chunk = if !buffer_front.is_empty() {
106                 buffer_front
107             } else {
108                 buffer_back
109             };
110             let written = writer.write(buffer_chunk).unwrap();
111             drop(buffer.drain(..written));
112         }
113 
114         let bytes_written = writer.bytes_written() as u32;
115 
116         if bytes_written > 0 {
117             receive_queue.pop_peeked(mem);
118             receive_queue.add_used(mem, desc_index, bytes_written);
119             receive_queue.trigger_interrupt(mem, interrupt);
120         }
121 
122         if bytes_written == 0 {
123             return Ok(());
124         }
125     }
126 }
127 
128 /// Writes the available data from the reader into the given output queue.
129 ///
130 /// # Arguments
131 ///
132 /// * `reader` - The Reader with the data we want to write.
133 /// * `output` - The output sink we are going to write the data to.
process_transmit_request(mut reader: Reader, output: &mut dyn io::Write) -> io::Result<()>134 fn process_transmit_request(mut reader: Reader, output: &mut dyn io::Write) -> io::Result<()> {
135     let len = reader.available_bytes();
136     let mut data = vec![0u8; len];
137     reader.read_exact(&mut data)?;
138     output.write_all(&data)?;
139     output.flush()?;
140     Ok(())
141 }
142 
143 /// Processes the data taken from the given transmit queue into the output sink.
144 ///
145 /// # Arguments
146 ///
147 /// * `mem` - The GuestMemory to take the data from
148 /// * `interrupt` - SignalableInterrupt used to signal (if required) that the queue has been used
149 /// * `transmit_queue` - The transmit virtio Queue
150 /// * `output` - The output sink we are going to write the data into
process_transmit_queue<I: SignalableInterrupt>( mem: &GuestMemory, interrupt: &I, transmit_queue: &mut Queue, output: &mut dyn io::Write, )151 fn process_transmit_queue<I: SignalableInterrupt>(
152     mem: &GuestMemory,
153     interrupt: &I,
154     transmit_queue: &mut Queue,
155     output: &mut dyn io::Write,
156 ) {
157     let mut needs_interrupt = false;
158     while let Some(avail_desc) = transmit_queue.pop(mem) {
159         let desc_index = avail_desc.index;
160 
161         match Reader::new(mem.clone(), avail_desc) {
162             Ok(reader) => process_transmit_request(reader, output)
163                 .unwrap_or_else(|e| error!("console: process_transmit_request failed: {}", e)),
164             Err(e) => {
165                 error!("console: failed to create reader: {}", e);
166             }
167         };
168 
169         transmit_queue.add_used(mem, desc_index, 0);
170         needs_interrupt = true;
171     }
172 
173     if needs_interrupt {
174         transmit_queue.trigger_interrupt(mem, interrupt);
175     }
176 }
177 
178 struct Worker {
179     mem: GuestMemory,
180     interrupt: Interrupt,
181     input: Option<Arc<Mutex<VecDeque<u8>>>>,
182     output: Box<dyn io::Write + Send>,
183     kill_evt: Event,
184     in_avail_evt: Event,
185     receive_queue: Queue,
186     receive_evt: Event,
187     transmit_queue: Queue,
188     transmit_evt: Event,
189 }
190 
191 /// Starts a thread that reads rx and sends the input back via the returned buffer.
192 ///
193 /// The caller should listen on `in_avail_evt` for events. When `in_avail_evt` signals that data
194 /// is available, the caller should lock the returned `Mutex` and read data out of the inner
195 /// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed.
196 ///
197 /// # Arguments
198 ///
199 /// * `rx` - Data source that the reader thread will wait on to send data back to the buffer
200 /// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer
spawn_input_thread( mut rx: crate::serial::sys::InStreamType, in_avail_evt: &Event, ) -> Option<Arc<Mutex<VecDeque<u8>>>>201 fn spawn_input_thread(
202     mut rx: crate::serial::sys::InStreamType,
203     in_avail_evt: &Event,
204 ) -> Option<Arc<Mutex<VecDeque<u8>>>> {
205     let buffer = Arc::new(Mutex::new(VecDeque::<u8>::new()));
206     let buffer_cloned = buffer.clone();
207 
208     let thread_in_avail_evt = match in_avail_evt.try_clone() {
209         Ok(evt) => evt,
210         Err(e) => {
211             error!("failed to clone in_avail_evt: {}", e);
212             return None;
213         }
214     };
215 
216     // The input thread runs in detached mode.
217     let res = thread::Builder::new()
218         .name("console_input".to_string())
219         .spawn(move || {
220             let mut rx_buf = [0u8; 1 << 12];
221             loop {
222                 match rx.read(&mut rx_buf) {
223                     Ok(0) => break, // Assume the stream of input has ended.
224                     Ok(size) => {
225                         buffer.lock().extend(&rx_buf[0..size]);
226                         thread_in_avail_evt.signal().unwrap();
227                     }
228                     Err(e) => {
229                         // Being interrupted is not an error, but everything else is.
230                         if sys::is_a_fatal_input_error(&e) {
231                             error!(
232                                 "failed to read for bytes to queue into console device: {}",
233                                 e
234                             );
235                             break;
236                         }
237                     }
238                 }
239 
240                 // Depending on the platform, a short sleep is needed here (ie. Windows).
241                 sys::read_delay_if_needed();
242             }
243         });
244     if let Err(e) = res {
245         error!("failed to spawn input thread: {}", e);
246         return None;
247     }
248     Some(buffer_cloned)
249 }
250 
251 impl Worker {
run(&mut self)252     fn run(&mut self) {
253         #[derive(EventToken)]
254         enum Token {
255             ReceiveQueueAvailable,
256             TransmitQueueAvailable,
257             InputAvailable,
258             InterruptResample,
259             Kill,
260         }
261 
262         let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
263             (&self.transmit_evt, Token::TransmitQueueAvailable),
264             (&self.receive_evt, Token::ReceiveQueueAvailable),
265             (&self.in_avail_evt, Token::InputAvailable),
266             (&self.kill_evt, Token::Kill),
267         ]) {
268             Ok(pc) => pc,
269             Err(e) => {
270                 error!("failed creating WaitContext: {}", e);
271                 return;
272             }
273         };
274         if let Some(resample_evt) = self.interrupt.get_resample_evt() {
275             if wait_ctx
276                 .add(resample_evt, Token::InterruptResample)
277                 .is_err()
278             {
279                 error!("failed adding resample event to WaitContext.");
280                 return;
281             }
282         }
283 
284         'wait: loop {
285             let events = match wait_ctx.wait() {
286                 Ok(v) => v,
287                 Err(e) => {
288                     error!("failed polling for events: {}", e);
289                     break;
290                 }
291             };
292 
293             for event in events.iter().filter(|e| e.is_readable) {
294                 match event.token {
295                     Token::TransmitQueueAvailable => {
296                         if let Err(e) = self.transmit_evt.wait() {
297                             error!("failed reading transmit queue Event: {}", e);
298                             break 'wait;
299                         }
300                         process_transmit_queue(
301                             &self.mem,
302                             &self.interrupt,
303                             &mut self.transmit_queue,
304                             &mut self.output,
305                         );
306                     }
307                     Token::ReceiveQueueAvailable => {
308                         if let Err(e) = self.receive_evt.wait() {
309                             error!("failed reading receive queue Event: {}", e);
310                             break 'wait;
311                         }
312                         if let Some(in_buf_ref) = self.input.as_ref() {
313                             match handle_input(
314                                 &self.mem,
315                                 &self.interrupt,
316                                 in_buf_ref.lock().deref_mut(),
317                                 &mut self.receive_queue,
318                             ) {
319                                 Ok(()) => {}
320                                 // Console errors are no-ops, so just continue.
321                                 Err(_) => {
322                                     continue;
323                                 }
324                             }
325                         }
326                     }
327                     Token::InputAvailable => {
328                         if let Err(e) = self.in_avail_evt.wait() {
329                             error!("failed reading in_avail_evt: {}", e);
330                             break 'wait;
331                         }
332                         if let Some(in_buf_ref) = self.input.as_ref() {
333                             match handle_input(
334                                 &self.mem,
335                                 &self.interrupt,
336                                 in_buf_ref.lock().deref_mut(),
337                                 &mut self.receive_queue,
338                             ) {
339                                 Ok(()) => {}
340                                 // Console errors are no-ops, so just continue.
341                                 Err(_) => {
342                                     continue;
343                                 }
344                             }
345                         }
346                     }
347                     Token::InterruptResample => {
348                         self.interrupt.interrupt_resample();
349                     }
350                     Token::Kill => break 'wait,
351                 }
352             }
353         }
354     }
355 }
356 
357 enum ConsoleInput {
358     FromRead(crate::serial::sys::InStreamType),
359     FromThread(Arc<Mutex<VecDeque<u8>>>),
360 }
361 
362 /// Virtio console device.
363 pub struct Console {
364     base_features: u64,
365     in_avail_evt: Option<Event>,
366     worker_thread: Option<WorkerThread<Worker>>,
367     input: Option<ConsoleInput>,
368     output: Option<Box<dyn io::Write + Send>>,
369     keep_descriptors: Vec<Descriptor>,
370 }
371 
372 impl Console {
new( protection_type: ProtectionType, input: Option<ConsoleInput>, output: Option<Box<dyn io::Write + Send>>, keep_rds: Vec<RawDescriptor>, ) -> Console373     fn new(
374         protection_type: ProtectionType,
375         input: Option<ConsoleInput>,
376         output: Option<Box<dyn io::Write + Send>>,
377         keep_rds: Vec<RawDescriptor>,
378     ) -> Console {
379         Console {
380             base_features: base_features(protection_type),
381             in_avail_evt: None,
382             worker_thread: None,
383             input,
384             output,
385             keep_descriptors: keep_rds.iter().map(|rd| Descriptor(*rd)).collect(),
386         }
387     }
388 }
389 
390 impl VirtioDevice for Console {
keep_rds(&self) -> Vec<RawDescriptor>391     fn keep_rds(&self) -> Vec<RawDescriptor> {
392         // return the raw descriptors as opposed to descriptor.
393         self.keep_descriptors
394             .iter()
395             .map(|descr| descr.as_raw_descriptor())
396             .collect()
397     }
398 
features(&self) -> u64399     fn features(&self) -> u64 {
400         self.base_features
401     }
402 
device_type(&self) -> DeviceType403     fn device_type(&self) -> DeviceType {
404         DeviceType::Console
405     }
406 
queue_max_sizes(&self) -> &[u16]407     fn queue_max_sizes(&self) -> &[u16] {
408         QUEUE_SIZES
409     }
410 
read_config(&self, offset: u64, data: &mut [u8])411     fn read_config(&self, offset: u64, data: &mut [u8]) {
412         let config = virtio_console_config {
413             max_nr_ports: 1.into(),
414             ..Default::default()
415         };
416         copy_config(data, 0, config.as_bytes(), offset);
417     }
418 
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: Vec<(Queue, Event)>, ) -> anyhow::Result<()>419     fn activate(
420         &mut self,
421         mem: GuestMemory,
422         interrupt: Interrupt,
423         mut queues: Vec<(Queue, Event)>,
424     ) -> anyhow::Result<()> {
425         if queues.len() < 2 {
426             return Err(anyhow!("expected 2 queues, got {}", queues.len()));
427         }
428 
429         let (receive_queue, receive_evt) = queues.remove(0);
430         let (transmit_queue, transmit_evt) = queues.remove(0);
431 
432         if self.in_avail_evt.is_none() {
433             self.in_avail_evt = Some(Event::new().context("failed creating Event")?);
434         }
435         let in_avail_evt = self
436             .in_avail_evt
437             .as_ref()
438             .unwrap()
439             .try_clone()
440             .context("failed creating input available Event pair")?;
441 
442         // Spawn a separate thread to poll self.input.
443         // A thread is used because io::Read only provides a blocking interface, and there is no
444         // generic way to add an io::Read instance to a poll context (it may not be backed by a file
445         // descriptor).  Moving the blocking read call to a separate thread and sending data back to
446         // the main worker thread with an event for notification bridges this gap.
447         let input = match self.input.take() {
448             Some(ConsoleInput::FromRead(read)) => {
449                 let buffer = spawn_input_thread(read, self.in_avail_evt.as_ref().unwrap());
450                 if buffer.is_none() {
451                     return Err(anyhow!("failed creating input thread"));
452                 };
453                 buffer
454             }
455             Some(ConsoleInput::FromThread(buffer)) => Some(buffer),
456             None => None,
457         };
458         let output = self.output.take().unwrap_or_else(|| Box::new(io::sink()));
459 
460         self.worker_thread = Some(WorkerThread::start("v_console", move |kill_evt| {
461             let mut worker = Worker {
462                 mem,
463                 interrupt,
464                 input,
465                 output,
466                 in_avail_evt,
467                 kill_evt,
468                 // Device -> driver
469                 receive_queue,
470                 receive_evt,
471                 // Driver -> device
472                 transmit_queue,
473                 transmit_evt,
474             };
475             worker.run();
476             worker
477         }));
478         Ok(())
479     }
480 
reset(&mut self) -> bool481     fn reset(&mut self) -> bool {
482         if let Some(worker_thread) = self.worker_thread.take() {
483             let worker = worker_thread.stop();
484             self.input = worker.input.map(ConsoleInput::FromThread);
485             self.output = Some(worker.output);
486             return true;
487         }
488         false
489     }
490 }
491 
492 impl Suspendable for Console {}
493