• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::net::Ipv4Addr;
6 use std::str::FromStr;
7 use std::thread;
8 
9 use anyhow::anyhow;
10 use anyhow::bail;
11 use anyhow::Context;
12 use argh::FromArgs;
13 use base::error;
14 use base::info;
15 use base::validate_raw_descriptor;
16 use base::warn;
17 use base::RawDescriptor;
18 use cros_async::EventAsync;
19 use cros_async::Executor;
20 use cros_async::IntoAsync;
21 use cros_async::IoSource;
22 use futures::channel::oneshot;
23 use futures::select_biased;
24 use futures::FutureExt;
25 use hypervisor::ProtectionType;
26 use net_util::sys::linux::Tap;
27 use net_util::MacAddress;
28 use net_util::TapT;
29 use virtio_sys::virtio_net;
30 use vm_memory::GuestMemory;
31 use vmm_vhost::message::VhostUserProtocolFeatures;
32 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
33 
34 use crate::virtio;
35 use crate::virtio::net::process_rx;
36 use crate::virtio::net::validate_and_configure_tap;
37 use crate::virtio::net::NetError;
38 use crate::virtio::vhost::user::device::handler::VhostUserDevice;
39 use crate::virtio::vhost::user::device::listener::sys::VhostUserListener;
40 use crate::virtio::vhost::user::device::listener::VhostUserListenerTrait;
41 use crate::virtio::vhost::user::device::net::run_ctrl_queue;
42 use crate::virtio::vhost::user::device::net::run_tx_queue;
43 use crate::virtio::vhost::user::device::net::NetBackend;
44 use crate::virtio::vhost::user::device::net::NET_EXECUTOR;
45 use crate::virtio::Interrupt;
46 use crate::virtio::Queue;
47 
48 struct TapConfig {
49     host_ip: Ipv4Addr,
50     netmask: Ipv4Addr,
51     mac: MacAddress,
52 }
53 
54 impl FromStr for TapConfig {
55     type Err = anyhow::Error;
56 
from_str(arg: &str) -> Result<Self, Self::Err>57     fn from_str(arg: &str) -> Result<Self, Self::Err> {
58         let args: Vec<&str> = arg.split(',').collect();
59         if args.len() != 3 {
60             bail!("TAP config must consist of 3 parts but {}", args.len());
61         }
62 
63         let host_ip: Ipv4Addr = args[0]
64             .parse()
65             .map_err(|e| anyhow!("invalid IP address: {}", e))?;
66         let netmask: Ipv4Addr = args[1]
67             .parse()
68             .map_err(|e| anyhow!("invalid net mask: {}", e))?;
69         let mac: MacAddress = args[2]
70             .parse()
71             .map_err(|e| anyhow!("invalid MAC address: {}", e))?;
72 
73         Ok(Self {
74             host_ip,
75             netmask,
76             mac,
77         })
78     }
79 }
80 
81 impl<T: 'static> NetBackend<T>
82 where
83     T: TapT + IntoAsync,
84 {
new_from_config(config: &TapConfig) -> anyhow::Result<Self>85     fn new_from_config(config: &TapConfig) -> anyhow::Result<Self> {
86         // Create a tap device.
87         let tap = T::new(true /* vnet_hdr */, false /* multi_queue */)
88             .context("failed to create tap device")?;
89         tap.set_ip_addr(config.host_ip)
90             .context("failed to set IP address")?;
91         tap.set_netmask(config.netmask)
92             .context("failed to set netmask")?;
93         tap.set_mac_address(config.mac)
94             .context("failed to set MAC address")?;
95 
96         Self::new(tap)
97     }
98 
new_from_tap_fd(tap_fd: RawDescriptor) -> anyhow::Result<Self>99     pub fn new_from_tap_fd(tap_fd: RawDescriptor) -> anyhow::Result<Self> {
100         let tap_fd = validate_raw_descriptor(tap_fd).context("failed to validate tap fd")?;
101         // SAFETY:
102         // Safe because we ensure that we get a unique handle to the fd.
103         let tap = unsafe { T::from_raw_descriptor(tap_fd).context("failed to create tap device")? };
104 
105         Self::new(tap)
106     }
107 
new(tap: T) -> anyhow::Result<Self>108     pub fn new(tap: T) -> anyhow::Result<Self> {
109         let vq_pairs = Self::max_vq_pairs();
110         validate_and_configure_tap(&tap, vq_pairs as u16)
111             .context("failed to validate and configure tap")?;
112 
113         let avail_features = virtio::base_features(ProtectionType::Unprotected)
114             | 1 << virtio_net::VIRTIO_NET_F_GUEST_CSUM
115             | 1 << virtio_net::VIRTIO_NET_F_CSUM
116             | 1 << virtio_net::VIRTIO_NET_F_CTRL_VQ
117             | 1 << virtio_net::VIRTIO_NET_F_CTRL_GUEST_OFFLOADS
118             | 1 << virtio_net::VIRTIO_NET_F_GUEST_TSO4
119             | 1 << virtio_net::VIRTIO_NET_F_GUEST_UFO
120             | 1 << virtio_net::VIRTIO_NET_F_HOST_TSO4
121             | 1 << virtio_net::VIRTIO_NET_F_HOST_UFO
122             | 1 << virtio_net::VIRTIO_NET_F_MTU
123             | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
124 
125         let mtu = tap.mtu()?;
126 
127         Ok(Self {
128             tap,
129             avail_features,
130             acked_features: 0,
131             acked_protocol_features: VhostUserProtocolFeatures::empty(),
132             mtu,
133             workers: Default::default(),
134         })
135     }
136 }
137 
run_rx_queue<T: TapT>( mut queue: Queue, mut tap: IoSource<T>, doorbell: Interrupt, kick_evt: EventAsync, mut stop_rx: oneshot::Receiver<()>, ) -> Queue138 async fn run_rx_queue<T: TapT>(
139     mut queue: Queue,
140     mut tap: IoSource<T>,
141     doorbell: Interrupt,
142     kick_evt: EventAsync,
143     mut stop_rx: oneshot::Receiver<()>,
144 ) -> Queue {
145     loop {
146         select_biased! {
147             // `tap.wait_readable()` requires an immutable reference to `tap`, but `process_rx`
148             // requires a mutable reference to `tap`, so this future needs to be recreated on
149             // every iteration. If more arms are added that doesn't break out of the loop, then
150             // this future could be recreated too many times.
151             rx = tap.wait_readable().fuse() => {
152                 if let Err(e) = rx {
153                     error!("Failed to wait for tap device to become readable: {}", e);
154                     break;
155                 }
156             }
157             _ = stop_rx => {
158                 break;
159             }
160         }
161 
162         match process_rx(&doorbell, &mut queue, tap.as_source_mut()) {
163             Ok(()) => {}
164             Err(NetError::RxDescriptorsExhausted) => {
165                 if let Err(e) = kick_evt.next_val().await {
166                     error!("Failed to read kick event for rx queue: {}", e);
167                     break;
168                 }
169             }
170             Err(e) => {
171                 error!("Failed to process rx queue: {}", e);
172                 break;
173             }
174         }
175     }
176     queue
177 }
178 
179 /// Platform specific impl of VhostUserDevice::start_queue.
start_queue<T: 'static + IntoAsync + TapT>( backend: &mut NetBackend<T>, idx: usize, queue: virtio::Queue, _mem: GuestMemory, doorbell: Interrupt, ) -> anyhow::Result<()>180 pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + IntoAsync + TapT>(
181     backend: &mut NetBackend<T>,
182     idx: usize,
183     queue: virtio::Queue,
184     _mem: GuestMemory,
185     doorbell: Interrupt,
186 ) -> anyhow::Result<()> {
187     if backend.workers[idx].is_some() {
188         warn!("Starting new queue handler without stopping old handler");
189         backend.stop_queue(idx)?;
190     }
191 
192     NET_EXECUTOR.with(|ex| {
193         // Safe because the executor is initialized in main() below.
194         let ex = ex.get().expect("Executor not initialized");
195 
196         let kick_evt = queue
197             .event()
198             .try_clone()
199             .context("failed to clone queue event")?;
200         let kick_evt =
201             EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
202         let tap = backend
203             .tap
204             .try_clone()
205             .context("failed to clone tap device")?;
206         let worker_tuple = match idx {
207             0 => {
208                 let tap = ex
209                     .async_from(tap)
210                     .context("failed to create async tap device")?;
211 
212                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
213                 (
214                     ex.spawn_local(run_rx_queue(queue, tap, doorbell, kick_evt, stop_rx)),
215                     stop_tx,
216                 )
217             }
218             1 => {
219                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
220                 (
221                     ex.spawn_local(run_tx_queue(queue, tap, doorbell, kick_evt, stop_rx)),
222                     stop_tx,
223                 )
224             }
225             2 => {
226                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
227                 (
228                     ex.spawn_local(run_ctrl_queue(
229                         queue,
230                         tap,
231                         doorbell,
232                         kick_evt,
233                         backend.acked_features,
234                         1, /* vq_pairs */
235                         stop_rx,
236                     )),
237                     stop_tx,
238                 )
239             }
240             _ => bail!("attempted to start unknown queue: {}", idx),
241         };
242 
243         backend.workers[idx] = Some(worker_tuple);
244         Ok(())
245     })
246 }
247 
248 #[derive(FromArgs)]
249 #[argh(subcommand, name = "net")]
250 /// Net device
251 pub struct Options {
252     #[argh(option, arg_name = "SOCKET_PATH,IP_ADDR,NET_MASK,MAC_ADDR")]
253     /// TAP device config. (e.g. "path/to/sock,10.0.2.2,255.255.255.0,12:34:56:78:9a:bc")
254     device: Vec<String>,
255     #[argh(option, arg_name = "SOCKET_PATH,TAP_FD")]
256     /// TAP FD with a socket path"
257     tap_fd: Vec<String>,
258 }
259 
260 enum Connection {
261     Socket(String),
262 }
263 
new_backend_from_device_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)>264 fn new_backend_from_device_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)> {
265     let pos = match arg.find(',') {
266         Some(p) => p,
267         None => {
268             bail!("device must take comma-separated argument");
269         }
270     };
271     let conn = &arg[0..pos];
272     let cfg = &arg[pos + 1..]
273         .parse::<TapConfig>()
274         .context("failed to parse tap config")?;
275     let backend = NetBackend::<Tap>::new_from_config(cfg).context("failed to create NetBackend")?;
276     Ok((conn.to_string(), backend))
277 }
278 
new_backend_from_tapfd_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)>279 fn new_backend_from_tapfd_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)> {
280     let pos = match arg.find(',') {
281         Some(p) => p,
282         None => {
283             bail!("'tap-fd' flag must take comma-separated argument");
284         }
285     };
286     let conn = &arg[0..pos];
287     let tap_fd = &arg[pos + 1..]
288         .parse::<i32>()
289         .context("failed to parse tap-fd")?;
290     let backend =
291         NetBackend::<Tap>::new_from_tap_fd(*tap_fd).context("failed to create NetBackend")?;
292 
293     Ok((conn.to_string(), backend))
294 }
295 
296 /// Starts a vhost-user net device.
297 /// Returns an error if the given `args` is invalid or the device fails to run.
start_device(opts: Options) -> anyhow::Result<()>298 pub fn start_device(opts: Options) -> anyhow::Result<()> {
299     let num_devices = opts.device.len() + opts.tap_fd.len();
300 
301     if num_devices == 0 {
302         bail!("no device option was passed");
303     }
304 
305     let mut devices: Vec<(Connection, NetBackend<Tap>)> = Vec::with_capacity(num_devices);
306 
307     // vhost-user
308     for arg in opts.device.iter() {
309         devices.push(
310             new_backend_from_device_arg(arg)
311                 .map(|(s, backend)| (Connection::Socket(s), backend))?,
312         );
313     }
314     for arg in opts.tap_fd.iter() {
315         devices.push(
316             new_backend_from_tapfd_arg(arg).map(|(s, backend)| (Connection::Socket(s), backend))?,
317         );
318     }
319 
320     let mut threads = Vec::with_capacity(num_devices);
321 
322     for (conn, backend) in devices {
323         let ex = Executor::new().context("failed to create executor")?;
324 
325         match conn {
326             Connection::Socket(socket) => {
327                 threads.push(thread::spawn(move || {
328                     NET_EXECUTOR.with(|thread_ex| {
329                         let _ = thread_ex.set(ex.clone());
330                     });
331                     let listener = VhostUserListener::new_socket(&socket, None)?;
332                     // run_until() returns an Result<Result<..>> which the ? operator lets us
333                     // flatten.
334                     ex.run_until(listener.run_backend(backend, &ex))?
335                 }));
336             }
337         };
338     }
339 
340     info!("vhost-user net device ready, loop threads started.");
341     for t in threads {
342         match t.join() {
343             Ok(r) => r?,
344             Err(e) => bail!("thread panicked: {:?}", e),
345         }
346     }
347     Ok(())
348 }
349