• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2020 Alibaba Cloud. All rights reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 
4 use std::io;
5 use std::mem;
6 use std::os::unix::io::{AsRawFd, RawFd};
7 use std::os::unix::net::UnixStream;
8 use std::sync::{Arc, Mutex, MutexGuard};
9 
10 use super::connection::Endpoint;
11 use super::message::*;
12 use super::{Error, HandlerResult, Result, VhostUserMasterReqHandler};
13 
14 use vm_memory::ByteValued;
15 
16 struct SlaveInternal {
17     sock: Endpoint<SlaveReq>,
18 
19     // Protocol feature VHOST_USER_PROTOCOL_F_REPLY_ACK has been negotiated.
20     reply_ack_negotiated: bool,
21 
22     // whether the endpoint has encountered any failure
23     error: Option<i32>,
24 }
25 
26 impl SlaveInternal {
check_state(&self) -> Result<u64>27     fn check_state(&self) -> Result<u64> {
28         match self.error {
29             Some(e) => Err(Error::SocketBroken(std::io::Error::from_raw_os_error(e))),
30             None => Ok(0),
31         }
32     }
33 
send_message<T: ByteValued>( &mut self, request: SlaveReq, body: &T, fds: Option<&[RawFd]>, ) -> Result<u64>34     fn send_message<T: ByteValued>(
35         &mut self,
36         request: SlaveReq,
37         body: &T,
38         fds: Option<&[RawFd]>,
39     ) -> Result<u64> {
40         self.check_state()?;
41 
42         let len = mem::size_of::<T>();
43         let mut hdr = VhostUserMsgHeader::new(request, 0, len as u32);
44         if self.reply_ack_negotiated {
45             hdr.set_need_reply(true);
46         }
47         self.sock.send_message(&hdr, body, fds)?;
48 
49         self.wait_for_ack(&hdr)
50     }
51 
wait_for_ack(&mut self, hdr: &VhostUserMsgHeader<SlaveReq>) -> Result<u64>52     fn wait_for_ack(&mut self, hdr: &VhostUserMsgHeader<SlaveReq>) -> Result<u64> {
53         self.check_state()?;
54         if !self.reply_ack_negotiated {
55             return Ok(0);
56         }
57 
58         let (reply, body, rfds) = self.sock.recv_body::<VhostUserU64>()?;
59         if !reply.is_reply_for(hdr) || rfds.is_some() || !body.is_valid() {
60             return Err(Error::InvalidMessage);
61         }
62         if body.value != 0 {
63             return Err(Error::MasterInternalError);
64         }
65 
66         Ok(body.value)
67     }
68 }
69 
70 /// Request proxy to send vhost-user slave requests to the master through the slave
71 /// communication channel.
72 ///
73 /// The [Slave] acts as a message proxy to forward vhost-user slave requests to the
74 /// master through the vhost-user slave communication channel. The forwarded messages will be
75 /// handled by the [MasterReqHandler] server.
76 ///
77 /// [Slave]: struct.Slave.html
78 /// [MasterReqHandler]: struct.MasterReqHandler.html
79 #[derive(Clone)]
80 pub struct Slave {
81     // underlying Unix domain socket for communication
82     node: Arc<Mutex<SlaveInternal>>,
83 }
84 
85 impl Slave {
new(ep: Endpoint<SlaveReq>) -> Self86     fn new(ep: Endpoint<SlaveReq>) -> Self {
87         Slave {
88             node: Arc::new(Mutex::new(SlaveInternal {
89                 sock: ep,
90                 reply_ack_negotiated: false,
91                 error: None,
92             })),
93         }
94     }
95 
node(&self) -> MutexGuard<SlaveInternal>96     fn node(&self) -> MutexGuard<SlaveInternal> {
97         self.node.lock().unwrap()
98     }
99 
send_message<T: ByteValued>( &self, request: SlaveReq, body: &T, fds: Option<&[RawFd]>, ) -> io::Result<u64>100     fn send_message<T: ByteValued>(
101         &self,
102         request: SlaveReq,
103         body: &T,
104         fds: Option<&[RawFd]>,
105     ) -> io::Result<u64> {
106         self.node()
107             .send_message(request, body, fds)
108             .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))
109     }
110 
111     /// Create a new instance from a `UnixStream` object.
from_stream(sock: UnixStream) -> Self112     pub fn from_stream(sock: UnixStream) -> Self {
113         Self::new(Endpoint::<SlaveReq>::from_stream(sock))
114     }
115 
116     /// Set the negotiation state of the `VHOST_USER_PROTOCOL_F_REPLY_ACK` protocol feature.
117     ///
118     /// When the `VHOST_USER_PROTOCOL_F_REPLY_ACK` protocol feature has been negotiated,
119     /// the "REPLY_ACK" flag will be set in the message header for every slave to master request
120     /// message.
set_reply_ack_flag(&self, enable: bool)121     pub fn set_reply_ack_flag(&self, enable: bool) {
122         self.node().reply_ack_negotiated = enable;
123     }
124 
125     /// Mark endpoint as failed with specified error code.
set_failed(&self, error: i32)126     pub fn set_failed(&self, error: i32) {
127         self.node().error = Some(error);
128     }
129 }
130 
131 impl VhostUserMasterReqHandler for Slave {
132     /// Forward vhost-user-fs map file requests to the slave.
fs_slave_map(&self, fs: &VhostUserFSSlaveMsg, fd: &dyn AsRawFd) -> HandlerResult<u64>133     fn fs_slave_map(&self, fs: &VhostUserFSSlaveMsg, fd: &dyn AsRawFd) -> HandlerResult<u64> {
134         self.send_message(SlaveReq::FS_MAP, fs, Some(&[fd.as_raw_fd()]))
135     }
136 
137     /// Forward vhost-user-fs unmap file requests to the master.
fs_slave_unmap(&self, fs: &VhostUserFSSlaveMsg) -> HandlerResult<u64>138     fn fs_slave_unmap(&self, fs: &VhostUserFSSlaveMsg) -> HandlerResult<u64> {
139         self.send_message(SlaveReq::FS_UNMAP, fs, None)
140     }
141 }
142 
143 #[cfg(test)]
144 mod tests {
145     use std::os::unix::io::AsRawFd;
146 
147     use super::*;
148 
149     #[test]
test_slave_req_set_failed()150     fn test_slave_req_set_failed() {
151         let (p1, _p2) = UnixStream::pair().unwrap();
152         let slave = Slave::from_stream(p1);
153 
154         assert!(slave.node().error.is_none());
155         slave.set_failed(libc::EAGAIN);
156         assert_eq!(slave.node().error, Some(libc::EAGAIN));
157     }
158 
159     #[test]
test_slave_req_send_failure()160     fn test_slave_req_send_failure() {
161         let (p1, p2) = UnixStream::pair().unwrap();
162         let slave = Slave::from_stream(p1);
163 
164         slave.set_failed(libc::ECONNRESET);
165         slave
166             .fs_slave_map(&VhostUserFSSlaveMsg::default(), &p2)
167             .unwrap_err();
168         slave
169             .fs_slave_unmap(&VhostUserFSSlaveMsg::default())
170             .unwrap_err();
171         slave.node().error = None;
172     }
173 
174     #[test]
test_slave_req_recv_negative()175     fn test_slave_req_recv_negative() {
176         let (p1, p2) = UnixStream::pair().unwrap();
177         let slave = Slave::from_stream(p1);
178         let mut master = Endpoint::<SlaveReq>::from_stream(p2);
179 
180         let len = mem::size_of::<VhostUserFSSlaveMsg>();
181         let mut hdr = VhostUserMsgHeader::new(
182             SlaveReq::FS_MAP,
183             VhostUserHeaderFlag::REPLY.bits(),
184             len as u32,
185         );
186         let body = VhostUserU64::new(0);
187 
188         master
189             .send_message(&hdr, &body, Some(&[master.as_raw_fd()]))
190             .unwrap();
191         slave
192             .fs_slave_map(&VhostUserFSSlaveMsg::default(), &master)
193             .unwrap();
194 
195         slave.set_reply_ack_flag(true);
196         slave
197             .fs_slave_map(&VhostUserFSSlaveMsg::default(), &master)
198             .unwrap_err();
199 
200         hdr.set_code(SlaveReq::FS_UNMAP);
201         master.send_message(&hdr, &body, None).unwrap();
202         slave
203             .fs_slave_map(&VhostUserFSSlaveMsg::default(), &master)
204             .unwrap_err();
205         hdr.set_code(SlaveReq::FS_MAP);
206 
207         let body = VhostUserU64::new(1);
208         master.send_message(&hdr, &body, None).unwrap();
209         slave
210             .fs_slave_map(&VhostUserFSSlaveMsg::default(), &master)
211             .unwrap_err();
212 
213         let body = VhostUserU64::new(0);
214         master.send_message(&hdr, &body, None).unwrap();
215         slave
216             .fs_slave_map(&VhostUserFSSlaveMsg::default(), &master)
217             .unwrap();
218     }
219 }
220