1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::collections::VecDeque;
6 use std::io::{self, Read, Write};
7 use std::ops::DerefMut;
8 use std::result;
9 use std::sync::Arc;
10 use std::thread;
11
12 use base::{error, Event, FileSync, PollToken, RawDescriptor, WaitContext};
13 use data_model::{DataInit, Le16, Le32};
14 use hypervisor::ProtectionType;
15 use remain::sorted;
16 use sync::Mutex;
17 use thiserror::Error as ThisError;
18 use vm_memory::GuestMemory;
19
20 use super::{
21 base_features, copy_config, Interrupt, Queue, Reader, SignalableInterrupt, VirtioDevice,
22 Writer, TYPE_CONSOLE,
23 };
24 use crate::SerialDevice;
25
26 pub(crate) const QUEUE_SIZE: u16 = 256;
27
28 // For now, just implement port 0 (receiveq and transmitq).
29 // If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed.
30 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE];
31
32 #[sorted]
33 #[derive(ThisError, Debug)]
34 pub enum ConsoleError {
35 /// There are no more available descriptors to receive into
36 #[error("no rx descriptors available")]
37 RxDescriptorsExhausted,
38 }
39
40 #[derive(Copy, Clone, Debug, Default)]
41 #[repr(C)]
42 pub struct virtio_console_config {
43 pub cols: Le16,
44 pub rows: Le16,
45 pub max_nr_ports: Le32,
46 pub emerg_wr: Le32,
47 }
48
49 // Safe because it only has data and has no implicit padding.
50 unsafe impl DataInit for virtio_console_config {}
51
52 /// Checks for input from `buffer` and transfers it to the receive queue, if any.
53 ///
54 /// # Arguments
55 ///
56 /// * `mem` - The GuestMemory to write the data into
57 /// * `interrupt` - SignalableInterrupt used to signal that the queue has been used
58 /// * `buffer` - Ring buffer providing data to put into the guest
59 /// * `receive_queue` - The receive virtio Queue
handle_input<I: SignalableInterrupt>( mem: &GuestMemory, interrupt: &I, buffer: &mut VecDeque<u8>, receive_queue: &mut Queue, ) -> result::Result<(), ConsoleError>60 pub fn handle_input<I: SignalableInterrupt>(
61 mem: &GuestMemory,
62 interrupt: &I,
63 buffer: &mut VecDeque<u8>,
64 receive_queue: &mut Queue,
65 ) -> result::Result<(), ConsoleError> {
66 loop {
67 let desc = receive_queue
68 .peek(mem)
69 .ok_or(ConsoleError::RxDescriptorsExhausted)?;
70 let desc_index = desc.index;
71 // TODO(morg): Handle extra error cases as Err(ConsoleError) instead of just returning.
72 let mut writer = match Writer::new(mem.clone(), desc) {
73 Ok(w) => w,
74 Err(e) => {
75 error!("console: failed to create Writer: {}", e);
76 return Ok(());
77 }
78 };
79
80 while writer.available_bytes() > 0 && !buffer.is_empty() {
81 let (buffer_front, buffer_back) = buffer.as_slices();
82 let buffer_chunk = if !buffer_front.is_empty() {
83 buffer_front
84 } else {
85 buffer_back
86 };
87 let written = writer.write(buffer_chunk).unwrap();
88 drop(buffer.drain(..written));
89 }
90
91 let bytes_written = writer.bytes_written() as u32;
92
93 if bytes_written > 0 {
94 receive_queue.pop_peeked(mem);
95 receive_queue.add_used(mem, desc_index, bytes_written);
96 receive_queue.trigger_interrupt(mem, interrupt);
97 }
98
99 if bytes_written == 0 {
100 return Ok(());
101 }
102 }
103 }
104
105 /// Processes the data taken from the given transmit queue into the output sink.
106 ///
107 /// # Arguments
108 ///
109 /// * `mem` - The GuestMemory to take the data from
110 /// * `interrupt` - SignalableInterrupt used to signal (if required) that the queue has been used
111 /// * `transmit_queue` - The transmit virtio Queue
112 /// * `output` - The output sink we are going to write the data into
process_transmit_queue<I: SignalableInterrupt>( mem: &GuestMemory, interrupt: &I, transmit_queue: &mut Queue, output: &mut dyn io::Write, )113 pub fn process_transmit_queue<I: SignalableInterrupt>(
114 mem: &GuestMemory,
115 interrupt: &I,
116 transmit_queue: &mut Queue,
117 output: &mut dyn io::Write,
118 ) {
119 let mut needs_interrupt = false;
120 while let Some(avail_desc) = transmit_queue.pop(mem) {
121 let desc_index = avail_desc.index;
122
123 let reader = match Reader::new(mem.clone(), avail_desc) {
124 Ok(r) => r,
125 Err(e) => {
126 error!("console: failed to create reader: {}", e);
127 transmit_queue.add_used(mem, desc_index, 0);
128 needs_interrupt = true;
129 continue;
130 }
131 };
132
133 let len = match process_transmit_request(reader, output) {
134 Ok(written) => written,
135 Err(e) => {
136 error!("console: process_transmit_request failed: {}", e);
137 0
138 }
139 };
140
141 transmit_queue.add_used(mem, desc_index, len);
142 needs_interrupt = true;
143 }
144
145 if needs_interrupt {
146 transmit_queue.trigger_interrupt(mem, interrupt);
147 }
148 }
149
150 struct Worker {
151 mem: GuestMemory,
152 interrupt: Interrupt,
153 input: Option<Arc<Mutex<VecDeque<u8>>>>,
154 output: Box<dyn io::Write + Send>,
155 kill_evt: Event,
156 in_avail_evt: Event,
157 receive_queue: Queue,
158 receive_evt: Event,
159 transmit_queue: Queue,
160 transmit_evt: Event,
161 }
162
write_output(output: &mut dyn io::Write, data: &[u8]) -> io::Result<()>163 fn write_output(output: &mut dyn io::Write, data: &[u8]) -> io::Result<()> {
164 output.write_all(data)?;
165 output.flush()
166 }
167
168 /// Starts a thread that reads rx and sends the input back via the returned buffer.
169 ///
170 /// The caller should listen on `in_avail_evt` for events. When `in_avail_evt` signals that data
171 /// is available, the caller should lock the returned `Mutex` and read data out of the inner
172 /// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed.
173 ///
174 /// # Arguments
175 ///
176 /// * `rx` - Data source that the reader thread will wait on to send data back to the buffer
177 /// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer
spawn_input_thread( mut rx: Box<dyn io::Read + Send>, in_avail_evt: &Event, ) -> Option<Arc<Mutex<VecDeque<u8>>>>178 pub fn spawn_input_thread(
179 mut rx: Box<dyn io::Read + Send>,
180 in_avail_evt: &Event,
181 ) -> Option<Arc<Mutex<VecDeque<u8>>>> {
182 let buffer = Arc::new(Mutex::new(VecDeque::<u8>::new()));
183 let buffer_cloned = buffer.clone();
184
185 let thread_in_avail_evt = match in_avail_evt.try_clone() {
186 Ok(evt) => evt,
187 Err(e) => {
188 error!("failed to clone in_avail_evt: {}", e);
189 return None;
190 }
191 };
192
193 // The input thread runs in detached mode.
194 let res = thread::Builder::new()
195 .name("console_input".to_string())
196 .spawn(move || {
197 let mut rx_buf = [0u8; 1 << 12];
198 loop {
199 match rx.read(&mut rx_buf) {
200 Ok(0) => break, // Assume the stream of input has ended.
201 Ok(size) => {
202 buffer.lock().extend(&rx_buf[0..size]);
203 thread_in_avail_evt.write(1).unwrap();
204 }
205 Err(e) => {
206 // Being interrupted is not an error, but everything else is.
207 if e.kind() != io::ErrorKind::Interrupted {
208 error!(
209 "failed to read for bytes to queue into console device: {}",
210 e
211 );
212 break;
213 }
214 }
215 }
216 }
217 });
218 if let Err(e) = res {
219 error!("failed to spawn input thread: {}", e);
220 return None;
221 }
222 Some(buffer_cloned)
223 }
224
225 /// Writes the available data from the reader into the given output queue.
226 ///
227 /// # Arguments
228 ///
229 /// * `reader` - The Reader with the data we want to write.
230 /// * `output` - The output sink we are going to write the data to.
process_transmit_request(mut reader: Reader, output: &mut dyn io::Write) -> io::Result<u32>231 pub fn process_transmit_request(mut reader: Reader, output: &mut dyn io::Write) -> io::Result<u32> {
232 let len = reader.available_bytes();
233 let mut data = vec![0u8; len];
234 reader.read_exact(&mut data)?;
235 write_output(output, &data)?;
236 Ok(0)
237 }
238
239 impl Worker {
run(&mut self)240 fn run(&mut self) {
241 #[derive(PollToken)]
242 enum Token {
243 ReceiveQueueAvailable,
244 TransmitQueueAvailable,
245 InputAvailable,
246 InterruptResample,
247 Kill,
248 }
249
250 let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
251 (&self.transmit_evt, Token::TransmitQueueAvailable),
252 (&self.receive_evt, Token::ReceiveQueueAvailable),
253 (&self.in_avail_evt, Token::InputAvailable),
254 (&self.kill_evt, Token::Kill),
255 ]) {
256 Ok(pc) => pc,
257 Err(e) => {
258 error!("failed creating WaitContext: {}", e);
259 return;
260 }
261 };
262 if let Some(resample_evt) = self.interrupt.get_resample_evt() {
263 if wait_ctx
264 .add(resample_evt, Token::InterruptResample)
265 .is_err()
266 {
267 error!("failed adding resample event to WaitContext.");
268 return;
269 }
270 }
271
272 'wait: loop {
273 let events = match wait_ctx.wait() {
274 Ok(v) => v,
275 Err(e) => {
276 error!("failed polling for events: {}", e);
277 break;
278 }
279 };
280
281 for event in events.iter().filter(|e| e.is_readable) {
282 match event.token {
283 Token::TransmitQueueAvailable => {
284 if let Err(e) = self.transmit_evt.read() {
285 error!("failed reading transmit queue Event: {}", e);
286 break 'wait;
287 }
288 process_transmit_queue(
289 &self.mem,
290 &self.interrupt,
291 &mut self.transmit_queue,
292 &mut self.output,
293 );
294 }
295 Token::ReceiveQueueAvailable => {
296 if let Err(e) = self.receive_evt.read() {
297 error!("failed reading receive queue Event: {}", e);
298 break 'wait;
299 }
300 if let Some(in_buf_ref) = self.input.as_ref() {
301 match handle_input(
302 &self.mem,
303 &self.interrupt,
304 in_buf_ref.lock().deref_mut(),
305 &mut self.receive_queue,
306 ) {
307 Ok(()) => {}
308 // Console errors are no-ops, so just continue.
309 Err(_) => {
310 continue;
311 }
312 }
313 }
314 }
315 Token::InputAvailable => {
316 if let Err(e) = self.in_avail_evt.read() {
317 error!("failed reading in_avail_evt: {}", e);
318 break 'wait;
319 }
320 if let Some(in_buf_ref) = self.input.as_ref() {
321 match handle_input(
322 &self.mem,
323 &self.interrupt,
324 in_buf_ref.lock().deref_mut(),
325 &mut self.receive_queue,
326 ) {
327 Ok(()) => {}
328 // Console errors are no-ops, so just continue.
329 Err(_) => {
330 continue;
331 }
332 }
333 }
334 }
335 Token::InterruptResample => {
336 self.interrupt.interrupt_resample();
337 }
338 Token::Kill => break 'wait,
339 }
340 }
341 }
342 }
343 }
344
345 enum ConsoleInput {
346 FromRead(Box<dyn io::Read + Send>),
347 FromThread(Arc<Mutex<VecDeque<u8>>>),
348 }
349
350 /// Virtio console device.
351 pub struct Console {
352 base_features: u64,
353 kill_evt: Option<Event>,
354 in_avail_evt: Option<Event>,
355 worker_thread: Option<thread::JoinHandle<Worker>>,
356 input: Option<ConsoleInput>,
357 output: Option<Box<dyn io::Write + Send>>,
358 keep_rds: Vec<RawDescriptor>,
359 }
360
361 impl SerialDevice for Console {
new( protected_vm: ProtectionType, _evt: Event, input: Option<Box<dyn io::Read + Send>>, output: Option<Box<dyn io::Write + Send>>, _sync: Option<Box<dyn FileSync + Send>>, _out_timestamp: bool, keep_rds: Vec<RawDescriptor>, ) -> Console362 fn new(
363 protected_vm: ProtectionType,
364 _evt: Event,
365 input: Option<Box<dyn io::Read + Send>>,
366 output: Option<Box<dyn io::Write + Send>>,
367 _sync: Option<Box<dyn FileSync + Send>>,
368 _out_timestamp: bool,
369 keep_rds: Vec<RawDescriptor>,
370 ) -> Console {
371 Console {
372 base_features: base_features(protected_vm),
373 in_avail_evt: None,
374 kill_evt: None,
375 worker_thread: None,
376 input: input.map(ConsoleInput::FromRead),
377 output,
378 keep_rds,
379 }
380 }
381 }
382
383 impl Drop for Console {
drop(&mut self)384 fn drop(&mut self) {
385 if let Some(kill_evt) = self.kill_evt.take() {
386 // Ignore the result because there is nothing we can do about it.
387 let _ = kill_evt.write(1);
388 }
389
390 if let Some(worker_thread) = self.worker_thread.take() {
391 let _ = worker_thread.join();
392 }
393 }
394 }
395
396 impl VirtioDevice for Console {
keep_rds(&self) -> Vec<RawDescriptor>397 fn keep_rds(&self) -> Vec<RawDescriptor> {
398 self.keep_rds.clone()
399 }
400
features(&self) -> u64401 fn features(&self) -> u64 {
402 self.base_features
403 }
404
device_type(&self) -> u32405 fn device_type(&self) -> u32 {
406 TYPE_CONSOLE
407 }
408
queue_max_sizes(&self) -> &[u16]409 fn queue_max_sizes(&self) -> &[u16] {
410 QUEUE_SIZES
411 }
412
read_config(&self, offset: u64, data: &mut [u8])413 fn read_config(&self, offset: u64, data: &mut [u8]) {
414 let config = virtio_console_config {
415 max_nr_ports: 1.into(),
416 ..Default::default()
417 };
418 copy_config(data, 0, config.as_slice(), offset);
419 }
420
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: Vec<Queue>, mut queue_evts: Vec<Event>, )421 fn activate(
422 &mut self,
423 mem: GuestMemory,
424 interrupt: Interrupt,
425 mut queues: Vec<Queue>,
426 mut queue_evts: Vec<Event>,
427 ) {
428 if queues.len() < 2 || queue_evts.len() < 2 {
429 return;
430 }
431
432 let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) {
433 Ok(v) => v,
434 Err(e) => {
435 error!("failed creating kill Event pair: {}", e);
436 return;
437 }
438 };
439 self.kill_evt = Some(self_kill_evt);
440
441 if self.in_avail_evt.is_none() {
442 self.in_avail_evt = match Event::new() {
443 Ok(evt) => Some(evt),
444 Err(e) => {
445 error!("failed creating Event: {}", e);
446 return;
447 }
448 };
449 }
450 let in_avail_evt = match self.in_avail_evt.as_ref().unwrap().try_clone() {
451 Ok(v) => v,
452 Err(e) => {
453 error!("failed creating input available Event pair: {}", e);
454 return;
455 }
456 };
457
458 // Spawn a separate thread to poll self.input.
459 // A thread is used because io::Read only provides a blocking interface, and there is no
460 // generic way to add an io::Read instance to a poll context (it may not be backed by a file
461 // descriptor). Moving the blocking read call to a separate thread and sending data back to
462 // the main worker thread with an event for notification bridges this gap.
463 let input = match self.input.take() {
464 Some(ConsoleInput::FromRead(read)) => {
465 let buffer = spawn_input_thread(read, self.in_avail_evt.as_ref().unwrap());
466 if buffer.is_none() {
467 error!("failed creating input thread");
468 };
469 buffer
470 }
471 Some(ConsoleInput::FromThread(buffer)) => Some(buffer),
472 None => None,
473 };
474 let output = self.output.take().unwrap_or_else(|| Box::new(io::sink()));
475
476 let worker_result = thread::Builder::new()
477 .name("virtio_console".to_string())
478 .spawn(move || {
479 let mut worker = Worker {
480 mem,
481 interrupt,
482 input,
483 output,
484 in_avail_evt,
485 kill_evt,
486 // Device -> driver
487 receive_queue: queues.remove(0),
488 receive_evt: queue_evts.remove(0),
489 // Driver -> device
490 transmit_queue: queues.remove(0),
491 transmit_evt: queue_evts.remove(0),
492 };
493 worker.run();
494 worker
495 });
496
497 match worker_result {
498 Err(e) => {
499 error!("failed to spawn virtio_console worker: {}", e);
500 }
501 Ok(join_handle) => {
502 self.worker_thread = Some(join_handle);
503 }
504 }
505 }
506
reset(&mut self) -> bool507 fn reset(&mut self) -> bool {
508 if let Some(kill_evt) = self.kill_evt.take() {
509 if kill_evt.write(1).is_err() {
510 error!("{}: failed to notify the kill event", self.debug_label());
511 return false;
512 }
513 }
514
515 if let Some(worker_thread) = self.worker_thread.take() {
516 match worker_thread.join() {
517 Err(_) => {
518 error!("{}: failed to get back resources", self.debug_label());
519 return false;
520 }
521 Ok(worker) => {
522 self.input = worker.input.map(ConsoleInput::FromThread);
523 self.output = Some(worker.output);
524 return true;
525 }
526 }
527 }
528 false
529 }
530 }
531