• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::VecDeque;
6 use std::io::{self, Read, Write};
7 use std::ops::DerefMut;
8 use std::result;
9 use std::sync::Arc;
10 use std::thread;
11 
12 use base::{error, Event, FileSync, PollToken, RawDescriptor, WaitContext};
13 use data_model::{DataInit, Le16, Le32};
14 use hypervisor::ProtectionType;
15 use remain::sorted;
16 use sync::Mutex;
17 use thiserror::Error as ThisError;
18 use vm_memory::GuestMemory;
19 
20 use super::{
21     base_features, copy_config, Interrupt, Queue, Reader, SignalableInterrupt, VirtioDevice,
22     Writer, TYPE_CONSOLE,
23 };
24 use crate::SerialDevice;
25 
26 pub(crate) const QUEUE_SIZE: u16 = 256;
27 
28 // For now, just implement port 0 (receiveq and transmitq).
29 // If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed.
30 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE];
31 
32 #[sorted]
33 #[derive(ThisError, Debug)]
34 pub enum ConsoleError {
35     /// There are no more available descriptors to receive into
36     #[error("no rx descriptors available")]
37     RxDescriptorsExhausted,
38 }
39 
40 #[derive(Copy, Clone, Debug, Default)]
41 #[repr(C)]
42 pub struct virtio_console_config {
43     pub cols: Le16,
44     pub rows: Le16,
45     pub max_nr_ports: Le32,
46     pub emerg_wr: Le32,
47 }
48 
49 // Safe because it only has data and has no implicit padding.
50 unsafe impl DataInit for virtio_console_config {}
51 
52 /// Checks for input from `buffer` and transfers it to the receive queue, if any.
53 ///
54 /// # Arguments
55 ///
56 /// * `mem` - The GuestMemory to write the data into
57 /// * `interrupt` - SignalableInterrupt used to signal that the queue has been used
58 /// * `buffer` - Ring buffer providing data to put into the guest
59 /// * `receive_queue` - The receive virtio Queue
handle_input<I: SignalableInterrupt>( mem: &GuestMemory, interrupt: &I, buffer: &mut VecDeque<u8>, receive_queue: &mut Queue, ) -> result::Result<(), ConsoleError>60 pub fn handle_input<I: SignalableInterrupt>(
61     mem: &GuestMemory,
62     interrupt: &I,
63     buffer: &mut VecDeque<u8>,
64     receive_queue: &mut Queue,
65 ) -> result::Result<(), ConsoleError> {
66     loop {
67         let desc = receive_queue
68             .peek(mem)
69             .ok_or(ConsoleError::RxDescriptorsExhausted)?;
70         let desc_index = desc.index;
71         // TODO(morg): Handle extra error cases as Err(ConsoleError) instead of just returning.
72         let mut writer = match Writer::new(mem.clone(), desc) {
73             Ok(w) => w,
74             Err(e) => {
75                 error!("console: failed to create Writer: {}", e);
76                 return Ok(());
77             }
78         };
79 
80         while writer.available_bytes() > 0 && !buffer.is_empty() {
81             let (buffer_front, buffer_back) = buffer.as_slices();
82             let buffer_chunk = if !buffer_front.is_empty() {
83                 buffer_front
84             } else {
85                 buffer_back
86             };
87             let written = writer.write(buffer_chunk).unwrap();
88             drop(buffer.drain(..written));
89         }
90 
91         let bytes_written = writer.bytes_written() as u32;
92 
93         if bytes_written > 0 {
94             receive_queue.pop_peeked(mem);
95             receive_queue.add_used(mem, desc_index, bytes_written);
96             receive_queue.trigger_interrupt(mem, interrupt);
97         }
98 
99         if bytes_written == 0 {
100             return Ok(());
101         }
102     }
103 }
104 
105 /// Processes the data taken from the given transmit queue into the output sink.
106 ///
107 /// # Arguments
108 ///
109 /// * `mem` - The GuestMemory to take the data from
110 /// * `interrupt` - SignalableInterrupt used to signal (if required) that the queue has been used
111 /// * `transmit_queue` - The transmit virtio Queue
112 /// * `output` - The output sink we are going to write the data into
process_transmit_queue<I: SignalableInterrupt>( mem: &GuestMemory, interrupt: &I, transmit_queue: &mut Queue, output: &mut dyn io::Write, )113 pub fn process_transmit_queue<I: SignalableInterrupt>(
114     mem: &GuestMemory,
115     interrupt: &I,
116     transmit_queue: &mut Queue,
117     output: &mut dyn io::Write,
118 ) {
119     let mut needs_interrupt = false;
120     while let Some(avail_desc) = transmit_queue.pop(mem) {
121         let desc_index = avail_desc.index;
122 
123         let reader = match Reader::new(mem.clone(), avail_desc) {
124             Ok(r) => r,
125             Err(e) => {
126                 error!("console: failed to create reader: {}", e);
127                 transmit_queue.add_used(mem, desc_index, 0);
128                 needs_interrupt = true;
129                 continue;
130             }
131         };
132 
133         let len = match process_transmit_request(reader, output) {
134             Ok(written) => written,
135             Err(e) => {
136                 error!("console: process_transmit_request failed: {}", e);
137                 0
138             }
139         };
140 
141         transmit_queue.add_used(mem, desc_index, len);
142         needs_interrupt = true;
143     }
144 
145     if needs_interrupt {
146         transmit_queue.trigger_interrupt(mem, interrupt);
147     }
148 }
149 
150 struct Worker {
151     mem: GuestMemory,
152     interrupt: Interrupt,
153     input: Option<Arc<Mutex<VecDeque<u8>>>>,
154     output: Box<dyn io::Write + Send>,
155     kill_evt: Event,
156     in_avail_evt: Event,
157     receive_queue: Queue,
158     receive_evt: Event,
159     transmit_queue: Queue,
160     transmit_evt: Event,
161 }
162 
write_output(output: &mut dyn io::Write, data: &[u8]) -> io::Result<()>163 fn write_output(output: &mut dyn io::Write, data: &[u8]) -> io::Result<()> {
164     output.write_all(data)?;
165     output.flush()
166 }
167 
168 /// Starts a thread that reads rx and sends the input back via the returned buffer.
169 ///
170 /// The caller should listen on `in_avail_evt` for events. When `in_avail_evt` signals that data
171 /// is available, the caller should lock the returned `Mutex` and read data out of the inner
172 /// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed.
173 ///
174 /// # Arguments
175 ///
176 /// * `rx` - Data source that the reader thread will wait on to send data back to the buffer
177 /// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer
spawn_input_thread( mut rx: Box<dyn io::Read + Send>, in_avail_evt: &Event, ) -> Option<Arc<Mutex<VecDeque<u8>>>>178 pub fn spawn_input_thread(
179     mut rx: Box<dyn io::Read + Send>,
180     in_avail_evt: &Event,
181 ) -> Option<Arc<Mutex<VecDeque<u8>>>> {
182     let buffer = Arc::new(Mutex::new(VecDeque::<u8>::new()));
183     let buffer_cloned = buffer.clone();
184 
185     let thread_in_avail_evt = match in_avail_evt.try_clone() {
186         Ok(evt) => evt,
187         Err(e) => {
188             error!("failed to clone in_avail_evt: {}", e);
189             return None;
190         }
191     };
192 
193     // The input thread runs in detached mode.
194     let res = thread::Builder::new()
195         .name("console_input".to_string())
196         .spawn(move || {
197             let mut rx_buf = [0u8; 1 << 12];
198             loop {
199                 match rx.read(&mut rx_buf) {
200                     Ok(0) => break, // Assume the stream of input has ended.
201                     Ok(size) => {
202                         buffer.lock().extend(&rx_buf[0..size]);
203                         thread_in_avail_evt.write(1).unwrap();
204                     }
205                     Err(e) => {
206                         // Being interrupted is not an error, but everything else is.
207                         if e.kind() != io::ErrorKind::Interrupted {
208                             error!(
209                                 "failed to read for bytes to queue into console device: {}",
210                                 e
211                             );
212                             break;
213                         }
214                     }
215                 }
216             }
217         });
218     if let Err(e) = res {
219         error!("failed to spawn input thread: {}", e);
220         return None;
221     }
222     Some(buffer_cloned)
223 }
224 
225 /// Writes the available data from the reader into the given output queue.
226 ///
227 /// # Arguments
228 ///
229 /// * `reader` - The Reader with the data we want to write.
230 /// * `output` - The output sink we are going to write the data to.
process_transmit_request(mut reader: Reader, output: &mut dyn io::Write) -> io::Result<u32>231 pub fn process_transmit_request(mut reader: Reader, output: &mut dyn io::Write) -> io::Result<u32> {
232     let len = reader.available_bytes();
233     let mut data = vec![0u8; len];
234     reader.read_exact(&mut data)?;
235     write_output(output, &data)?;
236     Ok(0)
237 }
238 
239 impl Worker {
run(&mut self)240     fn run(&mut self) {
241         #[derive(PollToken)]
242         enum Token {
243             ReceiveQueueAvailable,
244             TransmitQueueAvailable,
245             InputAvailable,
246             InterruptResample,
247             Kill,
248         }
249 
250         let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
251             (&self.transmit_evt, Token::TransmitQueueAvailable),
252             (&self.receive_evt, Token::ReceiveQueueAvailable),
253             (&self.in_avail_evt, Token::InputAvailable),
254             (&self.kill_evt, Token::Kill),
255         ]) {
256             Ok(pc) => pc,
257             Err(e) => {
258                 error!("failed creating WaitContext: {}", e);
259                 return;
260             }
261         };
262         if let Some(resample_evt) = self.interrupt.get_resample_evt() {
263             if wait_ctx
264                 .add(resample_evt, Token::InterruptResample)
265                 .is_err()
266             {
267                 error!("failed adding resample event to WaitContext.");
268                 return;
269             }
270         }
271 
272         'wait: loop {
273             let events = match wait_ctx.wait() {
274                 Ok(v) => v,
275                 Err(e) => {
276                     error!("failed polling for events: {}", e);
277                     break;
278                 }
279             };
280 
281             for event in events.iter().filter(|e| e.is_readable) {
282                 match event.token {
283                     Token::TransmitQueueAvailable => {
284                         if let Err(e) = self.transmit_evt.read() {
285                             error!("failed reading transmit queue Event: {}", e);
286                             break 'wait;
287                         }
288                         process_transmit_queue(
289                             &self.mem,
290                             &self.interrupt,
291                             &mut self.transmit_queue,
292                             &mut self.output,
293                         );
294                     }
295                     Token::ReceiveQueueAvailable => {
296                         if let Err(e) = self.receive_evt.read() {
297                             error!("failed reading receive queue Event: {}", e);
298                             break 'wait;
299                         }
300                         if let Some(in_buf_ref) = self.input.as_ref() {
301                             match handle_input(
302                                 &self.mem,
303                                 &self.interrupt,
304                                 in_buf_ref.lock().deref_mut(),
305                                 &mut self.receive_queue,
306                             ) {
307                                 Ok(()) => {}
308                                 // Console errors are no-ops, so just continue.
309                                 Err(_) => {
310                                     continue;
311                                 }
312                             }
313                         }
314                     }
315                     Token::InputAvailable => {
316                         if let Err(e) = self.in_avail_evt.read() {
317                             error!("failed reading in_avail_evt: {}", e);
318                             break 'wait;
319                         }
320                         if let Some(in_buf_ref) = self.input.as_ref() {
321                             match handle_input(
322                                 &self.mem,
323                                 &self.interrupt,
324                                 in_buf_ref.lock().deref_mut(),
325                                 &mut self.receive_queue,
326                             ) {
327                                 Ok(()) => {}
328                                 // Console errors are no-ops, so just continue.
329                                 Err(_) => {
330                                     continue;
331                                 }
332                             }
333                         }
334                     }
335                     Token::InterruptResample => {
336                         self.interrupt.interrupt_resample();
337                     }
338                     Token::Kill => break 'wait,
339                 }
340             }
341         }
342     }
343 }
344 
345 enum ConsoleInput {
346     FromRead(Box<dyn io::Read + Send>),
347     FromThread(Arc<Mutex<VecDeque<u8>>>),
348 }
349 
350 /// Virtio console device.
351 pub struct Console {
352     base_features: u64,
353     kill_evt: Option<Event>,
354     in_avail_evt: Option<Event>,
355     worker_thread: Option<thread::JoinHandle<Worker>>,
356     input: Option<ConsoleInput>,
357     output: Option<Box<dyn io::Write + Send>>,
358     keep_rds: Vec<RawDescriptor>,
359 }
360 
361 impl SerialDevice for Console {
new( protected_vm: ProtectionType, _evt: Event, input: Option<Box<dyn io::Read + Send>>, output: Option<Box<dyn io::Write + Send>>, _sync: Option<Box<dyn FileSync + Send>>, _out_timestamp: bool, keep_rds: Vec<RawDescriptor>, ) -> Console362     fn new(
363         protected_vm: ProtectionType,
364         _evt: Event,
365         input: Option<Box<dyn io::Read + Send>>,
366         output: Option<Box<dyn io::Write + Send>>,
367         _sync: Option<Box<dyn FileSync + Send>>,
368         _out_timestamp: bool,
369         keep_rds: Vec<RawDescriptor>,
370     ) -> Console {
371         Console {
372             base_features: base_features(protected_vm),
373             in_avail_evt: None,
374             kill_evt: None,
375             worker_thread: None,
376             input: input.map(ConsoleInput::FromRead),
377             output,
378             keep_rds,
379         }
380     }
381 }
382 
383 impl Drop for Console {
drop(&mut self)384     fn drop(&mut self) {
385         if let Some(kill_evt) = self.kill_evt.take() {
386             // Ignore the result because there is nothing we can do about it.
387             let _ = kill_evt.write(1);
388         }
389 
390         if let Some(worker_thread) = self.worker_thread.take() {
391             let _ = worker_thread.join();
392         }
393     }
394 }
395 
396 impl VirtioDevice for Console {
keep_rds(&self) -> Vec<RawDescriptor>397     fn keep_rds(&self) -> Vec<RawDescriptor> {
398         self.keep_rds.clone()
399     }
400 
features(&self) -> u64401     fn features(&self) -> u64 {
402         self.base_features
403     }
404 
device_type(&self) -> u32405     fn device_type(&self) -> u32 {
406         TYPE_CONSOLE
407     }
408 
queue_max_sizes(&self) -> &[u16]409     fn queue_max_sizes(&self) -> &[u16] {
410         QUEUE_SIZES
411     }
412 
read_config(&self, offset: u64, data: &mut [u8])413     fn read_config(&self, offset: u64, data: &mut [u8]) {
414         let config = virtio_console_config {
415             max_nr_ports: 1.into(),
416             ..Default::default()
417         };
418         copy_config(data, 0, config.as_slice(), offset);
419     }
420 
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: Vec<Queue>, mut queue_evts: Vec<Event>, )421     fn activate(
422         &mut self,
423         mem: GuestMemory,
424         interrupt: Interrupt,
425         mut queues: Vec<Queue>,
426         mut queue_evts: Vec<Event>,
427     ) {
428         if queues.len() < 2 || queue_evts.len() < 2 {
429             return;
430         }
431 
432         let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) {
433             Ok(v) => v,
434             Err(e) => {
435                 error!("failed creating kill Event pair: {}", e);
436                 return;
437             }
438         };
439         self.kill_evt = Some(self_kill_evt);
440 
441         if self.in_avail_evt.is_none() {
442             self.in_avail_evt = match Event::new() {
443                 Ok(evt) => Some(evt),
444                 Err(e) => {
445                     error!("failed creating Event: {}", e);
446                     return;
447                 }
448             };
449         }
450         let in_avail_evt = match self.in_avail_evt.as_ref().unwrap().try_clone() {
451             Ok(v) => v,
452             Err(e) => {
453                 error!("failed creating input available Event pair: {}", e);
454                 return;
455             }
456         };
457 
458         // Spawn a separate thread to poll self.input.
459         // A thread is used because io::Read only provides a blocking interface, and there is no
460         // generic way to add an io::Read instance to a poll context (it may not be backed by a file
461         // descriptor).  Moving the blocking read call to a separate thread and sending data back to
462         // the main worker thread with an event for notification bridges this gap.
463         let input = match self.input.take() {
464             Some(ConsoleInput::FromRead(read)) => {
465                 let buffer = spawn_input_thread(read, self.in_avail_evt.as_ref().unwrap());
466                 if buffer.is_none() {
467                     error!("failed creating input thread");
468                 };
469                 buffer
470             }
471             Some(ConsoleInput::FromThread(buffer)) => Some(buffer),
472             None => None,
473         };
474         let output = self.output.take().unwrap_or_else(|| Box::new(io::sink()));
475 
476         let worker_result = thread::Builder::new()
477             .name("virtio_console".to_string())
478             .spawn(move || {
479                 let mut worker = Worker {
480                     mem,
481                     interrupt,
482                     input,
483                     output,
484                     in_avail_evt,
485                     kill_evt,
486                     // Device -> driver
487                     receive_queue: queues.remove(0),
488                     receive_evt: queue_evts.remove(0),
489                     // Driver -> device
490                     transmit_queue: queues.remove(0),
491                     transmit_evt: queue_evts.remove(0),
492                 };
493                 worker.run();
494                 worker
495             });
496 
497         match worker_result {
498             Err(e) => {
499                 error!("failed to spawn virtio_console worker: {}", e);
500             }
501             Ok(join_handle) => {
502                 self.worker_thread = Some(join_handle);
503             }
504         }
505     }
506 
reset(&mut self) -> bool507     fn reset(&mut self) -> bool {
508         if let Some(kill_evt) = self.kill_evt.take() {
509             if kill_evt.write(1).is_err() {
510                 error!("{}: failed to notify the kill event", self.debug_label());
511                 return false;
512             }
513         }
514 
515         if let Some(worker_thread) = self.worker_thread.take() {
516             match worker_thread.join() {
517                 Err(_) => {
518                     error!("{}: failed to get back resources", self.debug_label());
519                     return false;
520                 }
521                 Ok(worker) => {
522                     self.input = worker.input.map(ConsoleInput::FromThread);
523                     self.output = Some(worker.output);
524                     return true;
525                 }
526             }
527         }
528         false
529     }
530 }
531