1 // Copyright (C) 2020 Alibaba Cloud. All rights reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 4 use std::io; 5 use std::mem; 6 use std::os::unix::io::{AsRawFd, RawFd}; 7 use std::os::unix::net::UnixStream; 8 use std::sync::{Arc, Mutex, MutexGuard}; 9 10 use super::connection::Endpoint; 11 use super::message::*; 12 use super::{Error, HandlerResult, Result, VhostUserMasterReqHandler}; 13 14 use vm_memory::ByteValued; 15 16 struct SlaveInternal { 17 sock: Endpoint<SlaveReq>, 18 19 // Protocol feature VHOST_USER_PROTOCOL_F_REPLY_ACK has been negotiated. 20 reply_ack_negotiated: bool, 21 22 // whether the endpoint has encountered any failure 23 error: Option<i32>, 24 } 25 26 impl SlaveInternal { check_state(&self) -> Result<u64>27 fn check_state(&self) -> Result<u64> { 28 match self.error { 29 Some(e) => Err(Error::SocketBroken(std::io::Error::from_raw_os_error(e))), 30 None => Ok(0), 31 } 32 } 33 send_message<T: ByteValued>( &mut self, request: SlaveReq, body: &T, fds: Option<&[RawFd]>, ) -> Result<u64>34 fn send_message<T: ByteValued>( 35 &mut self, 36 request: SlaveReq, 37 body: &T, 38 fds: Option<&[RawFd]>, 39 ) -> Result<u64> { 40 self.check_state()?; 41 42 let len = mem::size_of::<T>(); 43 let mut hdr = VhostUserMsgHeader::new(request, 0, len as u32); 44 if self.reply_ack_negotiated { 45 hdr.set_need_reply(true); 46 } 47 self.sock.send_message(&hdr, body, fds)?; 48 49 self.wait_for_ack(&hdr) 50 } 51 wait_for_ack(&mut self, hdr: &VhostUserMsgHeader<SlaveReq>) -> Result<u64>52 fn wait_for_ack(&mut self, hdr: &VhostUserMsgHeader<SlaveReq>) -> Result<u64> { 53 self.check_state()?; 54 if !self.reply_ack_negotiated { 55 return Ok(0); 56 } 57 58 let (reply, body, rfds) = self.sock.recv_body::<VhostUserU64>()?; 59 if !reply.is_reply_for(hdr) || rfds.is_some() || !body.is_valid() { 60 return Err(Error::InvalidMessage); 61 } 62 if body.value != 0 { 63 return Err(Error::MasterInternalError); 64 } 65 66 Ok(body.value) 67 } 68 } 69 70 /// Request proxy to send vhost-user slave requests to the master through the slave 71 /// communication channel. 72 /// 73 /// The [Slave] acts as a message proxy to forward vhost-user slave requests to the 74 /// master through the vhost-user slave communication channel. The forwarded messages will be 75 /// handled by the [MasterReqHandler] server. 76 /// 77 /// [Slave]: struct.Slave.html 78 /// [MasterReqHandler]: struct.MasterReqHandler.html 79 #[derive(Clone)] 80 pub struct Slave { 81 // underlying Unix domain socket for communication 82 node: Arc<Mutex<SlaveInternal>>, 83 } 84 85 impl Slave { new(ep: Endpoint<SlaveReq>) -> Self86 fn new(ep: Endpoint<SlaveReq>) -> Self { 87 Slave { 88 node: Arc::new(Mutex::new(SlaveInternal { 89 sock: ep, 90 reply_ack_negotiated: false, 91 error: None, 92 })), 93 } 94 } 95 node(&self) -> MutexGuard<SlaveInternal>96 fn node(&self) -> MutexGuard<SlaveInternal> { 97 self.node.lock().unwrap() 98 } 99 send_message<T: ByteValued>( &self, request: SlaveReq, body: &T, fds: Option<&[RawFd]>, ) -> io::Result<u64>100 fn send_message<T: ByteValued>( 101 &self, 102 request: SlaveReq, 103 body: &T, 104 fds: Option<&[RawFd]>, 105 ) -> io::Result<u64> { 106 self.node() 107 .send_message(request, body, fds) 108 .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e))) 109 } 110 111 /// Create a new instance from a `UnixStream` object. from_stream(sock: UnixStream) -> Self112 pub fn from_stream(sock: UnixStream) -> Self { 113 Self::new(Endpoint::<SlaveReq>::from_stream(sock)) 114 } 115 116 /// Set the negotiation state of the `VHOST_USER_PROTOCOL_F_REPLY_ACK` protocol feature. 117 /// 118 /// When the `VHOST_USER_PROTOCOL_F_REPLY_ACK` protocol feature has been negotiated, 119 /// the "REPLY_ACK" flag will be set in the message header for every slave to master request 120 /// message. set_reply_ack_flag(&self, enable: bool)121 pub fn set_reply_ack_flag(&self, enable: bool) { 122 self.node().reply_ack_negotiated = enable; 123 } 124 125 /// Mark endpoint as failed with specified error code. set_failed(&self, error: i32)126 pub fn set_failed(&self, error: i32) { 127 self.node().error = Some(error); 128 } 129 } 130 131 impl VhostUserMasterReqHandler for Slave { 132 /// Forward vhost-user-fs map file requests to the slave. fs_slave_map(&self, fs: &VhostUserFSSlaveMsg, fd: &dyn AsRawFd) -> HandlerResult<u64>133 fn fs_slave_map(&self, fs: &VhostUserFSSlaveMsg, fd: &dyn AsRawFd) -> HandlerResult<u64> { 134 self.send_message(SlaveReq::FS_MAP, fs, Some(&[fd.as_raw_fd()])) 135 } 136 137 /// Forward vhost-user-fs unmap file requests to the master. fs_slave_unmap(&self, fs: &VhostUserFSSlaveMsg) -> HandlerResult<u64>138 fn fs_slave_unmap(&self, fs: &VhostUserFSSlaveMsg) -> HandlerResult<u64> { 139 self.send_message(SlaveReq::FS_UNMAP, fs, None) 140 } 141 } 142 143 #[cfg(test)] 144 mod tests { 145 use std::os::unix::io::AsRawFd; 146 147 use super::*; 148 149 #[test] test_slave_req_set_failed()150 fn test_slave_req_set_failed() { 151 let (p1, _p2) = UnixStream::pair().unwrap(); 152 let slave = Slave::from_stream(p1); 153 154 assert!(slave.node().error.is_none()); 155 slave.set_failed(libc::EAGAIN); 156 assert_eq!(slave.node().error, Some(libc::EAGAIN)); 157 } 158 159 #[test] test_slave_req_send_failure()160 fn test_slave_req_send_failure() { 161 let (p1, p2) = UnixStream::pair().unwrap(); 162 let slave = Slave::from_stream(p1); 163 164 slave.set_failed(libc::ECONNRESET); 165 slave 166 .fs_slave_map(&VhostUserFSSlaveMsg::default(), &p2) 167 .unwrap_err(); 168 slave 169 .fs_slave_unmap(&VhostUserFSSlaveMsg::default()) 170 .unwrap_err(); 171 slave.node().error = None; 172 } 173 174 #[test] test_slave_req_recv_negative()175 fn test_slave_req_recv_negative() { 176 let (p1, p2) = UnixStream::pair().unwrap(); 177 let slave = Slave::from_stream(p1); 178 let mut master = Endpoint::<SlaveReq>::from_stream(p2); 179 180 let len = mem::size_of::<VhostUserFSSlaveMsg>(); 181 let mut hdr = VhostUserMsgHeader::new( 182 SlaveReq::FS_MAP, 183 VhostUserHeaderFlag::REPLY.bits(), 184 len as u32, 185 ); 186 let body = VhostUserU64::new(0); 187 188 master 189 .send_message(&hdr, &body, Some(&[master.as_raw_fd()])) 190 .unwrap(); 191 slave 192 .fs_slave_map(&VhostUserFSSlaveMsg::default(), &master) 193 .unwrap(); 194 195 slave.set_reply_ack_flag(true); 196 slave 197 .fs_slave_map(&VhostUserFSSlaveMsg::default(), &master) 198 .unwrap_err(); 199 200 hdr.set_code(SlaveReq::FS_UNMAP); 201 master.send_message(&hdr, &body, None).unwrap(); 202 slave 203 .fs_slave_map(&VhostUserFSSlaveMsg::default(), &master) 204 .unwrap_err(); 205 hdr.set_code(SlaveReq::FS_MAP); 206 207 let body = VhostUserU64::new(1); 208 master.send_message(&hdr, &body, None).unwrap(); 209 slave 210 .fs_slave_map(&VhostUserFSSlaveMsg::default(), &master) 211 .unwrap_err(); 212 213 let body = VhostUserU64::new(0); 214 master.send_message(&hdr, &body, None).unwrap(); 215 slave 216 .fs_slave_map(&VhostUserFSSlaveMsg::default(), &master) 217 .unwrap(); 218 } 219 } 220