• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Asynchronous console device which implementation can be shared by VMM and vhost-user.
6 
7 use std::collections::VecDeque;
8 use std::io;
9 
10 use anyhow::anyhow;
11 use anyhow::Context;
12 use base::error;
13 use base::AsRawDescriptor;
14 use base::Event;
15 use base::FileSync;
16 use base::RawDescriptor;
17 use base::WorkerThread;
18 use cros_async::select2;
19 use cros_async::AsyncResult;
20 use cros_async::EventAsync;
21 use cros_async::Executor;
22 use cros_async::IntoAsync;
23 use cros_async::IoSource;
24 use futures::FutureExt;
25 use hypervisor::ProtectionType;
26 use vm_memory::GuestMemory;
27 use vmm_vhost::message::VhostUserVirtioFeatures;
28 use zerocopy::AsBytes;
29 
30 use super::handle_input;
31 use super::process_transmit_queue;
32 use super::QUEUE_SIZES;
33 use crate::serial_device::SerialInput;
34 use crate::virtio;
35 use crate::virtio::async_device::AsyncQueueState;
36 use crate::virtio::async_utils;
37 use crate::virtio::base_features;
38 use crate::virtio::copy_config;
39 use crate::virtio::virtio_console_config;
40 use crate::virtio::ConsoleError;
41 use crate::virtio::DeviceType;
42 use crate::virtio::Interrupt;
43 use crate::virtio::Queue;
44 use crate::virtio::SignalableInterrupt;
45 use crate::virtio::VirtioDevice;
46 use crate::SerialDevice;
47 use crate::Suspendable;
48 
49 /// Wrapper that makes any `SerialInput` usable as an async source by providing an implementation of
50 /// `IntoAsync`.
51 struct AsyncSerialInput(Box<dyn SerialInput>);
52 impl AsRawDescriptor for AsyncSerialInput {
as_raw_descriptor(&self) -> RawDescriptor53     fn as_raw_descriptor(&self) -> RawDescriptor {
54         self.0.get_read_notifier().as_raw_descriptor()
55     }
56 }
57 impl IntoAsync for AsyncSerialInput {}
58 
run_tx_queue<I: SignalableInterrupt>( mut queue: virtio::Queue, mem: GuestMemory, doorbell: I, kick_evt: EventAsync, output: &mut Box<dyn io::Write + Send>, )59 async fn run_tx_queue<I: SignalableInterrupt>(
60     mut queue: virtio::Queue,
61     mem: GuestMemory,
62     doorbell: I,
63     kick_evt: EventAsync,
64     output: &mut Box<dyn io::Write + Send>,
65 ) {
66     loop {
67         if let Err(e) = kick_evt.next_val().await {
68             error!("Failed to read kick event for tx queue: {}", e);
69             break;
70         }
71         process_transmit_queue(&mem, &doorbell, &mut queue, output.as_mut());
72     }
73 }
74 
run_rx_queue<I: SignalableInterrupt>( mut queue: virtio::Queue, mem: GuestMemory, doorbell: I, kick_evt: EventAsync, input: &IoSource<AsyncSerialInput>, )75 async fn run_rx_queue<I: SignalableInterrupt>(
76     mut queue: virtio::Queue,
77     mem: GuestMemory,
78     doorbell: I,
79     kick_evt: EventAsync,
80     input: &IoSource<AsyncSerialInput>,
81 ) {
82     // Staging buffer, required because of `handle_input`'s API. We can probably remove this once
83     // the regular virtio device is switched to async.
84     let mut in_buffer = VecDeque::<u8>::new();
85     let mut rx_buf = vec![0u8; 4096];
86 
87     loop {
88         match input.read_to_vec(None, rx_buf).await {
89             // Input source has closed.
90             Ok((0, _)) => break,
91             Ok((size, v)) => {
92                 in_buffer.extend(&v[0..size]);
93                 rx_buf = v;
94             }
95             Err(e) => {
96                 error!("Failed to read console input: {}", e);
97                 return;
98             }
99         }
100 
101         // Submit all the data obtained during this read.
102         while !in_buffer.is_empty() {
103             match handle_input(&mem, &doorbell, &mut in_buffer, &mut queue) {
104                 Ok(()) => {}
105                 Err(ConsoleError::RxDescriptorsExhausted) => {
106                     // Wait until a descriptor becomes available and try again.
107                     if let Err(e) = kick_evt.next_val().await {
108                         error!("Failed to read kick event for rx queue: {}", e);
109                         return;
110                     }
111                 }
112             }
113         }
114     }
115 }
116 
117 pub struct ConsoleDevice {
118     input: Option<AsyncQueueState<AsyncSerialInput>>,
119     output: AsyncQueueState<Box<dyn io::Write + Send>>,
120     avail_features: u64,
121 }
122 
123 impl ConsoleDevice {
avail_features(&self) -> u64124     pub fn avail_features(&self) -> u64 {
125         self.avail_features
126     }
127 
start_receive_queue<I: SignalableInterrupt + 'static>( &mut self, ex: &Executor, mem: GuestMemory, queue: virtio::Queue, doorbell: I, kick_evt: Event, ) -> anyhow::Result<()>128     pub fn start_receive_queue<I: SignalableInterrupt + 'static>(
129         &mut self,
130         ex: &Executor,
131         mem: GuestMemory,
132         queue: virtio::Queue,
133         doorbell: I,
134         kick_evt: Event,
135     ) -> anyhow::Result<()> {
136         let input_queue = match self.input.as_mut() {
137             Some(input_queue) => input_queue,
138             None => return Ok(()),
139         };
140 
141         let kick_evt =
142             EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
143 
144         let closure_ex = ex.clone();
145         let rx_future = move |input, abort| {
146             let async_input = closure_ex
147                 .async_from(input)
148                 .context("failed to create async input")?;
149 
150             Ok(async move {
151                 select2(
152                     run_rx_queue(queue, mem, doorbell, kick_evt, &async_input).boxed_local(),
153                     abort,
154                 )
155                 .await;
156 
157                 async_input.into_source()
158             })
159         };
160 
161         input_queue.start(ex, rx_future)
162     }
163 
stop_receive_queue(&mut self) -> AsyncResult<bool>164     pub fn stop_receive_queue(&mut self) -> AsyncResult<bool> {
165         if let Some(queue) = self.input.as_mut() {
166             queue.stop()
167         } else {
168             Ok(false)
169         }
170     }
171 
start_transmit_queue<I: SignalableInterrupt + 'static>( &mut self, ex: &Executor, mem: GuestMemory, queue: virtio::Queue, doorbell: I, kick_evt: Event, ) -> anyhow::Result<()>172     pub fn start_transmit_queue<I: SignalableInterrupt + 'static>(
173         &mut self,
174         ex: &Executor,
175         mem: GuestMemory,
176         queue: virtio::Queue,
177         doorbell: I,
178         kick_evt: Event,
179     ) -> anyhow::Result<()> {
180         let kick_evt =
181             EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
182 
183         let tx_future = |mut output, abort| {
184             Ok(async move {
185                 select2(
186                     run_tx_queue(queue, mem, doorbell, kick_evt, &mut output).boxed_local(),
187                     abort,
188                 )
189                 .await;
190 
191                 output
192             })
193         };
194 
195         self.output.start(ex, tx_future)
196     }
197 
stop_transmit_queue(&mut self) -> AsyncResult<bool>198     pub fn stop_transmit_queue(&mut self) -> AsyncResult<bool> {
199         self.output.stop()
200     }
201 }
202 
203 impl SerialDevice for ConsoleDevice {
new( protection_type: ProtectionType, _evt: Event, input: Option<Box<dyn SerialInput>>, output: Option<Box<dyn io::Write + Send>>, _sync: Option<Box<dyn FileSync + Send>>, _out_timestamp: bool, _keep_rds: Vec<RawDescriptor>, ) -> ConsoleDevice204     fn new(
205         protection_type: ProtectionType,
206         _evt: Event,
207         input: Option<Box<dyn SerialInput>>,
208         output: Option<Box<dyn io::Write + Send>>,
209         _sync: Option<Box<dyn FileSync + Send>>,
210         _out_timestamp: bool,
211         _keep_rds: Vec<RawDescriptor>,
212     ) -> ConsoleDevice {
213         let avail_features = virtio::base_features(protection_type)
214             | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
215         ConsoleDevice {
216             input: input.map(AsyncSerialInput).map(AsyncQueueState::Stopped),
217             output: AsyncQueueState::Stopped(output.unwrap_or_else(|| Box::new(io::sink()))),
218             avail_features,
219         }
220     }
221 }
222 
223 enum VirtioConsoleState {
224     Stopped(ConsoleDevice),
225     Running(WorkerThread<anyhow::Result<ConsoleDevice>>),
226     Broken,
227 }
228 
229 /// Virtio console device.
230 pub struct AsyncConsole {
231     state: VirtioConsoleState,
232     base_features: u64,
233     keep_rds: Vec<RawDescriptor>,
234 }
235 
236 impl SerialDevice for AsyncConsole {
new( protection_type: ProtectionType, evt: Event, input: Option<Box<dyn SerialInput>>, output: Option<Box<dyn io::Write + Send>>, sync: Option<Box<dyn FileSync + Send>>, out_timestamp: bool, keep_rds: Vec<RawDescriptor>, ) -> AsyncConsole237     fn new(
238         protection_type: ProtectionType,
239         evt: Event,
240         input: Option<Box<dyn SerialInput>>,
241         output: Option<Box<dyn io::Write + Send>>,
242         sync: Option<Box<dyn FileSync + Send>>,
243         out_timestamp: bool,
244         keep_rds: Vec<RawDescriptor>,
245     ) -> AsyncConsole {
246         AsyncConsole {
247             state: VirtioConsoleState::Stopped(ConsoleDevice::new(
248                 protection_type,
249                 evt,
250                 input,
251                 output,
252                 sync,
253                 out_timestamp,
254                 Default::default(),
255             )),
256             base_features: base_features(protection_type),
257             keep_rds,
258         }
259     }
260 }
261 
262 impl VirtioDevice for AsyncConsole {
keep_rds(&self) -> Vec<RawDescriptor>263     fn keep_rds(&self) -> Vec<RawDescriptor> {
264         self.keep_rds.clone()
265     }
266 
features(&self) -> u64267     fn features(&self) -> u64 {
268         self.base_features
269     }
270 
device_type(&self) -> DeviceType271     fn device_type(&self) -> DeviceType {
272         DeviceType::Console
273     }
274 
queue_max_sizes(&self) -> &[u16]275     fn queue_max_sizes(&self) -> &[u16] {
276         QUEUE_SIZES
277     }
278 
read_config(&self, offset: u64, data: &mut [u8])279     fn read_config(&self, offset: u64, data: &mut [u8]) {
280         let config = virtio_console_config {
281             max_nr_ports: 1.into(),
282             ..Default::default()
283         };
284         copy_config(data, 0, config.as_bytes(), offset);
285     }
286 
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: Vec<(Queue, Event)>, ) -> anyhow::Result<()>287     fn activate(
288         &mut self,
289         mem: GuestMemory,
290         interrupt: Interrupt,
291         mut queues: Vec<(Queue, Event)>,
292     ) -> anyhow::Result<()> {
293         if queues.len() < 2 {
294             return Err(anyhow!("expected 2 queues, got {}", queues.len()));
295         }
296 
297         // Reset the device if it was already running.
298         if matches!(self.state, VirtioConsoleState::Running { .. }) {
299             self.reset();
300         }
301 
302         let state = std::mem::replace(&mut self.state, VirtioConsoleState::Broken);
303         let console = match state {
304             VirtioConsoleState::Running { .. } => {
305                 return Err(anyhow!("device should not be running here. This is a bug."));
306             }
307             VirtioConsoleState::Stopped(console) => console,
308             VirtioConsoleState::Broken => {
309                 return Err(anyhow!("device is broken and cannot be activated"));
310             }
311         };
312 
313         let ex = Executor::new().expect("failed to create an executor");
314         let (receive_queue, receive_evt) = queues.remove(0);
315         let (transmit_queue, transmit_evt) = queues.remove(0);
316 
317         self.state =
318             VirtioConsoleState::Running(WorkerThread::start("v_console", move |kill_evt| {
319                 let mut console = console;
320 
321                 console.start_receive_queue(
322                     &ex,
323                     mem.clone(),
324                     receive_queue,
325                     interrupt.clone(),
326                     receive_evt,
327                 )?;
328 
329                 console.start_transmit_queue(&ex, mem, transmit_queue, interrupt, transmit_evt)?;
330 
331                 // Run until the kill event is signaled and cancel all tasks.
332                 ex.run_until(async {
333                     async_utils::await_and_exit(&ex, kill_evt).await?;
334                     if let Some(input) = console.input.as_mut() {
335                         input.stop().context("failed to stop rx queue")?;
336                     }
337                     console.output.stop().context("failed to stop tx queue")?;
338 
339                     Ok(console)
340                 })?
341             }));
342 
343         Ok(())
344     }
345 
reset(&mut self) -> bool346     fn reset(&mut self) -> bool {
347         match std::mem::replace(&mut self.state, VirtioConsoleState::Broken) {
348             // Stopped console is already in reset state.
349             state @ VirtioConsoleState::Stopped(_) => {
350                 self.state = state;
351                 true
352             }
353             // Stop the worker thread and go back to `Stopped` state.
354             VirtioConsoleState::Running(worker_thread) => {
355                 let thread_res = worker_thread.stop();
356                 match thread_res {
357                     Ok(console) => {
358                         self.state = VirtioConsoleState::Stopped(console);
359                         true
360                     }
361                     Err(e) => {
362                         error!("worker thread returned an error: {}", e);
363                         false
364                     }
365                 }
366             }
367             // We are broken and cannot reset properly.
368             VirtioConsoleState::Broken => false,
369         }
370     }
371 }
372 
373 impl Suspendable for AsyncConsole {}
374