// Copyright 2021 The ChromiumOS Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. use std::io; use std::io::Cursor; use std::io::Read; use std::io::Write; use std::mem; use std::os::windows::io::AsRawHandle; use std::os::windows::io::RawHandle; use std::time::Duration; use data_model::DataInit; use once_cell::sync::Lazy; use serde::de::DeserializeOwned; use serde::Deserialize; use serde::Serialize; use serde::Serializer; use winapi::shared::winerror::ERROR_MORE_DATA; use crate::descriptor::AsRawDescriptor; use crate::descriptor::FromRawDescriptor; use crate::descriptor::SafeDescriptor; use crate::platform::deserialize_with_descriptors; use crate::platform::RawDescriptor; use crate::platform::SerializeDescriptors; use crate::tube::Error; use crate::tube::RecvTube; use crate::tube::Result; use crate::tube::SendTube; use crate::BlockingMode; use crate::CloseNotifier; use crate::EventToken; use crate::FramingMode; use crate::ReadNotifier; use crate::StreamChannel; /// Bidirectional tube that support both send and recv. /// /// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair /// (A, B). We wish to send B to another process, and communicate with it using A from the current /// process: /// 1. B's target_pid must be set to the current PID *before* serialization. There is a /// serialization hook that sets it to the current PID automatically if target_pid is unset. /// 2. A's target_pid must be set to the PID of the process where B was sent. /// /// If instead you are sending both A and B to separate processes, then: /// 1. A's target_pid must be set to B's pid, manually. /// 2. B's target_pid must be set to A's pid, manually. /// /// Automating all of this and getting a completely clean interface is tricky. We would need /// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync /// state about PIDs between the ends. There are alternatives like reusing the underlying /// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet /// to find a compelling solution that isn't a mess to implement. Suggestions are welcome. #[derive(Serialize, Deserialize, Debug)] pub struct Tube { socket: StreamChannel, // Default target_pid to current PID on serialization (see `Tube` comment header for details). #[serde(serialize_with = "set_tube_pid_on_serialize")] target_pid: Option, } /// For a Tube which has not had its target_pid set, when it is serialized, we should automatically /// default it to the current process, because the other end will be in the current process. fn set_tube_pid_on_serialize( existing_pid_value: &Option, serializer: S, ) -> std::result::Result where S: Serializer, { match existing_pid_value { Some(pid) => serializer.serialize_u32(*pid), None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())), } } #[derive(Copy, Clone, Debug)] #[repr(C)] struct MsgHeader { msg_json_size: usize, descriptor_json_size: usize, } // Safe because it only has data and has no implicit padding. unsafe impl DataInit for MsgHeader {} static DH_TUBE: Lazy>> = Lazy::new(|| sync::Mutex::new(None)); static ALIAS_PID: Lazy>> = Lazy::new(|| sync::Mutex::new(None)); /// Set a tube to delegate duplicate handle calls. pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) { DH_TUBE.lock().replace(dh_tube); } /// Set alias pid for use with a DuplicateHandleTube. pub fn set_alias_pid(alias_pid: u32) { ALIAS_PID.lock().replace(alias_pid); } impl Tube { /// Create a pair of connected tubes. Request is sent in one direction while response is /// received in the other direction. /// The result is in the form (server, client). pub fn pair() -> Result<(Tube, Tube)> { let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message) .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?; Ok((Tube::new(socket1), Tube::new(socket2))) } /// Create a pair of connected tubes with the specified buffer size. /// Request is sent in one direction while response is received in the other direction. /// The result is in the form (server, client). pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> { let (socket1, socket2) = StreamChannel::pair_with_buffer_size( BlockingMode::Blocking, FramingMode::Message, buffer_size, ) .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?; let tube1 = Tube::new(socket1); let tube2 = Tube::new(socket2); Ok((tube1, tube2)) } // Create a new `Tube`. pub fn new(socket: StreamChannel) -> Tube { Tube { socket, target_pid: None, } } pub(super) fn try_clone(&self) -> Result { Ok(Tube { socket: self.socket.try_clone().map_err(Error::Clone)?, target_pid: self.target_pid, }) } fn send_proto(&self, msg: &M) -> Result<()> { let bytes = msg.write_to_bytes().map_err(Error::Proto)?; let size_header = bytes.len(); let mut data_packet = Cursor::new(Vec::with_capacity(mem::size_of::() + size_header)); data_packet .write(&size_header.to_le_bytes()) .map_err(Error::SendIoBuf)?; data_packet.write(&bytes).map_err(Error::SendIoBuf)?; self.socket .write_immutable(&data_packet.into_inner()) .map_err(Error::SendIo)?; Ok(()) } fn recv_proto(&self) -> Result { let mut header_bytes = [0u8; mem::size_of::()]; perform_read(&|buf| (&self.socket).read(buf), &mut header_bytes).map_err(Error::Recv)?; let size_header = usize::from_le_bytes(header_bytes); let mut proto_bytes = vec![0u8; size_header]; perform_read(&|buf| (&self.socket).read(buf), &mut proto_bytes).map_err(Error::Recv)?; protobuf::Message::parse_from_bytes(&proto_bytes).map_err(Error::Proto) } pub fn send(&self, msg: &T) -> Result<()> { serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid) } pub fn recv(&self) -> Result { deserialize_and_recv(|buf| (&self.socket).read(buf)) } /// NOTE: On Windows this will only succeed if called on a server pipe. See #pair /// documentation to ensure you have a server pipe before calling. #[cfg(windows)] pub fn flush_blocking(&mut self) -> Result<()> { self.socket.flush_blocking().map_err(Error::Flush) } /// For Tubes that span processes, this method must be used to set the PID of the other end /// of the Tube, otherwise sending handles to the other end won't work. pub fn set_target_pid(&mut self, target_pid: u32) { self.target_pid = Some(target_pid); } /// Returns the PID of the process at the other end of the Tube, if any is set. pub fn target_pid(&self) -> Option { self.target_pid } /// TODO(b/145998747, b/184398671): this method should be removed. pub fn set_send_timeout(&self, _timeout: Option) -> Result<()> { unimplemented!("To be removed/refactored upstream."); } /// TODO(b/145998747, b/184398671): this method should be removed. pub fn set_recv_timeout(&self, _timeout: Option) -> Result<()> { unimplemented!("To be removed/refactored upstream."); } } pub fn serialize_and_send io::Result>( write_fn: F, msg: &T, target_pid: Option, ) -> Result<()> { let msg_serialize = SerializeDescriptors::new(&msg); let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?; let msg_descriptors = msg_serialize.into_descriptors(); let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len()); for desc in msg_descriptors { // Safe because these handles are guaranteed to be valid. Details: // 1. They come from base::descriptor_reflection::with_as_descriptor. // 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File, // SafeDescriptor). // 3. The owning object is borrowed by msg until sending is complete. duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize) } let descriptor_json = if duped_descriptors.is_empty() { None } else { Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?) }; let header = MsgHeader { msg_json_size: msg_json.len(), descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()), }; let mut data_packet = Cursor::new(Vec::with_capacity( header.as_slice().len() + header.msg_json_size + header.descriptor_json_size, )); data_packet .write(header.as_slice()) .map_err(Error::SendIoBuf)?; data_packet .write(msg_json.as_slice()) .map_err(Error::SendIoBuf)?; if let Some(descriptor_json) = descriptor_json { data_packet .write(descriptor_json.as_slice()) .map_err(Error::SendIoBuf)?; } // Multiple writers (producers) are safe because each write is atomic. let data_bytes = data_packet.into_inner(); write_fn(&data_bytes).map_err(Error::SendIo)?; Ok(()) } fn duplicate_handle(desc: RawHandle, target_pid: Option) -> Result { match target_pid { Some(pid) => match &*DH_TUBE.lock() { Some(tube) => tube.request_duplicate_handle(pid, desc), None => { win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor) } }, None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor), } } /// Reads a part of a Tube packet asserting that it was correctly read. This means: /// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer. /// We use this to ignore errors when reading the message header, which has the lengths we need /// to allocate our buffers for the remainder of the message. /// * We filled the supplied buffer. fn perform_read io::Result>( read_fn: &F, buf: &mut [u8], ) -> io::Result { let res = match read_fn(buf) { Ok(s) => Ok(s), Err(e) if e.raw_os_error() .map_or(false, |errno| errno == ERROR_MORE_DATA as i32) => { Ok(buf.len()) } Err(e) => Err(e), }; let bytes_read = res?; if bytes_read != buf.len() { Err(io::Error::new( io::ErrorKind::UnexpectedEof, "failed to fill whole buffer", )) } else { Ok(bytes_read) } } /// Deserializes a Tube packet by calling the supplied read function. This function MUST /// assert that the buffer was filled. pub fn deserialize_and_recv io::Result>( read_fn: F, ) -> Result { let mut header_bytes = vec![0u8; mem::size_of::()]; perform_read(&read_fn, header_bytes.as_mut_slice()).map_err(Error::Recv)?; // Safe because the header is always written by the send function, and only that function // writes to this channel. let header = MsgHeader::from_slice(header_bytes.as_slice()).expect("Tube header failed to deserialize."); let mut msg_json = vec![0u8; header.msg_json_size]; perform_read(&read_fn, msg_json.as_mut_slice()).map_err(Error::Recv)?; if msg_json.is_empty() { // This means we got a message header, but there is no json body (due to a zero size in // the header). This should never happen because it means the receiver is getting no // data whatsoever from the sender. return Err(Error::RecvUnexpectedEmptyBody); } let msg_descriptors: Vec = if header.descriptor_json_size > 0 { let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size]; perform_read(&read_fn, msg_descriptors_json.as_mut_slice()).map_err(Error::Recv)?; let descriptor_usizes: Vec = serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?; // Safe because the usizes are RawDescriptors that were converted to usize in the send // method. descriptor_usizes .iter() .map(|item| *item as RawDescriptor) .collect() } else { Vec::new() }; let mut msg_descriptors_safe = msg_descriptors .into_iter() .map(|v| { Some(unsafe { // Safe because the socket returns new fds that are owned locally by this scope. SafeDescriptor::from_raw_descriptor(v) }) }) .collect(); deserialize_with_descriptors( || serde_json::from_slice(&msg_json), &mut msg_descriptors_safe, ) .map_err(Error::Json) } #[derive(EventToken, Eq, PartialEq, Copy, Clone)] enum Token { SocketReady, } impl AsRawDescriptor for Tube { fn as_raw_descriptor(&self) -> RawDescriptor { self.socket.as_raw_descriptor() } } impl AsRawHandle for Tube { fn as_raw_handle(&self) -> RawHandle { self.as_raw_descriptor() } } impl ReadNotifier for Tube { fn get_read_notifier(&self) -> &dyn AsRawDescriptor { self.socket.get_read_notifier() } } impl CloseNotifier for Tube { fn get_close_notifier(&self) -> &dyn AsRawDescriptor { self.socket.get_close_notifier() } } impl AsRawDescriptor for SendTube { fn as_raw_descriptor(&self) -> RawDescriptor { self.0.as_raw_descriptor() } } impl AsRawDescriptor for RecvTube { fn as_raw_descriptor(&self) -> RawDescriptor { self.0.as_raw_descriptor() } } /// A request to duplicate a handle to a target process. #[derive(Serialize, Deserialize, Debug)] pub struct DuplicateHandleRequest { pub target_alias_pid: u32, pub handle: usize, } /// Contains a duplicated handle or None if an error occurred. #[derive(Serialize, Deserialize, Debug)] pub struct DuplicateHandleResponse { pub handle: Option, } /// Wrapper for tube which is used to delegate DuplicateHandle function calls to /// the broker process. #[derive(Serialize, Deserialize, Debug)] pub struct DuplicateHandleTube(Tube); impl DuplicateHandleTube { pub fn new(tube: Tube) -> Self { Self(tube) } pub fn request_duplicate_handle( &self, target_alias_pid: u32, handle: RawHandle, ) -> Result { let req = DuplicateHandleRequest { target_alias_pid, handle: handle as usize, }; self.0.send(&req)?; let res: DuplicateHandleResponse = self.0.recv()?; res.handle .map(|h| h as RawHandle) .ok_or(Error::BrokerDupDescriptor) } } /// Wrapper for Tube used for sending and recving protos. The main usecase is to send a message /// without serialization bloat caused from `serde-json`. pub struct ProtoTube(Tube); impl ProtoTube { pub fn pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)> { Tube::pair_with_buffer_size(size).map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2))) } pub fn send_proto(&self, msg: &M) -> Result<()> { self.0.send_proto(msg) } pub fn recv_proto(&self) -> Result { self.0.recv_proto() } } impl ReadNotifier for ProtoTube { fn get_read_notifier(&self) -> &dyn AsRawDescriptor { self.0.get_read_notifier() } }