1 // Copyright (C) 2020 Alibaba Cloud. All rights reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 4 use std::io; 5 use std::mem; 6 use std::os::unix::io::{AsRawFd, RawFd}; 7 use std::os::unix::net::UnixStream; 8 use std::sync::{Arc, Mutex, MutexGuard}; 9 10 use super::connection::Endpoint; 11 use super::message::*; 12 use super::{Error, HandlerResult, Result, VhostUserFrontendReqHandler}; 13 14 use vm_memory::ByteValued; 15 16 struct BackendInternal { 17 sock: Endpoint<VhostUserMsgHeader<BackendReq>>, 18 19 // Protocol feature VHOST_USER_PROTOCOL_F_REPLY_ACK has been negotiated. 20 reply_ack_negotiated: bool, 21 22 // Protocol feature VHOST_USER_PROTOCOL_F_SHARED_OBJECT has been negotiated. 23 shared_object_negotiated: bool, 24 25 // whether the endpoint has encountered any failure 26 error: Option<i32>, 27 } 28 29 impl BackendInternal { check_state(&self) -> Result<u64>30 fn check_state(&self) -> Result<u64> { 31 match self.error { 32 Some(e) => Err(Error::SocketBroken(std::io::Error::from_raw_os_error(e))), 33 None => Ok(0), 34 } 35 } 36 send_message<T: ByteValued>( &mut self, request: BackendReq, body: &T, fds: Option<&[RawFd]>, ) -> Result<u64>37 fn send_message<T: ByteValued>( 38 &mut self, 39 request: BackendReq, 40 body: &T, 41 fds: Option<&[RawFd]>, 42 ) -> Result<u64> { 43 self.check_state()?; 44 45 let len = mem::size_of::<T>(); 46 let mut hdr = VhostUserMsgHeader::new(request, 0, len as u32); 47 if self.reply_ack_negotiated { 48 hdr.set_need_reply(true); 49 } 50 self.sock.send_message(&hdr, body, fds)?; 51 52 self.wait_for_ack(&hdr) 53 } 54 wait_for_ack(&mut self, hdr: &VhostUserMsgHeader<BackendReq>) -> Result<u64>55 fn wait_for_ack(&mut self, hdr: &VhostUserMsgHeader<BackendReq>) -> Result<u64> { 56 self.check_state()?; 57 if !self.reply_ack_negotiated { 58 return Ok(0); 59 } 60 61 let (reply, body, rfds) = self.sock.recv_body::<VhostUserU64>()?; 62 if !reply.is_reply_for(hdr) || rfds.is_some() || !body.is_valid() { 63 return Err(Error::InvalidMessage); 64 } 65 if body.value != 0 { 66 return Err(Error::FrontendInternalError); 67 } 68 69 Ok(body.value) 70 } 71 } 72 73 /// Request proxy to send vhost-user backend requests to the frontend through the backend 74 /// communication channel. 75 /// 76 /// The [Backend] acts as a message proxy to forward vhost-user backend requests to the 77 /// frontend through the vhost-user backend communication channel. The forwarded messages will be 78 /// handled by the [FrontendReqHandler] server. 79 /// 80 /// [Backend]: struct.Backend.html 81 /// [FrontendReqHandler]: struct.FrontendReqHandler.html 82 #[derive(Clone)] 83 pub struct Backend { 84 // underlying Unix domain socket for communication 85 node: Arc<Mutex<BackendInternal>>, 86 } 87 88 impl Backend { new(ep: Endpoint<VhostUserMsgHeader<BackendReq>>) -> Self89 fn new(ep: Endpoint<VhostUserMsgHeader<BackendReq>>) -> Self { 90 Backend { 91 node: Arc::new(Mutex::new(BackendInternal { 92 sock: ep, 93 reply_ack_negotiated: false, 94 shared_object_negotiated: false, 95 error: None, 96 })), 97 } 98 } 99 node(&self) -> MutexGuard<BackendInternal>100 fn node(&self) -> MutexGuard<BackendInternal> { 101 self.node.lock().unwrap() 102 } 103 send_message<T: ByteValued>( &self, request: BackendReq, body: &T, fds: Option<&[RawFd]>, ) -> io::Result<u64>104 fn send_message<T: ByteValued>( 105 &self, 106 request: BackendReq, 107 body: &T, 108 fds: Option<&[RawFd]>, 109 ) -> io::Result<u64> { 110 self.node() 111 .send_message(request, body, fds) 112 .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e))) 113 } 114 115 /// Create a new instance from a `UnixStream` object. from_stream(sock: UnixStream) -> Self116 pub fn from_stream(sock: UnixStream) -> Self { 117 Self::new(Endpoint::<VhostUserMsgHeader<BackendReq>>::from_stream( 118 sock, 119 )) 120 } 121 122 /// Set the negotiation state of the `VHOST_USER_PROTOCOL_F_REPLY_ACK` protocol feature. 123 /// 124 /// When the `VHOST_USER_PROTOCOL_F_REPLY_ACK` protocol feature has been negotiated, 125 /// the "REPLY_ACK" flag will be set in the message header for every backend to frontend request 126 /// message. set_reply_ack_flag(&self, enable: bool)127 pub fn set_reply_ack_flag(&self, enable: bool) { 128 self.node().reply_ack_negotiated = enable; 129 } 130 131 /// Set the negotiation state of the `VHOST_USER_PROTOCOL_F_SHARED_OBJECT` protocol feature. 132 /// 133 /// When the `VHOST_USER_PROTOCOL_F_SHARED_OBJECT` protocol feature has been negotiated, 134 /// the backend is allowed to send "SHARED_OBJECT_*" messages to the frontend. set_shared_object_flag(&self, enable: bool)135 pub fn set_shared_object_flag(&self, enable: bool) { 136 self.node().shared_object_negotiated = enable; 137 } 138 139 /// Mark endpoint as failed with specified error code. set_failed(&self, error: i32)140 pub fn set_failed(&self, error: i32) { 141 self.node().error = Some(error); 142 } 143 } 144 145 impl VhostUserFrontendReqHandler for Backend { 146 /// Forward vhost-user shared-object add request to the frontend. shared_object_add(&self, uuid: &VhostUserSharedMsg) -> HandlerResult<u64>147 fn shared_object_add(&self, uuid: &VhostUserSharedMsg) -> HandlerResult<u64> { 148 if !self.node().shared_object_negotiated { 149 return Err(io::Error::new( 150 io::ErrorKind::Other, 151 "Shared Object feature not negotiated", 152 )); 153 } 154 self.send_message(BackendReq::SHARED_OBJECT_ADD, uuid, None) 155 } 156 157 /// Forward vhost-user shared-object remove request to the frontend. shared_object_remove(&self, uuid: &VhostUserSharedMsg) -> HandlerResult<u64>158 fn shared_object_remove(&self, uuid: &VhostUserSharedMsg) -> HandlerResult<u64> { 159 if !self.node().shared_object_negotiated { 160 return Err(io::Error::new( 161 io::ErrorKind::Other, 162 "Shared Object feature not negotiated", 163 )); 164 } 165 self.send_message(BackendReq::SHARED_OBJECT_REMOVE, uuid, None) 166 } 167 168 /// Forward vhost-user shared-object lookup request to the frontend. shared_object_lookup( &self, uuid: &VhostUserSharedMsg, fd: &dyn AsRawFd, ) -> HandlerResult<u64>169 fn shared_object_lookup( 170 &self, 171 uuid: &VhostUserSharedMsg, 172 fd: &dyn AsRawFd, 173 ) -> HandlerResult<u64> { 174 if !self.node().shared_object_negotiated { 175 return Err(io::Error::new( 176 io::ErrorKind::Other, 177 "Shared Object feature not negotiated", 178 )); 179 } 180 self.send_message( 181 BackendReq::SHARED_OBJECT_LOOKUP, 182 uuid, 183 Some(&[fd.as_raw_fd()]), 184 ) 185 } 186 } 187 188 #[cfg(test)] 189 mod tests { 190 use std::os::unix::io::AsRawFd; 191 192 use super::*; 193 194 #[test] test_backend_req_set_failed()195 fn test_backend_req_set_failed() { 196 let (p1, _p2) = UnixStream::pair().unwrap(); 197 let backend = Backend::from_stream(p1); 198 199 assert!(backend.node().error.is_none()); 200 backend.set_failed(libc::EAGAIN); 201 assert_eq!(backend.node().error, Some(libc::EAGAIN)); 202 } 203 204 #[test] test_backend_req_send_failure()205 fn test_backend_req_send_failure() { 206 let (p1, _) = UnixStream::pair().unwrap(); 207 let backend = Backend::from_stream(p1); 208 209 backend.set_failed(libc::ECONNRESET); 210 backend 211 .shared_object_add(&VhostUserSharedMsg::default()) 212 .unwrap_err(); 213 backend 214 .shared_object_remove(&VhostUserSharedMsg::default()) 215 .unwrap_err(); 216 backend.node().error = None; 217 } 218 219 #[test] test_backend_req_recv_negative()220 fn test_backend_req_recv_negative() { 221 let (p1, p2) = UnixStream::pair().unwrap(); 222 let backend = Backend::from_stream(p1); 223 let mut frontend = Endpoint::<VhostUserMsgHeader<BackendReq>>::from_stream(p2); 224 225 let len = mem::size_of::<VhostUserSharedMsg>(); 226 let mut hdr = VhostUserMsgHeader::new( 227 BackendReq::SHARED_OBJECT_ADD, 228 VhostUserHeaderFlag::REPLY.bits(), 229 len as u32, 230 ); 231 let body = VhostUserU64::new(0); 232 233 frontend 234 .send_message(&hdr, &body, Some(&[frontend.as_raw_fd()])) 235 .unwrap(); 236 backend 237 .shared_object_add(&VhostUserSharedMsg::default()) 238 .unwrap_err(); 239 240 backend.set_shared_object_flag(true); 241 backend 242 .shared_object_add(&VhostUserSharedMsg::default()) 243 .unwrap(); 244 245 backend.set_reply_ack_flag(true); 246 backend 247 .shared_object_add(&VhostUserSharedMsg::default()) 248 .unwrap_err(); 249 250 hdr.set_code(BackendReq::SHARED_OBJECT_REMOVE); 251 frontend.send_message(&hdr, &body, None).unwrap(); 252 backend 253 .shared_object_add(&VhostUserSharedMsg::default()) 254 .unwrap_err(); 255 hdr.set_code(BackendReq::SHARED_OBJECT_ADD); 256 257 let body = VhostUserU64::new(1); 258 frontend.send_message(&hdr, &body, None).unwrap(); 259 backend 260 .shared_object_add(&VhostUserSharedMsg::default()) 261 .unwrap_err(); 262 263 let body = VhostUserU64::new(0); 264 frontend.send_message(&hdr, &body, None).unwrap(); 265 backend 266 .shared_object_add(&VhostUserSharedMsg::default()) 267 .unwrap(); 268 } 269 } 270