• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::net::Ipv4Addr;
6 use std::str::FromStr;
7 use std::thread;
8 
9 use anyhow::anyhow;
10 use anyhow::bail;
11 use anyhow::Context;
12 use argh::FromArgs;
13 use base::error;
14 use base::info;
15 use base::validate_raw_descriptor;
16 use base::warn;
17 use base::RawDescriptor;
18 use cros_async::EventAsync;
19 use cros_async::Executor;
20 use cros_async::IntoAsync;
21 use cros_async::IoSource;
22 use futures::channel::oneshot;
23 use futures::select_biased;
24 use futures::FutureExt;
25 use hypervisor::ProtectionType;
26 use net_util::sys::linux::Tap;
27 use net_util::MacAddress;
28 use net_util::TapT;
29 use virtio_sys::virtio_net;
30 use vm_memory::GuestMemory;
31 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
32 
33 use crate::virtio;
34 use crate::virtio::net::process_rx;
35 use crate::virtio::net::validate_and_configure_tap;
36 use crate::virtio::net::NetError;
37 use crate::virtio::vhost::user::device::connection::sys::VhostUserListener;
38 use crate::virtio::vhost::user::device::connection::VhostUserConnectionTrait;
39 use crate::virtio::vhost::user::device::handler::VhostUserDevice;
40 use crate::virtio::vhost::user::device::net::run_ctrl_queue;
41 use crate::virtio::vhost::user::device::net::run_tx_queue;
42 use crate::virtio::vhost::user::device::net::NetBackend;
43 use crate::virtio::vhost::user::device::net::NET_EXECUTOR;
44 use crate::virtio::Queue;
45 
46 struct TapConfig {
47     host_ip: Ipv4Addr,
48     netmask: Ipv4Addr,
49     mac: MacAddress,
50 }
51 
52 impl FromStr for TapConfig {
53     type Err = anyhow::Error;
54 
from_str(arg: &str) -> Result<Self, Self::Err>55     fn from_str(arg: &str) -> Result<Self, Self::Err> {
56         let args: Vec<&str> = arg.split(',').collect();
57         if args.len() != 3 {
58             bail!("TAP config must consist of 3 parts but {}", args.len());
59         }
60 
61         let host_ip: Ipv4Addr = args[0]
62             .parse()
63             .map_err(|e| anyhow!("invalid IP address: {}", e))?;
64         let netmask: Ipv4Addr = args[1]
65             .parse()
66             .map_err(|e| anyhow!("invalid net mask: {}", e))?;
67         let mac: MacAddress = args[2]
68             .parse()
69             .map_err(|e| anyhow!("invalid MAC address: {}", e))?;
70 
71         Ok(Self {
72             host_ip,
73             netmask,
74             mac,
75         })
76     }
77 }
78 
79 impl<T: 'static> NetBackend<T>
80 where
81     T: TapT + IntoAsync,
82 {
new_from_config(config: &TapConfig) -> anyhow::Result<Self>83     fn new_from_config(config: &TapConfig) -> anyhow::Result<Self> {
84         // Create a tap device.
85         let tap = T::new(true /* vnet_hdr */, false /* multi_queue */)
86             .context("failed to create tap device")?;
87         tap.set_ip_addr(config.host_ip)
88             .context("failed to set IP address")?;
89         tap.set_netmask(config.netmask)
90             .context("failed to set netmask")?;
91         tap.set_mac_address(config.mac)
92             .context("failed to set MAC address")?;
93 
94         Self::new(tap)
95     }
96 
new_from_name(name: &str) -> anyhow::Result<Self>97     fn new_from_name(name: &str) -> anyhow::Result<Self> {
98         let tap = T::new_with_name(name.as_bytes(), true, false).map_err(NetError::TapOpen)?;
99         Self::new(tap)
100     }
101 
new_from_tap_fd(tap_fd: RawDescriptor) -> anyhow::Result<Self>102     pub fn new_from_tap_fd(tap_fd: RawDescriptor) -> anyhow::Result<Self> {
103         let tap_fd = validate_raw_descriptor(tap_fd).context("failed to validate tap fd")?;
104         // SAFETY:
105         // Safe because we ensure that we get a unique handle to the fd.
106         let tap = unsafe { T::from_raw_descriptor(tap_fd).context("failed to create tap device")? };
107 
108         Self::new(tap)
109     }
110 
new(tap: T) -> anyhow::Result<Self>111     pub fn new(tap: T) -> anyhow::Result<Self> {
112         let vq_pairs = Self::max_vq_pairs();
113         validate_and_configure_tap(&tap, vq_pairs as u16)
114             .context("failed to validate and configure tap")?;
115 
116         let avail_features = virtio::base_features(ProtectionType::Unprotected)
117             | 1 << virtio_net::VIRTIO_NET_F_GUEST_CSUM
118             | 1 << virtio_net::VIRTIO_NET_F_CSUM
119             | 1 << virtio_net::VIRTIO_NET_F_CTRL_VQ
120             | 1 << virtio_net::VIRTIO_NET_F_CTRL_GUEST_OFFLOADS
121             | 1 << virtio_net::VIRTIO_NET_F_GUEST_TSO4
122             | 1 << virtio_net::VIRTIO_NET_F_GUEST_UFO
123             | 1 << virtio_net::VIRTIO_NET_F_HOST_TSO4
124             | 1 << virtio_net::VIRTIO_NET_F_HOST_UFO
125             | 1 << virtio_net::VIRTIO_NET_F_MTU
126             | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
127 
128         let mtu = tap.mtu()?;
129 
130         Ok(Self {
131             tap,
132             avail_features,
133             acked_features: 0,
134             mtu,
135             workers: Default::default(),
136         })
137     }
138 }
139 
run_rx_queue<T: TapT>( mut queue: Queue, mut tap: IoSource<T>, kick_evt: EventAsync, mut stop_rx: oneshot::Receiver<()>, ) -> Queue140 async fn run_rx_queue<T: TapT>(
141     mut queue: Queue,
142     mut tap: IoSource<T>,
143     kick_evt: EventAsync,
144     mut stop_rx: oneshot::Receiver<()>,
145 ) -> Queue {
146     loop {
147         select_biased! {
148             // `tap.wait_readable()` requires an immutable reference to `tap`, but `process_rx`
149             // requires a mutable reference to `tap`, so this future needs to be recreated on
150             // every iteration. If more arms are added that doesn't break out of the loop, then
151             // this future could be recreated too many times.
152             rx = tap.wait_readable().fuse() => {
153                 if let Err(e) = rx {
154                     error!("Failed to wait for tap device to become readable: {}", e);
155                     break;
156                 }
157             }
158             _ = stop_rx => {
159                 break;
160             }
161         }
162 
163         match process_rx(&mut queue, tap.as_source_mut()) {
164             Ok(()) => {}
165             Err(NetError::RxDescriptorsExhausted) => {
166                 select_biased! {
167                     kick_evt = kick_evt.next_val().fuse() => {
168                         if let Err(e) = kick_evt {
169                             error!("Failed to read kick event for rx queue: {}", e);
170                             break;
171                         }
172                     },
173                     _ = stop_rx => {
174                         break;
175                     }
176                 };
177             }
178             Err(e) => {
179                 error!("Failed to process rx queue: {}", e);
180                 break;
181             }
182         }
183     }
184     queue
185 }
186 
187 /// Platform specific impl of VhostUserDevice::start_queue.
start_queue<T: 'static + IntoAsync + TapT>( backend: &mut NetBackend<T>, idx: usize, queue: virtio::Queue, _mem: GuestMemory, ) -> anyhow::Result<()>188 pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + IntoAsync + TapT>(
189     backend: &mut NetBackend<T>,
190     idx: usize,
191     queue: virtio::Queue,
192     _mem: GuestMemory,
193 ) -> anyhow::Result<()> {
194     if backend.workers[idx].is_some() {
195         warn!("Starting new queue handler without stopping old handler");
196         backend.stop_queue(idx)?;
197     }
198 
199     NET_EXECUTOR.with(|ex| {
200         // Safe because the executor is initialized in main() below.
201         let ex = ex.get().expect("Executor not initialized");
202 
203         let kick_evt = queue
204             .event()
205             .try_clone()
206             .context("failed to clone queue event")?;
207         let kick_evt =
208             EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
209         let tap = backend
210             .tap
211             .try_clone()
212             .context("failed to clone tap device")?;
213         let worker_tuple = match idx {
214             0 => {
215                 let tap = ex
216                     .async_from(tap)
217                     .context("failed to create async tap device")?;
218 
219                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
220                 (
221                     ex.spawn_local(run_rx_queue(queue, tap, kick_evt, stop_rx)),
222                     stop_tx,
223                 )
224             }
225             1 => {
226                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
227                 (
228                     ex.spawn_local(run_tx_queue(queue, tap, kick_evt, stop_rx)),
229                     stop_tx,
230                 )
231             }
232             2 => {
233                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
234                 (
235                     ex.spawn_local(run_ctrl_queue(
236                         queue,
237                         tap,
238                         kick_evt,
239                         backend.acked_features,
240                         1, /* vq_pairs */
241                         stop_rx,
242                     )),
243                     stop_tx,
244                 )
245             }
246             _ => bail!("attempted to start unknown queue: {}", idx),
247         };
248 
249         backend.workers[idx] = Some(worker_tuple);
250         Ok(())
251     })
252 }
253 
254 #[derive(FromArgs)]
255 #[argh(subcommand, name = "net")]
256 /// Net device
257 pub struct Options {
258     #[argh(option, arg_name = "SOCKET_PATH,IP_ADDR,NET_MASK,MAC_ADDR")]
259     /// TAP device config. (e.g. "path/to/sock,10.0.2.2,255.255.255.0,12:34:56:78:9a:bc")
260     device: Vec<String>,
261     #[argh(option, arg_name = "SOCKET_PATH,TAP_FD")]
262     /// TAP FD with a socket path"
263     tap_fd: Vec<String>,
264     #[argh(option, arg_name = "SOCKET_PATH,TAP_NAME")]
265     /// TAP NAME with a socket path
266     tap_name: Vec<String>,
267 }
268 
269 enum Connection {
270     Socket(String),
271 }
272 
new_backend_from_device_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)>273 fn new_backend_from_device_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)> {
274     let pos = match arg.find(',') {
275         Some(p) => p,
276         None => {
277             bail!("device must take comma-separated argument");
278         }
279     };
280     let conn = &arg[0..pos];
281     let cfg = &arg[pos + 1..]
282         .parse::<TapConfig>()
283         .context("failed to parse tap config")?;
284     let backend = NetBackend::<Tap>::new_from_config(cfg).context("failed to create NetBackend")?;
285     Ok((conn.to_string(), backend))
286 }
287 
new_backend_from_tap_name(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)>288 fn new_backend_from_tap_name(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)> {
289     let pos = match arg.find(',') {
290         Some(p) => p,
291         None => {
292             bail!("device must take comma-separated argument");
293         }
294     };
295     let conn = &arg[0..pos];
296     let tap_name = &arg[pos + 1..];
297 
298     let backend =
299         NetBackend::<Tap>::new_from_name(tap_name).context("failed to create NetBackend")?;
300     Ok((conn.to_string(), backend))
301 }
302 
new_backend_from_tapfd_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)>303 fn new_backend_from_tapfd_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)> {
304     let pos = match arg.find(',') {
305         Some(p) => p,
306         None => {
307             bail!("'tap-fd' flag must take comma-separated argument");
308         }
309     };
310     let conn = &arg[0..pos];
311     let tap_fd = &arg[pos + 1..]
312         .parse::<i32>()
313         .context("failed to parse tap-fd")?;
314     let backend =
315         NetBackend::<Tap>::new_from_tap_fd(*tap_fd).context("failed to create NetBackend")?;
316 
317     Ok((conn.to_string(), backend))
318 }
319 
320 /// Starts a vhost-user net device.
321 /// Returns an error if the given `args` is invalid or the device fails to run.
start_device(opts: Options) -> anyhow::Result<()>322 pub fn start_device(opts: Options) -> anyhow::Result<()> {
323     let num_devices = opts.device.len() + opts.tap_fd.len() + opts.tap_name.len();
324 
325     if num_devices == 0 {
326         bail!("no device option was passed");
327     }
328 
329     let mut devices: Vec<(Connection, NetBackend<Tap>)> = Vec::with_capacity(num_devices);
330 
331     // vhost-user
332     for arg in opts.device.iter() {
333         devices.push(
334             new_backend_from_device_arg(arg)
335                 .map(|(s, backend)| (Connection::Socket(s), backend))?,
336         );
337     }
338 
339     for arg in opts.tap_name.iter() {
340         devices.push(
341             new_backend_from_tap_name(arg).map(|(s, backend)| (Connection::Socket(s), backend))?,
342         );
343     }
344     for arg in opts.tap_fd.iter() {
345         devices.push(
346             new_backend_from_tapfd_arg(arg).map(|(s, backend)| (Connection::Socket(s), backend))?,
347         );
348     }
349 
350     let mut threads = Vec::with_capacity(num_devices);
351 
352     for (conn, backend) in devices {
353         let ex = Executor::new().context("failed to create executor")?;
354 
355         match conn {
356             Connection::Socket(socket) => {
357                 threads.push(thread::spawn(move || {
358                     NET_EXECUTOR.with(|thread_ex| {
359                         let _ = thread_ex.set(ex.clone());
360                     });
361                     let listener = VhostUserListener::new(&socket)?;
362                     // run_until() returns an Result<Result<..>> which the ? operator lets us
363                     // flatten.
364                     ex.run_until(listener.run_backend(backend, &ex))?
365                 }));
366             }
367         };
368     }
369 
370     info!("vhost-user net device ready, loop threads started.");
371     for t in threads {
372         match t.join() {
373             Ok(r) => r?,
374             Err(e) => bail!("thread panicked: {:?}", e),
375         }
376     }
377     Ok(())
378 }
379