1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::pin::Pin;
6 use std::str::FromStr;
7
8 use anyhow::bail;
9 use anyhow::Context;
10 use base::AsRawDescriptor;
11 use base::RawDescriptor;
12 use cros_async::AsyncWrapper;
13 use cros_async::Executor;
14 use futures::Future;
15 use futures::FutureExt;
16 use vmm_vhost::connection::socket::Listener as SocketListener;
17 use vmm_vhost::connection::vfio::Device;
18 use vmm_vhost::connection::vfio::Listener as VfioListener;
19 use vmm_vhost::connection::Endpoint;
20 use vmm_vhost::connection::Listener;
21 use vmm_vhost::message::MasterReq;
22 use vmm_vhost::SlaveListener;
23 use vmm_vhost::VhostUserSlaveReqHandler;
24
25 use crate::virtio::vhost::user::device::handler::sys::unix::run_handler;
26 use crate::virtio::vhost::user::device::handler::sys::unix::VvuOps;
27 use crate::virtio::vhost::user::device::handler::VhostUserPlatformOps;
28 use crate::virtio::vhost::user::device::handler::VhostUserRegularOps;
29 use crate::virtio::vhost::user::device::listener::VhostUserListenerTrait;
30 use crate::virtio::vhost::user::device::vvu::pci::VvuPciDevice;
31 use crate::virtio::vhost::user::device::vvu::VvuDevice;
32 use crate::virtio::VirtioDeviceType;
33 use crate::PciAddress;
34
35 //// On Unix we can listen to either a socket or a vhost-user device.
36 pub enum VhostUserListener {
37 Socket(SocketListener),
38 // We use a box here to avoid clippy warning about large size difference between variants.
39 Vvu(Box<VfioListener<VvuDevice>>, VvuOps),
40 }
41
42 impl VhostUserListener {
43 /// Creates a new regular vhost-user listener, listening on `path`.
44 ///
45 /// `keep_rds` can be specified to retrieve the raw descriptors that must be preserved for this
46 /// listener to keep working after forking.
new_socket( path: &str, keep_rds: Option<&mut Vec<RawDescriptor>>, ) -> anyhow::Result<Self>47 pub fn new_socket(
48 path: &str,
49 keep_rds: Option<&mut Vec<RawDescriptor>>,
50 ) -> anyhow::Result<Self> {
51 let listener = SocketListener::new(path, true)?;
52 if let Some(rds) = keep_rds {
53 rds.push(listener.as_raw_descriptor());
54 }
55
56 Ok(VhostUserListener::Socket(listener))
57 }
58
59 /// Creates a new VVU listener operating on device `pci_addr`. `max_num_queues` is the maximum
60 /// number of virtio queues the device could use.
61 ///
62 /// `keep_rds` can be specified to retrieve the raw descriptors that must be preserved for this
63 /// listener to keep working after forking.
new_vvu( pci_addr: PciAddress, max_num_queues: usize, mut keep_rds: Option<&mut Vec<RawDescriptor>>, ) -> anyhow::Result<Self>64 pub fn new_vvu(
65 pci_addr: PciAddress,
66 max_num_queues: usize,
67 mut keep_rds: Option<&mut Vec<RawDescriptor>>,
68 ) -> anyhow::Result<Self> {
69 let mut pci_device = VvuPciDevice::new_from_address(pci_addr, max_num_queues)?;
70 if let Some(rds) = &mut keep_rds {
71 rds.extend(pci_device.irqs.iter().map(|e| e.as_raw_descriptor()));
72 rds.extend(
73 pci_device
74 .notification_evts
75 .iter()
76 .map(|e| e.as_raw_descriptor()),
77 );
78 rds.push(pci_device.vfio_dev.device_file().as_raw_descriptor());
79 }
80
81 // We create the ops now because they need the PCI device for building, and we won't have
82 // access to it anymore after creating the listener.
83 let ops = VvuOps::new(&mut pci_device);
84
85 let device = VvuDevice::new(pci_device);
86 if let Some(rds) = &mut keep_rds {
87 rds.push(device.event().as_raw_descriptor());
88 }
89
90 let listener = VfioListener::new(device)?;
91 Ok(VhostUserListener::Vvu(Box::new(listener), ops))
92 }
93
94 /// Helper for the `device` command, which separates the socket and vfio arguments.
95 ///
96 /// `socket` is a path to a socket to listen to.
97 /// `vfio` is a PCI address to a VVU device.
98 ///
99 /// Exactly one of `socket` or `vvu` must be provided, or an error is returned.
100 ///
101 /// `keep_rds` can be specified to retrieve the raw descriptors that must be preserved for this
102 /// listener to keep working after forking.
new_from_socket_or_vfio( socket: &Option<String>, vfio: &Option<String>, max_num_queues: usize, keep_rds: Option<&mut Vec<RawDescriptor>>, ) -> anyhow::Result<Self>103 pub fn new_from_socket_or_vfio(
104 socket: &Option<String>,
105 vfio: &Option<String>,
106 max_num_queues: usize,
107 keep_rds: Option<&mut Vec<RawDescriptor>>,
108 ) -> anyhow::Result<Self> {
109 match (socket, vfio) {
110 (Some(socket), None) => Ok(Self::new_socket(socket, keep_rds)?),
111 (None, Some(vfio)) => Ok(Self::new_vvu(
112 PciAddress::from_str(vfio)?,
113 max_num_queues,
114 keep_rds,
115 )?),
116 _ => bail!("exactly one of `--socket` or `--vfio` is required"),
117 }
118 }
119
120 /// Returns the virtio transport type that will be used for the vhost string `path`.
get_virtio_transport_type(path: &str) -> VirtioDeviceType121 pub fn get_virtio_transport_type(path: &str) -> VirtioDeviceType {
122 match PciAddress::from_str(path) {
123 Ok(_) => VirtioDeviceType::Vvu,
124 Err(_) => VirtioDeviceType::VhostUser,
125 }
126 }
127 }
128
129 /// Attaches to an already bound socket via `listener` and handles incoming messages from the
130 /// VMM, which are dispatched to the device backend via the `VhostUserBackend` trait methods.
run_with_handler<L>( listener: L, handler: Box<dyn VhostUserSlaveReqHandler>, ex: &Executor, ) -> anyhow::Result<()> where L::Endpoint: Endpoint<MasterReq> + AsRawDescriptor, L: Listener + AsRawDescriptor,131 async fn run_with_handler<L>(
132 listener: L,
133 handler: Box<dyn VhostUserSlaveReqHandler>,
134 ex: &Executor,
135 ) -> anyhow::Result<()>
136 where
137 L::Endpoint: Endpoint<MasterReq> + AsRawDescriptor,
138 L: Listener + AsRawDescriptor,
139 {
140 let mut listener = SlaveListener::<L, _>::new(listener, handler)?;
141 listener.set_nonblocking(true)?;
142
143 loop {
144 // If the listener is not ready on the first call to `accept` and returns `None`, we
145 // temporarily convert it into an async I/O source and yield until it signals there is
146 // input data awaiting, before trying again.
147 match listener
148 .accept()
149 .context("failed to accept an incoming connection")?
150 {
151 Some(req_handler) => return run_handler(req_handler, ex).await,
152 None => {
153 // Nobody is on the other end yet, wait until we get a connection.
154 let async_waiter = ex
155 .async_from(AsyncWrapper::new(listener))
156 .context("failed to create async waiter")?;
157 async_waiter.wait_readable().await?;
158
159 // Retrieve the listener back so we can use it again.
160 listener = async_waiter.into_source().into_inner();
161 }
162 }
163 }
164 }
165 impl VhostUserListenerTrait for VhostUserListener {
166 /// Infers whether `path` is a PCI address or a socket path, and create the appropriate type
167 /// of listener.
168 ///
169 /// `keep_rds` can be specified to retrieve the raw descriptors that must be preserved for this
170 /// listener to keep working after forking.
new( path: &str, max_num_queues: usize, keep_rds: Option<&mut Vec<RawDescriptor>>, ) -> anyhow::Result<Self>171 fn new(
172 path: &str,
173 max_num_queues: usize,
174 keep_rds: Option<&mut Vec<RawDescriptor>>,
175 ) -> anyhow::Result<Self> {
176 // If the argument can be parsed as a PCI address, use VVU - otherwise assume it is a path
177 // to a socket.
178 Ok(match PciAddress::from_str(path) {
179 Ok(addr) => Self::new_vvu(addr, max_num_queues, keep_rds)?,
180 Err(_) => Self::new_socket(path, keep_rds)?,
181 })
182 }
183
184 /// Returns a future that runs a `VhostUserSlaveReqHandler` using this listener.
185 ///
186 /// The request handler is built from the `handler_builder` closure, which is passed the
187 /// `VhostUserPlatformOps` used by this listener. `ex` is the executor on which the request
188 /// handler can schedule its own tasks.
run_req_handler<'e, F>( self, handler_builder: F, ex: &'e Executor, ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + 'e>> where F: FnOnce(Box<dyn VhostUserPlatformOps>) -> Box<dyn VhostUserSlaveReqHandler> + 'e,189 fn run_req_handler<'e, F>(
190 self,
191 handler_builder: F,
192 ex: &'e Executor,
193 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + 'e>>
194 where
195 F: FnOnce(Box<dyn VhostUserPlatformOps>) -> Box<dyn VhostUserSlaveReqHandler> + 'e,
196 {
197 async {
198 match self {
199 VhostUserListener::Socket(listener) => {
200 let handler = handler_builder(Box::new(VhostUserRegularOps));
201 run_with_handler(listener, handler, ex).await
202 }
203 VhostUserListener::Vvu(listener, ops) => {
204 let handler = handler_builder(Box::new(ops));
205 run_with_handler(*listener, handler, ex).await
206 }
207 }
208 }
209 .boxed_local()
210 }
211
take_parent_process_resources(&mut self) -> Option<Box<dyn std::any::Any>>212 fn take_parent_process_resources(&mut self) -> Option<Box<dyn std::any::Any>> {
213 if let VhostUserListener::Socket(listener) = self {
214 listener.take_resources_for_parent()
215 } else {
216 None
217 }
218 }
219 }
220