• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::sync::Arc;
6 
7 use anyhow::bail;
8 use anyhow::Context;
9 use argh::FromArgs;
10 use base::error;
11 use base::info;
12 use base::named_pipes::OverlappedWrapper;
13 use base::named_pipes::PipeConnection;
14 use base::warn;
15 use base::Event;
16 use base::RawDescriptor;
17 use broker_ipc::common_child_setup;
18 use broker_ipc::CommonChildStartupArgs;
19 use cros_async::EventAsync;
20 use cros_async::Executor;
21 use cros_async::IntoAsync;
22 use cros_async::IoSource;
23 use futures::channel::oneshot;
24 use futures::future::AbortHandle;
25 use futures::future::Abortable;
26 use futures::pin_mut;
27 use futures::select_biased;
28 use futures::FutureExt;
29 use hypervisor::ProtectionType;
30 #[cfg(feature = "slirp")]
31 use net_util::Slirp;
32 use net_util::TapT;
33 #[cfg(feature = "slirp")]
34 use serde::Deserialize;
35 #[cfg(feature = "slirp")]
36 use serde::Serialize;
37 use sync::Mutex;
38 use tube_transporter::TubeToken;
39 use virtio_sys::virtio_net;
40 use vm_memory::GuestMemory;
41 use vmm_vhost::message::VhostUserProtocolFeatures;
42 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
43 
44 use crate::virtio;
45 use crate::virtio::base_features;
46 use crate::virtio::net::process_rx;
47 use crate::virtio::net::NetError;
48 #[cfg(feature = "slirp")]
49 use crate::virtio::net::MAX_BUFFER_SIZE;
50 use crate::virtio::vhost::user::device::handler::sys::windows::read_from_tube_transporter;
51 use crate::virtio::vhost::user::device::handler::sys::windows::run_handler;
52 use crate::virtio::vhost::user::device::handler::DeviceRequestHandler;
53 use crate::virtio::vhost::user::device::handler::VhostUserDevice;
54 use crate::virtio::vhost::user::device::handler::WorkerState;
55 use crate::virtio::vhost::user::device::net::run_ctrl_queue;
56 use crate::virtio::vhost::user::device::net::run_tx_queue;
57 use crate::virtio::vhost::user::device::net::NetBackend;
58 use crate::virtio::vhost::user::device::net::NET_EXECUTOR;
59 use crate::virtio::Interrupt;
60 use crate::virtio::Queue;
61 
62 impl<T: 'static> NetBackend<T>
63 where
64     T: TapT + IntoAsync,
65 {
66     #[cfg(feature = "slirp")]
new_slirp( guest_pipe: PipeConnection, slirp_kill_event: Event, ) -> anyhow::Result<NetBackend<Slirp>>67     pub fn new_slirp(
68         guest_pipe: PipeConnection,
69         slirp_kill_event: Event,
70     ) -> anyhow::Result<NetBackend<Slirp>> {
71         let avail_features = base_features(ProtectionType::Unprotected)
72             | 1 << virtio_net::VIRTIO_NET_F_CTRL_VQ
73             | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
74         let slirp = Slirp::new_for_multi_process(guest_pipe).map_err(NetError::SlirpCreateError)?;
75 
76         Ok(NetBackend::<Slirp> {
77             tap: slirp,
78             avail_features,
79             acked_features: 0,
80             acked_protocol_features: VhostUserProtocolFeatures::empty(),
81             mtu: 1500,
82             slirp_kill_event,
83             workers: Default::default(),
84         })
85     }
86 }
87 
run_rx_queue<T: TapT>( mut queue: Queue, mut tap: IoSource<T>, call_evt: Interrupt, kick_evt: EventAsync, read_notifier: EventAsync, mut overlapped_wrapper: OverlappedWrapper, mut stop_rx: oneshot::Receiver<()>, ) -> Queue88 async fn run_rx_queue<T: TapT>(
89     mut queue: Queue,
90     mut tap: IoSource<T>,
91     call_evt: Interrupt,
92     kick_evt: EventAsync,
93     read_notifier: EventAsync,
94     mut overlapped_wrapper: OverlappedWrapper,
95     mut stop_rx: oneshot::Receiver<()>,
96 ) -> Queue {
97     let mut rx_buf = [0u8; MAX_BUFFER_SIZE];
98     let mut rx_count = 0;
99     let mut deferred_rx = false;
100 
101     // SAFETY: safe because rx_buf & overlapped_wrapper live until the
102     // overlapped operation completes and are not used in any other operations
103     // until that time.
104     unsafe {
105         tap.as_source_mut()
106             .read_overlapped(&mut rx_buf, &mut overlapped_wrapper)
107             .expect("read_overlapped failed")
108     };
109 
110     let read_notifier_future = read_notifier.next_val().fuse();
111     pin_mut!(read_notifier_future);
112     let kick_evt_future = kick_evt.next_val().fuse();
113     pin_mut!(kick_evt_future);
114 
115     loop {
116         // If we already have a packet from deferred RX, we don't need to wait for the slirp device.
117         if !deferred_rx {
118             select_biased! {
119                 read_notifier_res = read_notifier_future => {
120                     read_notifier_future.set(read_notifier.next_val().fuse());
121                     if let Err(e) = read_notifier_res {
122                         error!("Failed to wait for tap device to become readable: {}", e);
123                         break;
124                     }
125                 }
126                 _ = stop_rx => {
127                     break;
128                 }
129             }
130             if let Err(e) = read_notifier.next_val().await {
131                 error!("Failed to wait for tap device to become readable: {}", e);
132                 break;
133             }
134         }
135 
136         let needs_interrupt = process_rx(
137             &call_evt,
138             &mut queue,
139             tap.as_source_mut(),
140             &mut rx_buf,
141             &mut deferred_rx,
142             &mut rx_count,
143             &mut overlapped_wrapper,
144         );
145         if needs_interrupt {
146             call_evt.signal_used_queue(queue.vector());
147         }
148 
149         // There aren't any RX descriptors available for us to write packets to. Wait for the guest
150         // to consume some packets and make more descriptors available to us.
151         if deferred_rx {
152             select_biased! {
153                 kick = kick_evt_future => {
154                     kick_evt_future.set(kick_evt.next_val().fuse());
155                     if let Err(e) = kick {
156                         error!("Failed to read kick event for rx queue: {}", e);
157                         break;
158                     }
159                 }
160                 _ = stop_rx => {
161                     break;
162                 }
163             }
164         }
165     }
166 
167     queue
168 }
169 
170 /// Platform specific impl of VhostUserDevice::start_queue.
start_queue<T: 'static + IntoAsync + TapT>( backend: &mut NetBackend<T>, idx: usize, queue: virtio::Queue, _mem: GuestMemory, doorbell: Interrupt, ) -> anyhow::Result<()>171 pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + IntoAsync + TapT>(
172     backend: &mut NetBackend<T>,
173     idx: usize,
174     queue: virtio::Queue,
175     _mem: GuestMemory,
176     doorbell: Interrupt,
177 ) -> anyhow::Result<()> {
178     if backend.workers.get(idx).is_some() {
179         warn!("Starting new queue handler without stopping old handler");
180         backend.stop_queue(idx);
181     }
182 
183     let overlapped_wrapper =
184         OverlappedWrapper::new(true).expect("Failed to create overlapped wrapper");
185 
186     super::super::NET_EXECUTOR.with(|ex| {
187         // Safe because the executor is initialized in main() below.
188         let ex = ex.get().expect("Executor not initialized");
189 
190         let kick_evt = queue
191             .event()
192             .try_clone()
193             .context("failed to clone queue event")?;
194         let kick_evt =
195             EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
196         let tap = backend
197             .tap
198             .try_clone()
199             .context("failed to clone tap device")?;
200         let worker_tuple = match idx {
201             0 => {
202                 let tap = ex
203                     .async_from(tap)
204                     .context("failed to create async tap device")?;
205                 let read_notifier = overlapped_wrapper
206                     .get_h_event_ref()
207                     .unwrap()
208                     .try_clone()
209                     .unwrap();
210                 let read_notifier = EventAsync::new_without_reset(read_notifier, ex)
211                     .context("failed to create async read notifier")?;
212 
213                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
214                 (
215                     ex.spawn_local(run_rx_queue(
216                         queue,
217                         tap,
218                         doorbell,
219                         kick_evt,
220                         read_notifier,
221                         overlapped_wrapper,
222                         stop_rx,
223                     )),
224                     stop_tx,
225                 )
226             }
227             1 => {
228                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
229                 (
230                     ex.spawn_local(run_tx_queue(queue, tap, doorbell, kick_evt, stop_rx)),
231                     stop_tx,
232                 )
233             }
234             2 => {
235                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
236                 (
237                     ex.spawn_local(run_ctrl_queue(
238                         queue,
239                         tap,
240                         doorbell,
241                         kick_evt,
242                         backend.acked_features,
243                         1, /* vq_pairs */
244                         stop_rx,
245                     )),
246                     stop_tx,
247                 )
248             }
249             _ => bail!("attempted to start unknown queue: {}", idx),
250         };
251 
252         backend.workers[idx] = Some(worker_tuple);
253         Ok(())
254     })
255 }
256 
257 #[cfg(feature = "slirp")]
258 impl<T> Drop for NetBackend<T>
259 where
260     T: TapT + IntoAsync,
261 {
drop(&mut self)262     fn drop(&mut self) {
263         let _ = self.slirp_kill_event.signal();
264     }
265 }
266 
267 /// Config arguments passed through the bootstrap Tube from the broker to the Net backend
268 /// process.
269 #[cfg(feature = "slirp")]
270 #[derive(Serialize, Deserialize, Debug)]
271 pub struct NetBackendConfig {
272     pub guest_pipe: PipeConnection,
273     pub slirp_kill_event: Event,
274 }
275 
276 #[derive(FromArgs, Debug)]
277 #[argh(subcommand, name = "net", description = "")]
278 pub struct Options {
279     #[argh(
280         option,
281         description = "pipe handle end for Tube Transporter",
282         arg_name = "HANDLE"
283     )]
284     bootstrap: usize,
285 }
286 
287 #[cfg(all(windows, not(feature = "slirp")))]
288 compile_error!("vhost-user net device requires slirp feature on Windows.");
289 
290 #[cfg(feature = "slirp")]
start_device(opts: Options) -> anyhow::Result<()>291 pub fn start_device(opts: Options) -> anyhow::Result<()> {
292     // Get the Tubes from the TubeTransporter. Then get the "Config" from the bootstrap_tube
293     // which will contain slirp settings.
294     let raw_transport_tube = opts.bootstrap as RawDescriptor;
295 
296     let mut tubes = read_from_tube_transporter(raw_transport_tube).unwrap();
297 
298     let vhost_user_tube = tubes.get_tube(TubeToken::VhostUser).unwrap();
299     let bootstrap_tube = tubes.get_tube(TubeToken::Bootstrap).unwrap();
300 
301     let startup_args: CommonChildStartupArgs =
302         bootstrap_tube.recv::<CommonChildStartupArgs>().unwrap();
303     let _child_cleanup = common_child_setup(startup_args).unwrap();
304 
305     let net_backend_config = bootstrap_tube.recv::<NetBackendConfig>().unwrap();
306 
307     let exit_event = bootstrap_tube.recv::<Event>()?;
308 
309     // We only have one net device for now.
310     let dev = NetBackend::<net_util::Slirp>::new_slirp(
311         net_backend_config.guest_pipe,
312         net_backend_config.slirp_kill_event,
313     )
314     .unwrap();
315 
316     let handler = DeviceRequestHandler::new(dev);
317 
318     let ex = Executor::new().context("failed to create executor")?;
319 
320     NET_EXECUTOR.with(|net_ex| {
321         let _ = net_ex.set(ex.clone());
322     });
323 
324     // TODO(b/213170185): Uncomment once sandbox is upstreamed.
325     // if sandbox::is_sandbox_target() {
326     //     sandbox::TargetServices::get()
327     //         .expect("failed to get target services")
328     //         .unwrap()
329     //         .lower_token();
330     // }
331 
332     info!("vhost-user net device ready, starting run loop...");
333     if let Err(e) = ex.run_until(run_handler(
334         Box::new(handler),
335         vhost_user_tube,
336         exit_event,
337         &ex,
338     )) {
339         bail!("error occurred: {}", e);
340     }
341 
342     Ok(())
343 }
344