1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::net::Ipv4Addr;
6 use std::str::FromStr;
7 use std::thread;
8
9 use anyhow::anyhow;
10 use anyhow::bail;
11 use anyhow::Context;
12 use argh::FromArgs;
13 use base::error;
14 use base::info;
15 use base::validate_raw_descriptor;
16 use base::warn;
17 use base::RawDescriptor;
18 use cros_async::EventAsync;
19 use cros_async::Executor;
20 use cros_async::IntoAsync;
21 use cros_async::IoSource;
22 use futures::channel::oneshot;
23 use futures::select_biased;
24 use futures::FutureExt;
25 use hypervisor::ProtectionType;
26 use net_util::sys::linux::Tap;
27 use net_util::MacAddress;
28 use net_util::TapT;
29 use virtio_sys::virtio_net;
30 use vm_memory::GuestMemory;
31 use vmm_vhost::message::VhostUserProtocolFeatures;
32 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
33
34 use crate::virtio;
35 use crate::virtio::net::process_rx;
36 use crate::virtio::net::validate_and_configure_tap;
37 use crate::virtio::net::NetError;
38 use crate::virtio::vhost::user::device::handler::VhostUserDevice;
39 use crate::virtio::vhost::user::device::listener::sys::VhostUserListener;
40 use crate::virtio::vhost::user::device::listener::VhostUserListenerTrait;
41 use crate::virtio::vhost::user::device::net::run_ctrl_queue;
42 use crate::virtio::vhost::user::device::net::run_tx_queue;
43 use crate::virtio::vhost::user::device::net::NetBackend;
44 use crate::virtio::vhost::user::device::net::NET_EXECUTOR;
45 use crate::virtio::Interrupt;
46 use crate::virtio::Queue;
47
48 struct TapConfig {
49 host_ip: Ipv4Addr,
50 netmask: Ipv4Addr,
51 mac: MacAddress,
52 }
53
54 impl FromStr for TapConfig {
55 type Err = anyhow::Error;
56
from_str(arg: &str) -> Result<Self, Self::Err>57 fn from_str(arg: &str) -> Result<Self, Self::Err> {
58 let args: Vec<&str> = arg.split(',').collect();
59 if args.len() != 3 {
60 bail!("TAP config must consist of 3 parts but {}", args.len());
61 }
62
63 let host_ip: Ipv4Addr = args[0]
64 .parse()
65 .map_err(|e| anyhow!("invalid IP address: {}", e))?;
66 let netmask: Ipv4Addr = args[1]
67 .parse()
68 .map_err(|e| anyhow!("invalid net mask: {}", e))?;
69 let mac: MacAddress = args[2]
70 .parse()
71 .map_err(|e| anyhow!("invalid MAC address: {}", e))?;
72
73 Ok(Self {
74 host_ip,
75 netmask,
76 mac,
77 })
78 }
79 }
80
81 impl<T: 'static> NetBackend<T>
82 where
83 T: TapT + IntoAsync,
84 {
new_from_config(config: &TapConfig) -> anyhow::Result<Self>85 fn new_from_config(config: &TapConfig) -> anyhow::Result<Self> {
86 // Create a tap device.
87 let tap = T::new(true /* vnet_hdr */, false /* multi_queue */)
88 .context("failed to create tap device")?;
89 tap.set_ip_addr(config.host_ip)
90 .context("failed to set IP address")?;
91 tap.set_netmask(config.netmask)
92 .context("failed to set netmask")?;
93 tap.set_mac_address(config.mac)
94 .context("failed to set MAC address")?;
95
96 Self::new(tap)
97 }
98
new_from_tap_fd(tap_fd: RawDescriptor) -> anyhow::Result<Self>99 pub fn new_from_tap_fd(tap_fd: RawDescriptor) -> anyhow::Result<Self> {
100 let tap_fd = validate_raw_descriptor(tap_fd).context("failed to validate tap fd")?;
101 // SAFETY:
102 // Safe because we ensure that we get a unique handle to the fd.
103 let tap = unsafe { T::from_raw_descriptor(tap_fd).context("failed to create tap device")? };
104
105 Self::new(tap)
106 }
107
new(tap: T) -> anyhow::Result<Self>108 pub fn new(tap: T) -> anyhow::Result<Self> {
109 let vq_pairs = Self::max_vq_pairs();
110 validate_and_configure_tap(&tap, vq_pairs as u16)
111 .context("failed to validate and configure tap")?;
112
113 let avail_features = virtio::base_features(ProtectionType::Unprotected)
114 | 1 << virtio_net::VIRTIO_NET_F_GUEST_CSUM
115 | 1 << virtio_net::VIRTIO_NET_F_CSUM
116 | 1 << virtio_net::VIRTIO_NET_F_CTRL_VQ
117 | 1 << virtio_net::VIRTIO_NET_F_CTRL_GUEST_OFFLOADS
118 | 1 << virtio_net::VIRTIO_NET_F_GUEST_TSO4
119 | 1 << virtio_net::VIRTIO_NET_F_GUEST_UFO
120 | 1 << virtio_net::VIRTIO_NET_F_HOST_TSO4
121 | 1 << virtio_net::VIRTIO_NET_F_HOST_UFO
122 | 1 << virtio_net::VIRTIO_NET_F_MTU
123 | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
124
125 let mtu = tap.mtu()?;
126
127 Ok(Self {
128 tap,
129 avail_features,
130 acked_features: 0,
131 acked_protocol_features: VhostUserProtocolFeatures::empty(),
132 mtu,
133 workers: Default::default(),
134 })
135 }
136 }
137
run_rx_queue<T: TapT>( mut queue: Queue, mut tap: IoSource<T>, doorbell: Interrupt, kick_evt: EventAsync, mut stop_rx: oneshot::Receiver<()>, ) -> Queue138 async fn run_rx_queue<T: TapT>(
139 mut queue: Queue,
140 mut tap: IoSource<T>,
141 doorbell: Interrupt,
142 kick_evt: EventAsync,
143 mut stop_rx: oneshot::Receiver<()>,
144 ) -> Queue {
145 loop {
146 select_biased! {
147 // `tap.wait_readable()` requires an immutable reference to `tap`, but `process_rx`
148 // requires a mutable reference to `tap`, so this future needs to be recreated on
149 // every iteration. If more arms are added that doesn't break out of the loop, then
150 // this future could be recreated too many times.
151 rx = tap.wait_readable().fuse() => {
152 if let Err(e) = rx {
153 error!("Failed to wait for tap device to become readable: {}", e);
154 break;
155 }
156 }
157 _ = stop_rx => {
158 break;
159 }
160 }
161
162 match process_rx(&doorbell, &mut queue, tap.as_source_mut()) {
163 Ok(()) => {}
164 Err(NetError::RxDescriptorsExhausted) => {
165 if let Err(e) = kick_evt.next_val().await {
166 error!("Failed to read kick event for rx queue: {}", e);
167 break;
168 }
169 }
170 Err(e) => {
171 error!("Failed to process rx queue: {}", e);
172 break;
173 }
174 }
175 }
176 queue
177 }
178
179 /// Platform specific impl of VhostUserDevice::start_queue.
start_queue<T: 'static + IntoAsync + TapT>( backend: &mut NetBackend<T>, idx: usize, queue: virtio::Queue, _mem: GuestMemory, doorbell: Interrupt, ) -> anyhow::Result<()>180 pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + IntoAsync + TapT>(
181 backend: &mut NetBackend<T>,
182 idx: usize,
183 queue: virtio::Queue,
184 _mem: GuestMemory,
185 doorbell: Interrupt,
186 ) -> anyhow::Result<()> {
187 if backend.workers[idx].is_some() {
188 warn!("Starting new queue handler without stopping old handler");
189 backend.stop_queue(idx)?;
190 }
191
192 NET_EXECUTOR.with(|ex| {
193 // Safe because the executor is initialized in main() below.
194 let ex = ex.get().expect("Executor not initialized");
195
196 let kick_evt = queue
197 .event()
198 .try_clone()
199 .context("failed to clone queue event")?;
200 let kick_evt =
201 EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
202 let tap = backend
203 .tap
204 .try_clone()
205 .context("failed to clone tap device")?;
206 let worker_tuple = match idx {
207 0 => {
208 let tap = ex
209 .async_from(tap)
210 .context("failed to create async tap device")?;
211
212 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
213 (
214 ex.spawn_local(run_rx_queue(queue, tap, doorbell, kick_evt, stop_rx)),
215 stop_tx,
216 )
217 }
218 1 => {
219 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
220 (
221 ex.spawn_local(run_tx_queue(queue, tap, doorbell, kick_evt, stop_rx)),
222 stop_tx,
223 )
224 }
225 2 => {
226 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
227 (
228 ex.spawn_local(run_ctrl_queue(
229 queue,
230 tap,
231 doorbell,
232 kick_evt,
233 backend.acked_features,
234 1, /* vq_pairs */
235 stop_rx,
236 )),
237 stop_tx,
238 )
239 }
240 _ => bail!("attempted to start unknown queue: {}", idx),
241 };
242
243 backend.workers[idx] = Some(worker_tuple);
244 Ok(())
245 })
246 }
247
248 #[derive(FromArgs)]
249 #[argh(subcommand, name = "net")]
250 /// Net device
251 pub struct Options {
252 #[argh(option, arg_name = "SOCKET_PATH,IP_ADDR,NET_MASK,MAC_ADDR")]
253 /// TAP device config. (e.g. "path/to/sock,10.0.2.2,255.255.255.0,12:34:56:78:9a:bc")
254 device: Vec<String>,
255 #[argh(option, arg_name = "SOCKET_PATH,TAP_FD")]
256 /// TAP FD with a socket path"
257 tap_fd: Vec<String>,
258 }
259
260 enum Connection {
261 Socket(String),
262 }
263
new_backend_from_device_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)>264 fn new_backend_from_device_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)> {
265 let pos = match arg.find(',') {
266 Some(p) => p,
267 None => {
268 bail!("device must take comma-separated argument");
269 }
270 };
271 let conn = &arg[0..pos];
272 let cfg = &arg[pos + 1..]
273 .parse::<TapConfig>()
274 .context("failed to parse tap config")?;
275 let backend = NetBackend::<Tap>::new_from_config(cfg).context("failed to create NetBackend")?;
276 Ok((conn.to_string(), backend))
277 }
278
new_backend_from_tapfd_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)>279 fn new_backend_from_tapfd_arg(arg: &str) -> anyhow::Result<(String, NetBackend<Tap>)> {
280 let pos = match arg.find(',') {
281 Some(p) => p,
282 None => {
283 bail!("'tap-fd' flag must take comma-separated argument");
284 }
285 };
286 let conn = &arg[0..pos];
287 let tap_fd = &arg[pos + 1..]
288 .parse::<i32>()
289 .context("failed to parse tap-fd")?;
290 let backend =
291 NetBackend::<Tap>::new_from_tap_fd(*tap_fd).context("failed to create NetBackend")?;
292
293 Ok((conn.to_string(), backend))
294 }
295
296 /// Starts a vhost-user net device.
297 /// Returns an error if the given `args` is invalid or the device fails to run.
start_device(opts: Options) -> anyhow::Result<()>298 pub fn start_device(opts: Options) -> anyhow::Result<()> {
299 let num_devices = opts.device.len() + opts.tap_fd.len();
300
301 if num_devices == 0 {
302 bail!("no device option was passed");
303 }
304
305 let mut devices: Vec<(Connection, NetBackend<Tap>)> = Vec::with_capacity(num_devices);
306
307 // vhost-user
308 for arg in opts.device.iter() {
309 devices.push(
310 new_backend_from_device_arg(arg)
311 .map(|(s, backend)| (Connection::Socket(s), backend))?,
312 );
313 }
314 for arg in opts.tap_fd.iter() {
315 devices.push(
316 new_backend_from_tapfd_arg(arg).map(|(s, backend)| (Connection::Socket(s), backend))?,
317 );
318 }
319
320 let mut threads = Vec::with_capacity(num_devices);
321
322 for (conn, backend) in devices {
323 let ex = Executor::new().context("failed to create executor")?;
324
325 match conn {
326 Connection::Socket(socket) => {
327 threads.push(thread::spawn(move || {
328 NET_EXECUTOR.with(|thread_ex| {
329 let _ = thread_ex.set(ex.clone());
330 });
331 let listener = VhostUserListener::new_socket(&socket, None)?;
332 // run_until() returns an Result<Result<..>> which the ? operator lets us
333 // flatten.
334 ex.run_until(listener.run_backend(backend, &ex))?
335 }));
336 }
337 };
338 }
339
340 info!("vhost-user net device ready, loop threads started.");
341 for t in threads {
342 match t.join() {
343 Ok(r) => r?,
344 Err(e) => bail!("thread panicked: {:?}", e),
345 }
346 }
347 Ok(())
348 }
349