• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io;
6 use std::io::Cursor;
7 use std::io::Read;
8 use std::io::Write;
9 use std::mem;
10 use std::os::windows::io::AsRawHandle;
11 use std::os::windows::io::RawHandle;
12 use std::time::Duration;
13 
14 use log::warn;
15 use serde::de::DeserializeOwned;
16 use serde::Deserialize;
17 use serde::Serialize;
18 use serde::Serializer;
19 use winapi::shared::winerror::ERROR_MORE_DATA;
20 use zerocopy::AsBytes;
21 use zerocopy::FromBytes;
22 use zerocopy::FromZeroes;
23 
24 use crate::descriptor::AsRawDescriptor;
25 use crate::descriptor::FromRawDescriptor;
26 use crate::descriptor::SafeDescriptor;
27 use crate::descriptor_reflection::deserialize_with_descriptors;
28 use crate::descriptor_reflection::SerializeDescriptors;
29 use crate::tube::Error;
30 use crate::tube::RecvTube;
31 use crate::tube::Result;
32 use crate::tube::SendTube;
33 use crate::BlockingMode;
34 use crate::CloseNotifier;
35 use crate::EventToken;
36 use crate::FramingMode;
37 use crate::PipeConnection;
38 use crate::RawDescriptor;
39 use crate::ReadNotifier;
40 use crate::StreamChannel;
41 
42 /// Bidirectional tube that support both send and recv.
43 ///
44 /// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair
45 /// (A, B). We wish to send B to another process, and communicate with it using A from the current
46 /// process:
47 ///     1. B's target_pid must be set to the current PID *before* serialization. There is a
48 ///        serialization hook that sets it to the current PID automatically if target_pid is unset.
49 ///     2. A's target_pid must be set to the PID of the process where B was sent.
50 ///
51 /// If instead you are sending both A and B to separate processes, then:
52 ///     1. A's target_pid must be set to B's pid, manually.
53 ///     2. B's target_pid must be set to A's pid, manually.
54 ///
55 /// Automating all of this and getting a completely clean interface is tricky. We would need
56 /// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync
57 /// state about PIDs between the ends. There are alternatives like reusing the underlying
58 /// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet
59 /// to find a compelling solution that isn't a mess to implement. Suggestions are welcome.
60 #[derive(Serialize, Deserialize, Debug)]
61 pub struct Tube {
62     socket: StreamChannel,
63 
64     // Default target_pid to current PID on serialization (see `Tube` comment header for details).
65     #[serde(serialize_with = "set_tube_pid_on_serialize")]
66     target_pid: Option<u32>,
67 }
68 
69 /// For a Tube which has not had its target_pid set, when it is serialized, we should automatically
70 /// default it to the current process, because the other end will be in the current process.
set_tube_pid_on_serialize<S>( existing_pid_value: &Option<u32>, serializer: S, ) -> std::result::Result<S::Ok, S::Error> where S: Serializer,71 fn set_tube_pid_on_serialize<S>(
72     existing_pid_value: &Option<u32>,
73     serializer: S,
74 ) -> std::result::Result<S::Ok, S::Error>
75 where
76     S: Serializer,
77 {
78     match existing_pid_value {
79         Some(pid) => serializer.serialize_u32(*pid),
80         None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())),
81     }
82 }
83 
84 #[derive(Copy, Clone, Debug, Default, AsBytes, FromZeroes, FromBytes)]
85 #[repr(C)]
86 struct MsgHeader {
87     msg_json_size: usize,
88     descriptor_json_size: usize,
89 }
90 
91 static DH_TUBE: sync::Mutex<Option<DuplicateHandleTube>> = sync::Mutex::new(None);
92 static ALIAS_PID: sync::Mutex<Option<u32>> = sync::Mutex::new(None);
93 
94 /// Set a tube to delegate duplicate handle calls.
set_duplicate_handle_tube(dh_tube: DuplicateHandleTube)95 pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) {
96     DH_TUBE.lock().replace(dh_tube);
97 }
98 
99 /// Set alias pid for use with a DuplicateHandleTube.
set_alias_pid(alias_pid: u32)100 pub fn set_alias_pid(alias_pid: u32) {
101     ALIAS_PID.lock().replace(alias_pid);
102 }
103 
104 impl Tube {
105     /// Create a pair of connected tubes. Request is sent in one direction while response is
106     /// received in the other direction.
107     /// The result is in the form (server, client).
pair() -> Result<(Tube, Tube)>108     pub fn pair() -> Result<(Tube, Tube)> {
109         let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
110             .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
111 
112         Ok((Tube::new(socket1), Tube::new(socket2)))
113     }
114 
115     /// Create a pair of connected tubes with the specified buffer size.
116     /// Request is sent in one direction while response is received in the other direction.
117     /// The result is in the form (server, client).
pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)>118     pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> {
119         let (socket1, socket2) = StreamChannel::pair_with_buffer_size(
120             BlockingMode::Blocking,
121             FramingMode::Message,
122             buffer_size,
123         )
124         .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
125         let tube1 = Tube::new(socket1);
126         let tube2 = Tube::new(socket2);
127         Ok((tube1, tube2))
128     }
129 
130     // Create a new `Tube`.
new(socket: StreamChannel) -> Tube131     pub fn new(socket: StreamChannel) -> Tube {
132         Tube {
133             socket,
134             target_pid: None,
135         }
136     }
137 
try_clone(&self) -> Result<Self>138     pub(crate) fn try_clone(&self) -> Result<Self> {
139         Ok(Tube {
140             socket: self.socket.try_clone().map_err(Error::Clone)?,
141             target_pid: self.target_pid,
142         })
143     }
144 
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>145     fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
146         let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
147         let size_header = bytes.len();
148 
149         let mut data_packet =
150             Cursor::new(Vec::with_capacity(mem::size_of::<usize>() + size_header));
151         data_packet
152             .write(&size_header.to_le_bytes())
153             .map_err(Error::from_send_io_buf_error)?;
154         data_packet.write(&bytes).map_err(Error::SendIoBuf)?;
155         self.socket
156             .write_immutable(&data_packet.into_inner())
157             .map_err(Error::from_send_error)?;
158 
159         Ok(())
160     }
161 
recv_proto<M: protobuf::Message>(&self) -> Result<M>162     fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
163         let mut header_bytes = [0u8; mem::size_of::<usize>()];
164         perform_read(&mut |buf| (&self.socket).read(buf), &mut header_bytes)
165             .map_err(Error::from_recv_io_error)?;
166         let size_header = usize::from_le_bytes(header_bytes);
167 
168         let mut proto_bytes = vec![0u8; size_header];
169         perform_read(&mut |buf| (&self.socket).read(buf), &mut proto_bytes)
170             .map_err(Error::from_recv_io_error)?;
171         protobuf::Message::parse_from_bytes(&proto_bytes).map_err(Error::Proto)
172     }
173 
send<T: Serialize>(&self, msg: &T) -> Result<()>174     pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
175         serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid)
176     }
177 
recv<T: DeserializeOwned>(&self) -> Result<T>178     pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
179         deserialize_and_recv(|buf| (&self.socket).read(buf))
180     }
181 
182     /// NOTE: On Windows this will only succeed if called on a server pipe. See #pair
183     /// documentation to ensure you have a server pipe before calling.
184     #[cfg(windows)]
flush_blocking(&mut self) -> Result<()>185     pub fn flush_blocking(&mut self) -> Result<()> {
186         self.socket.flush_blocking().map_err(Error::Flush)
187     }
188 
189     /// For Tubes that span processes, this method must be used to set the PID of the other end
190     /// of the Tube, otherwise sending handles to the other end won't work.
set_target_pid(&mut self, target_pid: u32)191     pub fn set_target_pid(&mut self, target_pid: u32) {
192         self.target_pid = Some(target_pid);
193     }
194 
195     /// Returns the PID of the process at the other end of the Tube, if any is set.
target_pid(&self) -> Option<u32>196     pub fn target_pid(&self) -> Option<u32> {
197         self.target_pid
198     }
199 
200     /// TODO(b/145998747, b/184398671): this method should be removed.
set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()>201     pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
202         unimplemented!("To be removed/refactored upstream.");
203     }
204 
205     /// TODO(b/145998747, b/184398671): this method should be removed.
set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()>206     pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
207         unimplemented!("To be removed/refactored upstream.");
208     }
209 }
210 
serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>( write_fn: F, msg: &T, target_pid: Option<u32>, ) -> Result<()>211 pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>(
212     write_fn: F,
213     msg: &T,
214     target_pid: Option<u32>,
215 ) -> Result<()> {
216     let msg_serialize = SerializeDescriptors::new(&msg);
217     let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
218     let msg_descriptors = msg_serialize.into_descriptors();
219 
220     let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len());
221     for desc in msg_descriptors {
222         // Safe because these handles are guaranteed to be valid. Details:
223         // 1. They come from base::descriptor_reflection::with_as_descriptor.
224         // 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File,
225         //    SafeDescriptor).
226         // 3. The owning object is borrowed by msg until sending is complete.
227         duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize)
228     }
229 
230     let descriptor_json = if duped_descriptors.is_empty() {
231         None
232     } else {
233         Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?)
234     };
235 
236     let header = MsgHeader {
237         msg_json_size: msg_json.len(),
238         descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()),
239     };
240 
241     let mut data_packet = Cursor::new(Vec::with_capacity(
242         header.as_bytes().len() + header.msg_json_size + header.descriptor_json_size,
243     ));
244     data_packet
245         .write(header.as_bytes())
246         .map_err(Error::SendIoBuf)?;
247     data_packet
248         .write(msg_json.as_slice())
249         .map_err(Error::SendIoBuf)?;
250     if let Some(descriptor_json) = descriptor_json {
251         data_packet
252             .write(descriptor_json.as_slice())
253             .map_err(Error::SendIoBuf)?;
254     }
255 
256     // Multiple writers (producers) are safe because each write is atomic.
257     let data_bytes = data_packet.into_inner();
258 
259     write_fn(&data_bytes).map_err(Error::from_send_error)?;
260     Ok(())
261 }
262 
duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle>263 fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> {
264     match target_pid {
265         Some(pid) => match &*DH_TUBE.lock() {
266             Some(tube) => tube.request_duplicate_handle(pid, desc),
267             None => {
268                 win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor)
269             }
270         },
271         None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor),
272     }
273 }
274 
275 /// Reads a part of a Tube packet asserting that it was correctly read. This means:
276 /// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer. We
277 ///   use this to ignore errors when reading the message header, which has the lengths we need to
278 ///   allocate our buffers for the remainder of the message.
279 /// * We filled the supplied buffer.
perform_read<F: FnMut(&mut [u8]) -> io::Result<usize>>( read_fn: &mut F, buf: &mut [u8], ) -> io::Result<usize>280 fn perform_read<F: FnMut(&mut [u8]) -> io::Result<usize>>(
281     read_fn: &mut F,
282     buf: &mut [u8],
283 ) -> io::Result<usize> {
284     let bytes_read = match read_fn(buf) {
285         Ok(s) => Ok(s),
286         Err(e)
287             if e.raw_os_error()
288                 .map_or(false, |errno| errno == ERROR_MORE_DATA as i32) =>
289         {
290             Ok(buf.len())
291         }
292         Err(e) => Err(e),
293     }?;
294 
295     if bytes_read != buf.len() {
296         Err(io::Error::new(
297             io::ErrorKind::UnexpectedEof,
298             format!(
299                 "failed to fill whole buffer, expected {} got {}",
300                 buf.len(),
301                 bytes_read
302             ),
303         ))
304     } else {
305         Ok(bytes_read)
306     }
307 }
308 
309 /// Deserializes a Tube packet by calling the supplied read function. This function MUST
310 /// assert that the buffer was filled.
deserialize_and_recv<T: DeserializeOwned, F: FnMut(&mut [u8]) -> io::Result<usize>>( mut read_fn: F, ) -> Result<T>311 pub fn deserialize_and_recv<T: DeserializeOwned, F: FnMut(&mut [u8]) -> io::Result<usize>>(
312     mut read_fn: F,
313 ) -> Result<T> {
314     let mut header = MsgHeader::default();
315     perform_read(&mut read_fn, header.as_bytes_mut()).map_err(Error::from_recv_io_error)?;
316 
317     let mut msg_json = vec![0u8; header.msg_json_size];
318     perform_read(&mut read_fn, msg_json.as_mut_slice()).map_err(Error::from_recv_io_error)?;
319 
320     if msg_json.is_empty() {
321         // This means we got a message header, but there is no json body (due to a zero size in
322         // the header). This should never happen because it means the receiver is getting no
323         // data whatsoever from the sender.
324         return Err(Error::RecvUnexpectedEmptyBody);
325     }
326 
327     let descriptor_usizes: Vec<usize> = if header.descriptor_json_size > 0 {
328         let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size];
329         perform_read(&mut read_fn, msg_descriptors_json.as_mut_slice())
330             .map_err(Error::from_recv_io_error)?;
331         serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?
332     } else {
333         Vec::new()
334     };
335 
336     let msg_descriptors = descriptor_usizes.into_iter().map(|item| {
337         // SAFETY: the usizes are RawDescriptors that were duplicated and converted to usize in the
338         // send method.
339         unsafe { SafeDescriptor::from_raw_descriptor(item as RawDescriptor) }
340     });
341 
342     deserialize_with_descriptors(|| serde_json::from_slice(&msg_json), msg_descriptors)
343         .map_err(Error::Json)
344 }
345 
346 #[derive(EventToken, Eq, PartialEq, Copy, Clone)]
347 enum Token {
348     SocketReady,
349 }
350 
351 impl AsRawDescriptor for Tube {
as_raw_descriptor(&self) -> RawDescriptor352     fn as_raw_descriptor(&self) -> RawDescriptor {
353         self.socket.as_raw_descriptor()
354     }
355 }
356 
357 impl AsRawHandle for Tube {
as_raw_handle(&self) -> RawHandle358     fn as_raw_handle(&self) -> RawHandle {
359         self.as_raw_descriptor()
360     }
361 }
362 
363 impl ReadNotifier for Tube {
get_read_notifier(&self) -> &dyn AsRawDescriptor364     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
365         self.socket.get_read_notifier()
366     }
367 }
368 
369 impl CloseNotifier for Tube {
get_close_notifier(&self) -> &dyn AsRawDescriptor370     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
371         self.socket.get_close_notifier()
372     }
373 }
374 
375 impl AsRawDescriptor for SendTube {
as_raw_descriptor(&self) -> RawDescriptor376     fn as_raw_descriptor(&self) -> RawDescriptor {
377         self.0.as_raw_descriptor()
378     }
379 }
380 
381 impl AsRawDescriptor for RecvTube {
as_raw_descriptor(&self) -> RawDescriptor382     fn as_raw_descriptor(&self) -> RawDescriptor {
383         self.0.as_raw_descriptor()
384     }
385 }
386 
387 impl CloseNotifier for SendTube {
get_close_notifier(&self) -> &dyn AsRawDescriptor388     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
389         self.0.get_close_notifier()
390     }
391 }
392 
393 impl CloseNotifier for RecvTube {
get_close_notifier(&self) -> &dyn AsRawDescriptor394     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
395         self.0.get_close_notifier()
396     }
397 }
398 
399 /// A request to duplicate a handle to a target process.
400 #[derive(Serialize, Deserialize, Debug)]
401 pub struct DuplicateHandleRequest {
402     pub target_alias_pid: u32,
403     pub handle: usize,
404 }
405 
406 /// Contains a duplicated handle or None if an error occurred.
407 #[derive(Serialize, Deserialize, Debug)]
408 pub struct DuplicateHandleResponse {
409     pub handle: Option<usize>,
410 }
411 
412 /// Wrapper for tube which is used to delegate DuplicateHandle function calls to
413 /// the broker process.
414 #[derive(Serialize, Deserialize, Debug)]
415 pub struct DuplicateHandleTube(Tube);
416 
417 impl DuplicateHandleTube {
new(tube: Tube) -> Self418     pub fn new(tube: Tube) -> Self {
419         Self(tube)
420     }
421 
request_duplicate_handle( &self, target_alias_pid: u32, handle: RawHandle, ) -> Result<RawHandle>422     pub fn request_duplicate_handle(
423         &self,
424         target_alias_pid: u32,
425         handle: RawHandle,
426     ) -> Result<RawHandle> {
427         let req = DuplicateHandleRequest {
428             target_alias_pid,
429             handle: handle as usize,
430         };
431         self.0.send(&req)?;
432         let res: DuplicateHandleResponse = self.0.recv()?;
433         res.handle
434             .map(|h| h as RawHandle)
435             .ok_or(Error::BrokerDupDescriptor)
436     }
437 }
438 
439 /// Wrapper for Tube used for sending and recving protos. The main usecase is to send a message
440 /// without serialization bloat caused from `serde-json`.
441 #[derive(Serialize, Deserialize)]
442 pub struct ProtoTube(Tube);
443 
444 impl ProtoTube {
pair() -> Result<(ProtoTube, ProtoTube)>445     pub fn pair() -> Result<(ProtoTube, ProtoTube)> {
446         Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
447     }
448 
pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)>449     pub fn pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)> {
450         Tube::pair_with_buffer_size(size).map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
451     }
452 
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>453     pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
454         self.0.send_proto(msg)
455     }
456 
recv_proto<M: protobuf::Message>(&self) -> Result<M>457     pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
458         self.0.recv_proto()
459     }
460 }
461 
462 impl ReadNotifier for ProtoTube {
get_read_notifier(&self) -> &dyn AsRawDescriptor463     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
464         self.0.get_read_notifier()
465     }
466 }
467 
468 impl AsRawDescriptor for ProtoTube {
as_raw_descriptor(&self) -> RawDescriptor469     fn as_raw_descriptor(&self) -> RawDescriptor {
470         self.0.as_raw_descriptor()
471     }
472 }
473 
474 /// A wrapper around a named pipe that uses Tube serialization.
475 ///
476 /// This limited form of `Tube` offers absolutely no notifier support, and can only send/recv
477 /// blocking messages.
478 pub struct PipeTube {
479     pipe: PipeConnection,
480 
481     // Default target_pid to current PID on serialization (see `Tube` comment header for details).
482     target_pid: Option<u32>,
483 }
484 
485 impl PipeTube {
from(pipe: PipeConnection, target_pid: Option<u32>) -> Self486     pub fn from(pipe: PipeConnection, target_pid: Option<u32>) -> Self {
487         Self { pipe, target_pid }
488     }
489 
send<T: Serialize>(&self, msg: &T) -> Result<()>490     pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
491         serialize_and_send(|buf| self.pipe.write(buf), msg, self.target_pid)
492     }
493 
recv<T: DeserializeOwned>(&self) -> Result<T>494     pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
495         deserialize_and_recv(|buf| {
496             // SAFETY:
497             // 1. We are reading bytes, so no matter what data is on the pipe, it is representable
498             //    as bytes.
499             // 2. A read is quantized in bytes, so no partial reads are possible.
500             unsafe { self.pipe.read(buf) }
501         })
502     }
503 }
504 
505 /// Wrapper around a Tube which is known to be the server end of a named pipe. This wrapper ensures
506 /// that the Tube is flushed before it is dropped.
507 pub struct FlushOnDropTube(pub Tube);
508 
509 impl FlushOnDropTube {
from(tube: Tube) -> Self510     pub fn from(tube: Tube) -> Self {
511         Self(tube)
512     }
513 }
514 
515 impl Drop for FlushOnDropTube {
drop(&mut self)516     fn drop(&mut self) {
517         if let Err(e) = self.0.flush_blocking() {
518             warn!("failed to flush Tube: {}", e)
519         }
520     }
521 }
522 
523 impl Error {
map_io_error(e: io::Error, err_ctor: fn(io::Error) -> Error) -> Error524     fn map_io_error(e: io::Error, err_ctor: fn(io::Error) -> Error) -> Error {
525         if e.kind() == io::ErrorKind::BrokenPipe {
526             Error::Disconnected
527         } else {
528             err_ctor(e)
529         }
530     }
531 
from_recv_io_error(e: io::Error) -> Error532     fn from_recv_io_error(e: io::Error) -> Error {
533         Self::map_io_error(e, Error::Recv)
534     }
535 
from_send_error(e: io::Error) -> Error536     fn from_send_error(e: io::Error) -> Error {
537         Self::map_io_error(e, Error::Send)
538     }
539 
from_send_io_buf_error(e: io::Error) -> Error540     fn from_send_io_buf_error(e: io::Error) -> Error {
541         Self::map_io_error(e, Error::SendIoBuf)
542     }
543 }
544