• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Chromium OS Authors. All rights reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 
4 //! Windows specific code that keeps rest of the code in the crate platform independent.
5 
6 use std::cmp::min;
7 use std::fs::File;
8 use std::io::IoSliceMut;
9 use std::path::Path;
10 use std::ptr::copy_nonoverlapping;
11 
12 use base::AsRawDescriptor;
13 use base::CloseNotifier;
14 use base::FromRawDescriptor;
15 use base::RawDescriptor;
16 use base::ReadNotifier;
17 use base::SafeDescriptor;
18 use base::Tube;
19 use serde::Deserialize;
20 use serde::Serialize;
21 use tube_transporter::packed_tube;
22 
23 use crate::Error;
24 use crate::Frontend;
25 use crate::FrontendServer;
26 use crate::Result;
27 
28 /// Alias to enable platform independent code.
29 pub type SystemStream = Tube;
30 
31 pub use TubePlatformConnection as PlatformConnection;
32 
33 #[derive(Serialize, Deserialize)]
34 struct RawDescriptorContainer {
35     #[serde(with = "base::with_raw_descriptor")]
36     rd: RawDescriptor,
37 }
38 
39 #[derive(Serialize, Deserialize)]
40 struct Message {
41     rds: Vec<RawDescriptorContainer>,
42     data: Vec<u8>,
43 }
44 
45 /// Tube based vhost-user connection.
46 pub struct TubePlatformConnection {
47     tube: Tube,
48 }
49 
50 impl TubePlatformConnection {
get_tube(&self) -> &Tube51     pub(crate) fn get_tube(&self) -> &Tube {
52         &self.tube
53     }
54 }
55 
56 impl From<Tube> for TubePlatformConnection {
from(tube: Tube) -> Self57     fn from(tube: Tube) -> Self {
58         Self { tube }
59     }
60 }
61 
62 impl TubePlatformConnection {
connect<P: AsRef<Path>>(_path: P) -> Result<Self>63     pub fn connect<P: AsRef<Path>>(_path: P) -> Result<Self> {
64         unimplemented!("connections not supported on Tubes")
65     }
66 
67     /// Sends a single message over the socket with optional attached file descriptors.
68     ///
69     /// - `hdr`: vhost message header
70     /// - `body`: vhost message body (may be empty to send a header-only message)
71     /// - `payload`: additional bytes to append to `body` (may be empty)
send_message( &self, hdr: &[u8], body: &[u8], payload: &[u8], rds: Option<&[RawDescriptor]>, ) -> Result<()>72     pub fn send_message(
73         &self,
74         hdr: &[u8],
75         body: &[u8],
76         payload: &[u8],
77         rds: Option<&[RawDescriptor]>,
78     ) -> Result<()> {
79         let hdr_msg = Message {
80             rds: rds
81                 .unwrap_or(&[])
82                 .iter()
83                 .map(|rd| RawDescriptorContainer { rd: *rd })
84                 .collect(),
85             data: hdr.to_vec(),
86         };
87 
88         let mut body_data = Vec::with_capacity(body.len() + payload.len());
89         body_data.extend_from_slice(body);
90         body_data.extend_from_slice(payload);
91         let body_msg = Message {
92             rds: Vec::new(),
93             data: body_data,
94         };
95 
96         // We send the header and the body separately here. This is necessary on Windows. Otherwise
97         // the recv side cannot read the header independently (the transport is message oriented).
98         self.tube.send(&hdr_msg)?;
99         if !body_msg.data.is_empty() {
100             self.tube.send(&body_msg)?;
101         }
102 
103         Ok(())
104     }
105 
106     /// Reads bytes from the tube into the given scatter/gather vectors with optional attached
107     /// file.
108     ///
109     /// The underlying communication channel is a Tube. Providing too little recv buffer space will
110     /// cause data to get dropped (with an error). This is tricky to fix with Tube backing our
111     /// transport layer, and as far as we can tell, is not exercised in practice.
112     ///
113     /// # Return:
114     /// * - (number of bytes received, [received files]) on success
115     /// * - RecvBufferTooSmall: Input bufs is too small for the received buffer.
116     /// * - TubeError: tube related errors.
recv_into_bufs( &self, bufs: &mut [IoSliceMut], _allow_rds: bool, ) -> Result<(usize, Option<Vec<File>>)>117     pub fn recv_into_bufs(
118         &self,
119         bufs: &mut [IoSliceMut],
120         _allow_rds: bool,
121     ) -> Result<(usize, Option<Vec<File>>)> {
122         // TODO(b/221882601): implement "allow_rds"
123 
124         let msg: Message = self.tube.recv()?;
125 
126         let files = match msg.rds.len() {
127             0 => None,
128             _ => Some(
129                 msg.rds
130                     .iter()
131                     .map(|r|
132                         // SAFETY:
133                         // Safe because we own r.rd and it is guaranteed valid.
134                         unsafe { File::from_raw_descriptor(r.rd) })
135                     .collect::<Vec<File>>(),
136             ),
137         };
138 
139         let mut bytes_read = 0;
140         for dest_iov in bufs.iter_mut() {
141             if bytes_read >= msg.data.len() {
142                 // We've read all the available data into the iovecs.
143                 break;
144             }
145 
146             let copy_count = min(dest_iov.len(), msg.data.len() - bytes_read);
147 
148             // SAFETY:
149             // Safe because:
150             //      1) msg.data and dest_iov do not overlap.
151             //      2) copy_count is bounded by dest_iov's length and msg.data.len() so we can't
152             //         overrun.
153             unsafe {
154                 copy_nonoverlapping(
155                     msg.data.as_ptr().add(bytes_read),
156                     dest_iov.as_mut_ptr(),
157                     copy_count,
158                 )
159             };
160             bytes_read += copy_count;
161         }
162 
163         if bytes_read != msg.data.len() {
164             // User didn't supply enough iov space.
165             return Err(Error::RecvBufferTooSmall {
166                 got: bytes_read,
167                 want: msg.data.len(),
168             });
169         }
170 
171         Ok((bytes_read, files))
172     }
173 }
174 
175 /// Convert a`SafeDescriptor` to a `Tube`.
176 ///
177 /// # Safety
178 ///
179 /// `fd` must represent a packed tube.
to_system_stream(fd: SafeDescriptor) -> Result<SystemStream>180 pub unsafe fn to_system_stream(fd: SafeDescriptor) -> Result<SystemStream> {
181     // SAFETY: Safe because the file represents a packed tube.
182     let tube = unsafe { packed_tube::unpack(fd).expect("unpacked Tube") };
183     Ok(tube)
184 }
185 
186 impl AsRawDescriptor for TubePlatformConnection {
187     /// WARNING: this function does not return a waitable descriptor! Use base::ReadNotifier
188     /// instead.
as_raw_descriptor(&self) -> RawDescriptor189     fn as_raw_descriptor(&self) -> RawDescriptor {
190         self.tube.as_raw_descriptor()
191     }
192 }
193 
194 impl<S: Frontend> FrontendServer<S> {
195     /// Create a `FrontendServer` that uses a Tube internally. Must specify the backend process
196     /// which will receive the Tube.
197     ///
198     /// The returned `SafeDescriptor` is the client side of the tube and should be sent to the
199     /// backend using [BackendClient::set_slave_request_fd()].
200     ///
201     /// [BackendClient::set_slave_request_fd()]: struct.BackendClient.html#method.set_slave_request_fd
with_tube(backend: S, backend_pid: u32) -> Result<(Self, SafeDescriptor)>202     pub fn with_tube(backend: S, backend_pid: u32) -> Result<(Self, SafeDescriptor)> {
203         let (tx, rx) = SystemStream::pair()?;
204         // SAFETY:
205         // Safe because we expect the tube to be unpacked in the other process.
206         let tx = unsafe { packed_tube::pack(tx, backend_pid).expect("packed tube") };
207         Ok((Self::new(backend, rx)?, tx))
208     }
209 }
210 
211 impl<S: Frontend> ReadNotifier for FrontendServer<S> {
212     /// Used for polling.
get_read_notifier(&self) -> &dyn AsRawDescriptor213     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
214         self.sub_sock.0.get_tube().get_read_notifier()
215     }
216 }
217 
218 impl<S: Frontend> CloseNotifier for FrontendServer<S> {
219     /// Used for closing.
get_close_notifier(&self) -> &dyn AsRawDescriptor220     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
221         self.sub_sock.0.get_tube().get_close_notifier()
222     }
223 }
224 
225 #[cfg(test)]
226 pub(crate) mod tests {
227     use crate::backend_client::BackendClient;
228     use crate::backend_server::Backend;
229     use crate::backend_server::BackendServer;
230     use crate::message::FrontendReq;
231     use crate::Connection;
232     use crate::SystemStream;
233 
create_pair() -> (BackendClient, Connection<FrontendReq>)234     pub(crate) fn create_pair() -> (BackendClient, Connection<FrontendReq>) {
235         let (client_tube, server_tube) = SystemStream::pair().unwrap();
236         let backend_client = BackendClient::from_stream(client_tube);
237         (backend_client, Connection::from(server_tube))
238     }
239 
create_connection_pair() -> (Connection<FrontendReq>, Connection<FrontendReq>)240     pub(crate) fn create_connection_pair() -> (Connection<FrontendReq>, Connection<FrontendReq>) {
241         let (client_tube, server_tube) = SystemStream::pair().unwrap();
242         let backend_connection = Connection::<FrontendReq>::from(client_tube);
243         (backend_connection, Connection::from(server_tube))
244     }
245 
create_client_server_pair<S>(backend: S) -> (BackendClient, BackendServer<S>) where S: Backend,246     pub(crate) fn create_client_server_pair<S>(backend: S) -> (BackendClient, BackendServer<S>)
247     where
248         S: Backend,
249     {
250         let (client_tube, server_tube) = SystemStream::pair().unwrap();
251         let backend_client = BackendClient::from_stream(client_tube);
252         (
253             backend_client,
254             BackendServer::<S>::from_stream(server_tube, backend),
255         )
256     }
257 }
258