• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::os::unix::prelude::AsRawFd;
6 use std::os::unix::prelude::RawFd;
7 use std::time::Duration;
8 
9 use serde::de::DeserializeOwned;
10 use serde::Deserialize;
11 use serde::Serialize;
12 
13 use crate::descriptor::AsRawDescriptor;
14 use crate::descriptor_reflection::deserialize_with_descriptors;
15 use crate::descriptor_reflection::SerializeDescriptors;
16 use crate::handle_eintr;
17 use crate::tube::Error;
18 use crate::tube::RecvTube;
19 use crate::tube::Result;
20 use crate::tube::SendTube;
21 use crate::RawDescriptor;
22 use crate::ReadNotifier;
23 use crate::ScmSocket;
24 use crate::UnixSeqpacket;
25 use crate::SCM_SOCKET_MAX_FD_COUNT;
26 
27 // This size matches the inline buffer size of CmsgBuffer.
28 const TUBE_MAX_FDS: usize = 32;
29 
30 /// Bidirectional tube that support both send and recv.
31 #[derive(Serialize, Deserialize)]
32 pub struct Tube {
33     socket: ScmSocket<UnixSeqpacket>,
34 }
35 
36 impl Tube {
37     /// Create a pair of connected tubes. Request is sent in one direction while response is in the
38     /// other direction.
pair() -> Result<(Tube, Tube)>39     pub fn pair() -> Result<(Tube, Tube)> {
40         let (socket1, socket2) = UnixSeqpacket::pair().map_err(Error::Pair)?;
41         let tube1 = Tube::try_from(socket1)?;
42         let tube2 = Tube::try_from(socket2)?;
43         Ok((tube1, tube2))
44     }
45 
46     /// DO NOT USE this method directly as it will become private soon (b/221484449). Use a
47     /// directional Tube pair instead.
48     #[deprecated]
try_clone(&self) -> Result<Self>49     pub fn try_clone(&self) -> Result<Self> {
50         self.socket
51             .inner()
52             .try_clone()
53             .map_err(Error::Clone)?
54             .try_into()
55     }
56 
57     /// Sends a message via a Tube.
58     /// The number of file descriptors that this method can send is limited to `TUBE_MAX_FDS`.
59     /// If you want to send more descriptors, use `send_with_max_fds` instead.
send<T: Serialize>(&self, msg: &T) -> Result<()>60     pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
61         self.send_with_max_fds(msg, TUBE_MAX_FDS)
62     }
63 
64     /// Sends a message with at most `max_fds` file descriptors via a Tube.
65     /// Note that `max_fds` must not exceed `SCM_SOCKET_MAX_FD_COUNT` (= 253).
send_with_max_fds<T: Serialize>(&self, msg: &T, max_fds: usize) -> Result<()>66     pub fn send_with_max_fds<T: Serialize>(&self, msg: &T, max_fds: usize) -> Result<()> {
67         if max_fds > SCM_SOCKET_MAX_FD_COUNT {
68             return Err(Error::SendTooManyFds);
69         }
70         let msg_serialize = SerializeDescriptors::new(&msg);
71         let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
72         let msg_descriptors = msg_serialize.into_descriptors();
73 
74         if msg_descriptors.len() > max_fds {
75             return Err(Error::SendTooManyFds);
76         }
77 
78         handle_eintr!(self.socket.send_with_fds(&msg_json, &msg_descriptors))
79             .map_err(Error::Send)?;
80         Ok(())
81     }
82 
83     /// Recieves a message from a Tube.
84     /// If the sender sent file descriptors more than TUBE_MAX_FDS with `send_with_max_fds`, use
85     /// `recv_with_max_fds` instead.
recv<T: DeserializeOwned>(&self) -> Result<T>86     pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
87         self.recv_with_max_fds(TUBE_MAX_FDS)
88     }
89 
90     /// Recieves a message with at most `max_fds` file descriptors from a Tube.
recv_with_max_fds<T: DeserializeOwned>(&self, max_fds: usize) -> Result<T>91     pub fn recv_with_max_fds<T: DeserializeOwned>(&self, max_fds: usize) -> Result<T> {
92         if max_fds > SCM_SOCKET_MAX_FD_COUNT {
93             return Err(Error::RecvTooManyFds);
94         }
95 
96         // WARNING: The `cros_async` and `base_tokio` tube wrappers both assume that, if the tube
97         // is readable, then a call to `Tube::recv` will not block (which ought to be true since we
98         // use SOCK_SEQPACKET and a single recvmsg call currently).
99 
100         let msg_size =
101             handle_eintr!(self.socket.inner().next_packet_size()).map_err(Error::Recv)?;
102         // This buffer is the right size, as the size received in next_packet_size() represents the
103         // size of only the message itself and not the file descriptors. The descriptors are stored
104         // separately in msghdr::msg_control.
105         let mut msg_json = vec![0u8; msg_size];
106 
107         let (msg_json_size, msg_descriptors) =
108             handle_eintr!(self.socket.recv_with_fds(&mut msg_json, max_fds))
109                 .map_err(Error::Recv)?;
110 
111         if msg_json_size == 0 {
112             return Err(Error::Disconnected);
113         }
114 
115         deserialize_with_descriptors(
116             || serde_json::from_slice(&msg_json[0..msg_json_size]),
117             msg_descriptors,
118         )
119         .map_err(Error::Json)
120     }
121 
set_send_timeout(&self, timeout: Option<Duration>) -> Result<()>122     pub fn set_send_timeout(&self, timeout: Option<Duration>) -> Result<()> {
123         self.socket
124             .inner()
125             .set_write_timeout(timeout)
126             .map_err(Error::SetSendTimeout)
127     }
128 
set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()>129     pub fn set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()> {
130         self.socket
131             .inner()
132             .set_read_timeout(timeout)
133             .map_err(Error::SetRecvTimeout)
134     }
135 
136     #[cfg(feature = "proto_tube")]
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>137     fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
138         let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
139         let no_fds: [RawFd; 0] = [];
140 
141         handle_eintr!(self.socket.send_with_fds(&bytes, &no_fds)).map_err(Error::Send)?;
142 
143         Ok(())
144     }
145 
146     #[cfg(feature = "proto_tube")]
recv_proto<M: protobuf::Message>(&self) -> Result<M>147     fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
148         let msg_size =
149             handle_eintr!(self.socket.inner().next_packet_size()).map_err(Error::Recv)?;
150         let mut msg_bytes = vec![0u8; msg_size];
151 
152         let (msg_bytes_size, _) =
153             handle_eintr!(self.socket.recv_with_fds(&mut msg_bytes, TUBE_MAX_FDS))
154                 .map_err(Error::Recv)?;
155 
156         if msg_bytes_size == 0 {
157             return Err(Error::Disconnected);
158         }
159 
160         protobuf::Message::parse_from_bytes(&msg_bytes).map_err(Error::Proto)
161     }
162 }
163 
164 impl TryFrom<UnixSeqpacket> for Tube {
165     type Error = Error;
166 
try_from(socket: UnixSeqpacket) -> Result<Self>167     fn try_from(socket: UnixSeqpacket) -> Result<Self> {
168         Ok(Tube {
169             socket: socket.try_into().map_err(Error::ScmSocket)?,
170         })
171     }
172 }
173 
174 impl AsRawDescriptor for Tube {
as_raw_descriptor(&self) -> RawDescriptor175     fn as_raw_descriptor(&self) -> RawDescriptor {
176         self.socket.as_raw_descriptor()
177     }
178 }
179 
180 impl AsRawFd for Tube {
as_raw_fd(&self) -> RawFd181     fn as_raw_fd(&self) -> RawFd {
182         self.socket.inner().as_raw_descriptor()
183     }
184 }
185 
186 impl ReadNotifier for Tube {
get_read_notifier(&self) -> &dyn AsRawDescriptor187     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
188         &self.socket
189     }
190 }
191 
192 impl AsRawDescriptor for SendTube {
as_raw_descriptor(&self) -> RawDescriptor193     fn as_raw_descriptor(&self) -> RawDescriptor {
194         self.0.as_raw_descriptor()
195     }
196 }
197 
198 impl AsRawDescriptor for RecvTube {
as_raw_descriptor(&self) -> RawDescriptor199     fn as_raw_descriptor(&self) -> RawDescriptor {
200         self.0.as_raw_descriptor()
201     }
202 }
203 
204 /// Wrapper for Tube used for sending and receiving protos - avoids extra overhead of serialization
205 /// via serde_json. Since protos should be standalone objects we do not support sending of file
206 /// descriptors as a normal Tube would.
207 #[cfg(feature = "proto_tube")]
208 pub struct ProtoTube(Tube);
209 
210 #[cfg(feature = "proto_tube")]
211 impl ProtoTube {
pair() -> Result<(ProtoTube, ProtoTube)>212     pub fn pair() -> Result<(ProtoTube, ProtoTube)> {
213         Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
214     }
215 
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>216     pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
217         self.0.send_proto(msg)
218     }
219 
recv_proto<M: protobuf::Message>(&self) -> Result<M>220     pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
221         self.0.recv_proto()
222     }
223 }
224 
225 #[cfg(feature = "proto_tube")]
226 impl From<Tube> for ProtoTube {
from(tube: Tube) -> Self227     fn from(tube: Tube) -> Self {
228         ProtoTube(tube)
229     }
230 }
231 
232 #[cfg(all(feature = "proto_tube", test))]
233 #[allow(unused_variables)]
234 mod tests {
235     // not testing this proto specifically, just need an existing one to test the ProtoTube.
236     use protos::cdisk_spec::ComponentDisk;
237 
238     use super::*;
239 
240     #[test]
tube_serializes_and_deserializes()241     fn tube_serializes_and_deserializes() {
242         let (pt1, pt2) = ProtoTube::pair().unwrap();
243         let proto = ComponentDisk {
244             file_path: "/some/cool/path".to_string(),
245             offset: 99,
246             ..ComponentDisk::new()
247         };
248 
249         pt1.send_proto(&proto).unwrap();
250 
251         let recv_proto = pt2.recv_proto().unwrap();
252 
253         assert!(proto.eq(&recv_proto));
254     }
255 }
256