1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! Implement a struct that works as a `vmm_vhost`'s backend.
6
7 use std::io::{IoSlice, IoSliceMut};
8 use std::mem;
9 use std::os::unix::io::RawFd;
10 use std::sync::mpsc::{channel, Receiver, Sender};
11 use std::sync::Arc;
12 use std::thread;
13
14 use anyhow::{anyhow, bail, Context, Result};
15 use base::{error, Event};
16 use cros_async::{EventAsync, Executor};
17 use futures::{pin_mut, select, FutureExt};
18 use sync::Mutex;
19 use vmm_vhost::connection::vfio::{Device as VfioDeviceTrait, RecvIntoBufsError};
20
21 use crate::vfio::VfioDevice;
22 use crate::virtio::vhost::user::device::vvu::{
23 pci::{QueueNotifier, QueueType, VvuPciDevice},
24 queue::UserQueue,
25 };
26
process_rxq( evt: EventAsync, mut rxq: UserQueue, rxq_sender: Sender<Vec<u8>>, rxq_evt: Event, ) -> Result<()>27 async fn process_rxq(
28 evt: EventAsync,
29 mut rxq: UserQueue,
30 rxq_sender: Sender<Vec<u8>>,
31 rxq_evt: Event,
32 ) -> Result<()> {
33 loop {
34 if let Err(e) = evt.next_val().await {
35 error!("Failed to read the next queue event: {}", e);
36 continue;
37 }
38
39 while let Some(slice) = rxq.read_data()? {
40 let mut buf = vec![0; slice.size()];
41 slice.copy_to(&mut buf);
42 rxq_sender.send(buf)?;
43 rxq_evt.write(1).context("process_rxq")?;
44 }
45 }
46 }
47
process_txq(evt: EventAsync, txq: Arc<Mutex<UserQueue>>) -> Result<()>48 async fn process_txq(evt: EventAsync, txq: Arc<Mutex<UserQueue>>) -> Result<()> {
49 loop {
50 if let Err(e) = evt.next_val().await {
51 error!("Failed to read the next queue event: {}", e);
52 continue;
53 }
54
55 txq.lock().ack_used()?;
56 }
57 }
58
run_worker( ex: Executor, rx_queue: UserQueue, rx_irq: Event, rx_sender: Sender<Vec<u8>>, rx_evt: Event, tx_queue: Arc<Mutex<UserQueue>>, tx_irq: Event, ) -> Result<()>59 fn run_worker(
60 ex: Executor,
61 rx_queue: UserQueue,
62 rx_irq: Event,
63 rx_sender: Sender<Vec<u8>>,
64 rx_evt: Event,
65 tx_queue: Arc<Mutex<UserQueue>>,
66 tx_irq: Event,
67 ) -> Result<()> {
68 let rx_irq = EventAsync::new(rx_irq.0, &ex).context("failed to create async event")?;
69 let rxq = process_rxq(rx_irq, rx_queue, rx_sender, rx_evt);
70 pin_mut!(rxq);
71
72 let tx_irq = EventAsync::new(tx_irq.0, &ex).context("failed to create async event")?;
73 let txq = process_txq(tx_irq, Arc::clone(&tx_queue));
74 pin_mut!(txq);
75
76 let done = async {
77 select! {
78 res = rxq.fuse() => res.context("failed to handle rxq"),
79 res = txq.fuse() => res.context("failed to handle txq"),
80 }
81 };
82
83 match ex.run_until(done) {
84 Ok(_) => Ok(()),
85 Err(e) => {
86 bail!("failed to process virtio-vhost-user queues: {}", e);
87 }
88 }
89 }
90
91 enum DeviceState {
92 Initialized {
93 // TODO(keiichiw): Update `VfioDeviceTrait::start()` to take `VvuPciDevice` so that we can
94 // drop this field.
95 device: VvuPciDevice,
96 },
97 Running {
98 vfio: Arc<VfioDevice>,
99
100 rxq_notifier: Arc<Mutex<QueueNotifier>>,
101 rxq_receiver: Receiver<Vec<u8>>,
102 /// Store data that was provided by rxq_receiver but not consumed yet.
103 rxq_buf: Vec<u8>,
104
105 txq: Arc<Mutex<UserQueue>>,
106 txq_notifier: Arc<Mutex<QueueNotifier>>,
107 },
108 }
109
110 pub struct VvuDevice {
111 state: DeviceState,
112 rxq_evt: Event,
113 }
114
115 impl VvuDevice {
new(device: VvuPciDevice) -> Self116 pub fn new(device: VvuPciDevice) -> Self {
117 Self {
118 state: DeviceState::Initialized { device },
119 rxq_evt: Event::new().expect("failed to create VvuDevice's rxq_evt"),
120 }
121 }
122 }
123
124 impl VfioDeviceTrait for VvuDevice {
event(&self) -> &Event125 fn event(&self) -> &Event {
126 &self.rxq_evt
127 }
128
start(&mut self) -> Result<()>129 fn start(&mut self) -> Result<()> {
130 let device = match &mut self.state {
131 DeviceState::Initialized { device } => device,
132 DeviceState::Running { .. } => {
133 bail!("VvuDevice has already started");
134 }
135 };
136 let ex = Executor::new().expect("Failed to create an executor");
137
138 let mut irqs = mem::take(&mut device.irqs);
139 let mut queues = mem::take(&mut device.queues);
140 let mut queue_notifiers = mem::take(&mut device.queue_notifiers);
141 let vfio = Arc::clone(&device.vfio_dev);
142
143 let rxq = queues.remove(0);
144 let rxq_irq = irqs.remove(0);
145 let rxq_notifier = Arc::new(Mutex::new(queue_notifiers.remove(0)));
146 // TODO: Can we use async channel instead so we don't need `rxq_evt`?
147 let (rxq_sender, rxq_receiver) = channel();
148 let rxq_evt = self.rxq_evt.try_clone().expect("rxq_evt clone");
149
150 let txq = Arc::new(Mutex::new(queues.remove(0)));
151 let txq_cloned = Arc::clone(&txq);
152 let txq_irq = irqs.remove(0);
153 let txq_notifier = Arc::new(Mutex::new(queue_notifiers.remove(0)));
154
155 let old_state = std::mem::replace(
156 &mut self.state,
157 DeviceState::Running {
158 vfio,
159 rxq_notifier,
160 rxq_receiver,
161 rxq_buf: vec![],
162 txq,
163 txq_notifier,
164 },
165 );
166
167 let device = match old_state {
168 DeviceState::Initialized { device } => device,
169 _ => unreachable!(),
170 };
171
172 thread::Builder::new()
173 .name("virtio-vhost-user driver".to_string())
174 .spawn(move || {
175 device.start().expect("failed to start device");
176 if let Err(e) =
177 run_worker(ex, rxq, rxq_irq, rxq_sender, rxq_evt, txq_cloned, txq_irq)
178 {
179 error!("worker thread exited with error: {}", e);
180 }
181 })?;
182
183 Ok(())
184 }
185
send_bufs(&mut self, iovs: &[IoSlice], fds: Option<&[RawFd]>) -> Result<usize>186 fn send_bufs(&mut self, iovs: &[IoSlice], fds: Option<&[RawFd]>) -> Result<usize> {
187 if fds.is_some() {
188 bail!("cannot send FDs");
189 }
190
191 let (txq, txq_notifier, vfio) = match &mut self.state {
192 DeviceState::Initialized { .. } => {
193 bail!("VvuDevice hasn't started yet");
194 }
195 DeviceState::Running {
196 vfio,
197 txq,
198 txq_notifier,
199 ..
200 } => (txq, txq_notifier, vfio),
201 };
202
203 let size = iovs.iter().map(|v| v.len()).sum();
204 let data: Vec<u8> = iovs.iter().flat_map(|v| v.to_vec()).collect();
205
206 txq.lock().write(&data).context("Failed to send data")?;
207 txq_notifier.lock().notify(vfio, QueueType::Tx as u16);
208
209 Ok(size)
210 }
211
recv_into_bufs(&mut self, bufs: &mut [IoSliceMut]) -> Result<usize, RecvIntoBufsError>212 fn recv_into_bufs(&mut self, bufs: &mut [IoSliceMut]) -> Result<usize, RecvIntoBufsError> {
213 let (rxq_receiver, rxq_notifier, rxq_buf, vfio) = match &mut self.state {
214 DeviceState::Initialized { .. } => {
215 return Err(RecvIntoBufsError::Fatal(anyhow!(
216 "VvuDevice hasn't started yet"
217 )));
218 }
219 DeviceState::Running {
220 rxq_receiver,
221 rxq_notifier,
222 rxq_buf,
223 vfio,
224 ..
225 } => (rxq_receiver, rxq_notifier, rxq_buf, vfio),
226 };
227
228 let mut size = 0;
229 for buf in bufs {
230 let len = buf.len();
231
232 while rxq_buf.len() < len {
233 let mut data = rxq_receiver
234 .recv()
235 .context("failed to receive data")
236 .map_err(RecvIntoBufsError::Fatal)?;
237 rxq_buf.append(&mut data);
238 }
239
240 buf.clone_from_slice(&rxq_buf[..len]);
241
242 rxq_buf.drain(0..len);
243 size += len;
244
245 rxq_notifier.lock().notify(vfio, QueueType::Rx as u16);
246 }
247
248 if size == 0 {
249 // TODO(b/216407443): We should change `self.state` and exit gracefully.
250 return Err(RecvIntoBufsError::Disconnect);
251 }
252 Ok(size)
253 }
254 }
255