• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io;
6 use std::io::Cursor;
7 use std::io::Read;
8 use std::io::Write;
9 use std::mem;
10 use std::os::windows::io::AsRawHandle;
11 use std::os::windows::io::RawHandle;
12 use std::time::Duration;
13 
14 use data_model::DataInit;
15 use once_cell::sync::Lazy;
16 use serde::de::DeserializeOwned;
17 use serde::Deserialize;
18 use serde::Serialize;
19 use serde::Serializer;
20 use winapi::shared::winerror::ERROR_MORE_DATA;
21 
22 use crate::descriptor::AsRawDescriptor;
23 use crate::descriptor::FromRawDescriptor;
24 use crate::descriptor::SafeDescriptor;
25 use crate::platform::deserialize_with_descriptors;
26 use crate::platform::RawDescriptor;
27 use crate::platform::SerializeDescriptors;
28 use crate::tube::Error;
29 use crate::tube::RecvTube;
30 use crate::tube::Result;
31 use crate::tube::SendTube;
32 use crate::BlockingMode;
33 use crate::CloseNotifier;
34 use crate::EventToken;
35 use crate::FramingMode;
36 use crate::ReadNotifier;
37 use crate::StreamChannel;
38 
39 /// Bidirectional tube that support both send and recv.
40 ///
41 /// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair
42 /// (A, B). We wish to send B to another process, and communicate with it using A from the current
43 /// process:
44 ///     1. B's target_pid must be set to the current PID *before* serialization. There is a
45 ///        serialization hook that sets it to the current PID automatically if target_pid is unset.
46 ///     2. A's target_pid must be set to the PID of the process where B was sent.
47 ///
48 /// If instead you are sending both A and B to separate processes, then:
49 ///     1. A's target_pid must be set to B's pid, manually.
50 ///     2. B's target_pid must be set to A's pid, manually.
51 ///
52 /// Automating all of this and getting a completely clean interface is tricky. We would need
53 /// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync
54 /// state about PIDs between the ends. There are alternatives like reusing the underlying
55 /// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet
56 /// to find a compelling solution that isn't a mess to implement. Suggestions are welcome.
57 #[derive(Serialize, Deserialize, Debug)]
58 pub struct Tube {
59     socket: StreamChannel,
60 
61     // Default target_pid to current PID on serialization (see `Tube` comment header for details).
62     #[serde(serialize_with = "set_tube_pid_on_serialize")]
63     target_pid: Option<u32>,
64 }
65 
66 /// For a Tube which has not had its target_pid set, when it is serialized, we should automatically
67 /// default it to the current process, because the other end will be in the current process.
set_tube_pid_on_serialize<S>( existing_pid_value: &Option<u32>, serializer: S, ) -> std::result::Result<S::Ok, S::Error> where S: Serializer,68 fn set_tube_pid_on_serialize<S>(
69     existing_pid_value: &Option<u32>,
70     serializer: S,
71 ) -> std::result::Result<S::Ok, S::Error>
72 where
73     S: Serializer,
74 {
75     match existing_pid_value {
76         Some(pid) => serializer.serialize_u32(*pid),
77         None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())),
78     }
79 }
80 
81 #[derive(Copy, Clone, Debug)]
82 #[repr(C)]
83 struct MsgHeader {
84     msg_json_size: usize,
85     descriptor_json_size: usize,
86 }
87 
88 // Safe because it only has data and has no implicit padding.
89 unsafe impl DataInit for MsgHeader {}
90 
91 static DH_TUBE: Lazy<sync::Mutex<Option<DuplicateHandleTube>>> =
92     Lazy::new(|| sync::Mutex::new(None));
93 static ALIAS_PID: Lazy<sync::Mutex<Option<u32>>> = Lazy::new(|| sync::Mutex::new(None));
94 
95 /// Set a tube to delegate duplicate handle calls.
set_duplicate_handle_tube(dh_tube: DuplicateHandleTube)96 pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) {
97     DH_TUBE.lock().replace(dh_tube);
98 }
99 
100 /// Set alias pid for use with a DuplicateHandleTube.
set_alias_pid(alias_pid: u32)101 pub fn set_alias_pid(alias_pid: u32) {
102     ALIAS_PID.lock().replace(alias_pid);
103 }
104 
105 impl Tube {
106     /// Create a pair of connected tubes. Request is sent in one direction while response is
107     /// received in the other direction.
108     /// The result is in the form (server, client).
pair() -> Result<(Tube, Tube)>109     pub fn pair() -> Result<(Tube, Tube)> {
110         let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
111             .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
112 
113         Ok((Tube::new(socket1), Tube::new(socket2)))
114     }
115 
116     /// Create a pair of connected tubes with the specified buffer size.
117     /// Request is sent in one direction while response is received in the other direction.
118     /// The result is in the form (server, client).
pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)>119     pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> {
120         let (socket1, socket2) = StreamChannel::pair_with_buffer_size(
121             BlockingMode::Blocking,
122             FramingMode::Message,
123             buffer_size,
124         )
125         .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
126         let tube1 = Tube::new(socket1);
127         let tube2 = Tube::new(socket2);
128         Ok((tube1, tube2))
129     }
130 
131     // Create a new `Tube`.
new(socket: StreamChannel) -> Tube132     pub fn new(socket: StreamChannel) -> Tube {
133         Tube {
134             socket,
135             target_pid: None,
136         }
137     }
138 
try_clone(&self) -> Result<Self>139     pub(super) fn try_clone(&self) -> Result<Self> {
140         Ok(Tube {
141             socket: self.socket.try_clone().map_err(Error::Clone)?,
142             target_pid: self.target_pid,
143         })
144     }
145 
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>146     fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
147         let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
148         let size_header = bytes.len();
149 
150         let mut data_packet =
151             Cursor::new(Vec::with_capacity(mem::size_of::<usize>() + size_header));
152         data_packet
153             .write(&size_header.to_le_bytes())
154             .map_err(Error::SendIoBuf)?;
155         data_packet.write(&bytes).map_err(Error::SendIoBuf)?;
156         self.socket
157             .write_immutable(&data_packet.into_inner())
158             .map_err(Error::SendIo)?;
159 
160         Ok(())
161     }
162 
recv_proto<M: protobuf::Message>(&self) -> Result<M>163     fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
164         let mut header_bytes = [0u8; mem::size_of::<usize>()];
165         perform_read(&|buf| (&self.socket).read(buf), &mut header_bytes).map_err(Error::Recv)?;
166         let size_header = usize::from_le_bytes(header_bytes);
167 
168         let mut proto_bytes = vec![0u8; size_header];
169         perform_read(&|buf| (&self.socket).read(buf), &mut proto_bytes).map_err(Error::Recv)?;
170         protobuf::Message::parse_from_bytes(&proto_bytes).map_err(Error::Proto)
171     }
172 
send<T: Serialize>(&self, msg: &T) -> Result<()>173     pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
174         serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid)
175     }
176 
recv<T: DeserializeOwned>(&self) -> Result<T>177     pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
178         deserialize_and_recv(|buf| (&self.socket).read(buf))
179     }
180 
181     /// NOTE: On Windows this will only succeed if called on a server pipe. See #pair
182     /// documentation to ensure you have a server pipe before calling.
183     #[cfg(windows)]
flush_blocking(&mut self) -> Result<()>184     pub fn flush_blocking(&mut self) -> Result<()> {
185         self.socket.flush_blocking().map_err(Error::Flush)
186     }
187 
188     /// For Tubes that span processes, this method must be used to set the PID of the other end
189     /// of the Tube, otherwise sending handles to the other end won't work.
set_target_pid(&mut self, target_pid: u32)190     pub fn set_target_pid(&mut self, target_pid: u32) {
191         self.target_pid = Some(target_pid);
192     }
193 
194     /// Returns the PID of the process at the other end of the Tube, if any is set.
target_pid(&self) -> Option<u32>195     pub fn target_pid(&self) -> Option<u32> {
196         self.target_pid
197     }
198 
199     /// TODO(b/145998747, b/184398671): this method should be removed.
set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()>200     pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
201         unimplemented!("To be removed/refactored upstream.");
202     }
203 
204     /// TODO(b/145998747, b/184398671): this method should be removed.
set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()>205     pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
206         unimplemented!("To be removed/refactored upstream.");
207     }
208 }
209 
serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>( write_fn: F, msg: &T, target_pid: Option<u32>, ) -> Result<()>210 pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>(
211     write_fn: F,
212     msg: &T,
213     target_pid: Option<u32>,
214 ) -> Result<()> {
215     let msg_serialize = SerializeDescriptors::new(&msg);
216     let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
217     let msg_descriptors = msg_serialize.into_descriptors();
218 
219     let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len());
220     for desc in msg_descriptors {
221         // Safe because these handles are guaranteed to be valid. Details:
222         // 1. They come from base::descriptor_reflection::with_as_descriptor.
223         // 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File,
224         //    SafeDescriptor).
225         // 3. The owning object is borrowed by msg until sending is complete.
226         duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize)
227     }
228 
229     let descriptor_json = if duped_descriptors.is_empty() {
230         None
231     } else {
232         Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?)
233     };
234 
235     let header = MsgHeader {
236         msg_json_size: msg_json.len(),
237         descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()),
238     };
239 
240     let mut data_packet = Cursor::new(Vec::with_capacity(
241         header.as_slice().len() + header.msg_json_size + header.descriptor_json_size,
242     ));
243     data_packet
244         .write(header.as_slice())
245         .map_err(Error::SendIoBuf)?;
246     data_packet
247         .write(msg_json.as_slice())
248         .map_err(Error::SendIoBuf)?;
249     if let Some(descriptor_json) = descriptor_json {
250         data_packet
251             .write(descriptor_json.as_slice())
252             .map_err(Error::SendIoBuf)?;
253     }
254 
255     // Multiple writers (producers) are safe because each write is atomic.
256     let data_bytes = data_packet.into_inner();
257 
258     write_fn(&data_bytes).map_err(Error::SendIo)?;
259     Ok(())
260 }
261 
duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle>262 fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> {
263     match target_pid {
264         Some(pid) => match &*DH_TUBE.lock() {
265             Some(tube) => tube.request_duplicate_handle(pid, desc),
266             None => {
267                 win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor)
268             }
269         },
270         None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor),
271     }
272 }
273 
274 /// Reads a part of a Tube packet asserting that it was correctly read. This means:
275 /// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer.
276 ///   We use this to ignore errors when reading the message header, which has the lengths we need
277 ///   to allocate our buffers for the remainder of the message.
278 /// * We filled the supplied buffer.
perform_read<F: Fn(&mut [u8]) -> io::Result<usize>>( read_fn: &F, buf: &mut [u8], ) -> io::Result<usize>279 fn perform_read<F: Fn(&mut [u8]) -> io::Result<usize>>(
280     read_fn: &F,
281     buf: &mut [u8],
282 ) -> io::Result<usize> {
283     let res = match read_fn(buf) {
284         Ok(s) => Ok(s),
285         Err(e)
286             if e.raw_os_error()
287                 .map_or(false, |errno| errno == ERROR_MORE_DATA as i32) =>
288         {
289             Ok(buf.len())
290         }
291         Err(e) => Err(e),
292     };
293 
294     let bytes_read = res?;
295     if bytes_read != buf.len() {
296         Err(io::Error::new(
297             io::ErrorKind::UnexpectedEof,
298             "failed to fill whole buffer",
299         ))
300     } else {
301         Ok(bytes_read)
302     }
303 }
304 
305 /// Deserializes a Tube packet by calling the supplied read function. This function MUST
306 /// assert that the buffer was filled.
deserialize_and_recv<T: DeserializeOwned, F: Fn(&mut [u8]) -> io::Result<usize>>( read_fn: F, ) -> Result<T>307 pub fn deserialize_and_recv<T: DeserializeOwned, F: Fn(&mut [u8]) -> io::Result<usize>>(
308     read_fn: F,
309 ) -> Result<T> {
310     let mut header_bytes = vec![0u8; mem::size_of::<MsgHeader>()];
311     perform_read(&read_fn, header_bytes.as_mut_slice()).map_err(Error::Recv)?;
312 
313     // Safe because the header is always written by the send function, and only that function
314     // writes to this channel.
315     let header =
316         MsgHeader::from_slice(header_bytes.as_slice()).expect("Tube header failed to deserialize.");
317 
318     let mut msg_json = vec![0u8; header.msg_json_size];
319     perform_read(&read_fn, msg_json.as_mut_slice()).map_err(Error::Recv)?;
320 
321     if msg_json.is_empty() {
322         // This means we got a message header, but there is no json body (due to a zero size in
323         // the header). This should never happen because it means the receiver is getting no
324         // data whatsoever from the sender.
325         return Err(Error::RecvUnexpectedEmptyBody);
326     }
327 
328     let msg_descriptors: Vec<RawDescriptor> = if header.descriptor_json_size > 0 {
329         let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size];
330         perform_read(&read_fn, msg_descriptors_json.as_mut_slice()).map_err(Error::Recv)?;
331         let descriptor_usizes: Vec<usize> =
332             serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?;
333 
334         // Safe because the usizes are RawDescriptors that were converted to usize in the send
335         // method.
336         descriptor_usizes
337             .iter()
338             .map(|item| *item as RawDescriptor)
339             .collect()
340     } else {
341         Vec::new()
342     };
343 
344     let mut msg_descriptors_safe = msg_descriptors
345         .into_iter()
346         .map(|v| {
347             Some(unsafe {
348                 // Safe because the socket returns new fds that are owned locally by this scope.
349                 SafeDescriptor::from_raw_descriptor(v)
350             })
351         })
352         .collect();
353 
354     deserialize_with_descriptors(
355         || serde_json::from_slice(&msg_json),
356         &mut msg_descriptors_safe,
357     )
358     .map_err(Error::Json)
359 }
360 
361 #[derive(EventToken, Eq, PartialEq, Copy, Clone)]
362 enum Token {
363     SocketReady,
364 }
365 
366 impl AsRawDescriptor for Tube {
as_raw_descriptor(&self) -> RawDescriptor367     fn as_raw_descriptor(&self) -> RawDescriptor {
368         self.socket.as_raw_descriptor()
369     }
370 }
371 
372 impl AsRawHandle for Tube {
as_raw_handle(&self) -> RawHandle373     fn as_raw_handle(&self) -> RawHandle {
374         self.as_raw_descriptor()
375     }
376 }
377 
378 impl ReadNotifier for Tube {
get_read_notifier(&self) -> &dyn AsRawDescriptor379     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
380         self.socket.get_read_notifier()
381     }
382 }
383 
384 impl CloseNotifier for Tube {
get_close_notifier(&self) -> &dyn AsRawDescriptor385     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
386         self.socket.get_close_notifier()
387     }
388 }
389 
390 impl AsRawDescriptor for SendTube {
as_raw_descriptor(&self) -> RawDescriptor391     fn as_raw_descriptor(&self) -> RawDescriptor {
392         self.0.as_raw_descriptor()
393     }
394 }
395 
396 impl AsRawDescriptor for RecvTube {
as_raw_descriptor(&self) -> RawDescriptor397     fn as_raw_descriptor(&self) -> RawDescriptor {
398         self.0.as_raw_descriptor()
399     }
400 }
401 
402 /// A request to duplicate a handle to a target process.
403 #[derive(Serialize, Deserialize, Debug)]
404 pub struct DuplicateHandleRequest {
405     pub target_alias_pid: u32,
406     pub handle: usize,
407 }
408 
409 /// Contains a duplicated handle or None if an error occurred.
410 #[derive(Serialize, Deserialize, Debug)]
411 pub struct DuplicateHandleResponse {
412     pub handle: Option<usize>,
413 }
414 
415 /// Wrapper for tube which is used to delegate DuplicateHandle function calls to
416 /// the broker process.
417 #[derive(Serialize, Deserialize, Debug)]
418 pub struct DuplicateHandleTube(Tube);
419 
420 impl DuplicateHandleTube {
new(tube: Tube) -> Self421     pub fn new(tube: Tube) -> Self {
422         Self(tube)
423     }
424 
request_duplicate_handle( &self, target_alias_pid: u32, handle: RawHandle, ) -> Result<RawHandle>425     pub fn request_duplicate_handle(
426         &self,
427         target_alias_pid: u32,
428         handle: RawHandle,
429     ) -> Result<RawHandle> {
430         let req = DuplicateHandleRequest {
431             target_alias_pid,
432             handle: handle as usize,
433         };
434         self.0.send(&req)?;
435         let res: DuplicateHandleResponse = self.0.recv()?;
436         res.handle
437             .map(|h| h as RawHandle)
438             .ok_or(Error::BrokerDupDescriptor)
439     }
440 }
441 
442 /// Wrapper for Tube used for sending and recving protos. The main usecase is to send a message
443 /// without serialization bloat caused from `serde-json`.
444 pub struct ProtoTube(Tube);
445 
446 impl ProtoTube {
pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)>447     pub fn pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)> {
448         Tube::pair_with_buffer_size(size).map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
449     }
450 
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>451     pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
452         self.0.send_proto(msg)
453     }
454 
recv_proto<M: protobuf::Message>(&self) -> Result<M>455     pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
456         self.0.recv_proto()
457     }
458 }
459 
460 impl ReadNotifier for ProtoTube {
get_read_notifier(&self) -> &dyn AsRawDescriptor461     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
462         self.0.get_read_notifier()
463     }
464 }
465