• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Implement a struct that works as a `vmm_vhost`'s backend.
6 
7 use std::io::{IoSlice, IoSliceMut};
8 use std::mem;
9 use std::os::unix::io::RawFd;
10 use std::sync::mpsc::{channel, Receiver, Sender};
11 use std::sync::Arc;
12 use std::thread;
13 
14 use anyhow::{anyhow, bail, Context, Result};
15 use base::{error, Event};
16 use cros_async::{EventAsync, Executor};
17 use futures::{pin_mut, select, FutureExt};
18 use sync::Mutex;
19 use vmm_vhost::connection::vfio::{Device as VfioDeviceTrait, RecvIntoBufsError};
20 
21 use crate::vfio::VfioDevice;
22 use crate::virtio::vhost::user::device::vvu::{
23     pci::{QueueNotifier, QueueType, VvuPciDevice},
24     queue::UserQueue,
25 };
26 
process_rxq( evt: EventAsync, mut rxq: UserQueue, rxq_sender: Sender<Vec<u8>>, rxq_evt: Event, ) -> Result<()>27 async fn process_rxq(
28     evt: EventAsync,
29     mut rxq: UserQueue,
30     rxq_sender: Sender<Vec<u8>>,
31     rxq_evt: Event,
32 ) -> Result<()> {
33     loop {
34         if let Err(e) = evt.next_val().await {
35             error!("Failed to read the next queue event: {}", e);
36             continue;
37         }
38 
39         while let Some(slice) = rxq.read_data()? {
40             let mut buf = vec![0; slice.size()];
41             slice.copy_to(&mut buf);
42             rxq_sender.send(buf)?;
43             rxq_evt.write(1).context("process_rxq")?;
44         }
45     }
46 }
47 
process_txq(evt: EventAsync, txq: Arc<Mutex<UserQueue>>) -> Result<()>48 async fn process_txq(evt: EventAsync, txq: Arc<Mutex<UserQueue>>) -> Result<()> {
49     loop {
50         if let Err(e) = evt.next_val().await {
51             error!("Failed to read the next queue event: {}", e);
52             continue;
53         }
54 
55         txq.lock().ack_used()?;
56     }
57 }
58 
run_worker( ex: Executor, rx_queue: UserQueue, rx_irq: Event, rx_sender: Sender<Vec<u8>>, rx_evt: Event, tx_queue: Arc<Mutex<UserQueue>>, tx_irq: Event, ) -> Result<()>59 fn run_worker(
60     ex: Executor,
61     rx_queue: UserQueue,
62     rx_irq: Event,
63     rx_sender: Sender<Vec<u8>>,
64     rx_evt: Event,
65     tx_queue: Arc<Mutex<UserQueue>>,
66     tx_irq: Event,
67 ) -> Result<()> {
68     let rx_irq = EventAsync::new(rx_irq.0, &ex).context("failed to create async event")?;
69     let rxq = process_rxq(rx_irq, rx_queue, rx_sender, rx_evt);
70     pin_mut!(rxq);
71 
72     let tx_irq = EventAsync::new(tx_irq.0, &ex).context("failed to create async event")?;
73     let txq = process_txq(tx_irq, Arc::clone(&tx_queue));
74     pin_mut!(txq);
75 
76     let done = async {
77         select! {
78             res = rxq.fuse() => res.context("failed to handle rxq"),
79             res = txq.fuse() => res.context("failed to handle txq"),
80         }
81     };
82 
83     match ex.run_until(done) {
84         Ok(_) => Ok(()),
85         Err(e) => {
86             bail!("failed to process virtio-vhost-user queues: {}", e);
87         }
88     }
89 }
90 
91 enum DeviceState {
92     Initialized {
93         // TODO(keiichiw): Update `VfioDeviceTrait::start()` to take `VvuPciDevice` so that we can
94         // drop this field.
95         device: VvuPciDevice,
96     },
97     Running {
98         vfio: Arc<VfioDevice>,
99 
100         rxq_notifier: Arc<Mutex<QueueNotifier>>,
101         rxq_receiver: Receiver<Vec<u8>>,
102         /// Store data that was provided by rxq_receiver but not consumed yet.
103         rxq_buf: Vec<u8>,
104 
105         txq: Arc<Mutex<UserQueue>>,
106         txq_notifier: Arc<Mutex<QueueNotifier>>,
107     },
108 }
109 
110 pub struct VvuDevice {
111     state: DeviceState,
112     rxq_evt: Event,
113 }
114 
115 impl VvuDevice {
new(device: VvuPciDevice) -> Self116     pub fn new(device: VvuPciDevice) -> Self {
117         Self {
118             state: DeviceState::Initialized { device },
119             rxq_evt: Event::new().expect("failed to create VvuDevice's rxq_evt"),
120         }
121     }
122 }
123 
124 impl VfioDeviceTrait for VvuDevice {
event(&self) -> &Event125     fn event(&self) -> &Event {
126         &self.rxq_evt
127     }
128 
start(&mut self) -> Result<()>129     fn start(&mut self) -> Result<()> {
130         let device = match &mut self.state {
131             DeviceState::Initialized { device } => device,
132             DeviceState::Running { .. } => {
133                 bail!("VvuDevice has already started");
134             }
135         };
136         let ex = Executor::new().expect("Failed to create an executor");
137 
138         let mut irqs = mem::take(&mut device.irqs);
139         let mut queues = mem::take(&mut device.queues);
140         let mut queue_notifiers = mem::take(&mut device.queue_notifiers);
141         let vfio = Arc::clone(&device.vfio_dev);
142 
143         let rxq = queues.remove(0);
144         let rxq_irq = irqs.remove(0);
145         let rxq_notifier = Arc::new(Mutex::new(queue_notifiers.remove(0)));
146         // TODO: Can we use async channel instead so we don't need `rxq_evt`?
147         let (rxq_sender, rxq_receiver) = channel();
148         let rxq_evt = self.rxq_evt.try_clone().expect("rxq_evt clone");
149 
150         let txq = Arc::new(Mutex::new(queues.remove(0)));
151         let txq_cloned = Arc::clone(&txq);
152         let txq_irq = irqs.remove(0);
153         let txq_notifier = Arc::new(Mutex::new(queue_notifiers.remove(0)));
154 
155         let old_state = std::mem::replace(
156             &mut self.state,
157             DeviceState::Running {
158                 vfio,
159                 rxq_notifier,
160                 rxq_receiver,
161                 rxq_buf: vec![],
162                 txq,
163                 txq_notifier,
164             },
165         );
166 
167         let device = match old_state {
168             DeviceState::Initialized { device } => device,
169             _ => unreachable!(),
170         };
171 
172         thread::Builder::new()
173             .name("virtio-vhost-user driver".to_string())
174             .spawn(move || {
175                 device.start().expect("failed to start device");
176                 if let Err(e) =
177                     run_worker(ex, rxq, rxq_irq, rxq_sender, rxq_evt, txq_cloned, txq_irq)
178                 {
179                     error!("worker thread exited with error: {}", e);
180                 }
181             })?;
182 
183         Ok(())
184     }
185 
send_bufs(&mut self, iovs: &[IoSlice], fds: Option<&[RawFd]>) -> Result<usize>186     fn send_bufs(&mut self, iovs: &[IoSlice], fds: Option<&[RawFd]>) -> Result<usize> {
187         if fds.is_some() {
188             bail!("cannot send FDs");
189         }
190 
191         let (txq, txq_notifier, vfio) = match &mut self.state {
192             DeviceState::Initialized { .. } => {
193                 bail!("VvuDevice hasn't started yet");
194             }
195             DeviceState::Running {
196                 vfio,
197                 txq,
198                 txq_notifier,
199                 ..
200             } => (txq, txq_notifier, vfio),
201         };
202 
203         let size = iovs.iter().map(|v| v.len()).sum();
204         let data: Vec<u8> = iovs.iter().flat_map(|v| v.to_vec()).collect();
205 
206         txq.lock().write(&data).context("Failed to send data")?;
207         txq_notifier.lock().notify(vfio, QueueType::Tx as u16);
208 
209         Ok(size)
210     }
211 
recv_into_bufs(&mut self, bufs: &mut [IoSliceMut]) -> Result<usize, RecvIntoBufsError>212     fn recv_into_bufs(&mut self, bufs: &mut [IoSliceMut]) -> Result<usize, RecvIntoBufsError> {
213         let (rxq_receiver, rxq_notifier, rxq_buf, vfio) = match &mut self.state {
214             DeviceState::Initialized { .. } => {
215                 return Err(RecvIntoBufsError::Fatal(anyhow!(
216                     "VvuDevice hasn't started yet"
217                 )));
218             }
219             DeviceState::Running {
220                 rxq_receiver,
221                 rxq_notifier,
222                 rxq_buf,
223                 vfio,
224                 ..
225             } => (rxq_receiver, rxq_notifier, rxq_buf, vfio),
226         };
227 
228         let mut size = 0;
229         for buf in bufs {
230             let len = buf.len();
231 
232             while rxq_buf.len() < len {
233                 let mut data = rxq_receiver
234                     .recv()
235                     .context("failed to receive data")
236                     .map_err(RecvIntoBufsError::Fatal)?;
237                 rxq_buf.append(&mut data);
238             }
239 
240             buf.clone_from_slice(&rxq_buf[..len]);
241 
242             rxq_buf.drain(0..len);
243             size += len;
244 
245             rxq_notifier.lock().notify(vfio, QueueType::Rx as u16);
246         }
247 
248         if size == 0 {
249             // TODO(b/216407443): We should change `self.state` and exit gracefully.
250             return Err(RecvIntoBufsError::Disconnect);
251         }
252         Ok(size)
253     }
254 }
255