1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::path::PathBuf;
8 use std::rc::Rc;
9 use std::sync::Arc;
10 use std::thread;
11 use std::time::{Duration, Instant};
12
13 use anyhow::{anyhow, bail, Context};
14 use argh::FromArgs;
15 use base::{
16 clone_descriptor, error, warn, Event, FromRawDescriptor, SafeDescriptor, Tube, UnixSeqpacket,
17 UnixSeqpacketListener, UnlinkUnixSeqpacketListener,
18 };
19 use cros_async::{AsyncWrapper, EventAsync, Executor, IoSourceExt};
20 use futures::future::{AbortHandle, Abortable};
21 use hypervisor::ProtectionType;
22 use sync::Mutex;
23 use vm_memory::GuestMemory;
24 use vmm_vhost::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures};
25
26 use crate::virtio::vhost::user::device::handler::{
27 DeviceRequestHandler, Doorbell, VhostUserBackend,
28 };
29 use crate::virtio::{base_features, wl, Queue};
30
run_out_queue( mut queue: Queue, mem: GuestMemory, doorbell: Arc<Mutex<Doorbell>>, kick_evt: EventAsync, wlstate: Rc<RefCell<wl::WlState>>, )31 async fn run_out_queue(
32 mut queue: Queue,
33 mem: GuestMemory,
34 doorbell: Arc<Mutex<Doorbell>>,
35 kick_evt: EventAsync,
36 wlstate: Rc<RefCell<wl::WlState>>,
37 ) {
38 loop {
39 if let Err(e) = kick_evt.next_val().await {
40 error!("Failed to read kick event for out queue: {}", e);
41 break;
42 }
43
44 wl::process_out_queue(&doorbell, &mut queue, &mem, &mut wlstate.borrow_mut());
45 }
46 }
47
run_in_queue( mut queue: Queue, mem: GuestMemory, doorbell: Arc<Mutex<Doorbell>>, kick_evt: EventAsync, wlstate: Rc<RefCell<wl::WlState>>, wlstate_ctx: Box<dyn IoSourceExt<AsyncWrapper<SafeDescriptor>>>, )48 async fn run_in_queue(
49 mut queue: Queue,
50 mem: GuestMemory,
51 doorbell: Arc<Mutex<Doorbell>>,
52 kick_evt: EventAsync,
53 wlstate: Rc<RefCell<wl::WlState>>,
54 wlstate_ctx: Box<dyn IoSourceExt<AsyncWrapper<SafeDescriptor>>>,
55 ) {
56 loop {
57 if let Err(e) = wlstate_ctx.wait_readable().await {
58 error!(
59 "Failed to wait for inner WaitContext to become readable: {}",
60 e
61 );
62 break;
63 }
64
65 if let Err(wl::DescriptorsExhausted) =
66 wl::process_in_queue(&doorbell, &mut queue, &mem, &mut wlstate.borrow_mut())
67 {
68 if let Err(e) = kick_evt.next_val().await {
69 error!("Failed to read kick event for in queue: {}", e);
70 break;
71 }
72 }
73 }
74 }
75
76 struct WlBackend {
77 ex: Executor,
78 wayland_paths: Option<BTreeMap<String, PathBuf>>,
79 vm_socket: Option<Tube>,
80 resource_bridge: Option<Tube>,
81 use_transition_flags: bool,
82 use_send_vfd_v2: bool,
83 features: u64,
84 acked_features: u64,
85 wlstate: Option<Rc<RefCell<wl::WlState>>>,
86 workers: [Option<AbortHandle>; Self::MAX_QUEUE_NUM],
87 }
88
89 impl WlBackend {
new( ex: &Executor, wayland_paths: BTreeMap<String, PathBuf>, vm_socket: Tube, resource_bridge: Option<Tube>, ) -> WlBackend90 fn new(
91 ex: &Executor,
92 wayland_paths: BTreeMap<String, PathBuf>,
93 vm_socket: Tube,
94 resource_bridge: Option<Tube>,
95 ) -> WlBackend {
96 let features = base_features(ProtectionType::Unprotected)
97 | 1 << wl::VIRTIO_WL_F_TRANS_FLAGS
98 | 1 << wl::VIRTIO_WL_F_SEND_FENCES
99 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
100 WlBackend {
101 ex: ex.clone(),
102 wayland_paths: Some(wayland_paths),
103 vm_socket: Some(vm_socket),
104 resource_bridge,
105 use_transition_flags: false,
106 use_send_vfd_v2: false,
107 features,
108 acked_features: 0,
109 wlstate: None,
110 workers: Default::default(),
111 }
112 }
113 }
114
115 impl VhostUserBackend for WlBackend {
116 const MAX_QUEUE_NUM: usize = wl::QUEUE_SIZES.len();
117 const MAX_VRING_LEN: u16 = wl::QUEUE_SIZE;
118
119 type Error = anyhow::Error;
120
features(&self) -> u64121 fn features(&self) -> u64 {
122 self.features
123 }
124
ack_features(&mut self, value: u64) -> anyhow::Result<()>125 fn ack_features(&mut self, value: u64) -> anyhow::Result<()> {
126 let unrequested_features = value & !self.features();
127 if unrequested_features != 0 {
128 bail!("invalid features are given: {:#x}", unrequested_features);
129 }
130
131 self.acked_features |= value;
132
133 if value & (1 << wl::VIRTIO_WL_F_TRANS_FLAGS) != 0 {
134 self.use_transition_flags = true;
135 }
136 if value & (1 << wl::VIRTIO_WL_F_SEND_FENCES) != 0 {
137 self.use_send_vfd_v2 = true;
138 }
139
140 Ok(())
141 }
142
acked_features(&self) -> u64143 fn acked_features(&self) -> u64 {
144 self.acked_features
145 }
146
protocol_features(&self) -> VhostUserProtocolFeatures147 fn protocol_features(&self) -> VhostUserProtocolFeatures {
148 VhostUserProtocolFeatures::empty()
149 }
150
ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()>151 fn ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()> {
152 if features != 0 {
153 Err(anyhow!("Unexpected protocol features: {:#x}", features))
154 } else {
155 Ok(())
156 }
157 }
158
acked_protocol_features(&self) -> u64159 fn acked_protocol_features(&self) -> u64 {
160 VhostUserProtocolFeatures::empty().bits()
161 }
162
read_config(&self, _offset: u64, _dst: &mut [u8])163 fn read_config(&self, _offset: u64, _dst: &mut [u8]) {}
164
start_queue( &mut self, idx: usize, mut queue: Queue, mem: GuestMemory, doorbell: Arc<Mutex<Doorbell>>, kick_evt: Event, ) -> anyhow::Result<()>165 fn start_queue(
166 &mut self,
167 idx: usize,
168 mut queue: Queue,
169 mem: GuestMemory,
170 doorbell: Arc<Mutex<Doorbell>>,
171 kick_evt: Event,
172 ) -> anyhow::Result<()> {
173 if let Some(handle) = self.workers.get_mut(idx).and_then(Option::take) {
174 warn!("Starting new queue handler without stopping old handler");
175 handle.abort();
176 }
177
178 // Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX).
179 queue.ack_features(self.acked_features);
180
181 let kick_evt = EventAsync::new(kick_evt.0, &self.ex)
182 .context("failed to create EventAsync for kick_evt")?;
183
184 // We use this de-structuring let binding to separate borrows so that the compiler doesn't
185 // think we're borrowing all of `self` in the closure below.
186 let WlBackend {
187 ref mut wayland_paths,
188 ref mut vm_socket,
189 ref mut resource_bridge,
190 ref use_transition_flags,
191 ref use_send_vfd_v2,
192 ..
193 } = self;
194 let wlstate = self
195 .wlstate
196 .get_or_insert_with(|| {
197 Rc::new(RefCell::new(wl::WlState::new(
198 wayland_paths.take().expect("WlState already initialized"),
199 vm_socket.take().expect("WlState already initialized"),
200 *use_transition_flags,
201 *use_send_vfd_v2,
202 resource_bridge.take(),
203 )))
204 })
205 .clone();
206 let (handle, registration) = AbortHandle::new_pair();
207 match idx {
208 0 => {
209 let wlstate_ctx = clone_descriptor(wlstate.borrow().wait_ctx())
210 .map(|fd| {
211 // Safe because we just created this fd.
212 AsyncWrapper::new(unsafe { SafeDescriptor::from_raw_descriptor(fd) })
213 })
214 .context("failed to clone inner WaitContext for WlState")
215 .and_then(|ctx| {
216 self.ex
217 .async_from(ctx)
218 .context("failed to create async WaitContext")
219 })?;
220
221 self.ex
222 .spawn_local(Abortable::new(
223 run_in_queue(queue, mem, doorbell, kick_evt, wlstate, wlstate_ctx),
224 registration,
225 ))
226 .detach();
227 }
228 1 => {
229 self.ex
230 .spawn_local(Abortable::new(
231 run_out_queue(queue, mem, doorbell, kick_evt, wlstate),
232 registration,
233 ))
234 .detach();
235 }
236 _ => bail!("attempted to start unknown queue: {}", idx),
237 }
238 self.workers[idx] = Some(handle);
239 Ok(())
240 }
241
stop_queue(&mut self, idx: usize)242 fn stop_queue(&mut self, idx: usize) {
243 if let Some(handle) = self.workers.get_mut(idx).and_then(Option::take) {
244 handle.abort();
245 }
246 }
reset(&mut self)247 fn reset(&mut self) {
248 for handle in self.workers.iter_mut().filter_map(Option::take) {
249 handle.abort();
250 }
251 }
252 }
253
parse_wayland_sock(value: &str) -> Result<(String, PathBuf), String>254 pub(crate) fn parse_wayland_sock(value: &str) -> Result<(String, PathBuf), String> {
255 let mut components = value.split(',');
256 let path = PathBuf::from(match components.next() {
257 None => return Err("missing socket path".to_string()),
258 Some(c) => c,
259 });
260 let mut name = "";
261 for c in components {
262 let mut kv = c.splitn(2, '=');
263 let (kind, value) = match (kv.next(), kv.next()) {
264 (Some(kind), Some(value)) => (kind, value),
265 _ => return Err(format!("option must be of the form `kind=value`: {}", c)),
266 };
267 match kind {
268 "name" => name = value,
269 _ => return Err(format!("unrecognized option: {}", kind)),
270 }
271 }
272
273 Ok((name.to_string(), path))
274 }
275
276 #[derive(FromArgs)]
277 #[argh(description = "")]
278 struct Options {
279 #[argh(
280 option,
281 description = "path to bind a listening vhost-user socket",
282 arg_name = "PATH"
283 )]
284 socket: String,
285 #[argh(
286 option,
287 description = "path to a socket for wayland-specific messages",
288 arg_name = "PATH"
289 )]
290 vm_socket: String,
291 #[argh(
292 option,
293 description = "path to one or more Wayland sockets. The unnamed socket is used for\
294 displaying virtual screens while the named ones are used for IPC",
295 from_str_fn(parse_wayland_sock),
296 arg_name = "PATH[,name=NAME]"
297 )]
298 wayland_sock: Vec<(String, PathBuf)>,
299 #[argh(
300 option,
301 description = "path to the GPU resource bridge",
302 arg_name = "PATH"
303 )]
304 resource_bridge: Option<String>,
305 }
306
307 /// Starts a vhost-user wayland device.
308 /// Returns an error if the given `args` is invalid or the device fails to run.
run_wl_device(program_name: &str, args: &[&str]) -> anyhow::Result<()>309 pub fn run_wl_device(program_name: &str, args: &[&str]) -> anyhow::Result<()> {
310 let Options {
311 vm_socket,
312 wayland_sock,
313 socket,
314 resource_bridge,
315 } = match Options::from_args(&[program_name], args) {
316 Ok(opts) => opts,
317 Err(e) => {
318 if e.status.is_err() {
319 bail!(e.output);
320 } else {
321 println!("{}", e.output);
322 }
323 return Ok(());
324 }
325 };
326
327 let wayland_paths: BTreeMap<_, _> = wayland_sock.into_iter().collect();
328
329 let resource_bridge = resource_bridge
330 .map(|p| -> anyhow::Result<Tube> {
331 let deadline = Instant::now() + Duration::from_secs(5);
332 loop {
333 match UnixSeqpacket::connect(&p) {
334 Ok(s) => return Ok(Tube::new(s)),
335 Err(e) => {
336 if Instant::now() < deadline {
337 thread::sleep(Duration::from_millis(50));
338 } else {
339 return Err(anyhow::Error::new(e));
340 }
341 }
342 }
343 }
344 })
345 .transpose()
346 .context("failed to connect to resource bridge socket")?;
347
348 let ex = Executor::new().context("failed to create executor")?;
349
350 // We can safely `unwrap()` this because it is a required option.
351 let vm_listener = UnixSeqpacketListener::bind(vm_socket)
352 .map(UnlinkUnixSeqpacketListener)
353 .context("failed to create listening socket")?;
354 let vm_socket = vm_listener
355 .accept()
356 .map(Tube::new)
357 .context("failed to accept vm socket connection")?;
358 let handler = DeviceRequestHandler::new(WlBackend::new(
359 &ex,
360 wayland_paths,
361 vm_socket,
362 resource_bridge,
363 ));
364
365 // run_until() returns an Result<Result<..>> which the ? operator lets us flatten.
366 ex.run_until(handler.run(socket, &ex))?
367 }
368