1 // Copyright 2021 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::os::unix::prelude::AsRawFd; 6 use std::os::unix::prelude::RawFd; 7 use std::time::Duration; 8 9 use serde::de::DeserializeOwned; 10 use serde::Deserialize; 11 use serde::Serialize; 12 13 use crate::descriptor::AsRawDescriptor; 14 use crate::descriptor_reflection::deserialize_with_descriptors; 15 use crate::descriptor_reflection::SerializeDescriptors; 16 use crate::handle_eintr; 17 use crate::tube::Error; 18 use crate::tube::RecvTube; 19 use crate::tube::Result; 20 use crate::tube::SendTube; 21 use crate::RawDescriptor; 22 use crate::ReadNotifier; 23 use crate::ScmSocket; 24 use crate::UnixSeqpacket; 25 use crate::SCM_SOCKET_MAX_FD_COUNT; 26 27 // This size matches the inline buffer size of CmsgBuffer. 28 const TUBE_MAX_FDS: usize = 32; 29 30 /// Bidirectional tube that support both send and recv. 31 #[derive(Serialize, Deserialize)] 32 pub struct Tube { 33 socket: ScmSocket<UnixSeqpacket>, 34 } 35 36 impl Tube { 37 /// Create a pair of connected tubes. Request is sent in one direction while response is in the 38 /// other direction. pair() -> Result<(Tube, Tube)>39 pub fn pair() -> Result<(Tube, Tube)> { 40 let (socket1, socket2) = UnixSeqpacket::pair().map_err(Error::Pair)?; 41 let tube1 = Tube::try_from(socket1)?; 42 let tube2 = Tube::try_from(socket2)?; 43 Ok((tube1, tube2)) 44 } 45 46 /// DO NOT USE this method directly as it will become private soon (b/221484449). Use a 47 /// directional Tube pair instead. 48 #[deprecated] try_clone(&self) -> Result<Self>49 pub fn try_clone(&self) -> Result<Self> { 50 self.socket 51 .inner() 52 .try_clone() 53 .map_err(Error::Clone)? 54 .try_into() 55 } 56 57 /// Sends a message via a Tube. 58 /// The number of file descriptors that this method can send is limited to `TUBE_MAX_FDS`. 59 /// If you want to send more descriptors, use `send_with_max_fds` instead. send<T: Serialize>(&self, msg: &T) -> Result<()>60 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> { 61 self.send_with_max_fds(msg, TUBE_MAX_FDS) 62 } 63 64 /// Sends a message with at most `max_fds` file descriptors via a Tube. 65 /// Note that `max_fds` must not exceed `SCM_SOCKET_MAX_FD_COUNT` (= 253). send_with_max_fds<T: Serialize>(&self, msg: &T, max_fds: usize) -> Result<()>66 pub fn send_with_max_fds<T: Serialize>(&self, msg: &T, max_fds: usize) -> Result<()> { 67 if max_fds > SCM_SOCKET_MAX_FD_COUNT { 68 return Err(Error::SendTooManyFds); 69 } 70 let msg_serialize = SerializeDescriptors::new(&msg); 71 let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?; 72 let msg_descriptors = msg_serialize.into_descriptors(); 73 74 if msg_descriptors.len() > max_fds { 75 return Err(Error::SendTooManyFds); 76 } 77 78 handle_eintr!(self.socket.send_with_fds(&msg_json, &msg_descriptors)) 79 .map_err(Error::Send)?; 80 Ok(()) 81 } 82 83 /// Recieves a message from a Tube. 84 /// If the sender sent file descriptors more than TUBE_MAX_FDS with `send_with_max_fds`, use 85 /// `recv_with_max_fds` instead. recv<T: DeserializeOwned>(&self) -> Result<T>86 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> { 87 self.recv_with_max_fds(TUBE_MAX_FDS) 88 } 89 90 /// Recieves a message with at most `max_fds` file descriptors from a Tube. recv_with_max_fds<T: DeserializeOwned>(&self, max_fds: usize) -> Result<T>91 pub fn recv_with_max_fds<T: DeserializeOwned>(&self, max_fds: usize) -> Result<T> { 92 if max_fds > SCM_SOCKET_MAX_FD_COUNT { 93 return Err(Error::RecvTooManyFds); 94 } 95 96 // WARNING: The `cros_async` and `base_tokio` tube wrappers both assume that, if the tube 97 // is readable, then a call to `Tube::recv` will not block (which ought to be true since we 98 // use SOCK_SEQPACKET and a single recvmsg call currently). 99 100 let msg_size = 101 handle_eintr!(self.socket.inner().next_packet_size()).map_err(Error::Recv)?; 102 // This buffer is the right size, as the size received in next_packet_size() represents the 103 // size of only the message itself and not the file descriptors. The descriptors are stored 104 // separately in msghdr::msg_control. 105 let mut msg_json = vec![0u8; msg_size]; 106 107 let (msg_json_size, msg_descriptors) = 108 handle_eintr!(self.socket.recv_with_fds(&mut msg_json, max_fds)) 109 .map_err(Error::Recv)?; 110 111 if msg_json_size == 0 { 112 return Err(Error::Disconnected); 113 } 114 115 deserialize_with_descriptors( 116 || serde_json::from_slice(&msg_json[0..msg_json_size]), 117 msg_descriptors, 118 ) 119 .map_err(Error::Json) 120 } 121 set_send_timeout(&self, timeout: Option<Duration>) -> Result<()>122 pub fn set_send_timeout(&self, timeout: Option<Duration>) -> Result<()> { 123 self.socket 124 .inner() 125 .set_write_timeout(timeout) 126 .map_err(Error::SetSendTimeout) 127 } 128 set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()>129 pub fn set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()> { 130 self.socket 131 .inner() 132 .set_read_timeout(timeout) 133 .map_err(Error::SetRecvTimeout) 134 } 135 136 #[cfg(feature = "proto_tube")] send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>137 fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> { 138 let bytes = msg.write_to_bytes().map_err(Error::Proto)?; 139 let no_fds: [RawFd; 0] = []; 140 141 handle_eintr!(self.socket.send_with_fds(&bytes, &no_fds)).map_err(Error::Send)?; 142 143 Ok(()) 144 } 145 146 #[cfg(feature = "proto_tube")] recv_proto<M: protobuf::Message>(&self) -> Result<M>147 fn recv_proto<M: protobuf::Message>(&self) -> Result<M> { 148 let msg_size = 149 handle_eintr!(self.socket.inner().next_packet_size()).map_err(Error::Recv)?; 150 let mut msg_bytes = vec![0u8; msg_size]; 151 152 let (msg_bytes_size, _) = 153 handle_eintr!(self.socket.recv_with_fds(&mut msg_bytes, TUBE_MAX_FDS)) 154 .map_err(Error::Recv)?; 155 156 if msg_bytes_size == 0 { 157 return Err(Error::Disconnected); 158 } 159 160 protobuf::Message::parse_from_bytes(&msg_bytes).map_err(Error::Proto) 161 } 162 } 163 164 impl TryFrom<UnixSeqpacket> for Tube { 165 type Error = Error; 166 try_from(socket: UnixSeqpacket) -> Result<Self>167 fn try_from(socket: UnixSeqpacket) -> Result<Self> { 168 Ok(Tube { 169 socket: socket.try_into().map_err(Error::ScmSocket)?, 170 }) 171 } 172 } 173 174 impl AsRawDescriptor for Tube { as_raw_descriptor(&self) -> RawDescriptor175 fn as_raw_descriptor(&self) -> RawDescriptor { 176 self.socket.as_raw_descriptor() 177 } 178 } 179 180 impl AsRawFd for Tube { as_raw_fd(&self) -> RawFd181 fn as_raw_fd(&self) -> RawFd { 182 self.socket.inner().as_raw_descriptor() 183 } 184 } 185 186 impl ReadNotifier for Tube { get_read_notifier(&self) -> &dyn AsRawDescriptor187 fn get_read_notifier(&self) -> &dyn AsRawDescriptor { 188 &self.socket 189 } 190 } 191 192 impl AsRawDescriptor for SendTube { as_raw_descriptor(&self) -> RawDescriptor193 fn as_raw_descriptor(&self) -> RawDescriptor { 194 self.0.as_raw_descriptor() 195 } 196 } 197 198 impl AsRawDescriptor for RecvTube { as_raw_descriptor(&self) -> RawDescriptor199 fn as_raw_descriptor(&self) -> RawDescriptor { 200 self.0.as_raw_descriptor() 201 } 202 } 203 204 /// Wrapper for Tube used for sending and receiving protos - avoids extra overhead of serialization 205 /// via serde_json. Since protos should be standalone objects we do not support sending of file 206 /// descriptors as a normal Tube would. 207 #[cfg(feature = "proto_tube")] 208 pub struct ProtoTube(Tube); 209 210 #[cfg(feature = "proto_tube")] 211 impl ProtoTube { pair() -> Result<(ProtoTube, ProtoTube)>212 pub fn pair() -> Result<(ProtoTube, ProtoTube)> { 213 Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2))) 214 } 215 send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>216 pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> { 217 self.0.send_proto(msg) 218 } 219 recv_proto<M: protobuf::Message>(&self) -> Result<M>220 pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> { 221 self.0.recv_proto() 222 } 223 } 224 225 #[cfg(feature = "proto_tube")] 226 impl From<Tube> for ProtoTube { from(tube: Tube) -> Self227 fn from(tube: Tube) -> Self { 228 ProtoTube(tube) 229 } 230 } 231 232 #[cfg(all(feature = "proto_tube", test))] 233 #[allow(unused_variables)] 234 mod tests { 235 // not testing this proto specifically, just need an existing one to test the ProtoTube. 236 use protos::cdisk_spec::ComponentDisk; 237 238 use super::*; 239 240 #[test] tube_serializes_and_deserializes()241 fn tube_serializes_and_deserializes() { 242 let (pt1, pt2) = ProtoTube::pair().unwrap(); 243 let proto = ComponentDisk { 244 file_path: "/some/cool/path".to_string(), 245 offset: 99, 246 ..ComponentDisk::new() 247 }; 248 249 pt1.send_proto(&proto).unwrap(); 250 251 let recv_proto = pt2.recv_proto().unwrap(); 252 253 assert!(proto.eq(&recv_proto)); 254 } 255 } 256