• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Asynchronous console device which implementation can be shared by VMM and vhost-user.
6 
7 use std::collections::BTreeMap;
8 use std::collections::VecDeque;
9 use std::io;
10 use std::sync::Arc;
11 
12 use anyhow::anyhow;
13 use anyhow::Context;
14 use base::error;
15 #[cfg(windows)]
16 use base::named_pipes;
17 use base::AsRawDescriptor;
18 use base::Descriptor;
19 use base::Event;
20 use base::FileSync;
21 use base::RawDescriptor;
22 use base::WorkerThread;
23 use cros_async::select2;
24 use cros_async::AsyncResult;
25 use cros_async::EventAsync;
26 use cros_async::Executor;
27 use cros_async::IntoAsync;
28 use cros_async::IoSource;
29 use futures::FutureExt;
30 use hypervisor::ProtectionType;
31 use sync::Mutex;
32 use vm_memory::GuestMemory;
33 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
34 use zerocopy::AsBytes;
35 
36 use super::handle_input;
37 use super::process_transmit_queue;
38 use super::QUEUE_SIZES;
39 use crate::serial_device::SerialInput;
40 use crate::serial_device::SerialOptions;
41 use crate::virtio;
42 use crate::virtio::async_device::AsyncQueueState;
43 use crate::virtio::async_utils;
44 use crate::virtio::base_features;
45 use crate::virtio::console::multiport::ConsolePortInfo;
46 use crate::virtio::console::multiport::ControlPort;
47 use crate::virtio::console::virtio_console_config;
48 use crate::virtio::console::ConsoleError;
49 use crate::virtio::copy_config;
50 use crate::virtio::device_constants::console::VIRTIO_CONSOLE_F_MULTIPORT;
51 use crate::virtio::DeviceType;
52 use crate::virtio::Interrupt;
53 use crate::virtio::Queue;
54 use crate::virtio::VirtioDevice;
55 use crate::PciAddress;
56 use crate::SerialDevice;
57 
58 /// Wrapper that makes any `SerialInput` usable as an async source by providing an implementation of
59 /// `IntoAsync`.
60 struct AsyncSerialInput(Box<dyn SerialInput>);
61 impl AsRawDescriptor for AsyncSerialInput {
as_raw_descriptor(&self) -> RawDescriptor62     fn as_raw_descriptor(&self) -> RawDescriptor {
63         self.0.get_read_notifier().as_raw_descriptor()
64     }
65 }
66 impl IntoAsync for AsyncSerialInput {}
67 
run_tx_queue( queue: &Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, kick_evt: EventAsync, output: &mut Box<dyn io::Write + Send>, )68 async fn run_tx_queue(
69     queue: &Arc<Mutex<virtio::Queue>>,
70     doorbell: Interrupt,
71     kick_evt: EventAsync,
72     output: &mut Box<dyn io::Write + Send>,
73 ) {
74     loop {
75         if let Err(e) = kick_evt.next_val().await {
76             error!("Failed to read kick event for tx queue: {}", e);
77             break;
78         }
79         process_transmit_queue(&doorbell, queue, output.as_mut());
80     }
81 }
82 
run_rx_queue( queue: &Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, kick_evt: EventAsync, input: &IoSource<AsyncSerialInput>, )83 async fn run_rx_queue(
84     queue: &Arc<Mutex<virtio::Queue>>,
85     doorbell: Interrupt,
86     kick_evt: EventAsync,
87     input: &IoSource<AsyncSerialInput>,
88 ) {
89     // Staging buffer, required because of `handle_input`'s API. We can probably remove this once
90     // the regular virtio device is switched to async.
91     let mut in_buffer = VecDeque::<u8>::new();
92     let mut rx_buf = vec![0u8; 4096];
93 
94     loop {
95         match input.read_to_vec(None, rx_buf).await {
96             // Input source has closed.
97             Ok((0, _)) => break,
98             Ok((size, v)) => {
99                 in_buffer.extend(&v[0..size]);
100                 rx_buf = v;
101             }
102             Err(e) => {
103                 error!("Failed to read console input: {}", e);
104                 return;
105             }
106         }
107 
108         // Submit all the data obtained during this read.
109         while !in_buffer.is_empty() {
110             match handle_input(&doorbell, &mut in_buffer, queue) {
111                 Ok(()) => {}
112                 Err(ConsoleError::RxDescriptorsExhausted) => {
113                     // Wait until a descriptor becomes available and try again.
114                     if let Err(e) = kick_evt.next_val().await {
115                         error!("Failed to read kick event for rx queue: {}", e);
116                         return;
117                     }
118                 }
119             }
120         }
121     }
122 }
123 
124 pub struct ConsolePort {
125     input: Option<AsyncQueueState<AsyncSerialInput>>,
126     output: AsyncQueueState<Box<dyn io::Write + Send>>,
127     info: ConsolePortInfo,
128 }
129 
130 impl SerialDevice for ConsolePort {
new( _protection_type: ProtectionType, _evt: Event, input: Option<Box<dyn SerialInput>>, output: Option<Box<dyn io::Write + Send>>, _sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> ConsolePort131     fn new(
132         _protection_type: ProtectionType,
133         _evt: Event,
134         input: Option<Box<dyn SerialInput>>,
135         output: Option<Box<dyn io::Write + Send>>,
136         _sync: Option<Box<dyn FileSync + Send>>,
137         options: SerialOptions,
138         _keep_rds: Vec<RawDescriptor>,
139     ) -> ConsolePort {
140         let input = input.map(AsyncSerialInput).map(AsyncQueueState::Stopped);
141         let output = AsyncQueueState::Stopped(output.unwrap_or_else(|| Box::new(io::sink())));
142         let info = ConsolePortInfo {
143             console: options.console,
144             name: options.name.unwrap_or_default(),
145         };
146 
147         ConsolePort {
148             input,
149             output,
150             info,
151         }
152     }
153 
154     #[cfg(windows)]
new_with_pipe( _protection_type: ProtectionType, _interrupt_evt: Event, _pipe_in: named_pipes::PipeConnection, _pipe_out: named_pipes::PipeConnection, _options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> ConsolePort155     fn new_with_pipe(
156         _protection_type: ProtectionType,
157         _interrupt_evt: Event,
158         _pipe_in: named_pipes::PipeConnection,
159         _pipe_out: named_pipes::PipeConnection,
160         _options: SerialOptions,
161         _keep_rds: Vec<RawDescriptor>,
162     ) -> ConsolePort {
163         unimplemented!("new_with_pipe unimplemented for ConsolePort");
164     }
165 }
166 
167 impl ConsolePort {
start_receive_queue( &mut self, ex: &Executor, queue: Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, ) -> anyhow::Result<()>168     pub fn start_receive_queue(
169         &mut self,
170         ex: &Executor,
171         queue: Arc<Mutex<virtio::Queue>>,
172         doorbell: Interrupt,
173     ) -> anyhow::Result<()> {
174         let input_queue = match self.input.as_mut() {
175             Some(input_queue) => input_queue,
176             None => return Ok(()),
177         };
178 
179         let kick_evt = queue
180             .lock()
181             .event()
182             .try_clone()
183             .context("Failed to clone queue event")?;
184         let kick_evt =
185             EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
186 
187         let closure_ex = ex.clone();
188         let rx_future = move |input, abort| {
189             let async_input = closure_ex
190                 .async_from(input)
191                 .context("failed to create async input")?;
192 
193             Ok(async move {
194                 select2(
195                     run_rx_queue(&queue, doorbell, kick_evt, &async_input).boxed_local(),
196                     abort,
197                 )
198                 .await;
199 
200                 async_input.into_source()
201             })
202         };
203 
204         input_queue.start(ex, rx_future)
205     }
206 
stop_receive_queue(&mut self) -> AsyncResult<bool>207     pub fn stop_receive_queue(&mut self) -> AsyncResult<bool> {
208         if let Some(queue) = self.input.as_mut() {
209             queue.stop()
210         } else {
211             Ok(false)
212         }
213     }
214 
start_transmit_queue( &mut self, ex: &Executor, queue: Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, ) -> anyhow::Result<()>215     pub fn start_transmit_queue(
216         &mut self,
217         ex: &Executor,
218         queue: Arc<Mutex<virtio::Queue>>,
219         doorbell: Interrupt,
220     ) -> anyhow::Result<()> {
221         let kick_evt = queue
222             .lock()
223             .event()
224             .try_clone()
225             .context("Failed to clone queue event")?;
226         let kick_evt =
227             EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
228 
229         let tx_future = |mut output, abort| {
230             Ok(async move {
231                 select2(
232                     run_tx_queue(&queue, doorbell, kick_evt, &mut output).boxed_local(),
233                     abort,
234                 )
235                 .await;
236 
237                 output
238             })
239         };
240 
241         self.output.start(ex, tx_future)
242     }
243 
stop_transmit_queue(&mut self) -> AsyncResult<bool>244     pub fn stop_transmit_queue(&mut self) -> AsyncResult<bool> {
245         self.output.stop()
246     }
247 }
248 
249 /// Console device with an optional control port to support for multiport
250 pub struct ConsoleDevice {
251     avail_features: u64,
252     // Port 0 always exists.
253     port0: ConsolePort,
254     // Control port, if multiport is in use.
255     control_port: Option<ControlPort>,
256     // Port 1..n, if they exist.
257     extra_ports: Vec<ConsolePort>,
258 }
259 
260 impl ConsoleDevice {
261     /// Create a console device with the multiport feature enabled
262     /// The multiport feature is referred to virtio spec.
new_multi_port( protection_type: ProtectionType, port0: ConsolePort, extra_ports: Vec<ConsolePort>, ) -> ConsoleDevice263     pub fn new_multi_port(
264         protection_type: ProtectionType,
265         port0: ConsolePort,
266         extra_ports: Vec<ConsolePort>,
267     ) -> ConsoleDevice {
268         let avail_features =
269             virtio::base_features(protection_type) | (1 << VIRTIO_CONSOLE_F_MULTIPORT);
270 
271         let info = std::iter::once(&port0)
272             .chain(extra_ports.iter())
273             .map(|port| port.info.clone())
274             .collect::<Vec<_>>();
275 
276         ConsoleDevice {
277             avail_features,
278             port0,
279             control_port: Some(ControlPort::new(info)),
280             extra_ports,
281         }
282     }
283 
284     /// Return available features
avail_features(&self) -> u64285     pub fn avail_features(&self) -> u64 {
286         self.avail_features
287     }
288 
289     /// Return whether current console device supports multiport feature
is_multi_port(&self) -> bool290     pub fn is_multi_port(&self) -> bool {
291         self.avail_features & (1 << VIRTIO_CONSOLE_F_MULTIPORT) != 0
292     }
293 
294     /// Return the number of the port initiated by the console device
max_ports(&self) -> usize295     pub fn max_ports(&self) -> usize {
296         1 + self.extra_ports.len()
297     }
298 
299     /// Returns the maximum number of queues supported by this device.
max_queues(&self) -> usize300     pub fn max_queues(&self) -> usize {
301         // The port 0 receive and transmit queues always exist;
302         // other queues only exist if VIRTIO_CONSOLE_F_MULTIPORT is set.
303         if self.is_multi_port() {
304             let port_num = self.max_ports();
305 
306             // Extra 1 is for control port; each port has two queues (tx & rx)
307             (port_num + 1) * 2
308         } else {
309             2
310         }
311     }
312 
313     /// Return the reference of the console port by port_id
get_console_port(&mut self, port_id: usize) -> anyhow::Result<&mut ConsolePort>314     fn get_console_port(&mut self, port_id: usize) -> anyhow::Result<&mut ConsolePort> {
315         match port_id {
316             0 => Ok(&mut self.port0),
317             port_id => self
318                 .extra_ports
319                 .get_mut(port_id - 1)
320                 .with_context(|| format!("failed to get console port {}", port_id)),
321         }
322     }
323 
324     /// Start the queue with the index `idx`
start_queue( &mut self, ex: &Executor, idx: usize, queue: Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, ) -> anyhow::Result<()>325     pub fn start_queue(
326         &mut self,
327         ex: &Executor,
328         idx: usize,
329         queue: Arc<Mutex<virtio::Queue>>,
330         doorbell: Interrupt,
331     ) -> anyhow::Result<()> {
332         match idx {
333             // rxq (port0)
334             0 => self.port0.start_receive_queue(ex, queue, doorbell),
335             // txq (port0)
336             1 => self.port0.start_transmit_queue(ex, queue, doorbell),
337             // control port rxq
338             2 => self
339                 .control_port
340                 .as_mut()
341                 .unwrap()
342                 .start_receive_queue(ex, queue, doorbell),
343             // control port txq
344             3 => self
345                 .control_port
346                 .as_mut()
347                 .unwrap()
348                 .start_transmit_queue(ex, queue, doorbell),
349             // {4, 5} -> port1 {rxq, txq} if exist
350             // {6, 7} -> port2 {rxq, txq} if exist
351             // ...
352             _ => {
353                 let port_id = idx / 2 - 1;
354                 let port = self.get_console_port(port_id)?;
355                 match idx % 2 {
356                     0 => port.start_receive_queue(ex, queue, doorbell),
357                     1 => port.start_transmit_queue(ex, queue, doorbell),
358                     _ => unreachable!(),
359                 }
360             }
361         }
362     }
363 
364     /// Stop the queue with the index `idx`
stop_queue(&mut self, idx: usize) -> anyhow::Result<bool>365     pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<bool> {
366         match idx {
367             0 => self
368                 .port0
369                 .stop_receive_queue()
370                 .context("failed to stop rx queue"),
371             1 => self
372                 .port0
373                 .stop_transmit_queue()
374                 .context("failed to stop tx queue"),
375             2 => self.control_port.as_mut().unwrap().stop_receive_queue(),
376             3 => self.control_port.as_mut().unwrap().stop_transmit_queue(),
377             _ => {
378                 let port_id = idx / 2 - 1;
379                 let port = self.get_console_port(port_id)?;
380                 match idx % 2 {
381                     0 => port.stop_receive_queue().context("failed to stop rx queue"),
382                     1 => port
383                         .stop_transmit_queue()
384                         .context("failed to stop tx queue"),
385                     _ => unreachable!(),
386                 }
387             }
388         }
389     }
390 }
391 
392 impl SerialDevice for ConsoleDevice {
393     /// Create a default console device, without multiport support
new( protection_type: ProtectionType, evt: Event, input: Option<Box<dyn SerialInput>>, output: Option<Box<dyn io::Write + Send>>, sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, keep_rds: Vec<RawDescriptor>, ) -> ConsoleDevice394     fn new(
395         protection_type: ProtectionType,
396         evt: Event,
397         input: Option<Box<dyn SerialInput>>,
398         output: Option<Box<dyn io::Write + Send>>,
399         sync: Option<Box<dyn FileSync + Send>>,
400         options: SerialOptions,
401         keep_rds: Vec<RawDescriptor>,
402     ) -> ConsoleDevice {
403         let avail_features =
404             virtio::base_features(protection_type) | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
405         let port0 = ConsolePort::new(protection_type, evt, input, output, sync, options, keep_rds);
406 
407         ConsoleDevice {
408             avail_features,
409             port0,
410             control_port: None,
411             extra_ports: vec![],
412         }
413     }
414 
415     #[cfg(windows)]
new_with_pipe( _protection_type: ProtectionType, _interrupt_evt: Event, _pipe_in: named_pipes::PipeConnection, _pipe_out: named_pipes::PipeConnection, _options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> ConsoleDevice416     fn new_with_pipe(
417         _protection_type: ProtectionType,
418         _interrupt_evt: Event,
419         _pipe_in: named_pipes::PipeConnection,
420         _pipe_out: named_pipes::PipeConnection,
421         _options: SerialOptions,
422         _keep_rds: Vec<RawDescriptor>,
423     ) -> ConsoleDevice {
424         unimplemented!("new_with_pipe unimplemented for ConsoleDevice");
425     }
426 }
427 
428 /// Virtio console device.
429 pub struct AsyncConsole {
430     console_device: Option<ConsoleDevice>,
431     worker_thread: Option<WorkerThread<anyhow::Result<ConsoleDevice>>>,
432     base_features: u64,
433     keep_descriptors: Vec<Descriptor>,
434     pci_address: Option<PciAddress>,
435 }
436 
437 impl SerialDevice for AsyncConsole {
new( protection_type: ProtectionType, evt: Event, input: Option<Box<dyn SerialInput>>, output: Option<Box<dyn io::Write + Send>>, sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, keep_rds: Vec<RawDescriptor>, ) -> AsyncConsole438     fn new(
439         protection_type: ProtectionType,
440         evt: Event,
441         input: Option<Box<dyn SerialInput>>,
442         output: Option<Box<dyn io::Write + Send>>,
443         sync: Option<Box<dyn FileSync + Send>>,
444         options: SerialOptions,
445         keep_rds: Vec<RawDescriptor>,
446     ) -> AsyncConsole {
447         let pci_address = options.pci_address;
448         AsyncConsole {
449             console_device: Some(ConsoleDevice::new(
450                 protection_type,
451                 evt,
452                 input,
453                 output,
454                 sync,
455                 options,
456                 Default::default(),
457             )),
458             worker_thread: None,
459             base_features: base_features(protection_type),
460             keep_descriptors: keep_rds.iter().copied().map(Descriptor).collect(),
461             pci_address,
462         }
463     }
464 
465     #[cfg(windows)]
new_with_pipe( _protection_type: ProtectionType, _interrupt_evt: Event, _pipe_in: named_pipes::PipeConnection, _pipe_out: named_pipes::PipeConnection, _options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> AsyncConsole466     fn new_with_pipe(
467         _protection_type: ProtectionType,
468         _interrupt_evt: Event,
469         _pipe_in: named_pipes::PipeConnection,
470         _pipe_out: named_pipes::PipeConnection,
471         _options: SerialOptions,
472         _keep_rds: Vec<RawDescriptor>,
473     ) -> AsyncConsole {
474         unimplemented!("new_with_pipe unimplemented for AsyncConsole");
475     }
476 }
477 
478 impl VirtioDevice for AsyncConsole {
keep_rds(&self) -> Vec<RawDescriptor>479     fn keep_rds(&self) -> Vec<RawDescriptor> {
480         self.keep_descriptors
481             .iter()
482             .map(Descriptor::as_raw_descriptor)
483             .collect()
484     }
485 
features(&self) -> u64486     fn features(&self) -> u64 {
487         self.base_features
488     }
489 
device_type(&self) -> DeviceType490     fn device_type(&self) -> DeviceType {
491         DeviceType::Console
492     }
493 
queue_max_sizes(&self) -> &[u16]494     fn queue_max_sizes(&self) -> &[u16] {
495         QUEUE_SIZES
496     }
497 
read_config(&self, offset: u64, data: &mut [u8])498     fn read_config(&self, offset: u64, data: &mut [u8]) {
499         let config = virtio_console_config {
500             max_nr_ports: 1.into(),
501             ..Default::default()
502         };
503         copy_config(data, 0, config.as_bytes(), offset);
504     }
505 
activate( &mut self, _mem: GuestMemory, interrupt: Interrupt, mut queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>506     fn activate(
507         &mut self,
508         _mem: GuestMemory,
509         interrupt: Interrupt,
510         mut queues: BTreeMap<usize, Queue>,
511     ) -> anyhow::Result<()> {
512         if queues.len() < 2 {
513             return Err(anyhow!("expected 2 queues, got {}", queues.len()));
514         }
515 
516         let console = self.console_device.take().context("no console_device")?;
517 
518         let ex = Executor::new().expect("failed to create an executor");
519         let receive_queue = queues.remove(&0).unwrap();
520         let transmit_queue = queues.remove(&1).unwrap();
521 
522         self.worker_thread = Some(WorkerThread::start("v_console", move |kill_evt| {
523             let mut console = console;
524             let receive_queue = Arc::new(Mutex::new(receive_queue));
525             let transmit_queue = Arc::new(Mutex::new(transmit_queue));
526 
527             // Start transmit queue of port 0
528             console.start_queue(&ex, 0, receive_queue, interrupt.clone())?;
529             // Start receive queue of port 0
530             console.start_queue(&ex, 1, transmit_queue, interrupt.clone())?;
531 
532             // Run until the kill event is signaled and cancel all tasks.
533             ex.run_until(async {
534                 async_utils::await_and_exit(&ex, kill_evt).await?;
535                 let port = &mut console.port0;
536                 if let Some(input) = port.input.as_mut() {
537                     input
538                         .stop_async()
539                         .await
540                         .context("failed to stop rx queue")?;
541                 }
542                 port.output
543                     .stop_async()
544                     .await
545                     .context("failed to stop tx queue")?;
546 
547                 Ok(console)
548             })?
549         }));
550 
551         Ok(())
552     }
553 
pci_address(&self) -> Option<PciAddress>554     fn pci_address(&self) -> Option<PciAddress> {
555         self.pci_address
556     }
557 
reset(&mut self) -> anyhow::Result<()>558     fn reset(&mut self) -> anyhow::Result<()> {
559         if let Some(worker_thread) = self.worker_thread.take() {
560             let console = worker_thread.stop()?;
561             self.console_device = Some(console);
562         }
563         Ok(())
564     }
565 }
566