1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! Asynchronous console device which implementation can be shared by VMM and vhost-user.
6
7 use std::collections::VecDeque;
8 use std::io;
9
10 use anyhow::anyhow;
11 use anyhow::Context;
12 use base::error;
13 use base::AsRawDescriptor;
14 use base::Event;
15 use base::FileSync;
16 use base::RawDescriptor;
17 use base::WorkerThread;
18 use cros_async::select2;
19 use cros_async::AsyncResult;
20 use cros_async::EventAsync;
21 use cros_async::Executor;
22 use cros_async::IntoAsync;
23 use cros_async::IoSource;
24 use futures::FutureExt;
25 use hypervisor::ProtectionType;
26 use vm_memory::GuestMemory;
27 use vmm_vhost::message::VhostUserVirtioFeatures;
28 use zerocopy::AsBytes;
29
30 use super::handle_input;
31 use super::process_transmit_queue;
32 use super::QUEUE_SIZES;
33 use crate::serial_device::SerialInput;
34 use crate::virtio;
35 use crate::virtio::async_device::AsyncQueueState;
36 use crate::virtio::async_utils;
37 use crate::virtio::base_features;
38 use crate::virtio::copy_config;
39 use crate::virtio::virtio_console_config;
40 use crate::virtio::ConsoleError;
41 use crate::virtio::DeviceType;
42 use crate::virtio::Interrupt;
43 use crate::virtio::Queue;
44 use crate::virtio::SignalableInterrupt;
45 use crate::virtio::VirtioDevice;
46 use crate::SerialDevice;
47 use crate::Suspendable;
48
49 /// Wrapper that makes any `SerialInput` usable as an async source by providing an implementation of
50 /// `IntoAsync`.
51 struct AsyncSerialInput(Box<dyn SerialInput>);
52 impl AsRawDescriptor for AsyncSerialInput {
as_raw_descriptor(&self) -> RawDescriptor53 fn as_raw_descriptor(&self) -> RawDescriptor {
54 self.0.get_read_notifier().as_raw_descriptor()
55 }
56 }
57 impl IntoAsync for AsyncSerialInput {}
58
run_tx_queue<I: SignalableInterrupt>( mut queue: virtio::Queue, mem: GuestMemory, doorbell: I, kick_evt: EventAsync, output: &mut Box<dyn io::Write + Send>, )59 async fn run_tx_queue<I: SignalableInterrupt>(
60 mut queue: virtio::Queue,
61 mem: GuestMemory,
62 doorbell: I,
63 kick_evt: EventAsync,
64 output: &mut Box<dyn io::Write + Send>,
65 ) {
66 loop {
67 if let Err(e) = kick_evt.next_val().await {
68 error!("Failed to read kick event for tx queue: {}", e);
69 break;
70 }
71 process_transmit_queue(&mem, &doorbell, &mut queue, output.as_mut());
72 }
73 }
74
run_rx_queue<I: SignalableInterrupt>( mut queue: virtio::Queue, mem: GuestMemory, doorbell: I, kick_evt: EventAsync, input: &IoSource<AsyncSerialInput>, )75 async fn run_rx_queue<I: SignalableInterrupt>(
76 mut queue: virtio::Queue,
77 mem: GuestMemory,
78 doorbell: I,
79 kick_evt: EventAsync,
80 input: &IoSource<AsyncSerialInput>,
81 ) {
82 // Staging buffer, required because of `handle_input`'s API. We can probably remove this once
83 // the regular virtio device is switched to async.
84 let mut in_buffer = VecDeque::<u8>::new();
85 let mut rx_buf = vec![0u8; 4096];
86
87 loop {
88 match input.read_to_vec(None, rx_buf).await {
89 // Input source has closed.
90 Ok((0, _)) => break,
91 Ok((size, v)) => {
92 in_buffer.extend(&v[0..size]);
93 rx_buf = v;
94 }
95 Err(e) => {
96 error!("Failed to read console input: {}", e);
97 return;
98 }
99 }
100
101 // Submit all the data obtained during this read.
102 while !in_buffer.is_empty() {
103 match handle_input(&mem, &doorbell, &mut in_buffer, &mut queue) {
104 Ok(()) => {}
105 Err(ConsoleError::RxDescriptorsExhausted) => {
106 // Wait until a descriptor becomes available and try again.
107 if let Err(e) = kick_evt.next_val().await {
108 error!("Failed to read kick event for rx queue: {}", e);
109 return;
110 }
111 }
112 }
113 }
114 }
115 }
116
117 pub struct ConsoleDevice {
118 input: Option<AsyncQueueState<AsyncSerialInput>>,
119 output: AsyncQueueState<Box<dyn io::Write + Send>>,
120 avail_features: u64,
121 }
122
123 impl ConsoleDevice {
avail_features(&self) -> u64124 pub fn avail_features(&self) -> u64 {
125 self.avail_features
126 }
127
start_receive_queue<I: SignalableInterrupt + 'static>( &mut self, ex: &Executor, mem: GuestMemory, queue: virtio::Queue, doorbell: I, kick_evt: Event, ) -> anyhow::Result<()>128 pub fn start_receive_queue<I: SignalableInterrupt + 'static>(
129 &mut self,
130 ex: &Executor,
131 mem: GuestMemory,
132 queue: virtio::Queue,
133 doorbell: I,
134 kick_evt: Event,
135 ) -> anyhow::Result<()> {
136 let input_queue = match self.input.as_mut() {
137 Some(input_queue) => input_queue,
138 None => return Ok(()),
139 };
140
141 let kick_evt =
142 EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
143
144 let closure_ex = ex.clone();
145 let rx_future = move |input, abort| {
146 let async_input = closure_ex
147 .async_from(input)
148 .context("failed to create async input")?;
149
150 Ok(async move {
151 select2(
152 run_rx_queue(queue, mem, doorbell, kick_evt, &async_input).boxed_local(),
153 abort,
154 )
155 .await;
156
157 async_input.into_source()
158 })
159 };
160
161 input_queue.start(ex, rx_future)
162 }
163
stop_receive_queue(&mut self) -> AsyncResult<bool>164 pub fn stop_receive_queue(&mut self) -> AsyncResult<bool> {
165 if let Some(queue) = self.input.as_mut() {
166 queue.stop()
167 } else {
168 Ok(false)
169 }
170 }
171
start_transmit_queue<I: SignalableInterrupt + 'static>( &mut self, ex: &Executor, mem: GuestMemory, queue: virtio::Queue, doorbell: I, kick_evt: Event, ) -> anyhow::Result<()>172 pub fn start_transmit_queue<I: SignalableInterrupt + 'static>(
173 &mut self,
174 ex: &Executor,
175 mem: GuestMemory,
176 queue: virtio::Queue,
177 doorbell: I,
178 kick_evt: Event,
179 ) -> anyhow::Result<()> {
180 let kick_evt =
181 EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
182
183 let tx_future = |mut output, abort| {
184 Ok(async move {
185 select2(
186 run_tx_queue(queue, mem, doorbell, kick_evt, &mut output).boxed_local(),
187 abort,
188 )
189 .await;
190
191 output
192 })
193 };
194
195 self.output.start(ex, tx_future)
196 }
197
stop_transmit_queue(&mut self) -> AsyncResult<bool>198 pub fn stop_transmit_queue(&mut self) -> AsyncResult<bool> {
199 self.output.stop()
200 }
201 }
202
203 impl SerialDevice for ConsoleDevice {
new( protection_type: ProtectionType, _evt: Event, input: Option<Box<dyn SerialInput>>, output: Option<Box<dyn io::Write + Send>>, _sync: Option<Box<dyn FileSync + Send>>, _out_timestamp: bool, _keep_rds: Vec<RawDescriptor>, ) -> ConsoleDevice204 fn new(
205 protection_type: ProtectionType,
206 _evt: Event,
207 input: Option<Box<dyn SerialInput>>,
208 output: Option<Box<dyn io::Write + Send>>,
209 _sync: Option<Box<dyn FileSync + Send>>,
210 _out_timestamp: bool,
211 _keep_rds: Vec<RawDescriptor>,
212 ) -> ConsoleDevice {
213 let avail_features = virtio::base_features(protection_type)
214 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
215 ConsoleDevice {
216 input: input.map(AsyncSerialInput).map(AsyncQueueState::Stopped),
217 output: AsyncQueueState::Stopped(output.unwrap_or_else(|| Box::new(io::sink()))),
218 avail_features,
219 }
220 }
221 }
222
223 enum VirtioConsoleState {
224 Stopped(ConsoleDevice),
225 Running(WorkerThread<anyhow::Result<ConsoleDevice>>),
226 Broken,
227 }
228
229 /// Virtio console device.
230 pub struct AsyncConsole {
231 state: VirtioConsoleState,
232 base_features: u64,
233 keep_rds: Vec<RawDescriptor>,
234 }
235
236 impl SerialDevice for AsyncConsole {
new( protection_type: ProtectionType, evt: Event, input: Option<Box<dyn SerialInput>>, output: Option<Box<dyn io::Write + Send>>, sync: Option<Box<dyn FileSync + Send>>, out_timestamp: bool, keep_rds: Vec<RawDescriptor>, ) -> AsyncConsole237 fn new(
238 protection_type: ProtectionType,
239 evt: Event,
240 input: Option<Box<dyn SerialInput>>,
241 output: Option<Box<dyn io::Write + Send>>,
242 sync: Option<Box<dyn FileSync + Send>>,
243 out_timestamp: bool,
244 keep_rds: Vec<RawDescriptor>,
245 ) -> AsyncConsole {
246 AsyncConsole {
247 state: VirtioConsoleState::Stopped(ConsoleDevice::new(
248 protection_type,
249 evt,
250 input,
251 output,
252 sync,
253 out_timestamp,
254 Default::default(),
255 )),
256 base_features: base_features(protection_type),
257 keep_rds,
258 }
259 }
260 }
261
262 impl VirtioDevice for AsyncConsole {
keep_rds(&self) -> Vec<RawDescriptor>263 fn keep_rds(&self) -> Vec<RawDescriptor> {
264 self.keep_rds.clone()
265 }
266
features(&self) -> u64267 fn features(&self) -> u64 {
268 self.base_features
269 }
270
device_type(&self) -> DeviceType271 fn device_type(&self) -> DeviceType {
272 DeviceType::Console
273 }
274
queue_max_sizes(&self) -> &[u16]275 fn queue_max_sizes(&self) -> &[u16] {
276 QUEUE_SIZES
277 }
278
read_config(&self, offset: u64, data: &mut [u8])279 fn read_config(&self, offset: u64, data: &mut [u8]) {
280 let config = virtio_console_config {
281 max_nr_ports: 1.into(),
282 ..Default::default()
283 };
284 copy_config(data, 0, config.as_bytes(), offset);
285 }
286
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: Vec<(Queue, Event)>, ) -> anyhow::Result<()>287 fn activate(
288 &mut self,
289 mem: GuestMemory,
290 interrupt: Interrupt,
291 mut queues: Vec<(Queue, Event)>,
292 ) -> anyhow::Result<()> {
293 if queues.len() < 2 {
294 return Err(anyhow!("expected 2 queues, got {}", queues.len()));
295 }
296
297 // Reset the device if it was already running.
298 if matches!(self.state, VirtioConsoleState::Running { .. }) {
299 self.reset();
300 }
301
302 let state = std::mem::replace(&mut self.state, VirtioConsoleState::Broken);
303 let console = match state {
304 VirtioConsoleState::Running { .. } => {
305 return Err(anyhow!("device should not be running here. This is a bug."));
306 }
307 VirtioConsoleState::Stopped(console) => console,
308 VirtioConsoleState::Broken => {
309 return Err(anyhow!("device is broken and cannot be activated"));
310 }
311 };
312
313 let ex = Executor::new().expect("failed to create an executor");
314 let (receive_queue, receive_evt) = queues.remove(0);
315 let (transmit_queue, transmit_evt) = queues.remove(0);
316
317 self.state =
318 VirtioConsoleState::Running(WorkerThread::start("v_console", move |kill_evt| {
319 let mut console = console;
320
321 console.start_receive_queue(
322 &ex,
323 mem.clone(),
324 receive_queue,
325 interrupt.clone(),
326 receive_evt,
327 )?;
328
329 console.start_transmit_queue(&ex, mem, transmit_queue, interrupt, transmit_evt)?;
330
331 // Run until the kill event is signaled and cancel all tasks.
332 ex.run_until(async {
333 async_utils::await_and_exit(&ex, kill_evt).await?;
334 if let Some(input) = console.input.as_mut() {
335 input.stop().context("failed to stop rx queue")?;
336 }
337 console.output.stop().context("failed to stop tx queue")?;
338
339 Ok(console)
340 })?
341 }));
342
343 Ok(())
344 }
345
reset(&mut self) -> bool346 fn reset(&mut self) -> bool {
347 match std::mem::replace(&mut self.state, VirtioConsoleState::Broken) {
348 // Stopped console is already in reset state.
349 state @ VirtioConsoleState::Stopped(_) => {
350 self.state = state;
351 true
352 }
353 // Stop the worker thread and go back to `Stopped` state.
354 VirtioConsoleState::Running(worker_thread) => {
355 let thread_res = worker_thread.stop();
356 match thread_res {
357 Ok(console) => {
358 self.state = VirtioConsoleState::Stopped(console);
359 true
360 }
361 Err(e) => {
362 error!("worker thread returned an error: {}", e);
363 false
364 }
365 }
366 }
367 // We are broken and cannot reset properly.
368 VirtioConsoleState::Broken => false,
369 }
370 }
371 }
372
373 impl Suspendable for AsyncConsole {}
374