1 // Copyright (C) 2019 Alibaba Cloud Computing. All rights reserved.
2 // SPDX-License-Identifier: Apache-2.0
3
4 //! Common data structures for listener and endpoint.
5
6 cfg_if::cfg_if! {
7 if #[cfg(unix)] {
8 pub mod socket;
9 #[cfg(feature = "vfio-device")]
10 pub mod vfio;
11 mod unix;
12 } else if #[cfg(windows)] {
13 mod tube;
14 pub use tube::{TubeEndpoint, TubeListener};
15 mod windows;
16 }
17 }
18
19 use base::RawDescriptor;
20 use std::fs::File;
21 use std::io::{IoSlice, IoSliceMut};
22 use std::mem;
23 use std::path::Path;
24
25 use data_model::DataInit;
26
27 use super::message::*;
28 use super::{Error, Result};
29 use crate::connection::Req;
30
31 /// Listener for accepting connections.
32 pub trait Listener: Sized {
33 /// Type of an object created when a connection is accepted.
34 type Connection;
35
36 /// Accept an incoming connection.
accept(&mut self) -> Result<Option<Self::Connection>>37 fn accept(&mut self) -> Result<Option<Self::Connection>>;
38
39 /// Change blocking status on the listener.
set_nonblocking(&self, block: bool) -> Result<()>40 fn set_nonblocking(&self, block: bool) -> Result<()>;
41 }
42
43 /// Abstracts a vhost-user connection and related operations.
44 pub trait Endpoint<R: Req>: Sized {
45 /// Type of an object that Endpoint is created from.
46 type Listener: Listener;
47
48 /// Create an endpoint from a stream object.
from_connection(sock: <Self::Listener as Listener>::Connection) -> Self49 fn from_connection(sock: <Self::Listener as Listener>::Connection) -> Self;
50
51 /// Create a new stream by connecting to server at `str`.
connect<P: AsRef<Path>>(path: P) -> Result<Self>52 fn connect<P: AsRef<Path>>(path: P) -> Result<Self>;
53
54 /// Sends bytes from scatter-gather vectors with optional attached file descriptors.
55 ///
56 /// # Return:
57 /// * - number of bytes sent on success
send_iovec(&mut self, iovs: &[IoSlice], fds: Option<&[RawDescriptor]>) -> Result<usize>58 fn send_iovec(&mut self, iovs: &[IoSlice], fds: Option<&[RawDescriptor]>) -> Result<usize>;
59
60 /// Reads bytes into the given scatter/gather vectors with optional attached file.
61 ///
62 /// # Arguements
63 /// * `bufs` - A slice of buffers to store received data.
64 /// * `allow_fd` - Indicates whether we can receive FDs.
65 ///
66 /// # Return:
67 /// * - (number of bytes received, [received files]) on success.
68 /// * - `Error::Disconnect` if the client closed.
recv_into_bufs( &mut self, bufs: &mut [IoSliceMut], allow_fd: bool, ) -> Result<(usize, Option<Vec<File>>)>69 fn recv_into_bufs(
70 &mut self,
71 bufs: &mut [IoSliceMut],
72 allow_fd: bool,
73 ) -> Result<(usize, Option<Vec<File>>)>;
74 }
75
76 // Advance the internal cursor of the slices.
77 // This is same with a nightly API `IoSlice::advance_slices` but for `&[u8]`.
advance_slices(bufs: &mut &mut [&[u8]], mut count: usize)78 fn advance_slices(bufs: &mut &mut [&[u8]], mut count: usize) {
79 use std::mem::take;
80
81 let mut idx = 0;
82 for b in bufs.iter() {
83 if count < b.len() {
84 break;
85 }
86 count -= b.len();
87 idx += 1;
88 }
89 *bufs = &mut take(bufs)[idx..];
90 if !bufs.is_empty() {
91 bufs[0] = &bufs[0][count..];
92 }
93 }
94
95 // Advance the internal cursor of the slices.
96 // This is same with a nightly API `IoSliceMut::advance_slices` but for `&mut [u8]`.
advance_slices_mut(bufs: &mut &mut [&mut [u8]], mut count: usize)97 fn advance_slices_mut(bufs: &mut &mut [&mut [u8]], mut count: usize) {
98 use std::mem::take;
99
100 let mut idx = 0;
101 for b in bufs.iter() {
102 if count < b.len() {
103 break;
104 }
105 count -= b.len();
106 idx += 1;
107 }
108 *bufs = &mut take(bufs)[idx..];
109 if !bufs.is_empty() {
110 let slice = take(&mut bufs[0]);
111 let (_, remaining) = slice.split_at_mut(count);
112 bufs[0] = remaining;
113 }
114 }
115
116 /// Abstracts VVU message parsing, sending and receiving.
117 pub trait EndpointExt<R: Req>: Endpoint<R> {
118 /// Sends all bytes from scatter-gather vectors with optional attached file descriptors. Will
119 /// loop until all data has been transfered.
120 ///
121 /// # Return:
122 /// * - number of bytes sent on success
123 ///
124 /// # TODO
125 /// This function takes a slice of `&[u8]` instead of `IoSlice` because the internal
126 /// cursor needs to be moved by `advance_slices()`.
127 /// Once `IoSlice::advance_slices()` becomes stable, this should be updated.
128 /// <https://github.com/rust-lang/rust/issues/62726>.
send_iovec_all( &mut self, mut iovs: &mut [&[u8]], mut fds: Option<&[RawDescriptor]>, ) -> Result<usize>129 fn send_iovec_all(
130 &mut self,
131 mut iovs: &mut [&[u8]],
132 mut fds: Option<&[RawDescriptor]>,
133 ) -> Result<usize> {
134 // Guarantee that `iovs` becomes empty if it doesn't contain any data.
135 advance_slices(&mut iovs, 0);
136
137 let mut data_sent = 0;
138 while !iovs.is_empty() {
139 let iovec: Vec<_> = iovs.iter_mut().map(|i| IoSlice::new(i)).collect();
140 match self.send_iovec(&iovec, fds) {
141 Ok(0) => {
142 break;
143 }
144 Ok(n) => {
145 data_sent += n;
146 fds = None;
147 advance_slices(&mut iovs, n);
148 }
149 Err(e) => match e {
150 Error::SocketRetry(_) => {}
151 _ => return Err(e),
152 },
153 }
154 }
155 Ok(data_sent)
156 }
157
158 /// Sends bytes from a slice with optional attached file descriptors.
159 ///
160 /// # Return:
161 /// * - number of bytes sent on success
162 #[cfg(test)]
send_slice(&mut self, data: IoSlice, fds: Option<&[RawDescriptor]>) -> Result<usize>163 fn send_slice(&mut self, data: IoSlice, fds: Option<&[RawDescriptor]>) -> Result<usize> {
164 self.send_iovec(&[data], fds)
165 }
166
167 /// Sends a header-only message with optional attached file descriptors.
168 ///
169 /// # Return:
170 /// * - number of bytes sent on success
171 /// * - PartialMessage: received a partial message.
172 /// * - backend specific errors
send_header( &mut self, hdr: &VhostUserMsgHeader<R>, fds: Option<&[RawDescriptor]>, ) -> Result<()>173 fn send_header(
174 &mut self,
175 hdr: &VhostUserMsgHeader<R>,
176 fds: Option<&[RawDescriptor]>,
177 ) -> Result<()> {
178 let mut iovs = [hdr.as_slice()];
179 let bytes = self.send_iovec_all(&mut iovs[..], fds)?;
180 if bytes != mem::size_of::<VhostUserMsgHeader<R>>() {
181 return Err(Error::PartialMessage);
182 }
183 Ok(())
184 }
185
186 /// Send a message with header and body. Optional file descriptors may be attached to
187 /// the message.
188 ///
189 /// # Return:
190 /// * - number of bytes sent on success
191 /// * - OversizedMsg: message size is too big.
192 /// * - PartialMessage: received a partial message.
193 /// * - backend specific errors
send_message<T: Sized + DataInit>( &mut self, hdr: &VhostUserMsgHeader<R>, body: &T, fds: Option<&[RawDescriptor]>, ) -> Result<()>194 fn send_message<T: Sized + DataInit>(
195 &mut self,
196 hdr: &VhostUserMsgHeader<R>,
197 body: &T,
198 fds: Option<&[RawDescriptor]>,
199 ) -> Result<()> {
200 if mem::size_of::<T>() > MAX_MSG_SIZE {
201 return Err(Error::OversizedMsg);
202 }
203
204 // We send the header and the body separately here. This is necessary on Windows. Otherwise
205 // the recv side cannot read the header independently (the transport is message oriented).
206 let mut bytes = self.send_iovec_all(&mut [hdr.as_slice()], fds)?;
207 bytes += self.send_iovec_all(&mut [body.as_slice()], None)?;
208 if bytes != mem::size_of::<VhostUserMsgHeader<R>>() + mem::size_of::<T>() {
209 return Err(Error::PartialMessage);
210 }
211 Ok(())
212 }
213
214 /// Send a message with header, body and payload. Optional file descriptors
215 /// may also be attached to the message.
216 ///
217 /// # Return:
218 /// * - number of bytes sent on success
219 /// * - OversizedMsg: message size is too big.
220 /// * - PartialMessage: received a partial message.
221 /// * - IncorrectFds: wrong number of attached fds.
222 /// * - backend specific errors
send_message_with_payload<T: Sized + DataInit>( &mut self, hdr: &VhostUserMsgHeader<R>, body: &T, payload: &[u8], fds: Option<&[RawDescriptor]>, ) -> Result<()>223 fn send_message_with_payload<T: Sized + DataInit>(
224 &mut self,
225 hdr: &VhostUserMsgHeader<R>,
226 body: &T,
227 payload: &[u8],
228 fds: Option<&[RawDescriptor]>,
229 ) -> Result<()> {
230 let len = payload.len();
231 if mem::size_of::<T>() > MAX_MSG_SIZE {
232 return Err(Error::OversizedMsg);
233 }
234 if len > MAX_MSG_SIZE - mem::size_of::<T>() {
235 return Err(Error::OversizedMsg);
236 }
237 if let Some(fd_arr) = fds {
238 if fd_arr.len() > MAX_ATTACHED_FD_ENTRIES {
239 return Err(Error::IncorrectFds);
240 }
241 }
242
243 let total = mem::size_of::<VhostUserMsgHeader<R>>() + mem::size_of::<T>() + len;
244
245 // We send the header and the body separately here. This is necessary on Windows. Otherwise
246 // the recv side cannot read the header independently (the transport is message oriented).
247 let mut len = self.send_iovec_all(&mut [hdr.as_slice()], fds)?;
248 len += self.send_iovec_all(&mut [body.as_slice(), payload], None)?;
249
250 if len != total {
251 return Err(Error::PartialMessage);
252 }
253 Ok(())
254 }
255
256 /// Reads `len` bytes at most.
257 ///
258 /// # Return:
259 /// * - (number of bytes received, buf) on success
recv_data(&mut self, len: usize) -> Result<Vec<u8>>260 fn recv_data(&mut self, len: usize) -> Result<Vec<u8>> {
261 let mut buf = vec![0u8; len];
262 let (data_len, _) =
263 self.recv_into_bufs(&mut [IoSliceMut::new(&mut buf)], false /* allow_fd */)?;
264 buf.truncate(data_len);
265 Ok(buf)
266 }
267
268 /// Reads all bytes into the given scatter/gather vectors with optional attached files. Will
269 /// loop until all data has been transferred.
270 ///
271 /// # Return:
272 /// * - (number of bytes received, [received fds]) on success
273 /// * - `Disconnect` - client is closed
274 ///
275 /// # TODO
276 /// This function takes a slice of `&mut [u8]` instead of `IoSliceMut` because the internal
277 /// cursor needs to be moved by `advance_slices_mut()`.
278 /// Once `IoSliceMut::advance_slices()` becomes stable, this should be updated.
279 /// <https://github.com/rust-lang/rust/issues/62726>.
recv_into_bufs_all( &mut self, mut bufs: &mut [&mut [u8]], ) -> Result<(usize, Option<Vec<File>>)>280 fn recv_into_bufs_all(
281 &mut self,
282 mut bufs: &mut [&mut [u8]],
283 ) -> Result<(usize, Option<Vec<File>>)> {
284 let buf_lens: Vec<usize> = bufs.iter().map(|b| b.len()).collect();
285 let data_total: usize = buf_lens.iter().sum();
286 let mut data_read = 0;
287 let mut rfds = None;
288
289 while (data_total - data_read) > 0 {
290 let mut slices: Vec<IoSliceMut> = bufs.iter_mut().map(|b| IoSliceMut::new(b)).collect();
291 let res = self.recv_into_bufs(&mut slices, true);
292 match res {
293 Ok((0, _)) => return Ok((data_read, rfds)),
294 Ok((n, fds)) => {
295 if data_read == 0 {
296 rfds = fds;
297 }
298 data_read += n;
299 advance_slices_mut(&mut bufs, n);
300 }
301 Err(e) => match e {
302 Error::SocketRetry(_) => {}
303 _ => return Err(e),
304 },
305 }
306 }
307 Ok((data_read, rfds))
308 }
309
310 /// Reads bytes into a new buffer with optional attached files. Received file descriptors are
311 /// set close-on-exec and converted to `File`.
312 ///
313 /// # Return:
314 /// * - (number of bytes received, buf, [received files]) on success.
315 /// * - backend specific errors
316 #[cfg(test)]
recv_into_buf(&mut self, buf_size: usize) -> Result<(usize, Vec<u8>, Option<Vec<File>>)>317 fn recv_into_buf(&mut self, buf_size: usize) -> Result<(usize, Vec<u8>, Option<Vec<File>>)> {
318 let mut buf = vec![0u8; buf_size];
319 let mut slices = [IoSliceMut::new(buf.as_mut_slice())];
320 let (bytes, files) = self.recv_into_bufs(&mut slices, true /* allow_fd */)?;
321 Ok((bytes, buf, files))
322 }
323
324 /// Receive a header-only message with optional attached files.
325 /// Note, only the first MAX_ATTACHED_FD_ENTRIES file descriptors will be
326 /// accepted and all other file descriptor will be discard silently.
327 ///
328 /// # Return:
329 /// * - (message header, [received files]) on success.
330 /// * - Disconnect: the client closed the connection.
331 /// * - PartialMessage: received a partial message.
332 /// * - InvalidMessage: received a invalid message.
333 /// * - backend specific errors
recv_header(&mut self) -> Result<(VhostUserMsgHeader<R>, Option<Vec<File>>)>334 fn recv_header(&mut self) -> Result<(VhostUserMsgHeader<R>, Option<Vec<File>>)> {
335 let mut hdr = VhostUserMsgHeader::default();
336 let (bytes, files) = self.recv_into_bufs(
337 &mut [IoSliceMut::new(hdr.as_mut_slice())],
338 true, /* allow_fd */
339 )?;
340
341 if bytes != mem::size_of::<VhostUserMsgHeader<R>>() {
342 return Err(Error::PartialMessage);
343 } else if !hdr.is_valid() {
344 return Err(Error::InvalidMessage);
345 }
346
347 Ok((hdr, files))
348 }
349
350 /// Receive a message with optional attached file descriptors.
351 /// Note, only the first MAX_ATTACHED_FD_ENTRIES file descriptors will be
352 /// accepted and all other file descriptor will be discard silently.
353 ///
354 /// # Return:
355 /// * - (message header, message body, [received files]) on success.
356 /// * - PartialMessage: received a partial message.
357 /// * - InvalidMessage: received a invalid message.
358 /// * - backend specific errors
recv_body<T: Sized + DataInit + Default + VhostUserMsgValidator>( &mut self, ) -> Result<(VhostUserMsgHeader<R>, T, Option<Vec<File>>)>359 fn recv_body<T: Sized + DataInit + Default + VhostUserMsgValidator>(
360 &mut self,
361 ) -> Result<(VhostUserMsgHeader<R>, T, Option<Vec<File>>)> {
362 let mut hdr = VhostUserMsgHeader::default();
363 let mut body: T = Default::default();
364 let mut slices = [hdr.as_mut_slice(), body.as_mut_slice()];
365 let (bytes, files) = self.recv_into_bufs_all(&mut slices)?;
366
367 let total = mem::size_of::<VhostUserMsgHeader<R>>() + mem::size_of::<T>();
368 if bytes != total {
369 return Err(Error::PartialMessage);
370 } else if !hdr.is_valid() || !body.is_valid() {
371 return Err(Error::InvalidMessage);
372 }
373
374 Ok((hdr, body, files))
375 }
376
377 /// Receive a message with header and optional content. Callers need to
378 /// pre-allocate a big enough buffer to receive the message body and
379 /// optional payload. If there are attached file descriptor associated
380 /// with the message, the first MAX_ATTACHED_FD_ENTRIES file descriptors
381 /// will be accepted and all other file descriptor will be discard
382 /// silently.
383 ///
384 /// # Return:
385 /// * - (message header, message size, [received files]) on success.
386 /// * - PartialMessage: received a partial message.
387 /// * - InvalidMessage: received a invalid message.
388 /// * - backend specific errors
389 #[cfg(test)]
recv_body_into_buf( &mut self, buf: &mut [u8], ) -> Result<(VhostUserMsgHeader<R>, usize, Option<Vec<File>>)>390 fn recv_body_into_buf(
391 &mut self,
392 buf: &mut [u8],
393 ) -> Result<(VhostUserMsgHeader<R>, usize, Option<Vec<File>>)> {
394 let mut hdr = VhostUserMsgHeader::default();
395 let mut slices = [hdr.as_mut_slice(), buf];
396 let (bytes, files) = self.recv_into_bufs_all(&mut slices)?;
397
398 if bytes < mem::size_of::<VhostUserMsgHeader<R>>() {
399 return Err(Error::PartialMessage);
400 } else if !hdr.is_valid() {
401 return Err(Error::InvalidMessage);
402 }
403
404 Ok((hdr, bytes - mem::size_of::<VhostUserMsgHeader<R>>(), files))
405 }
406
407 /// Receive a message with optional payload and attached file descriptors.
408 /// Note, only the first MAX_ATTACHED_FD_ENTRIES file descriptors will be
409 /// accepted and all other file descriptor will be discard silently.
410 ///
411 /// # Return:
412 /// * - (message header, message body, size of payload, [received files]) on success.
413 /// * - PartialMessage: received a partial message.
414 /// * - InvalidMessage: received a invalid message.
415 /// * - backend specific errors
416 #[cfg_attr(feature = "cargo-clippy", allow(clippy::type_complexity))]
recv_payload_into_buf<T: Sized + DataInit + Default + VhostUserMsgValidator>( &mut self, buf: &mut [u8], ) -> Result<(VhostUserMsgHeader<R>, T, usize, Option<Vec<File>>)>417 fn recv_payload_into_buf<T: Sized + DataInit + Default + VhostUserMsgValidator>(
418 &mut self,
419 buf: &mut [u8],
420 ) -> Result<(VhostUserMsgHeader<R>, T, usize, Option<Vec<File>>)> {
421 let mut hdr = VhostUserMsgHeader::default();
422 let mut body: T = Default::default();
423 let mut slices = [hdr.as_mut_slice(), body.as_mut_slice(), buf];
424 let (bytes, files) = self.recv_into_bufs_all(&mut slices)?;
425
426 let total = mem::size_of::<VhostUserMsgHeader<R>>() + mem::size_of::<T>();
427 if bytes < total {
428 return Err(Error::PartialMessage);
429 } else if !hdr.is_valid() || !body.is_valid() {
430 return Err(Error::InvalidMessage);
431 }
432
433 Ok((hdr, body, bytes - total, files))
434 }
435 }
436
437 impl<R: Req, E: Endpoint<R>> EndpointExt<R> for E {}
438
439 #[cfg(test)]
440 pub(crate) mod tests {
441 use super::*;
442 cfg_if::cfg_if! {
443 if #[cfg(unix)] {
444 #[cfg(feature = "vmm")]
445 pub(crate) use super::unix::tests::*;
446 } else if #[cfg(windows)] {
447 #[cfg(feature = "vmm")]
448 pub(crate) use windows::tests::*;
449 }
450 }
451
452 #[test]
test_advance_slices()453 fn test_advance_slices() {
454 // Test case from https://doc.rust-lang.org/std/io/struct.IoSlice.html#method.advance_slices
455 let buf1 = [1; 8];
456 let buf2 = [2; 16];
457 let buf3 = [3; 8];
458 let mut bufs = &mut [&buf1[..], &buf2[..], &buf3[..]][..];
459 advance_slices(&mut bufs, 10);
460 assert_eq!(bufs[0], [2; 14].as_ref());
461 assert_eq!(bufs[1], [3; 8].as_ref());
462 }
463
464 #[test]
test_advance_slices_mut()465 fn test_advance_slices_mut() {
466 // Test case from https://doc.rust-lang.org/std/io/struct.IoSliceMut.html#method.advance_slices
467 let mut buf1 = [1; 8];
468 let mut buf2 = [2; 16];
469 let mut buf3 = [3; 8];
470 let mut bufs = &mut [&mut buf1[..], &mut buf2[..], &mut buf3[..]][..];
471 advance_slices_mut(&mut bufs, 10);
472 assert_eq!(bufs[0], [2; 14].as_ref());
473 assert_eq!(bufs[1], [3; 8].as_ref());
474 }
475 }
476