1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! Legacy console device that uses a polling thread. This is kept because it is still used by
6 //! Windows ; outside of this use-case, please use [[asynchronous::AsyncConsole]] instead.
7
8 pub mod asynchronous;
9 mod multiport;
10 mod sys;
11
12 use std::collections::BTreeMap;
13 use std::collections::VecDeque;
14 use std::io;
15 use std::io::Read;
16 use std::io::Write;
17 use std::ops::DerefMut;
18 use std::result;
19 use std::sync::Arc;
20
21 use anyhow::anyhow;
22 use anyhow::Context;
23 use base::error;
24 use base::AsRawDescriptor;
25 use base::Descriptor;
26 use base::Event;
27 use base::EventToken;
28 use base::RawDescriptor;
29 #[cfg(windows)]
30 use base::ReadNotifier;
31 use base::WaitContext;
32 use base::WorkerThread;
33 use data_model::Le16;
34 use data_model::Le32;
35 use hypervisor::ProtectionType;
36 use remain::sorted;
37 use serde::Deserialize;
38 use serde::Serialize;
39 use sync::Mutex;
40 use thiserror::Error as ThisError;
41 use vm_memory::GuestMemory;
42 use zerocopy::AsBytes;
43 use zerocopy::FromBytes;
44 use zerocopy::FromZeroes;
45
46 use crate::serial::sys::InStreamType;
47 use crate::virtio::base_features;
48 use crate::virtio::copy_config;
49 use crate::virtio::DeviceType;
50 use crate::virtio::Interrupt;
51 use crate::virtio::Queue;
52 use crate::virtio::Reader;
53 use crate::virtio::VirtioDevice;
54 use crate::PciAddress;
55
56 pub(crate) const QUEUE_SIZE: u16 = 256;
57
58 // For now, just implement port 0 (receiveq and transmitq).
59 // If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed.
60 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE];
61
62 #[sorted]
63 #[derive(ThisError, Debug)]
64 pub enum ConsoleError {
65 /// There are no more available descriptors to receive into
66 #[error("no rx descriptors available")]
67 RxDescriptorsExhausted,
68 }
69
70 #[derive(Copy, Clone, Debug, Default, AsBytes, FromZeroes, FromBytes)]
71 #[repr(C)]
72 pub struct virtio_console_config {
73 pub cols: Le16,
74 pub rows: Le16,
75 pub max_nr_ports: Le32,
76 pub emerg_wr: Le32,
77 }
78
79 /// Checks for input from `buffer` and transfers it to the receive queue, if any.
80 ///
81 /// # Arguments
82 ///
83 /// * `interrupt` - Interrupt used to signal that the queue has been used
84 /// * `buffer` - Ring buffer providing data to put into the guest
85 /// * `receive_queue` - The receive virtio Queue
handle_input( interrupt: &Interrupt, buffer: &mut VecDeque<u8>, receive_queue: &Arc<Mutex<Queue>>, ) -> result::Result<(), ConsoleError>86 fn handle_input(
87 interrupt: &Interrupt,
88 buffer: &mut VecDeque<u8>,
89 receive_queue: &Arc<Mutex<Queue>>,
90 ) -> result::Result<(), ConsoleError> {
91 let mut receive_queue = receive_queue
92 .try_lock()
93 .expect("Lock should not be unavailable");
94 loop {
95 let mut desc = receive_queue
96 .peek()
97 .ok_or(ConsoleError::RxDescriptorsExhausted)?;
98
99 let writer = &mut desc.writer;
100 while writer.available_bytes() > 0 && !buffer.is_empty() {
101 let (buffer_front, buffer_back) = buffer.as_slices();
102 let buffer_chunk = if !buffer_front.is_empty() {
103 buffer_front
104 } else {
105 buffer_back
106 };
107 let written = writer.write(buffer_chunk).unwrap();
108 drop(buffer.drain(..written));
109 }
110
111 let bytes_written = writer.bytes_written() as u32;
112
113 if bytes_written > 0 {
114 let desc = desc.pop();
115 receive_queue.add_used(desc, bytes_written);
116 receive_queue.trigger_interrupt(interrupt);
117 }
118
119 if bytes_written == 0 {
120 return Ok(());
121 }
122 }
123 }
124
125 /// Writes the available data from the reader into the given output queue.
126 ///
127 /// # Arguments
128 ///
129 /// * `reader` - The Reader with the data we want to write.
130 /// * `output` - The output sink we are going to write the data to.
process_transmit_request(reader: &mut Reader, output: &mut dyn io::Write) -> io::Result<()>131 fn process_transmit_request(reader: &mut Reader, output: &mut dyn io::Write) -> io::Result<()> {
132 let len = reader.available_bytes();
133 let mut data = vec![0u8; len];
134 reader.read_exact(&mut data)?;
135 output.write_all(&data)?;
136 output.flush()?;
137 Ok(())
138 }
139
140 /// Processes the data taken from the given transmit queue into the output sink.
141 ///
142 /// # Arguments
143 ///
144 /// * `interrupt` - Interrupt used to signal (if required) that the queue has been used
145 /// * `transmit_queue` - The transmit virtio Queue
146 /// * `output` - The output sink we are going to write the data into
process_transmit_queue( interrupt: &Interrupt, transmit_queue: &Arc<Mutex<Queue>>, output: &mut dyn io::Write, )147 fn process_transmit_queue(
148 interrupt: &Interrupt,
149 transmit_queue: &Arc<Mutex<Queue>>,
150 output: &mut dyn io::Write,
151 ) {
152 let mut needs_interrupt = false;
153 let mut transmit_queue = transmit_queue
154 .try_lock()
155 .expect("Lock should not be unavailable");
156 while let Some(mut avail_desc) = transmit_queue.pop() {
157 process_transmit_request(&mut avail_desc.reader, output)
158 .unwrap_or_else(|e| error!("console: process_transmit_request failed: {}", e));
159
160 transmit_queue.add_used(avail_desc, 0);
161 needs_interrupt = true;
162 }
163
164 if needs_interrupt {
165 transmit_queue.trigger_interrupt(interrupt);
166 }
167 }
168
169 struct Worker {
170 interrupt: Interrupt,
171 input: Option<Arc<Mutex<VecDeque<u8>>>>,
172 output: Box<dyn io::Write + Send>,
173 kill_evt: Event,
174 in_avail_evt: Event,
175 receive_queue: Arc<Mutex<Queue>>,
176 transmit_queue: Arc<Mutex<Queue>>,
177 }
178
179 impl Worker {
run(&mut self) -> anyhow::Result<()>180 fn run(&mut self) -> anyhow::Result<()> {
181 #[derive(EventToken)]
182 enum Token {
183 ReceiveQueueAvailable,
184 TransmitQueueAvailable,
185 InputAvailable,
186 InterruptResample,
187 Kill,
188 }
189
190 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
191 (
192 self.transmit_queue.lock().event(),
193 Token::TransmitQueueAvailable,
194 ),
195 (
196 self.receive_queue.lock().event(),
197 Token::ReceiveQueueAvailable,
198 ),
199 (&self.in_avail_evt, Token::InputAvailable),
200 (&self.kill_evt, Token::Kill),
201 ])?;
202 if let Some(resample_evt) = self.interrupt.get_resample_evt() {
203 wait_ctx.add(resample_evt, Token::InterruptResample)?;
204 }
205
206 let mut running = true;
207 while running {
208 let events = wait_ctx.wait()?;
209
210 for event in events.iter().filter(|e| e.is_readable) {
211 match event.token {
212 Token::TransmitQueueAvailable => {
213 self.transmit_queue
214 .lock()
215 .event()
216 .wait()
217 .context("failed reading transmit queue Event")?;
218 process_transmit_queue(
219 &self.interrupt,
220 &self.transmit_queue,
221 &mut self.output,
222 );
223 }
224 Token::ReceiveQueueAvailable => {
225 self.receive_queue
226 .lock()
227 .event()
228 .wait()
229 .context("failed reading receive queue Event")?;
230 if let Some(in_buf_ref) = self.input.as_ref() {
231 let _ = handle_input(
232 &self.interrupt,
233 in_buf_ref.lock().deref_mut(),
234 &self.receive_queue,
235 );
236 }
237 }
238 Token::InputAvailable => {
239 self.in_avail_evt
240 .wait()
241 .context("failed reading in_avail_evt")?;
242 if let Some(in_buf_ref) = self.input.as_ref() {
243 let _ = handle_input(
244 &self.interrupt,
245 in_buf_ref.lock().deref_mut(),
246 &self.receive_queue,
247 );
248 }
249 }
250 Token::InterruptResample => {
251 self.interrupt.interrupt_resample();
252 }
253 Token::Kill => running = false,
254 }
255 }
256 }
257 Ok(())
258 }
259 }
260
261 /// Virtio console device.
262 pub struct Console {
263 base_features: u64,
264 in_avail_evt: Event,
265 worker_thread: Option<WorkerThread<Worker>>,
266 input: Option<InStreamType>,
267 output: Option<Box<dyn io::Write + Send>>,
268 keep_descriptors: Vec<Descriptor>,
269 input_thread: Option<WorkerThread<InStreamType>>,
270 // input_buffer is not continuously updated. It holds the state of the buffer when a snapshot
271 // happens, or when a restore is performed. On a fresh startup, it will be empty. On a restore,
272 // it will contain whatever data was remaining in the buffer in the snapshot.
273 input_buffer: VecDeque<u8>,
274 pci_address: Option<PciAddress>,
275 }
276
277 #[derive(Serialize, Deserialize)]
278 struct ConsoleSnapshot {
279 base_features: u64,
280 input_buffer: VecDeque<u8>,
281 }
282
283 impl Console {
new( protection_type: ProtectionType, input: Option<InStreamType>, output: Option<Box<dyn io::Write + Send>>, mut keep_rds: Vec<RawDescriptor>, pci_address: Option<PciAddress>, ) -> Console284 fn new(
285 protection_type: ProtectionType,
286 input: Option<InStreamType>,
287 output: Option<Box<dyn io::Write + Send>>,
288 mut keep_rds: Vec<RawDescriptor>,
289 pci_address: Option<PciAddress>,
290 ) -> Console {
291 let in_avail_evt = Event::new().expect("failed creating Event");
292 keep_rds.push(in_avail_evt.as_raw_descriptor());
293 Console {
294 base_features: base_features(protection_type),
295 in_avail_evt,
296 worker_thread: None,
297 input,
298 output,
299 keep_descriptors: keep_rds.iter().map(|rd| Descriptor(*rd)).collect(),
300 input_thread: None,
301 input_buffer: VecDeque::new(),
302 pci_address,
303 }
304 }
305 }
306
307 impl VirtioDevice for Console {
keep_rds(&self) -> Vec<RawDescriptor>308 fn keep_rds(&self) -> Vec<RawDescriptor> {
309 // return the raw descriptors as opposed to descriptor.
310 self.keep_descriptors
311 .iter()
312 .map(|descr| descr.as_raw_descriptor())
313 .collect()
314 }
315
features(&self) -> u64316 fn features(&self) -> u64 {
317 self.base_features
318 }
319
device_type(&self) -> DeviceType320 fn device_type(&self) -> DeviceType {
321 DeviceType::Console
322 }
323
queue_max_sizes(&self) -> &[u16]324 fn queue_max_sizes(&self) -> &[u16] {
325 QUEUE_SIZES
326 }
327
read_config(&self, offset: u64, data: &mut [u8])328 fn read_config(&self, offset: u64, data: &mut [u8]) {
329 let config = virtio_console_config {
330 max_nr_ports: 1.into(),
331 ..Default::default()
332 };
333 copy_config(data, 0, config.as_bytes(), offset);
334 }
335
activate( &mut self, _mem: GuestMemory, interrupt: Interrupt, mut queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>336 fn activate(
337 &mut self,
338 _mem: GuestMemory,
339 interrupt: Interrupt,
340 mut queues: BTreeMap<usize, Queue>,
341 ) -> anyhow::Result<()> {
342 if queues.len() < 2 {
343 return Err(anyhow!("expected 2 queues, got {}", queues.len()));
344 }
345
346 let receive_queue = queues.remove(&0).unwrap();
347 let transmit_queue = queues.remove(&1).unwrap();
348
349 let in_avail_evt = self
350 .in_avail_evt
351 .try_clone()
352 .context("failed creating input available Event pair")?;
353
354 // Spawn a separate thread to poll self.input.
355 // A thread is used because io::Read only provides a blocking interface, and there is no
356 // generic way to add an io::Read instance to a poll context (it may not be backed by a file
357 // descriptor). Moving the blocking read call to a separate thread and sending data back to
358 // the main worker thread with an event for notification bridges this gap.
359 let input = match self.input.take() {
360 Some(read) => {
361 let (buffer, thread) = sys::spawn_input_thread(
362 read,
363 &self.in_avail_evt,
364 std::mem::take(&mut self.input_buffer),
365 );
366 self.input_thread = Some(thread);
367 Some(buffer)
368 }
369 None => None,
370 };
371 let output = self.output.take().unwrap_or_else(|| Box::new(io::sink()));
372
373 self.worker_thread = Some(WorkerThread::start("v_console", move |kill_evt| {
374 let mut worker = Worker {
375 interrupt,
376 input,
377 output,
378 in_avail_evt,
379 kill_evt,
380 // Device -> driver
381 receive_queue: Arc::new(Mutex::new(receive_queue)),
382 // Driver -> device
383 transmit_queue: Arc::new(Mutex::new(transmit_queue)),
384 };
385 if let Err(e) = worker.run() {
386 error!("console run failure: {:?}", e);
387 };
388 worker
389 }));
390 Ok(())
391 }
392
pci_address(&self) -> Option<PciAddress>393 fn pci_address(&self) -> Option<PciAddress> {
394 self.pci_address
395 }
396
reset(&mut self) -> anyhow::Result<()>397 fn reset(&mut self) -> anyhow::Result<()> {
398 if let Some(input_thread) = self.input_thread.take() {
399 self.input = Some(input_thread.stop());
400 }
401 if let Some(worker_thread) = self.worker_thread.take() {
402 let worker = worker_thread.stop();
403 // NOTE: Even though we are reseting the device, it still makes sense to preserve the
404 // pending input bytes that the host sent but the guest hasn't accepted yet.
405 self.input_buffer = worker
406 .input
407 .map_or(VecDeque::new(), |arc_mutex| arc_mutex.lock().clone());
408 self.output = Some(worker.output);
409 }
410 Ok(())
411 }
412
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>413 fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
414 if let Some(input_thread) = self.input_thread.take() {
415 self.input = Some(input_thread.stop());
416 }
417 if let Some(worker_thread) = self.worker_thread.take() {
418 let worker = worker_thread.stop();
419 self.input_buffer = worker
420 .input
421 .map_or(VecDeque::new(), |arc_mutex| arc_mutex.lock().clone());
422 self.output = Some(worker.output);
423 let receive_queue = match Arc::try_unwrap(worker.receive_queue) {
424 Ok(mutex) => mutex.into_inner(),
425 Err(_) => return Err(anyhow!("failed to retrieve receive queue to sleep device.")),
426 };
427 let transmit_queue = match Arc::try_unwrap(worker.transmit_queue) {
428 Ok(mutex) => mutex.into_inner(),
429 Err(_) => {
430 return Err(anyhow!(
431 "failed to retrieve transmit queue to sleep device."
432 ))
433 }
434 };
435 return Ok(Some(BTreeMap::from([
436 (0, receive_queue),
437 (1, transmit_queue),
438 ])));
439 }
440 Ok(None)
441 }
442
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>443 fn virtio_wake(
444 &mut self,
445 queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
446 ) -> anyhow::Result<()> {
447 match queues_state {
448 None => Ok(()),
449 Some((mem, interrupt, queues)) => {
450 // TODO(khei): activate is just what we want at the moment, but we should probably
451 // move it into a "start workers" function to make it obvious that
452 // it isn't strictly used for activate events.
453 self.activate(mem, interrupt, queues)?;
454 Ok(())
455 }
456 }
457 }
458
virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value>459 fn virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
460 if let Some(read) = self.input.as_mut() {
461 // If the device was not activated yet, we still read the input.
462 // It's fine to do so since the the data is not lost. It will get queued in the
463 // input_buffer and restored. When the device activates, the data will still be
464 // available, and if there's any new data, that new data will get appended.
465 let input_buffer = Arc::new(Mutex::new(std::mem::take(&mut self.input_buffer)));
466
467 let kill_evt = Event::new().unwrap();
468 let _ = kill_evt.signal();
469 sys::read_input(read, &self.in_avail_evt, input_buffer.clone(), kill_evt);
470 self.input_buffer = std::mem::take(&mut input_buffer.lock());
471 };
472 serde_json::to_value(ConsoleSnapshot {
473 // Snapshot base_features as a safeguard when restoring the console device. Saving this
474 // info allows us to validate that the proper config was used for the console.
475 base_features: self.base_features,
476 input_buffer: self.input_buffer.clone(),
477 })
478 .context("failed to snapshot virtio console")
479 }
480
virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()>481 fn virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> {
482 let deser: ConsoleSnapshot =
483 serde_json::from_value(data).context("failed to deserialize virtio console")?;
484 anyhow::ensure!(
485 self.base_features == deser.base_features,
486 "Virtio console incorrect base features for restore:\n Expected: {}, Actual: {}",
487 self.base_features,
488 deser.base_features,
489 );
490 self.input_buffer = deser.input_buffer;
491 Ok(())
492 }
493 }
494
495 #[cfg(test)]
496 mod tests {
497 #[cfg(windows)]
498 use base::windows::named_pipes;
499 use tempfile::tempfile;
500 use vm_memory::GuestAddress;
501
502 use super::*;
503 use crate::suspendable_virtio_tests;
504
505 struct ConsoleContext {
506 #[cfg(windows)]
507 input_peer: named_pipes::PipeConnection,
508 }
509
modify_device(_context: &mut ConsoleContext, b: &mut Console)510 fn modify_device(_context: &mut ConsoleContext, b: &mut Console) {
511 b.input_buffer.push_back(0);
512 }
513
create_device() -> (ConsoleContext, Console)514 fn create_device() -> (ConsoleContext, Console) {
515 #[cfg(any(target_os = "android", target_os = "linux"))]
516 let (input, context) = (Box::new(tempfile().unwrap()), ConsoleContext {});
517 #[cfg(windows)]
518 let (input, context) = {
519 let (x, y) = named_pipes::pair(
520 &named_pipes::FramingMode::Byte,
521 &named_pipes::BlockingMode::NoWait,
522 0,
523 )
524 .unwrap();
525 (Box::new(x), ConsoleContext { input_peer: y })
526 };
527
528 let output = Box::new(tempfile().unwrap());
529 (
530 context,
531 Console::new(
532 hypervisor::ProtectionType::Unprotected,
533 Some(input),
534 Some(output),
535 Vec::new(),
536 None,
537 ),
538 )
539 }
540
541 suspendable_virtio_tests!(console, create_device, 2, modify_device);
542
543 #[test]
test_inactive_sleep_resume()544 fn test_inactive_sleep_resume() {
545 let (_ctx, device) = &mut create_device();
546 let sleep_result = device.virtio_sleep().expect("failed to sleep");
547 assert!(sleep_result.is_none());
548 device.virtio_snapshot().expect("failed to snapshot");
549 device.virtio_wake(None).expect("failed to wake");
550 // Make sure the input and output haven't been dropped.
551 assert!(device.input.is_some());
552 assert!(device.output.is_some());
553 }
554 }
555