1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! Asynchronous console device which implementation can be shared by VMM and vhost-user.
6
7 use std::collections::BTreeMap;
8 use std::collections::VecDeque;
9 use std::io;
10 use std::sync::Arc;
11
12 use anyhow::anyhow;
13 use anyhow::Context;
14 use base::error;
15 #[cfg(windows)]
16 use base::named_pipes;
17 use base::AsRawDescriptor;
18 use base::Descriptor;
19 use base::Event;
20 use base::FileSync;
21 use base::RawDescriptor;
22 use base::WorkerThread;
23 use cros_async::select2;
24 use cros_async::AsyncResult;
25 use cros_async::EventAsync;
26 use cros_async::Executor;
27 use cros_async::IntoAsync;
28 use cros_async::IoSource;
29 use futures::FutureExt;
30 use hypervisor::ProtectionType;
31 use sync::Mutex;
32 use vm_memory::GuestMemory;
33 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
34 use zerocopy::AsBytes;
35
36 use super::handle_input;
37 use super::process_transmit_queue;
38 use super::QUEUE_SIZES;
39 use crate::serial_device::SerialInput;
40 use crate::serial_device::SerialOptions;
41 use crate::virtio;
42 use crate::virtio::async_device::AsyncQueueState;
43 use crate::virtio::async_utils;
44 use crate::virtio::base_features;
45 use crate::virtio::console::multiport::ConsolePortInfo;
46 use crate::virtio::console::multiport::ControlPort;
47 use crate::virtio::console::virtio_console_config;
48 use crate::virtio::console::ConsoleError;
49 use crate::virtio::copy_config;
50 use crate::virtio::device_constants::console::VIRTIO_CONSOLE_F_MULTIPORT;
51 use crate::virtio::DeviceType;
52 use crate::virtio::Interrupt;
53 use crate::virtio::Queue;
54 use crate::virtio::VirtioDevice;
55 use crate::PciAddress;
56 use crate::SerialDevice;
57
58 /// Wrapper that makes any `SerialInput` usable as an async source by providing an implementation of
59 /// `IntoAsync`.
60 struct AsyncSerialInput(Box<dyn SerialInput>);
61 impl AsRawDescriptor for AsyncSerialInput {
as_raw_descriptor(&self) -> RawDescriptor62 fn as_raw_descriptor(&self) -> RawDescriptor {
63 self.0.get_read_notifier().as_raw_descriptor()
64 }
65 }
66 impl IntoAsync for AsyncSerialInput {}
67
run_tx_queue( queue: &Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, kick_evt: EventAsync, output: &mut Box<dyn io::Write + Send>, )68 async fn run_tx_queue(
69 queue: &Arc<Mutex<virtio::Queue>>,
70 doorbell: Interrupt,
71 kick_evt: EventAsync,
72 output: &mut Box<dyn io::Write + Send>,
73 ) {
74 loop {
75 if let Err(e) = kick_evt.next_val().await {
76 error!("Failed to read kick event for tx queue: {}", e);
77 break;
78 }
79 process_transmit_queue(&doorbell, queue, output.as_mut());
80 }
81 }
82
run_rx_queue( queue: &Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, kick_evt: EventAsync, input: &IoSource<AsyncSerialInput>, )83 async fn run_rx_queue(
84 queue: &Arc<Mutex<virtio::Queue>>,
85 doorbell: Interrupt,
86 kick_evt: EventAsync,
87 input: &IoSource<AsyncSerialInput>,
88 ) {
89 // Staging buffer, required because of `handle_input`'s API. We can probably remove this once
90 // the regular virtio device is switched to async.
91 let mut in_buffer = VecDeque::<u8>::new();
92 let mut rx_buf = vec![0u8; 4096];
93
94 loop {
95 match input.read_to_vec(None, rx_buf).await {
96 // Input source has closed.
97 Ok((0, _)) => break,
98 Ok((size, v)) => {
99 in_buffer.extend(&v[0..size]);
100 rx_buf = v;
101 }
102 Err(e) => {
103 error!("Failed to read console input: {}", e);
104 return;
105 }
106 }
107
108 // Submit all the data obtained during this read.
109 while !in_buffer.is_empty() {
110 match handle_input(&doorbell, &mut in_buffer, queue) {
111 Ok(()) => {}
112 Err(ConsoleError::RxDescriptorsExhausted) => {
113 // Wait until a descriptor becomes available and try again.
114 if let Err(e) = kick_evt.next_val().await {
115 error!("Failed to read kick event for rx queue: {}", e);
116 return;
117 }
118 }
119 }
120 }
121 }
122 }
123
124 pub struct ConsolePort {
125 input: Option<AsyncQueueState<AsyncSerialInput>>,
126 output: AsyncQueueState<Box<dyn io::Write + Send>>,
127 info: ConsolePortInfo,
128 }
129
130 impl SerialDevice for ConsolePort {
new( _protection_type: ProtectionType, _evt: Event, input: Option<Box<dyn SerialInput>>, output: Option<Box<dyn io::Write + Send>>, _sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> ConsolePort131 fn new(
132 _protection_type: ProtectionType,
133 _evt: Event,
134 input: Option<Box<dyn SerialInput>>,
135 output: Option<Box<dyn io::Write + Send>>,
136 _sync: Option<Box<dyn FileSync + Send>>,
137 options: SerialOptions,
138 _keep_rds: Vec<RawDescriptor>,
139 ) -> ConsolePort {
140 let input = input.map(AsyncSerialInput).map(AsyncQueueState::Stopped);
141 let output = AsyncQueueState::Stopped(output.unwrap_or_else(|| Box::new(io::sink())));
142 let info = ConsolePortInfo {
143 console: options.console,
144 name: options.name.unwrap_or_default(),
145 };
146
147 ConsolePort {
148 input,
149 output,
150 info,
151 }
152 }
153
154 #[cfg(windows)]
new_with_pipe( _protection_type: ProtectionType, _interrupt_evt: Event, _pipe_in: named_pipes::PipeConnection, _pipe_out: named_pipes::PipeConnection, _options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> ConsolePort155 fn new_with_pipe(
156 _protection_type: ProtectionType,
157 _interrupt_evt: Event,
158 _pipe_in: named_pipes::PipeConnection,
159 _pipe_out: named_pipes::PipeConnection,
160 _options: SerialOptions,
161 _keep_rds: Vec<RawDescriptor>,
162 ) -> ConsolePort {
163 unimplemented!("new_with_pipe unimplemented for ConsolePort");
164 }
165 }
166
167 impl ConsolePort {
start_receive_queue( &mut self, ex: &Executor, queue: Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, ) -> anyhow::Result<()>168 pub fn start_receive_queue(
169 &mut self,
170 ex: &Executor,
171 queue: Arc<Mutex<virtio::Queue>>,
172 doorbell: Interrupt,
173 ) -> anyhow::Result<()> {
174 let input_queue = match self.input.as_mut() {
175 Some(input_queue) => input_queue,
176 None => return Ok(()),
177 };
178
179 let kick_evt = queue
180 .lock()
181 .event()
182 .try_clone()
183 .context("Failed to clone queue event")?;
184 let kick_evt =
185 EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
186
187 let closure_ex = ex.clone();
188 let rx_future = move |input, abort| {
189 let async_input = closure_ex
190 .async_from(input)
191 .context("failed to create async input")?;
192
193 Ok(async move {
194 select2(
195 run_rx_queue(&queue, doorbell, kick_evt, &async_input).boxed_local(),
196 abort,
197 )
198 .await;
199
200 async_input.into_source()
201 })
202 };
203
204 input_queue.start(ex, rx_future)
205 }
206
stop_receive_queue(&mut self) -> AsyncResult<bool>207 pub fn stop_receive_queue(&mut self) -> AsyncResult<bool> {
208 if let Some(queue) = self.input.as_mut() {
209 queue.stop()
210 } else {
211 Ok(false)
212 }
213 }
214
start_transmit_queue( &mut self, ex: &Executor, queue: Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, ) -> anyhow::Result<()>215 pub fn start_transmit_queue(
216 &mut self,
217 ex: &Executor,
218 queue: Arc<Mutex<virtio::Queue>>,
219 doorbell: Interrupt,
220 ) -> anyhow::Result<()> {
221 let kick_evt = queue
222 .lock()
223 .event()
224 .try_clone()
225 .context("Failed to clone queue event")?;
226 let kick_evt =
227 EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
228
229 let tx_future = |mut output, abort| {
230 Ok(async move {
231 select2(
232 run_tx_queue(&queue, doorbell, kick_evt, &mut output).boxed_local(),
233 abort,
234 )
235 .await;
236
237 output
238 })
239 };
240
241 self.output.start(ex, tx_future)
242 }
243
stop_transmit_queue(&mut self) -> AsyncResult<bool>244 pub fn stop_transmit_queue(&mut self) -> AsyncResult<bool> {
245 self.output.stop()
246 }
247 }
248
249 /// Console device with an optional control port to support for multiport
250 pub struct ConsoleDevice {
251 avail_features: u64,
252 // Port 0 always exists.
253 port0: ConsolePort,
254 // Control port, if multiport is in use.
255 control_port: Option<ControlPort>,
256 // Port 1..n, if they exist.
257 extra_ports: Vec<ConsolePort>,
258 }
259
260 impl ConsoleDevice {
261 /// Create a console device with the multiport feature enabled
262 /// The multiport feature is referred to virtio spec.
new_multi_port( protection_type: ProtectionType, port0: ConsolePort, extra_ports: Vec<ConsolePort>, ) -> ConsoleDevice263 pub fn new_multi_port(
264 protection_type: ProtectionType,
265 port0: ConsolePort,
266 extra_ports: Vec<ConsolePort>,
267 ) -> ConsoleDevice {
268 let avail_features =
269 virtio::base_features(protection_type) | (1 << VIRTIO_CONSOLE_F_MULTIPORT);
270
271 let info = std::iter::once(&port0)
272 .chain(extra_ports.iter())
273 .map(|port| port.info.clone())
274 .collect::<Vec<_>>();
275
276 ConsoleDevice {
277 avail_features,
278 port0,
279 control_port: Some(ControlPort::new(info)),
280 extra_ports,
281 }
282 }
283
284 /// Return available features
avail_features(&self) -> u64285 pub fn avail_features(&self) -> u64 {
286 self.avail_features
287 }
288
289 /// Return whether current console device supports multiport feature
is_multi_port(&self) -> bool290 pub fn is_multi_port(&self) -> bool {
291 self.avail_features & (1 << VIRTIO_CONSOLE_F_MULTIPORT) != 0
292 }
293
294 /// Return the number of the port initiated by the console device
max_ports(&self) -> usize295 pub fn max_ports(&self) -> usize {
296 1 + self.extra_ports.len()
297 }
298
299 /// Returns the maximum number of queues supported by this device.
max_queues(&self) -> usize300 pub fn max_queues(&self) -> usize {
301 // The port 0 receive and transmit queues always exist;
302 // other queues only exist if VIRTIO_CONSOLE_F_MULTIPORT is set.
303 if self.is_multi_port() {
304 let port_num = self.max_ports();
305
306 // Extra 1 is for control port; each port has two queues (tx & rx)
307 (port_num + 1) * 2
308 } else {
309 2
310 }
311 }
312
313 /// Return the reference of the console port by port_id
get_console_port(&mut self, port_id: usize) -> anyhow::Result<&mut ConsolePort>314 fn get_console_port(&mut self, port_id: usize) -> anyhow::Result<&mut ConsolePort> {
315 match port_id {
316 0 => Ok(&mut self.port0),
317 port_id => self
318 .extra_ports
319 .get_mut(port_id - 1)
320 .with_context(|| format!("failed to get console port {}", port_id)),
321 }
322 }
323
324 /// Start the queue with the index `idx`
start_queue( &mut self, ex: &Executor, idx: usize, queue: Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, ) -> anyhow::Result<()>325 pub fn start_queue(
326 &mut self,
327 ex: &Executor,
328 idx: usize,
329 queue: Arc<Mutex<virtio::Queue>>,
330 doorbell: Interrupt,
331 ) -> anyhow::Result<()> {
332 match idx {
333 // rxq (port0)
334 0 => self.port0.start_receive_queue(ex, queue, doorbell),
335 // txq (port0)
336 1 => self.port0.start_transmit_queue(ex, queue, doorbell),
337 // control port rxq
338 2 => self
339 .control_port
340 .as_mut()
341 .unwrap()
342 .start_receive_queue(ex, queue, doorbell),
343 // control port txq
344 3 => self
345 .control_port
346 .as_mut()
347 .unwrap()
348 .start_transmit_queue(ex, queue, doorbell),
349 // {4, 5} -> port1 {rxq, txq} if exist
350 // {6, 7} -> port2 {rxq, txq} if exist
351 // ...
352 _ => {
353 let port_id = idx / 2 - 1;
354 let port = self.get_console_port(port_id)?;
355 match idx % 2 {
356 0 => port.start_receive_queue(ex, queue, doorbell),
357 1 => port.start_transmit_queue(ex, queue, doorbell),
358 _ => unreachable!(),
359 }
360 }
361 }
362 }
363
364 /// Stop the queue with the index `idx`
stop_queue(&mut self, idx: usize) -> anyhow::Result<bool>365 pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<bool> {
366 match idx {
367 0 => self
368 .port0
369 .stop_receive_queue()
370 .context("failed to stop rx queue"),
371 1 => self
372 .port0
373 .stop_transmit_queue()
374 .context("failed to stop tx queue"),
375 2 => self.control_port.as_mut().unwrap().stop_receive_queue(),
376 3 => self.control_port.as_mut().unwrap().stop_transmit_queue(),
377 _ => {
378 let port_id = idx / 2 - 1;
379 let port = self.get_console_port(port_id)?;
380 match idx % 2 {
381 0 => port.stop_receive_queue().context("failed to stop rx queue"),
382 1 => port
383 .stop_transmit_queue()
384 .context("failed to stop tx queue"),
385 _ => unreachable!(),
386 }
387 }
388 }
389 }
390 }
391
392 impl SerialDevice for ConsoleDevice {
393 /// Create a default console device, without multiport support
new( protection_type: ProtectionType, evt: Event, input: Option<Box<dyn SerialInput>>, output: Option<Box<dyn io::Write + Send>>, sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, keep_rds: Vec<RawDescriptor>, ) -> ConsoleDevice394 fn new(
395 protection_type: ProtectionType,
396 evt: Event,
397 input: Option<Box<dyn SerialInput>>,
398 output: Option<Box<dyn io::Write + Send>>,
399 sync: Option<Box<dyn FileSync + Send>>,
400 options: SerialOptions,
401 keep_rds: Vec<RawDescriptor>,
402 ) -> ConsoleDevice {
403 let avail_features =
404 virtio::base_features(protection_type) | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
405 let port0 = ConsolePort::new(protection_type, evt, input, output, sync, options, keep_rds);
406
407 ConsoleDevice {
408 avail_features,
409 port0,
410 control_port: None,
411 extra_ports: vec![],
412 }
413 }
414
415 #[cfg(windows)]
new_with_pipe( _protection_type: ProtectionType, _interrupt_evt: Event, _pipe_in: named_pipes::PipeConnection, _pipe_out: named_pipes::PipeConnection, _options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> ConsoleDevice416 fn new_with_pipe(
417 _protection_type: ProtectionType,
418 _interrupt_evt: Event,
419 _pipe_in: named_pipes::PipeConnection,
420 _pipe_out: named_pipes::PipeConnection,
421 _options: SerialOptions,
422 _keep_rds: Vec<RawDescriptor>,
423 ) -> ConsoleDevice {
424 unimplemented!("new_with_pipe unimplemented for ConsoleDevice");
425 }
426 }
427
428 /// Virtio console device.
429 pub struct AsyncConsole {
430 console_device: Option<ConsoleDevice>,
431 worker_thread: Option<WorkerThread<anyhow::Result<ConsoleDevice>>>,
432 base_features: u64,
433 keep_descriptors: Vec<Descriptor>,
434 pci_address: Option<PciAddress>,
435 }
436
437 impl SerialDevice for AsyncConsole {
new( protection_type: ProtectionType, evt: Event, input: Option<Box<dyn SerialInput>>, output: Option<Box<dyn io::Write + Send>>, sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, keep_rds: Vec<RawDescriptor>, ) -> AsyncConsole438 fn new(
439 protection_type: ProtectionType,
440 evt: Event,
441 input: Option<Box<dyn SerialInput>>,
442 output: Option<Box<dyn io::Write + Send>>,
443 sync: Option<Box<dyn FileSync + Send>>,
444 options: SerialOptions,
445 keep_rds: Vec<RawDescriptor>,
446 ) -> AsyncConsole {
447 let pci_address = options.pci_address;
448 AsyncConsole {
449 console_device: Some(ConsoleDevice::new(
450 protection_type,
451 evt,
452 input,
453 output,
454 sync,
455 options,
456 Default::default(),
457 )),
458 worker_thread: None,
459 base_features: base_features(protection_type),
460 keep_descriptors: keep_rds.iter().copied().map(Descriptor).collect(),
461 pci_address,
462 }
463 }
464
465 #[cfg(windows)]
new_with_pipe( _protection_type: ProtectionType, _interrupt_evt: Event, _pipe_in: named_pipes::PipeConnection, _pipe_out: named_pipes::PipeConnection, _options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> AsyncConsole466 fn new_with_pipe(
467 _protection_type: ProtectionType,
468 _interrupt_evt: Event,
469 _pipe_in: named_pipes::PipeConnection,
470 _pipe_out: named_pipes::PipeConnection,
471 _options: SerialOptions,
472 _keep_rds: Vec<RawDescriptor>,
473 ) -> AsyncConsole {
474 unimplemented!("new_with_pipe unimplemented for AsyncConsole");
475 }
476 }
477
478 impl VirtioDevice for AsyncConsole {
keep_rds(&self) -> Vec<RawDescriptor>479 fn keep_rds(&self) -> Vec<RawDescriptor> {
480 self.keep_descriptors
481 .iter()
482 .map(Descriptor::as_raw_descriptor)
483 .collect()
484 }
485
features(&self) -> u64486 fn features(&self) -> u64 {
487 self.base_features
488 }
489
device_type(&self) -> DeviceType490 fn device_type(&self) -> DeviceType {
491 DeviceType::Console
492 }
493
queue_max_sizes(&self) -> &[u16]494 fn queue_max_sizes(&self) -> &[u16] {
495 QUEUE_SIZES
496 }
497
read_config(&self, offset: u64, data: &mut [u8])498 fn read_config(&self, offset: u64, data: &mut [u8]) {
499 let config = virtio_console_config {
500 max_nr_ports: 1.into(),
501 ..Default::default()
502 };
503 copy_config(data, 0, config.as_bytes(), offset);
504 }
505
activate( &mut self, _mem: GuestMemory, interrupt: Interrupt, mut queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>506 fn activate(
507 &mut self,
508 _mem: GuestMemory,
509 interrupt: Interrupt,
510 mut queues: BTreeMap<usize, Queue>,
511 ) -> anyhow::Result<()> {
512 if queues.len() < 2 {
513 return Err(anyhow!("expected 2 queues, got {}", queues.len()));
514 }
515
516 let console = self.console_device.take().context("no console_device")?;
517
518 let ex = Executor::new().expect("failed to create an executor");
519 let receive_queue = queues.remove(&0).unwrap();
520 let transmit_queue = queues.remove(&1).unwrap();
521
522 self.worker_thread = Some(WorkerThread::start("v_console", move |kill_evt| {
523 let mut console = console;
524 let receive_queue = Arc::new(Mutex::new(receive_queue));
525 let transmit_queue = Arc::new(Mutex::new(transmit_queue));
526
527 // Start transmit queue of port 0
528 console.start_queue(&ex, 0, receive_queue, interrupt.clone())?;
529 // Start receive queue of port 0
530 console.start_queue(&ex, 1, transmit_queue, interrupt.clone())?;
531
532 // Run until the kill event is signaled and cancel all tasks.
533 ex.run_until(async {
534 async_utils::await_and_exit(&ex, kill_evt).await?;
535 let port = &mut console.port0;
536 if let Some(input) = port.input.as_mut() {
537 input
538 .stop_async()
539 .await
540 .context("failed to stop rx queue")?;
541 }
542 port.output
543 .stop_async()
544 .await
545 .context("failed to stop tx queue")?;
546
547 Ok(console)
548 })?
549 }));
550
551 Ok(())
552 }
553
pci_address(&self) -> Option<PciAddress>554 fn pci_address(&self) -> Option<PciAddress> {
555 self.pci_address
556 }
557
reset(&mut self) -> anyhow::Result<()>558 fn reset(&mut self) -> anyhow::Result<()> {
559 if let Some(worker_thread) = self.worker_thread.take() {
560 let console = worker_thread.stop()?;
561 self.console_device = Some(console);
562 }
563 Ok(())
564 }
565 }
566