• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Legacy console device that uses a polling thread. This is kept because it is still used by
6 //! Windows ; outside of this use-case, please use [[asynchronous::AsyncConsole]] instead.
7 
8 pub mod asynchronous;
9 mod multiport;
10 mod sys;
11 
12 use std::collections::BTreeMap;
13 use std::collections::VecDeque;
14 use std::io;
15 use std::io::Read;
16 use std::io::Write;
17 use std::ops::DerefMut;
18 use std::result;
19 use std::sync::Arc;
20 
21 use anyhow::anyhow;
22 use anyhow::Context;
23 use base::error;
24 use base::AsRawDescriptor;
25 use base::Descriptor;
26 use base::Event;
27 use base::EventToken;
28 use base::RawDescriptor;
29 #[cfg(windows)]
30 use base::ReadNotifier;
31 use base::WaitContext;
32 use base::WorkerThread;
33 use data_model::Le16;
34 use data_model::Le32;
35 use hypervisor::ProtectionType;
36 use remain::sorted;
37 use serde::Deserialize;
38 use serde::Serialize;
39 use sync::Mutex;
40 use thiserror::Error as ThisError;
41 use vm_memory::GuestMemory;
42 use zerocopy::AsBytes;
43 use zerocopy::FromBytes;
44 use zerocopy::FromZeroes;
45 
46 use crate::serial::sys::InStreamType;
47 use crate::virtio::base_features;
48 use crate::virtio::copy_config;
49 use crate::virtio::DeviceType;
50 use crate::virtio::Interrupt;
51 use crate::virtio::Queue;
52 use crate::virtio::Reader;
53 use crate::virtio::VirtioDevice;
54 use crate::PciAddress;
55 
56 pub(crate) const QUEUE_SIZE: u16 = 256;
57 
58 // For now, just implement port 0 (receiveq and transmitq).
59 // If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed.
60 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE];
61 
62 #[sorted]
63 #[derive(ThisError, Debug)]
64 pub enum ConsoleError {
65     /// There are no more available descriptors to receive into
66     #[error("no rx descriptors available")]
67     RxDescriptorsExhausted,
68 }
69 
70 #[derive(Copy, Clone, Debug, Default, AsBytes, FromZeroes, FromBytes)]
71 #[repr(C)]
72 pub struct virtio_console_config {
73     pub cols: Le16,
74     pub rows: Le16,
75     pub max_nr_ports: Le32,
76     pub emerg_wr: Le32,
77 }
78 
79 /// Checks for input from `buffer` and transfers it to the receive queue, if any.
80 ///
81 /// # Arguments
82 ///
83 /// * `interrupt` - Interrupt used to signal that the queue has been used
84 /// * `buffer` - Ring buffer providing data to put into the guest
85 /// * `receive_queue` - The receive virtio Queue
handle_input( interrupt: &Interrupt, buffer: &mut VecDeque<u8>, receive_queue: &Arc<Mutex<Queue>>, ) -> result::Result<(), ConsoleError>86 fn handle_input(
87     interrupt: &Interrupt,
88     buffer: &mut VecDeque<u8>,
89     receive_queue: &Arc<Mutex<Queue>>,
90 ) -> result::Result<(), ConsoleError> {
91     let mut receive_queue = receive_queue
92         .try_lock()
93         .expect("Lock should not be unavailable");
94     loop {
95         let mut desc = receive_queue
96             .peek()
97             .ok_or(ConsoleError::RxDescriptorsExhausted)?;
98 
99         let writer = &mut desc.writer;
100         while writer.available_bytes() > 0 && !buffer.is_empty() {
101             let (buffer_front, buffer_back) = buffer.as_slices();
102             let buffer_chunk = if !buffer_front.is_empty() {
103                 buffer_front
104             } else {
105                 buffer_back
106             };
107             let written = writer.write(buffer_chunk).unwrap();
108             drop(buffer.drain(..written));
109         }
110 
111         let bytes_written = writer.bytes_written() as u32;
112 
113         if bytes_written > 0 {
114             let desc = desc.pop();
115             receive_queue.add_used(desc, bytes_written);
116             receive_queue.trigger_interrupt(interrupt);
117         }
118 
119         if bytes_written == 0 {
120             return Ok(());
121         }
122     }
123 }
124 
125 /// Writes the available data from the reader into the given output queue.
126 ///
127 /// # Arguments
128 ///
129 /// * `reader` - The Reader with the data we want to write.
130 /// * `output` - The output sink we are going to write the data to.
process_transmit_request(reader: &mut Reader, output: &mut dyn io::Write) -> io::Result<()>131 fn process_transmit_request(reader: &mut Reader, output: &mut dyn io::Write) -> io::Result<()> {
132     let len = reader.available_bytes();
133     let mut data = vec![0u8; len];
134     reader.read_exact(&mut data)?;
135     output.write_all(&data)?;
136     output.flush()?;
137     Ok(())
138 }
139 
140 /// Processes the data taken from the given transmit queue into the output sink.
141 ///
142 /// # Arguments
143 ///
144 /// * `interrupt` - Interrupt used to signal (if required) that the queue has been used
145 /// * `transmit_queue` - The transmit virtio Queue
146 /// * `output` - The output sink we are going to write the data into
process_transmit_queue( interrupt: &Interrupt, transmit_queue: &Arc<Mutex<Queue>>, output: &mut dyn io::Write, )147 fn process_transmit_queue(
148     interrupt: &Interrupt,
149     transmit_queue: &Arc<Mutex<Queue>>,
150     output: &mut dyn io::Write,
151 ) {
152     let mut needs_interrupt = false;
153     let mut transmit_queue = transmit_queue
154         .try_lock()
155         .expect("Lock should not be unavailable");
156     while let Some(mut avail_desc) = transmit_queue.pop() {
157         process_transmit_request(&mut avail_desc.reader, output)
158             .unwrap_or_else(|e| error!("console: process_transmit_request failed: {}", e));
159 
160         transmit_queue.add_used(avail_desc, 0);
161         needs_interrupt = true;
162     }
163 
164     if needs_interrupt {
165         transmit_queue.trigger_interrupt(interrupt);
166     }
167 }
168 
169 struct Worker {
170     interrupt: Interrupt,
171     input: Option<Arc<Mutex<VecDeque<u8>>>>,
172     output: Box<dyn io::Write + Send>,
173     kill_evt: Event,
174     in_avail_evt: Event,
175     receive_queue: Arc<Mutex<Queue>>,
176     transmit_queue: Arc<Mutex<Queue>>,
177 }
178 
179 impl Worker {
run(&mut self) -> anyhow::Result<()>180     fn run(&mut self) -> anyhow::Result<()> {
181         #[derive(EventToken)]
182         enum Token {
183             ReceiveQueueAvailable,
184             TransmitQueueAvailable,
185             InputAvailable,
186             InterruptResample,
187             Kill,
188         }
189 
190         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
191             (
192                 self.transmit_queue.lock().event(),
193                 Token::TransmitQueueAvailable,
194             ),
195             (
196                 self.receive_queue.lock().event(),
197                 Token::ReceiveQueueAvailable,
198             ),
199             (&self.in_avail_evt, Token::InputAvailable),
200             (&self.kill_evt, Token::Kill),
201         ])?;
202         if let Some(resample_evt) = self.interrupt.get_resample_evt() {
203             wait_ctx.add(resample_evt, Token::InterruptResample)?;
204         }
205 
206         let mut running = true;
207         while running {
208             let events = wait_ctx.wait()?;
209 
210             for event in events.iter().filter(|e| e.is_readable) {
211                 match event.token {
212                     Token::TransmitQueueAvailable => {
213                         self.transmit_queue
214                             .lock()
215                             .event()
216                             .wait()
217                             .context("failed reading transmit queue Event")?;
218                         process_transmit_queue(
219                             &self.interrupt,
220                             &self.transmit_queue,
221                             &mut self.output,
222                         );
223                     }
224                     Token::ReceiveQueueAvailable => {
225                         self.receive_queue
226                             .lock()
227                             .event()
228                             .wait()
229                             .context("failed reading receive queue Event")?;
230                         if let Some(in_buf_ref) = self.input.as_ref() {
231                             let _ = handle_input(
232                                 &self.interrupt,
233                                 in_buf_ref.lock().deref_mut(),
234                                 &self.receive_queue,
235                             );
236                         }
237                     }
238                     Token::InputAvailable => {
239                         self.in_avail_evt
240                             .wait()
241                             .context("failed reading in_avail_evt")?;
242                         if let Some(in_buf_ref) = self.input.as_ref() {
243                             let _ = handle_input(
244                                 &self.interrupt,
245                                 in_buf_ref.lock().deref_mut(),
246                                 &self.receive_queue,
247                             );
248                         }
249                     }
250                     Token::InterruptResample => {
251                         self.interrupt.interrupt_resample();
252                     }
253                     Token::Kill => running = false,
254                 }
255             }
256         }
257         Ok(())
258     }
259 }
260 
261 /// Virtio console device.
262 pub struct Console {
263     base_features: u64,
264     in_avail_evt: Event,
265     worker_thread: Option<WorkerThread<Worker>>,
266     input: Option<InStreamType>,
267     output: Option<Box<dyn io::Write + Send>>,
268     keep_descriptors: Vec<Descriptor>,
269     input_thread: Option<WorkerThread<InStreamType>>,
270     // input_buffer is not continuously updated. It holds the state of the buffer when a snapshot
271     // happens, or when a restore is performed. On a fresh startup, it will be empty. On a restore,
272     // it will contain whatever data was remaining in the buffer in the snapshot.
273     input_buffer: VecDeque<u8>,
274     pci_address: Option<PciAddress>,
275 }
276 
277 #[derive(Serialize, Deserialize)]
278 struct ConsoleSnapshot {
279     base_features: u64,
280     input_buffer: VecDeque<u8>,
281 }
282 
283 impl Console {
new( protection_type: ProtectionType, input: Option<InStreamType>, output: Option<Box<dyn io::Write + Send>>, mut keep_rds: Vec<RawDescriptor>, pci_address: Option<PciAddress>, ) -> Console284     fn new(
285         protection_type: ProtectionType,
286         input: Option<InStreamType>,
287         output: Option<Box<dyn io::Write + Send>>,
288         mut keep_rds: Vec<RawDescriptor>,
289         pci_address: Option<PciAddress>,
290     ) -> Console {
291         let in_avail_evt = Event::new().expect("failed creating Event");
292         keep_rds.push(in_avail_evt.as_raw_descriptor());
293         Console {
294             base_features: base_features(protection_type),
295             in_avail_evt,
296             worker_thread: None,
297             input,
298             output,
299             keep_descriptors: keep_rds.iter().map(|rd| Descriptor(*rd)).collect(),
300             input_thread: None,
301             input_buffer: VecDeque::new(),
302             pci_address,
303         }
304     }
305 }
306 
307 impl VirtioDevice for Console {
keep_rds(&self) -> Vec<RawDescriptor>308     fn keep_rds(&self) -> Vec<RawDescriptor> {
309         // return the raw descriptors as opposed to descriptor.
310         self.keep_descriptors
311             .iter()
312             .map(|descr| descr.as_raw_descriptor())
313             .collect()
314     }
315 
features(&self) -> u64316     fn features(&self) -> u64 {
317         self.base_features
318     }
319 
device_type(&self) -> DeviceType320     fn device_type(&self) -> DeviceType {
321         DeviceType::Console
322     }
323 
queue_max_sizes(&self) -> &[u16]324     fn queue_max_sizes(&self) -> &[u16] {
325         QUEUE_SIZES
326     }
327 
read_config(&self, offset: u64, data: &mut [u8])328     fn read_config(&self, offset: u64, data: &mut [u8]) {
329         let config = virtio_console_config {
330             max_nr_ports: 1.into(),
331             ..Default::default()
332         };
333         copy_config(data, 0, config.as_bytes(), offset);
334     }
335 
activate( &mut self, _mem: GuestMemory, interrupt: Interrupt, mut queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>336     fn activate(
337         &mut self,
338         _mem: GuestMemory,
339         interrupt: Interrupt,
340         mut queues: BTreeMap<usize, Queue>,
341     ) -> anyhow::Result<()> {
342         if queues.len() < 2 {
343             return Err(anyhow!("expected 2 queues, got {}", queues.len()));
344         }
345 
346         let receive_queue = queues.remove(&0).unwrap();
347         let transmit_queue = queues.remove(&1).unwrap();
348 
349         let in_avail_evt = self
350             .in_avail_evt
351             .try_clone()
352             .context("failed creating input available Event pair")?;
353 
354         // Spawn a separate thread to poll self.input.
355         // A thread is used because io::Read only provides a blocking interface, and there is no
356         // generic way to add an io::Read instance to a poll context (it may not be backed by a file
357         // descriptor).  Moving the blocking read call to a separate thread and sending data back to
358         // the main worker thread with an event for notification bridges this gap.
359         let input = match self.input.take() {
360             Some(read) => {
361                 let (buffer, thread) = sys::spawn_input_thread(
362                     read,
363                     &self.in_avail_evt,
364                     std::mem::take(&mut self.input_buffer),
365                 );
366                 self.input_thread = Some(thread);
367                 Some(buffer)
368             }
369             None => None,
370         };
371         let output = self.output.take().unwrap_or_else(|| Box::new(io::sink()));
372 
373         self.worker_thread = Some(WorkerThread::start("v_console", move |kill_evt| {
374             let mut worker = Worker {
375                 interrupt,
376                 input,
377                 output,
378                 in_avail_evt,
379                 kill_evt,
380                 // Device -> driver
381                 receive_queue: Arc::new(Mutex::new(receive_queue)),
382                 // Driver -> device
383                 transmit_queue: Arc::new(Mutex::new(transmit_queue)),
384             };
385             if let Err(e) = worker.run() {
386                 error!("console run failure: {:?}", e);
387             };
388             worker
389         }));
390         Ok(())
391     }
392 
pci_address(&self) -> Option<PciAddress>393     fn pci_address(&self) -> Option<PciAddress> {
394         self.pci_address
395     }
396 
reset(&mut self) -> anyhow::Result<()>397     fn reset(&mut self) -> anyhow::Result<()> {
398         if let Some(input_thread) = self.input_thread.take() {
399             self.input = Some(input_thread.stop());
400         }
401         if let Some(worker_thread) = self.worker_thread.take() {
402             let worker = worker_thread.stop();
403             // NOTE: Even though we are reseting the device, it still makes sense to preserve the
404             // pending input bytes that the host sent but the guest hasn't accepted yet.
405             self.input_buffer = worker
406                 .input
407                 .map_or(VecDeque::new(), |arc_mutex| arc_mutex.lock().clone());
408             self.output = Some(worker.output);
409         }
410         Ok(())
411     }
412 
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>413     fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
414         if let Some(input_thread) = self.input_thread.take() {
415             self.input = Some(input_thread.stop());
416         }
417         if let Some(worker_thread) = self.worker_thread.take() {
418             let worker = worker_thread.stop();
419             self.input_buffer = worker
420                 .input
421                 .map_or(VecDeque::new(), |arc_mutex| arc_mutex.lock().clone());
422             self.output = Some(worker.output);
423             let receive_queue = match Arc::try_unwrap(worker.receive_queue) {
424                 Ok(mutex) => mutex.into_inner(),
425                 Err(_) => return Err(anyhow!("failed to retrieve receive queue to sleep device.")),
426             };
427             let transmit_queue = match Arc::try_unwrap(worker.transmit_queue) {
428                 Ok(mutex) => mutex.into_inner(),
429                 Err(_) => {
430                     return Err(anyhow!(
431                         "failed to retrieve transmit queue to sleep device."
432                     ))
433                 }
434             };
435             return Ok(Some(BTreeMap::from([
436                 (0, receive_queue),
437                 (1, transmit_queue),
438             ])));
439         }
440         Ok(None)
441     }
442 
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>443     fn virtio_wake(
444         &mut self,
445         queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
446     ) -> anyhow::Result<()> {
447         match queues_state {
448             None => Ok(()),
449             Some((mem, interrupt, queues)) => {
450                 // TODO(khei): activate is just what we want at the moment, but we should probably
451                 // move it into a "start workers" function to make it obvious that
452                 // it isn't strictly used for activate events.
453                 self.activate(mem, interrupt, queues)?;
454                 Ok(())
455             }
456         }
457     }
458 
virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value>459     fn virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
460         if let Some(read) = self.input.as_mut() {
461             // If the device was not activated yet, we still read the input.
462             // It's fine to do so since the the data is not lost. It will get queued in the
463             // input_buffer and restored. When the device activates, the data will still be
464             // available, and if there's any new data, that new data will get appended.
465             let input_buffer = Arc::new(Mutex::new(std::mem::take(&mut self.input_buffer)));
466 
467             let kill_evt = Event::new().unwrap();
468             let _ = kill_evt.signal();
469             sys::read_input(read, &self.in_avail_evt, input_buffer.clone(), kill_evt);
470             self.input_buffer = std::mem::take(&mut input_buffer.lock());
471         };
472         serde_json::to_value(ConsoleSnapshot {
473             // Snapshot base_features as a safeguard when restoring the console device. Saving this
474             // info allows us to validate that the proper config was used for the console.
475             base_features: self.base_features,
476             input_buffer: self.input_buffer.clone(),
477         })
478         .context("failed to snapshot virtio console")
479     }
480 
virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()>481     fn virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> {
482         let deser: ConsoleSnapshot =
483             serde_json::from_value(data).context("failed to deserialize virtio console")?;
484         anyhow::ensure!(
485             self.base_features == deser.base_features,
486             "Virtio console incorrect base features for restore:\n Expected: {}, Actual: {}",
487             self.base_features,
488             deser.base_features,
489         );
490         self.input_buffer = deser.input_buffer;
491         Ok(())
492     }
493 }
494 
495 #[cfg(test)]
496 mod tests {
497     #[cfg(windows)]
498     use base::windows::named_pipes;
499     use tempfile::tempfile;
500     use vm_memory::GuestAddress;
501 
502     use super::*;
503     use crate::suspendable_virtio_tests;
504 
505     struct ConsoleContext {
506         #[cfg(windows)]
507         input_peer: named_pipes::PipeConnection,
508     }
509 
modify_device(_context: &mut ConsoleContext, b: &mut Console)510     fn modify_device(_context: &mut ConsoleContext, b: &mut Console) {
511         b.input_buffer.push_back(0);
512     }
513 
create_device() -> (ConsoleContext, Console)514     fn create_device() -> (ConsoleContext, Console) {
515         #[cfg(any(target_os = "android", target_os = "linux"))]
516         let (input, context) = (Box::new(tempfile().unwrap()), ConsoleContext {});
517         #[cfg(windows)]
518         let (input, context) = {
519             let (x, y) = named_pipes::pair(
520                 &named_pipes::FramingMode::Byte,
521                 &named_pipes::BlockingMode::NoWait,
522                 0,
523             )
524             .unwrap();
525             (Box::new(x), ConsoleContext { input_peer: y })
526         };
527 
528         let output = Box::new(tempfile().unwrap());
529         (
530             context,
531             Console::new(
532                 hypervisor::ProtectionType::Unprotected,
533                 Some(input),
534                 Some(output),
535                 Vec::new(),
536                 None,
537             ),
538         )
539     }
540 
541     suspendable_virtio_tests!(console, create_device, 2, modify_device);
542 
543     #[test]
test_inactive_sleep_resume()544     fn test_inactive_sleep_resume() {
545         let (_ctx, device) = &mut create_device();
546         let sleep_result = device.virtio_sleep().expect("failed to sleep");
547         assert!(sleep_result.is_none());
548         device.virtio_snapshot().expect("failed to snapshot");
549         device.virtio_wake(None).expect("failed to wake");
550         // Make sure the input and output haven't been dropped.
551         assert!(device.input.is_some());
552         assert!(device.output.is_some());
553     }
554 }
555