1 // Copyright 2021 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::io::IoSlice; 6 use std::io::IoSliceMut; 7 use std::os::unix::prelude::AsRawFd; 8 use std::os::unix::prelude::RawFd; 9 use std::time::Duration; 10 11 use serde::de::DeserializeOwned; 12 use serde::Deserialize; 13 use serde::Serialize; 14 15 use crate::descriptor::AsRawDescriptor; 16 use crate::descriptor::FromRawDescriptor; 17 use crate::descriptor::SafeDescriptor; 18 use crate::platform::deserialize_with_descriptors; 19 use crate::platform::SerializeDescriptors; 20 use crate::tube::Error; 21 use crate::tube::RecvTube; 22 use crate::tube::Result; 23 use crate::tube::SendTube; 24 use crate::BlockingMode; 25 use crate::FramingMode; 26 use crate::RawDescriptor; 27 use crate::ReadNotifier; 28 use crate::ScmSocket; 29 use crate::StreamChannel; 30 use crate::UnixSeqpacket; 31 32 // This size matches the inline buffer size of CmsgBuffer. 33 const TUBE_MAX_FDS: usize = 32; 34 35 /// Bidirectional tube that support both send and recv. 36 #[derive(Serialize, Deserialize)] 37 pub struct Tube { 38 socket: StreamChannel, 39 } 40 41 impl Tube { 42 /// Create a pair of connected tubes. Request is sent in one direction while response is in the 43 /// other direction. pair() -> Result<(Tube, Tube)>44 pub fn pair() -> Result<(Tube, Tube)> { 45 let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message) 46 .map_err(|errno| Error::Pair(std::io::Error::from(errno)))?; 47 let tube1 = Tube::new(socket1)?; 48 let tube2 = Tube::new(socket2)?; 49 Ok((tube1, tube2)) 50 } 51 52 /// Create a new `Tube` from a `StreamChannel`. 53 /// The StreamChannel must use FramingMode::Message (meaning, must use a SOCK_SEQPACKET as the 54 /// underlying socket type), otherwise, this method returns an error. new(socket: StreamChannel) -> Result<Tube>55 pub fn new(socket: StreamChannel) -> Result<Tube> { 56 match socket.get_framing_mode() { 57 FramingMode::Message => Ok(Tube { socket }), 58 FramingMode::Byte => Err(Error::InvalidFramingMode), 59 } 60 } 61 62 /// Create a new `Tube` from a UnixSeqpacket. The StreamChannel is implicitly constructed to 63 /// have the right FramingMode by being constructed from a UnixSeqpacket. new_from_unix_seqpacket(sock: UnixSeqpacket) -> Tube64 pub fn new_from_unix_seqpacket(sock: UnixSeqpacket) -> Tube { 65 Tube { 66 socket: StreamChannel::from_unix_seqpacket(sock), 67 } 68 } 69 70 /// DO NOT USE this method directly as it will become private soon (b/221484449). Use a 71 /// directional Tube pair instead. 72 #[deprecated] try_clone(&self) -> Result<Self>73 pub fn try_clone(&self) -> Result<Self> { 74 self.socket 75 .try_clone() 76 .map(Tube::new) 77 .map_err(Error::Clone)? 78 } 79 send<T: Serialize>(&self, msg: &T) -> Result<()>80 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> { 81 let msg_serialize = SerializeDescriptors::new(&msg); 82 let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?; 83 let msg_descriptors = msg_serialize.into_descriptors(); 84 85 if msg_descriptors.len() > TUBE_MAX_FDS { 86 return Err(Error::SendTooManyFds); 87 } 88 89 self.socket 90 .send_with_fds(&[IoSlice::new(&msg_json)], &msg_descriptors) 91 .map_err(Error::Send)?; 92 Ok(()) 93 } 94 recv<T: DeserializeOwned>(&self) -> Result<T>95 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> { 96 let msg_size = self.socket.peek_size().map_err(Error::Recv)?; 97 // This buffer is the right size, as the size received in peek_size() represents the size 98 // of only the message itself and not the file descriptors. The descriptors are stored 99 // separately in msghdr::msg_control. 100 let mut msg_json = vec![0u8; msg_size]; 101 102 let mut msg_descriptors_full = [0; TUBE_MAX_FDS]; 103 104 let (msg_json_size, descriptor_size) = self 105 .socket 106 .recv_with_fds(IoSliceMut::new(&mut msg_json), &mut msg_descriptors_full) 107 .map_err(Error::Send)?; 108 109 if msg_json_size == 0 { 110 return Err(Error::Disconnected); 111 } 112 113 let mut msg_descriptors_safe = msg_descriptors_full[..descriptor_size] 114 .iter() 115 .map(|v| { 116 Some(unsafe { 117 // Safe because the socket returns new fds that are owned locally by this scope. 118 SafeDescriptor::from_raw_descriptor(*v) 119 }) 120 }) 121 .collect(); 122 123 deserialize_with_descriptors( 124 || serde_json::from_slice(&msg_json[0..msg_json_size]), 125 &mut msg_descriptors_safe, 126 ) 127 .map_err(Error::Json) 128 } 129 set_send_timeout(&self, timeout: Option<Duration>) -> Result<()>130 pub fn set_send_timeout(&self, timeout: Option<Duration>) -> Result<()> { 131 self.socket 132 .set_write_timeout(timeout) 133 .map_err(Error::SetSendTimeout) 134 } 135 set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()>136 pub fn set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()> { 137 self.socket 138 .set_read_timeout(timeout) 139 .map_err(Error::SetRecvTimeout) 140 } 141 } 142 143 impl AsRawDescriptor for Tube { as_raw_descriptor(&self) -> RawDescriptor144 fn as_raw_descriptor(&self) -> RawDescriptor { 145 self.socket.as_raw_descriptor() 146 } 147 } 148 149 impl AsRawFd for Tube { as_raw_fd(&self) -> RawFd150 fn as_raw_fd(&self) -> RawFd { 151 self.socket.as_raw_fd() 152 } 153 } 154 155 impl ReadNotifier for Tube { get_read_notifier(&self) -> &dyn AsRawDescriptor156 fn get_read_notifier(&self) -> &dyn AsRawDescriptor { 157 &self.socket 158 } 159 } 160 161 impl AsRawDescriptor for SendTube { as_raw_descriptor(&self) -> RawDescriptor162 fn as_raw_descriptor(&self) -> RawDescriptor { 163 self.0.as_raw_descriptor() 164 } 165 } 166 167 impl AsRawDescriptor for RecvTube { as_raw_descriptor(&self) -> RawDescriptor168 fn as_raw_descriptor(&self) -> RawDescriptor { 169 self.0.as_raw_descriptor() 170 } 171 } 172