• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2020 Alibaba Cloud. All rights reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 
4 use std::io;
5 use std::mem;
6 use std::os::unix::io::{AsRawFd, RawFd};
7 use std::os::unix::net::UnixStream;
8 use std::sync::{Arc, Mutex, MutexGuard};
9 
10 use super::connection::Endpoint;
11 use super::message::*;
12 use super::{Error, HandlerResult, Result, VhostUserFrontendReqHandler};
13 
14 use vm_memory::ByteValued;
15 
16 struct BackendInternal {
17     sock: Endpoint<VhostUserMsgHeader<BackendReq>>,
18 
19     // Protocol feature VHOST_USER_PROTOCOL_F_REPLY_ACK has been negotiated.
20     reply_ack_negotiated: bool,
21 
22     // Protocol feature VHOST_USER_PROTOCOL_F_SHARED_OBJECT has been negotiated.
23     shared_object_negotiated: bool,
24 
25     // whether the endpoint has encountered any failure
26     error: Option<i32>,
27 }
28 
29 impl BackendInternal {
check_state(&self) -> Result<u64>30     fn check_state(&self) -> Result<u64> {
31         match self.error {
32             Some(e) => Err(Error::SocketBroken(std::io::Error::from_raw_os_error(e))),
33             None => Ok(0),
34         }
35     }
36 
send_message<T: ByteValued>( &mut self, request: BackendReq, body: &T, fds: Option<&[RawFd]>, ) -> Result<u64>37     fn send_message<T: ByteValued>(
38         &mut self,
39         request: BackendReq,
40         body: &T,
41         fds: Option<&[RawFd]>,
42     ) -> Result<u64> {
43         self.check_state()?;
44 
45         let len = mem::size_of::<T>();
46         let mut hdr = VhostUserMsgHeader::new(request, 0, len as u32);
47         if self.reply_ack_negotiated {
48             hdr.set_need_reply(true);
49         }
50         self.sock.send_message(&hdr, body, fds)?;
51 
52         self.wait_for_ack(&hdr)
53     }
54 
wait_for_ack(&mut self, hdr: &VhostUserMsgHeader<BackendReq>) -> Result<u64>55     fn wait_for_ack(&mut self, hdr: &VhostUserMsgHeader<BackendReq>) -> Result<u64> {
56         self.check_state()?;
57         if !self.reply_ack_negotiated {
58             return Ok(0);
59         }
60 
61         let (reply, body, rfds) = self.sock.recv_body::<VhostUserU64>()?;
62         if !reply.is_reply_for(hdr) || rfds.is_some() || !body.is_valid() {
63             return Err(Error::InvalidMessage);
64         }
65         if body.value != 0 {
66             return Err(Error::FrontendInternalError);
67         }
68 
69         Ok(body.value)
70     }
71 }
72 
73 /// Request proxy to send vhost-user backend requests to the frontend through the backend
74 /// communication channel.
75 ///
76 /// The [Backend] acts as a message proxy to forward vhost-user backend requests to the
77 /// frontend through the vhost-user backend communication channel. The forwarded messages will be
78 /// handled by the [FrontendReqHandler] server.
79 ///
80 /// [Backend]: struct.Backend.html
81 /// [FrontendReqHandler]: struct.FrontendReqHandler.html
82 #[derive(Clone)]
83 pub struct Backend {
84     // underlying Unix domain socket for communication
85     node: Arc<Mutex<BackendInternal>>,
86 }
87 
88 impl Backend {
new(ep: Endpoint<VhostUserMsgHeader<BackendReq>>) -> Self89     fn new(ep: Endpoint<VhostUserMsgHeader<BackendReq>>) -> Self {
90         Backend {
91             node: Arc::new(Mutex::new(BackendInternal {
92                 sock: ep,
93                 reply_ack_negotiated: false,
94                 shared_object_negotiated: false,
95                 error: None,
96             })),
97         }
98     }
99 
node(&self) -> MutexGuard<BackendInternal>100     fn node(&self) -> MutexGuard<BackendInternal> {
101         self.node.lock().unwrap()
102     }
103 
send_message<T: ByteValued>( &self, request: BackendReq, body: &T, fds: Option<&[RawFd]>, ) -> io::Result<u64>104     fn send_message<T: ByteValued>(
105         &self,
106         request: BackendReq,
107         body: &T,
108         fds: Option<&[RawFd]>,
109     ) -> io::Result<u64> {
110         self.node()
111             .send_message(request, body, fds)
112             .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))
113     }
114 
115     /// Create a new instance from a `UnixStream` object.
from_stream(sock: UnixStream) -> Self116     pub fn from_stream(sock: UnixStream) -> Self {
117         Self::new(Endpoint::<VhostUserMsgHeader<BackendReq>>::from_stream(
118             sock,
119         ))
120     }
121 
122     /// Set the negotiation state of the `VHOST_USER_PROTOCOL_F_REPLY_ACK` protocol feature.
123     ///
124     /// When the `VHOST_USER_PROTOCOL_F_REPLY_ACK` protocol feature has been negotiated,
125     /// the "REPLY_ACK" flag will be set in the message header for every backend to frontend request
126     /// message.
set_reply_ack_flag(&self, enable: bool)127     pub fn set_reply_ack_flag(&self, enable: bool) {
128         self.node().reply_ack_negotiated = enable;
129     }
130 
131     /// Set the negotiation state of the `VHOST_USER_PROTOCOL_F_SHARED_OBJECT` protocol feature.
132     ///
133     /// When the `VHOST_USER_PROTOCOL_F_SHARED_OBJECT` protocol feature has been negotiated,
134     /// the backend is allowed to send "SHARED_OBJECT_*" messages to the frontend.
set_shared_object_flag(&self, enable: bool)135     pub fn set_shared_object_flag(&self, enable: bool) {
136         self.node().shared_object_negotiated = enable;
137     }
138 
139     /// Mark endpoint as failed with specified error code.
set_failed(&self, error: i32)140     pub fn set_failed(&self, error: i32) {
141         self.node().error = Some(error);
142     }
143 }
144 
145 impl VhostUserFrontendReqHandler for Backend {
146     /// Forward vhost-user shared-object add request to the frontend.
shared_object_add(&self, uuid: &VhostUserSharedMsg) -> HandlerResult<u64>147     fn shared_object_add(&self, uuid: &VhostUserSharedMsg) -> HandlerResult<u64> {
148         if !self.node().shared_object_negotiated {
149             return Err(io::Error::new(
150                 io::ErrorKind::Other,
151                 "Shared Object feature not negotiated",
152             ));
153         }
154         self.send_message(BackendReq::SHARED_OBJECT_ADD, uuid, None)
155     }
156 
157     /// Forward vhost-user shared-object remove request to the frontend.
shared_object_remove(&self, uuid: &VhostUserSharedMsg) -> HandlerResult<u64>158     fn shared_object_remove(&self, uuid: &VhostUserSharedMsg) -> HandlerResult<u64> {
159         if !self.node().shared_object_negotiated {
160             return Err(io::Error::new(
161                 io::ErrorKind::Other,
162                 "Shared Object feature not negotiated",
163             ));
164         }
165         self.send_message(BackendReq::SHARED_OBJECT_REMOVE, uuid, None)
166     }
167 
168     /// Forward vhost-user shared-object lookup request to the frontend.
shared_object_lookup( &self, uuid: &VhostUserSharedMsg, fd: &dyn AsRawFd, ) -> HandlerResult<u64>169     fn shared_object_lookup(
170         &self,
171         uuid: &VhostUserSharedMsg,
172         fd: &dyn AsRawFd,
173     ) -> HandlerResult<u64> {
174         if !self.node().shared_object_negotiated {
175             return Err(io::Error::new(
176                 io::ErrorKind::Other,
177                 "Shared Object feature not negotiated",
178             ));
179         }
180         self.send_message(
181             BackendReq::SHARED_OBJECT_LOOKUP,
182             uuid,
183             Some(&[fd.as_raw_fd()]),
184         )
185     }
186 }
187 
188 #[cfg(test)]
189 mod tests {
190     use std::os::unix::io::AsRawFd;
191 
192     use super::*;
193 
194     #[test]
test_backend_req_set_failed()195     fn test_backend_req_set_failed() {
196         let (p1, _p2) = UnixStream::pair().unwrap();
197         let backend = Backend::from_stream(p1);
198 
199         assert!(backend.node().error.is_none());
200         backend.set_failed(libc::EAGAIN);
201         assert_eq!(backend.node().error, Some(libc::EAGAIN));
202     }
203 
204     #[test]
test_backend_req_send_failure()205     fn test_backend_req_send_failure() {
206         let (p1, _) = UnixStream::pair().unwrap();
207         let backend = Backend::from_stream(p1);
208 
209         backend.set_failed(libc::ECONNRESET);
210         backend
211             .shared_object_add(&VhostUserSharedMsg::default())
212             .unwrap_err();
213         backend
214             .shared_object_remove(&VhostUserSharedMsg::default())
215             .unwrap_err();
216         backend.node().error = None;
217     }
218 
219     #[test]
test_backend_req_recv_negative()220     fn test_backend_req_recv_negative() {
221         let (p1, p2) = UnixStream::pair().unwrap();
222         let backend = Backend::from_stream(p1);
223         let mut frontend = Endpoint::<VhostUserMsgHeader<BackendReq>>::from_stream(p2);
224 
225         let len = mem::size_of::<VhostUserSharedMsg>();
226         let mut hdr = VhostUserMsgHeader::new(
227             BackendReq::SHARED_OBJECT_ADD,
228             VhostUserHeaderFlag::REPLY.bits(),
229             len as u32,
230         );
231         let body = VhostUserU64::new(0);
232 
233         frontend
234             .send_message(&hdr, &body, Some(&[frontend.as_raw_fd()]))
235             .unwrap();
236         backend
237             .shared_object_add(&VhostUserSharedMsg::default())
238             .unwrap_err();
239 
240         backend.set_shared_object_flag(true);
241         backend
242             .shared_object_add(&VhostUserSharedMsg::default())
243             .unwrap();
244 
245         backend.set_reply_ack_flag(true);
246         backend
247             .shared_object_add(&VhostUserSharedMsg::default())
248             .unwrap_err();
249 
250         hdr.set_code(BackendReq::SHARED_OBJECT_REMOVE);
251         frontend.send_message(&hdr, &body, None).unwrap();
252         backend
253             .shared_object_add(&VhostUserSharedMsg::default())
254             .unwrap_err();
255         hdr.set_code(BackendReq::SHARED_OBJECT_ADD);
256 
257         let body = VhostUserU64::new(1);
258         frontend.send_message(&hdr, &body, None).unwrap();
259         backend
260             .shared_object_add(&VhostUserSharedMsg::default())
261             .unwrap_err();
262 
263         let body = VhostUserU64::new(0);
264         frontend.send_message(&hdr, &body, None).unwrap();
265         backend
266             .shared_object_add(&VhostUserSharedMsg::default())
267             .unwrap();
268     }
269 }
270