1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::io;
6 use std::io::Cursor;
7 use std::io::Read;
8 use std::io::Write;
9 use std::mem;
10 use std::os::windows::io::AsRawHandle;
11 use std::os::windows::io::RawHandle;
12 use std::time::Duration;
13
14 use data_model::DataInit;
15 use once_cell::sync::Lazy;
16 use serde::de::DeserializeOwned;
17 use serde::Deserialize;
18 use serde::Serialize;
19 use serde::Serializer;
20 use winapi::shared::winerror::ERROR_MORE_DATA;
21
22 use crate::descriptor::AsRawDescriptor;
23 use crate::descriptor::FromRawDescriptor;
24 use crate::descriptor::SafeDescriptor;
25 use crate::platform::deserialize_with_descriptors;
26 use crate::platform::RawDescriptor;
27 use crate::platform::SerializeDescriptors;
28 use crate::tube::Error;
29 use crate::tube::RecvTube;
30 use crate::tube::Result;
31 use crate::tube::SendTube;
32 use crate::BlockingMode;
33 use crate::CloseNotifier;
34 use crate::EventToken;
35 use crate::FramingMode;
36 use crate::ReadNotifier;
37 use crate::StreamChannel;
38
39 /// Bidirectional tube that support both send and recv.
40 ///
41 /// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair
42 /// (A, B). We wish to send B to another process, and communicate with it using A from the current
43 /// process:
44 /// 1. B's target_pid must be set to the current PID *before* serialization. There is a
45 /// serialization hook that sets it to the current PID automatically if target_pid is unset.
46 /// 2. A's target_pid must be set to the PID of the process where B was sent.
47 ///
48 /// If instead you are sending both A and B to separate processes, then:
49 /// 1. A's target_pid must be set to B's pid, manually.
50 /// 2. B's target_pid must be set to A's pid, manually.
51 ///
52 /// Automating all of this and getting a completely clean interface is tricky. We would need
53 /// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync
54 /// state about PIDs between the ends. There are alternatives like reusing the underlying
55 /// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet
56 /// to find a compelling solution that isn't a mess to implement. Suggestions are welcome.
57 #[derive(Serialize, Deserialize, Debug)]
58 pub struct Tube {
59 socket: StreamChannel,
60
61 // Default target_pid to current PID on serialization (see `Tube` comment header for details).
62 #[serde(serialize_with = "set_tube_pid_on_serialize")]
63 target_pid: Option<u32>,
64 }
65
66 /// For a Tube which has not had its target_pid set, when it is serialized, we should automatically
67 /// default it to the current process, because the other end will be in the current process.
set_tube_pid_on_serialize<S>( existing_pid_value: &Option<u32>, serializer: S, ) -> std::result::Result<S::Ok, S::Error> where S: Serializer,68 fn set_tube_pid_on_serialize<S>(
69 existing_pid_value: &Option<u32>,
70 serializer: S,
71 ) -> std::result::Result<S::Ok, S::Error>
72 where
73 S: Serializer,
74 {
75 match existing_pid_value {
76 Some(pid) => serializer.serialize_u32(*pid),
77 None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())),
78 }
79 }
80
81 #[derive(Copy, Clone, Debug)]
82 #[repr(C)]
83 struct MsgHeader {
84 msg_json_size: usize,
85 descriptor_json_size: usize,
86 }
87
88 // Safe because it only has data and has no implicit padding.
89 unsafe impl DataInit for MsgHeader {}
90
91 static DH_TUBE: Lazy<sync::Mutex<Option<DuplicateHandleTube>>> =
92 Lazy::new(|| sync::Mutex::new(None));
93 static ALIAS_PID: Lazy<sync::Mutex<Option<u32>>> = Lazy::new(|| sync::Mutex::new(None));
94
95 /// Set a tube to delegate duplicate handle calls.
set_duplicate_handle_tube(dh_tube: DuplicateHandleTube)96 pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) {
97 DH_TUBE.lock().replace(dh_tube);
98 }
99
100 /// Set alias pid for use with a DuplicateHandleTube.
set_alias_pid(alias_pid: u32)101 pub fn set_alias_pid(alias_pid: u32) {
102 ALIAS_PID.lock().replace(alias_pid);
103 }
104
105 impl Tube {
106 /// Create a pair of connected tubes. Request is sent in one direction while response is
107 /// received in the other direction.
108 /// The result is in the form (server, client).
pair() -> Result<(Tube, Tube)>109 pub fn pair() -> Result<(Tube, Tube)> {
110 let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
111 .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
112
113 Ok((Tube::new(socket1), Tube::new(socket2)))
114 }
115
116 /// Create a pair of connected tubes with the specified buffer size.
117 /// Request is sent in one direction while response is received in the other direction.
118 /// The result is in the form (server, client).
pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)>119 pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> {
120 let (socket1, socket2) = StreamChannel::pair_with_buffer_size(
121 BlockingMode::Blocking,
122 FramingMode::Message,
123 buffer_size,
124 )
125 .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
126 let tube1 = Tube::new(socket1);
127 let tube2 = Tube::new(socket2);
128 Ok((tube1, tube2))
129 }
130
131 // Create a new `Tube`.
new(socket: StreamChannel) -> Tube132 pub fn new(socket: StreamChannel) -> Tube {
133 Tube {
134 socket,
135 target_pid: None,
136 }
137 }
138
try_clone(&self) -> Result<Self>139 pub(super) fn try_clone(&self) -> Result<Self> {
140 Ok(Tube {
141 socket: self.socket.try_clone().map_err(Error::Clone)?,
142 target_pid: self.target_pid,
143 })
144 }
145
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>146 fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
147 let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
148 let size_header = bytes.len();
149
150 let mut data_packet =
151 Cursor::new(Vec::with_capacity(mem::size_of::<usize>() + size_header));
152 data_packet
153 .write(&size_header.to_le_bytes())
154 .map_err(Error::SendIoBuf)?;
155 data_packet.write(&bytes).map_err(Error::SendIoBuf)?;
156 self.socket
157 .write_immutable(&data_packet.into_inner())
158 .map_err(Error::SendIo)?;
159
160 Ok(())
161 }
162
recv_proto<M: protobuf::Message>(&self) -> Result<M>163 fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
164 let mut header_bytes = [0u8; mem::size_of::<usize>()];
165 perform_read(&|buf| (&self.socket).read(buf), &mut header_bytes).map_err(Error::Recv)?;
166 let size_header = usize::from_le_bytes(header_bytes);
167
168 let mut proto_bytes = vec![0u8; size_header];
169 perform_read(&|buf| (&self.socket).read(buf), &mut proto_bytes).map_err(Error::Recv)?;
170 protobuf::Message::parse_from_bytes(&proto_bytes).map_err(Error::Proto)
171 }
172
send<T: Serialize>(&self, msg: &T) -> Result<()>173 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
174 serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid)
175 }
176
recv<T: DeserializeOwned>(&self) -> Result<T>177 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
178 deserialize_and_recv(|buf| (&self.socket).read(buf))
179 }
180
181 /// NOTE: On Windows this will only succeed if called on a server pipe. See #pair
182 /// documentation to ensure you have a server pipe before calling.
183 #[cfg(windows)]
flush_blocking(&mut self) -> Result<()>184 pub fn flush_blocking(&mut self) -> Result<()> {
185 self.socket.flush_blocking().map_err(Error::Flush)
186 }
187
188 /// For Tubes that span processes, this method must be used to set the PID of the other end
189 /// of the Tube, otherwise sending handles to the other end won't work.
set_target_pid(&mut self, target_pid: u32)190 pub fn set_target_pid(&mut self, target_pid: u32) {
191 self.target_pid = Some(target_pid);
192 }
193
194 /// Returns the PID of the process at the other end of the Tube, if any is set.
target_pid(&self) -> Option<u32>195 pub fn target_pid(&self) -> Option<u32> {
196 self.target_pid
197 }
198
199 /// TODO(b/145998747, b/184398671): this method should be removed.
set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()>200 pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
201 unimplemented!("To be removed/refactored upstream.");
202 }
203
204 /// TODO(b/145998747, b/184398671): this method should be removed.
set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()>205 pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
206 unimplemented!("To be removed/refactored upstream.");
207 }
208 }
209
serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>( write_fn: F, msg: &T, target_pid: Option<u32>, ) -> Result<()>210 pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>(
211 write_fn: F,
212 msg: &T,
213 target_pid: Option<u32>,
214 ) -> Result<()> {
215 let msg_serialize = SerializeDescriptors::new(&msg);
216 let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
217 let msg_descriptors = msg_serialize.into_descriptors();
218
219 let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len());
220 for desc in msg_descriptors {
221 // Safe because these handles are guaranteed to be valid. Details:
222 // 1. They come from base::descriptor_reflection::with_as_descriptor.
223 // 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File,
224 // SafeDescriptor).
225 // 3. The owning object is borrowed by msg until sending is complete.
226 duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize)
227 }
228
229 let descriptor_json = if duped_descriptors.is_empty() {
230 None
231 } else {
232 Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?)
233 };
234
235 let header = MsgHeader {
236 msg_json_size: msg_json.len(),
237 descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()),
238 };
239
240 let mut data_packet = Cursor::new(Vec::with_capacity(
241 header.as_slice().len() + header.msg_json_size + header.descriptor_json_size,
242 ));
243 data_packet
244 .write(header.as_slice())
245 .map_err(Error::SendIoBuf)?;
246 data_packet
247 .write(msg_json.as_slice())
248 .map_err(Error::SendIoBuf)?;
249 if let Some(descriptor_json) = descriptor_json {
250 data_packet
251 .write(descriptor_json.as_slice())
252 .map_err(Error::SendIoBuf)?;
253 }
254
255 // Multiple writers (producers) are safe because each write is atomic.
256 let data_bytes = data_packet.into_inner();
257
258 write_fn(&data_bytes).map_err(Error::SendIo)?;
259 Ok(())
260 }
261
duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle>262 fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> {
263 match target_pid {
264 Some(pid) => match &*DH_TUBE.lock() {
265 Some(tube) => tube.request_duplicate_handle(pid, desc),
266 None => {
267 win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor)
268 }
269 },
270 None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor),
271 }
272 }
273
274 /// Reads a part of a Tube packet asserting that it was correctly read. This means:
275 /// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer.
276 /// We use this to ignore errors when reading the message header, which has the lengths we need
277 /// to allocate our buffers for the remainder of the message.
278 /// * We filled the supplied buffer.
perform_read<F: Fn(&mut [u8]) -> io::Result<usize>>( read_fn: &F, buf: &mut [u8], ) -> io::Result<usize>279 fn perform_read<F: Fn(&mut [u8]) -> io::Result<usize>>(
280 read_fn: &F,
281 buf: &mut [u8],
282 ) -> io::Result<usize> {
283 let res = match read_fn(buf) {
284 Ok(s) => Ok(s),
285 Err(e)
286 if e.raw_os_error()
287 .map_or(false, |errno| errno == ERROR_MORE_DATA as i32) =>
288 {
289 Ok(buf.len())
290 }
291 Err(e) => Err(e),
292 };
293
294 let bytes_read = res?;
295 if bytes_read != buf.len() {
296 Err(io::Error::new(
297 io::ErrorKind::UnexpectedEof,
298 "failed to fill whole buffer",
299 ))
300 } else {
301 Ok(bytes_read)
302 }
303 }
304
305 /// Deserializes a Tube packet by calling the supplied read function. This function MUST
306 /// assert that the buffer was filled.
deserialize_and_recv<T: DeserializeOwned, F: Fn(&mut [u8]) -> io::Result<usize>>( read_fn: F, ) -> Result<T>307 pub fn deserialize_and_recv<T: DeserializeOwned, F: Fn(&mut [u8]) -> io::Result<usize>>(
308 read_fn: F,
309 ) -> Result<T> {
310 let mut header_bytes = vec![0u8; mem::size_of::<MsgHeader>()];
311 perform_read(&read_fn, header_bytes.as_mut_slice()).map_err(Error::Recv)?;
312
313 // Safe because the header is always written by the send function, and only that function
314 // writes to this channel.
315 let header =
316 MsgHeader::from_slice(header_bytes.as_slice()).expect("Tube header failed to deserialize.");
317
318 let mut msg_json = vec![0u8; header.msg_json_size];
319 perform_read(&read_fn, msg_json.as_mut_slice()).map_err(Error::Recv)?;
320
321 if msg_json.is_empty() {
322 // This means we got a message header, but there is no json body (due to a zero size in
323 // the header). This should never happen because it means the receiver is getting no
324 // data whatsoever from the sender.
325 return Err(Error::RecvUnexpectedEmptyBody);
326 }
327
328 let msg_descriptors: Vec<RawDescriptor> = if header.descriptor_json_size > 0 {
329 let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size];
330 perform_read(&read_fn, msg_descriptors_json.as_mut_slice()).map_err(Error::Recv)?;
331 let descriptor_usizes: Vec<usize> =
332 serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?;
333
334 // Safe because the usizes are RawDescriptors that were converted to usize in the send
335 // method.
336 descriptor_usizes
337 .iter()
338 .map(|item| *item as RawDescriptor)
339 .collect()
340 } else {
341 Vec::new()
342 };
343
344 let mut msg_descriptors_safe = msg_descriptors
345 .into_iter()
346 .map(|v| {
347 Some(unsafe {
348 // Safe because the socket returns new fds that are owned locally by this scope.
349 SafeDescriptor::from_raw_descriptor(v)
350 })
351 })
352 .collect();
353
354 deserialize_with_descriptors(
355 || serde_json::from_slice(&msg_json),
356 &mut msg_descriptors_safe,
357 )
358 .map_err(Error::Json)
359 }
360
361 #[derive(EventToken, Eq, PartialEq, Copy, Clone)]
362 enum Token {
363 SocketReady,
364 }
365
366 impl AsRawDescriptor for Tube {
as_raw_descriptor(&self) -> RawDescriptor367 fn as_raw_descriptor(&self) -> RawDescriptor {
368 self.socket.as_raw_descriptor()
369 }
370 }
371
372 impl AsRawHandle for Tube {
as_raw_handle(&self) -> RawHandle373 fn as_raw_handle(&self) -> RawHandle {
374 self.as_raw_descriptor()
375 }
376 }
377
378 impl ReadNotifier for Tube {
get_read_notifier(&self) -> &dyn AsRawDescriptor379 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
380 self.socket.get_read_notifier()
381 }
382 }
383
384 impl CloseNotifier for Tube {
get_close_notifier(&self) -> &dyn AsRawDescriptor385 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
386 self.socket.get_close_notifier()
387 }
388 }
389
390 impl AsRawDescriptor for SendTube {
as_raw_descriptor(&self) -> RawDescriptor391 fn as_raw_descriptor(&self) -> RawDescriptor {
392 self.0.as_raw_descriptor()
393 }
394 }
395
396 impl AsRawDescriptor for RecvTube {
as_raw_descriptor(&self) -> RawDescriptor397 fn as_raw_descriptor(&self) -> RawDescriptor {
398 self.0.as_raw_descriptor()
399 }
400 }
401
402 /// A request to duplicate a handle to a target process.
403 #[derive(Serialize, Deserialize, Debug)]
404 pub struct DuplicateHandleRequest {
405 pub target_alias_pid: u32,
406 pub handle: usize,
407 }
408
409 /// Contains a duplicated handle or None if an error occurred.
410 #[derive(Serialize, Deserialize, Debug)]
411 pub struct DuplicateHandleResponse {
412 pub handle: Option<usize>,
413 }
414
415 /// Wrapper for tube which is used to delegate DuplicateHandle function calls to
416 /// the broker process.
417 #[derive(Serialize, Deserialize, Debug)]
418 pub struct DuplicateHandleTube(Tube);
419
420 impl DuplicateHandleTube {
new(tube: Tube) -> Self421 pub fn new(tube: Tube) -> Self {
422 Self(tube)
423 }
424
request_duplicate_handle( &self, target_alias_pid: u32, handle: RawHandle, ) -> Result<RawHandle>425 pub fn request_duplicate_handle(
426 &self,
427 target_alias_pid: u32,
428 handle: RawHandle,
429 ) -> Result<RawHandle> {
430 let req = DuplicateHandleRequest {
431 target_alias_pid,
432 handle: handle as usize,
433 };
434 self.0.send(&req)?;
435 let res: DuplicateHandleResponse = self.0.recv()?;
436 res.handle
437 .map(|h| h as RawHandle)
438 .ok_or(Error::BrokerDupDescriptor)
439 }
440 }
441
442 /// Wrapper for Tube used for sending and recving protos. The main usecase is to send a message
443 /// without serialization bloat caused from `serde-json`.
444 pub struct ProtoTube(Tube);
445
446 impl ProtoTube {
pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)>447 pub fn pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)> {
448 Tube::pair_with_buffer_size(size).map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
449 }
450
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>451 pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
452 self.0.send_proto(msg)
453 }
454
recv_proto<M: protobuf::Message>(&self) -> Result<M>455 pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
456 self.0.recv_proto()
457 }
458 }
459
460 impl ReadNotifier for ProtoTube {
get_read_notifier(&self) -> &dyn AsRawDescriptor461 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
462 self.0.get_read_notifier()
463 }
464 }
465