1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! Legacy console device that uses a polling thread. This is kept because it is still used by
6 //! Windows ; outside of this use-case, please use [[asynchronous::AsyncConsole]] instead.
7
8 #[cfg(unix)]
9 pub mod asynchronous;
10 mod sys;
11
12 use std::collections::VecDeque;
13 use std::io;
14 use std::io::Read;
15 use std::io::Write;
16 use std::ops::DerefMut;
17 use std::result;
18 use std::sync::Arc;
19 use std::thread;
20
21 use anyhow::anyhow;
22 use anyhow::Context;
23 use base::error;
24 use base::AsRawDescriptor;
25 use base::Descriptor;
26 use base::Event;
27 use base::EventToken;
28 use base::RawDescriptor;
29 use base::WaitContext;
30 use base::WorkerThread;
31 use data_model::Le16;
32 use data_model::Le32;
33 use hypervisor::ProtectionType;
34 use remain::sorted;
35 use sync::Mutex;
36 use thiserror::Error as ThisError;
37 use vm_memory::GuestMemory;
38 use zerocopy::AsBytes;
39 use zerocopy::FromBytes;
40
41 use crate::virtio::base_features;
42 use crate::virtio::copy_config;
43 use crate::virtio::DeviceType;
44 use crate::virtio::Interrupt;
45 use crate::virtio::Queue;
46 use crate::virtio::Reader;
47 use crate::virtio::SignalableInterrupt;
48 use crate::virtio::VirtioDevice;
49 use crate::virtio::Writer;
50 use crate::Suspendable;
51
52 pub(crate) const QUEUE_SIZE: u16 = 256;
53
54 // For now, just implement port 0 (receiveq and transmitq).
55 // If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed.
56 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE];
57
58 #[sorted]
59 #[derive(ThisError, Debug)]
60 pub enum ConsoleError {
61 /// There are no more available descriptors to receive into
62 #[error("no rx descriptors available")]
63 RxDescriptorsExhausted,
64 }
65
66 #[derive(Copy, Clone, Debug, Default, AsBytes, FromBytes)]
67 #[repr(C)]
68 pub struct virtio_console_config {
69 pub cols: Le16,
70 pub rows: Le16,
71 pub max_nr_ports: Le32,
72 pub emerg_wr: Le32,
73 }
74
75 /// Checks for input from `buffer` and transfers it to the receive queue, if any.
76 ///
77 /// # Arguments
78 ///
79 /// * `mem` - The GuestMemory to write the data into
80 /// * `interrupt` - SignalableInterrupt used to signal that the queue has been used
81 /// * `buffer` - Ring buffer providing data to put into the guest
82 /// * `receive_queue` - The receive virtio Queue
handle_input<I: SignalableInterrupt>( mem: &GuestMemory, interrupt: &I, buffer: &mut VecDeque<u8>, receive_queue: &mut Queue, ) -> result::Result<(), ConsoleError>83 fn handle_input<I: SignalableInterrupt>(
84 mem: &GuestMemory,
85 interrupt: &I,
86 buffer: &mut VecDeque<u8>,
87 receive_queue: &mut Queue,
88 ) -> result::Result<(), ConsoleError> {
89 loop {
90 let desc = receive_queue
91 .peek(mem)
92 .ok_or(ConsoleError::RxDescriptorsExhausted)?;
93 let desc_index = desc.index;
94 // TODO(morg): Handle extra error cases as Err(ConsoleError) instead of just returning.
95 let mut writer = match Writer::new(mem.clone(), desc) {
96 Ok(w) => w,
97 Err(e) => {
98 error!("console: failed to create Writer: {}", e);
99 return Ok(());
100 }
101 };
102
103 while writer.available_bytes() > 0 && !buffer.is_empty() {
104 let (buffer_front, buffer_back) = buffer.as_slices();
105 let buffer_chunk = if !buffer_front.is_empty() {
106 buffer_front
107 } else {
108 buffer_back
109 };
110 let written = writer.write(buffer_chunk).unwrap();
111 drop(buffer.drain(..written));
112 }
113
114 let bytes_written = writer.bytes_written() as u32;
115
116 if bytes_written > 0 {
117 receive_queue.pop_peeked(mem);
118 receive_queue.add_used(mem, desc_index, bytes_written);
119 receive_queue.trigger_interrupt(mem, interrupt);
120 }
121
122 if bytes_written == 0 {
123 return Ok(());
124 }
125 }
126 }
127
128 /// Writes the available data from the reader into the given output queue.
129 ///
130 /// # Arguments
131 ///
132 /// * `reader` - The Reader with the data we want to write.
133 /// * `output` - The output sink we are going to write the data to.
process_transmit_request(mut reader: Reader, output: &mut dyn io::Write) -> io::Result<()>134 fn process_transmit_request(mut reader: Reader, output: &mut dyn io::Write) -> io::Result<()> {
135 let len = reader.available_bytes();
136 let mut data = vec![0u8; len];
137 reader.read_exact(&mut data)?;
138 output.write_all(&data)?;
139 output.flush()?;
140 Ok(())
141 }
142
143 /// Processes the data taken from the given transmit queue into the output sink.
144 ///
145 /// # Arguments
146 ///
147 /// * `mem` - The GuestMemory to take the data from
148 /// * `interrupt` - SignalableInterrupt used to signal (if required) that the queue has been used
149 /// * `transmit_queue` - The transmit virtio Queue
150 /// * `output` - The output sink we are going to write the data into
process_transmit_queue<I: SignalableInterrupt>( mem: &GuestMemory, interrupt: &I, transmit_queue: &mut Queue, output: &mut dyn io::Write, )151 fn process_transmit_queue<I: SignalableInterrupt>(
152 mem: &GuestMemory,
153 interrupt: &I,
154 transmit_queue: &mut Queue,
155 output: &mut dyn io::Write,
156 ) {
157 let mut needs_interrupt = false;
158 while let Some(avail_desc) = transmit_queue.pop(mem) {
159 let desc_index = avail_desc.index;
160
161 match Reader::new(mem.clone(), avail_desc) {
162 Ok(reader) => process_transmit_request(reader, output)
163 .unwrap_or_else(|e| error!("console: process_transmit_request failed: {}", e)),
164 Err(e) => {
165 error!("console: failed to create reader: {}", e);
166 }
167 };
168
169 transmit_queue.add_used(mem, desc_index, 0);
170 needs_interrupt = true;
171 }
172
173 if needs_interrupt {
174 transmit_queue.trigger_interrupt(mem, interrupt);
175 }
176 }
177
178 struct Worker {
179 mem: GuestMemory,
180 interrupt: Interrupt,
181 input: Option<Arc<Mutex<VecDeque<u8>>>>,
182 output: Box<dyn io::Write + Send>,
183 kill_evt: Event,
184 in_avail_evt: Event,
185 receive_queue: Queue,
186 receive_evt: Event,
187 transmit_queue: Queue,
188 transmit_evt: Event,
189 }
190
191 /// Starts a thread that reads rx and sends the input back via the returned buffer.
192 ///
193 /// The caller should listen on `in_avail_evt` for events. When `in_avail_evt` signals that data
194 /// is available, the caller should lock the returned `Mutex` and read data out of the inner
195 /// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed.
196 ///
197 /// # Arguments
198 ///
199 /// * `rx` - Data source that the reader thread will wait on to send data back to the buffer
200 /// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer
spawn_input_thread( mut rx: crate::serial::sys::InStreamType, in_avail_evt: &Event, ) -> Option<Arc<Mutex<VecDeque<u8>>>>201 fn spawn_input_thread(
202 mut rx: crate::serial::sys::InStreamType,
203 in_avail_evt: &Event,
204 ) -> Option<Arc<Mutex<VecDeque<u8>>>> {
205 let buffer = Arc::new(Mutex::new(VecDeque::<u8>::new()));
206 let buffer_cloned = buffer.clone();
207
208 let thread_in_avail_evt = match in_avail_evt.try_clone() {
209 Ok(evt) => evt,
210 Err(e) => {
211 error!("failed to clone in_avail_evt: {}", e);
212 return None;
213 }
214 };
215
216 // The input thread runs in detached mode.
217 let res = thread::Builder::new()
218 .name("console_input".to_string())
219 .spawn(move || {
220 let mut rx_buf = [0u8; 1 << 12];
221 loop {
222 match rx.read(&mut rx_buf) {
223 Ok(0) => break, // Assume the stream of input has ended.
224 Ok(size) => {
225 buffer.lock().extend(&rx_buf[0..size]);
226 thread_in_avail_evt.signal().unwrap();
227 }
228 Err(e) => {
229 // Being interrupted is not an error, but everything else is.
230 if sys::is_a_fatal_input_error(&e) {
231 error!(
232 "failed to read for bytes to queue into console device: {}",
233 e
234 );
235 break;
236 }
237 }
238 }
239
240 // Depending on the platform, a short sleep is needed here (ie. Windows).
241 sys::read_delay_if_needed();
242 }
243 });
244 if let Err(e) = res {
245 error!("failed to spawn input thread: {}", e);
246 return None;
247 }
248 Some(buffer_cloned)
249 }
250
251 impl Worker {
run(&mut self)252 fn run(&mut self) {
253 #[derive(EventToken)]
254 enum Token {
255 ReceiveQueueAvailable,
256 TransmitQueueAvailable,
257 InputAvailable,
258 InterruptResample,
259 Kill,
260 }
261
262 let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
263 (&self.transmit_evt, Token::TransmitQueueAvailable),
264 (&self.receive_evt, Token::ReceiveQueueAvailable),
265 (&self.in_avail_evt, Token::InputAvailable),
266 (&self.kill_evt, Token::Kill),
267 ]) {
268 Ok(pc) => pc,
269 Err(e) => {
270 error!("failed creating WaitContext: {}", e);
271 return;
272 }
273 };
274 if let Some(resample_evt) = self.interrupt.get_resample_evt() {
275 if wait_ctx
276 .add(resample_evt, Token::InterruptResample)
277 .is_err()
278 {
279 error!("failed adding resample event to WaitContext.");
280 return;
281 }
282 }
283
284 'wait: loop {
285 let events = match wait_ctx.wait() {
286 Ok(v) => v,
287 Err(e) => {
288 error!("failed polling for events: {}", e);
289 break;
290 }
291 };
292
293 for event in events.iter().filter(|e| e.is_readable) {
294 match event.token {
295 Token::TransmitQueueAvailable => {
296 if let Err(e) = self.transmit_evt.wait() {
297 error!("failed reading transmit queue Event: {}", e);
298 break 'wait;
299 }
300 process_transmit_queue(
301 &self.mem,
302 &self.interrupt,
303 &mut self.transmit_queue,
304 &mut self.output,
305 );
306 }
307 Token::ReceiveQueueAvailable => {
308 if let Err(e) = self.receive_evt.wait() {
309 error!("failed reading receive queue Event: {}", e);
310 break 'wait;
311 }
312 if let Some(in_buf_ref) = self.input.as_ref() {
313 match handle_input(
314 &self.mem,
315 &self.interrupt,
316 in_buf_ref.lock().deref_mut(),
317 &mut self.receive_queue,
318 ) {
319 Ok(()) => {}
320 // Console errors are no-ops, so just continue.
321 Err(_) => {
322 continue;
323 }
324 }
325 }
326 }
327 Token::InputAvailable => {
328 if let Err(e) = self.in_avail_evt.wait() {
329 error!("failed reading in_avail_evt: {}", e);
330 break 'wait;
331 }
332 if let Some(in_buf_ref) = self.input.as_ref() {
333 match handle_input(
334 &self.mem,
335 &self.interrupt,
336 in_buf_ref.lock().deref_mut(),
337 &mut self.receive_queue,
338 ) {
339 Ok(()) => {}
340 // Console errors are no-ops, so just continue.
341 Err(_) => {
342 continue;
343 }
344 }
345 }
346 }
347 Token::InterruptResample => {
348 self.interrupt.interrupt_resample();
349 }
350 Token::Kill => break 'wait,
351 }
352 }
353 }
354 }
355 }
356
357 enum ConsoleInput {
358 FromRead(crate::serial::sys::InStreamType),
359 FromThread(Arc<Mutex<VecDeque<u8>>>),
360 }
361
362 /// Virtio console device.
363 pub struct Console {
364 base_features: u64,
365 in_avail_evt: Option<Event>,
366 worker_thread: Option<WorkerThread<Worker>>,
367 input: Option<ConsoleInput>,
368 output: Option<Box<dyn io::Write + Send>>,
369 keep_descriptors: Vec<Descriptor>,
370 }
371
372 impl Console {
new( protection_type: ProtectionType, input: Option<ConsoleInput>, output: Option<Box<dyn io::Write + Send>>, keep_rds: Vec<RawDescriptor>, ) -> Console373 fn new(
374 protection_type: ProtectionType,
375 input: Option<ConsoleInput>,
376 output: Option<Box<dyn io::Write + Send>>,
377 keep_rds: Vec<RawDescriptor>,
378 ) -> Console {
379 Console {
380 base_features: base_features(protection_type),
381 in_avail_evt: None,
382 worker_thread: None,
383 input,
384 output,
385 keep_descriptors: keep_rds.iter().map(|rd| Descriptor(*rd)).collect(),
386 }
387 }
388 }
389
390 impl VirtioDevice for Console {
keep_rds(&self) -> Vec<RawDescriptor>391 fn keep_rds(&self) -> Vec<RawDescriptor> {
392 // return the raw descriptors as opposed to descriptor.
393 self.keep_descriptors
394 .iter()
395 .map(|descr| descr.as_raw_descriptor())
396 .collect()
397 }
398
features(&self) -> u64399 fn features(&self) -> u64 {
400 self.base_features
401 }
402
device_type(&self) -> DeviceType403 fn device_type(&self) -> DeviceType {
404 DeviceType::Console
405 }
406
queue_max_sizes(&self) -> &[u16]407 fn queue_max_sizes(&self) -> &[u16] {
408 QUEUE_SIZES
409 }
410
read_config(&self, offset: u64, data: &mut [u8])411 fn read_config(&self, offset: u64, data: &mut [u8]) {
412 let config = virtio_console_config {
413 max_nr_ports: 1.into(),
414 ..Default::default()
415 };
416 copy_config(data, 0, config.as_bytes(), offset);
417 }
418
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: Vec<(Queue, Event)>, ) -> anyhow::Result<()>419 fn activate(
420 &mut self,
421 mem: GuestMemory,
422 interrupt: Interrupt,
423 mut queues: Vec<(Queue, Event)>,
424 ) -> anyhow::Result<()> {
425 if queues.len() < 2 {
426 return Err(anyhow!("expected 2 queues, got {}", queues.len()));
427 }
428
429 let (receive_queue, receive_evt) = queues.remove(0);
430 let (transmit_queue, transmit_evt) = queues.remove(0);
431
432 if self.in_avail_evt.is_none() {
433 self.in_avail_evt = Some(Event::new().context("failed creating Event")?);
434 }
435 let in_avail_evt = self
436 .in_avail_evt
437 .as_ref()
438 .unwrap()
439 .try_clone()
440 .context("failed creating input available Event pair")?;
441
442 // Spawn a separate thread to poll self.input.
443 // A thread is used because io::Read only provides a blocking interface, and there is no
444 // generic way to add an io::Read instance to a poll context (it may not be backed by a file
445 // descriptor). Moving the blocking read call to a separate thread and sending data back to
446 // the main worker thread with an event for notification bridges this gap.
447 let input = match self.input.take() {
448 Some(ConsoleInput::FromRead(read)) => {
449 let buffer = spawn_input_thread(read, self.in_avail_evt.as_ref().unwrap());
450 if buffer.is_none() {
451 return Err(anyhow!("failed creating input thread"));
452 };
453 buffer
454 }
455 Some(ConsoleInput::FromThread(buffer)) => Some(buffer),
456 None => None,
457 };
458 let output = self.output.take().unwrap_or_else(|| Box::new(io::sink()));
459
460 self.worker_thread = Some(WorkerThread::start("v_console", move |kill_evt| {
461 let mut worker = Worker {
462 mem,
463 interrupt,
464 input,
465 output,
466 in_avail_evt,
467 kill_evt,
468 // Device -> driver
469 receive_queue,
470 receive_evt,
471 // Driver -> device
472 transmit_queue,
473 transmit_evt,
474 };
475 worker.run();
476 worker
477 }));
478 Ok(())
479 }
480
reset(&mut self) -> bool481 fn reset(&mut self) -> bool {
482 if let Some(worker_thread) = self.worker_thread.take() {
483 let worker = worker_thread.stop();
484 self.input = worker.input.map(ConsoleInput::FromThread);
485 self.output = Some(worker.output);
486 return true;
487 }
488 false
489 }
490 }
491
492 impl Suspendable for Console {}
493