1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::io;
6 use std::io::Cursor;
7 use std::io::Read;
8 use std::io::Write;
9 use std::mem;
10 use std::os::windows::io::AsRawHandle;
11 use std::os::windows::io::RawHandle;
12 use std::time::Duration;
13
14 use log::warn;
15 use serde::de::DeserializeOwned;
16 use serde::Deserialize;
17 use serde::Serialize;
18 use serde::Serializer;
19 use winapi::shared::winerror::ERROR_MORE_DATA;
20 use zerocopy::AsBytes;
21 use zerocopy::FromBytes;
22 use zerocopy::FromZeroes;
23
24 use crate::descriptor::AsRawDescriptor;
25 use crate::descriptor::FromRawDescriptor;
26 use crate::descriptor::SafeDescriptor;
27 use crate::descriptor_reflection::deserialize_with_descriptors;
28 use crate::descriptor_reflection::SerializeDescriptors;
29 use crate::tube::Error;
30 use crate::tube::RecvTube;
31 use crate::tube::Result;
32 use crate::tube::SendTube;
33 use crate::BlockingMode;
34 use crate::CloseNotifier;
35 use crate::EventToken;
36 use crate::FramingMode;
37 use crate::PipeConnection;
38 use crate::RawDescriptor;
39 use crate::ReadNotifier;
40 use crate::StreamChannel;
41
42 /// Bidirectional tube that support both send and recv.
43 ///
44 /// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair
45 /// (A, B). We wish to send B to another process, and communicate with it using A from the current
46 /// process:
47 /// 1. B's target_pid must be set to the current PID *before* serialization. There is a
48 /// serialization hook that sets it to the current PID automatically if target_pid is unset.
49 /// 2. A's target_pid must be set to the PID of the process where B was sent.
50 ///
51 /// If instead you are sending both A and B to separate processes, then:
52 /// 1. A's target_pid must be set to B's pid, manually.
53 /// 2. B's target_pid must be set to A's pid, manually.
54 ///
55 /// Automating all of this and getting a completely clean interface is tricky. We would need
56 /// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync
57 /// state about PIDs between the ends. There are alternatives like reusing the underlying
58 /// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet
59 /// to find a compelling solution that isn't a mess to implement. Suggestions are welcome.
60 #[derive(Serialize, Deserialize, Debug)]
61 pub struct Tube {
62 socket: StreamChannel,
63
64 // Default target_pid to current PID on serialization (see `Tube` comment header for details).
65 #[serde(serialize_with = "set_tube_pid_on_serialize")]
66 target_pid: Option<u32>,
67 }
68
69 /// For a Tube which has not had its target_pid set, when it is serialized, we should automatically
70 /// default it to the current process, because the other end will be in the current process.
set_tube_pid_on_serialize<S>( existing_pid_value: &Option<u32>, serializer: S, ) -> std::result::Result<S::Ok, S::Error> where S: Serializer,71 fn set_tube_pid_on_serialize<S>(
72 existing_pid_value: &Option<u32>,
73 serializer: S,
74 ) -> std::result::Result<S::Ok, S::Error>
75 where
76 S: Serializer,
77 {
78 match existing_pid_value {
79 Some(pid) => serializer.serialize_u32(*pid),
80 None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())),
81 }
82 }
83
84 #[derive(Copy, Clone, Debug, Default, AsBytes, FromZeroes, FromBytes)]
85 #[repr(C)]
86 struct MsgHeader {
87 msg_json_size: usize,
88 descriptor_json_size: usize,
89 }
90
91 static DH_TUBE: sync::Mutex<Option<DuplicateHandleTube>> = sync::Mutex::new(None);
92 static ALIAS_PID: sync::Mutex<Option<u32>> = sync::Mutex::new(None);
93
94 /// Set a tube to delegate duplicate handle calls.
set_duplicate_handle_tube(dh_tube: DuplicateHandleTube)95 pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) {
96 DH_TUBE.lock().replace(dh_tube);
97 }
98
99 /// Set alias pid for use with a DuplicateHandleTube.
set_alias_pid(alias_pid: u32)100 pub fn set_alias_pid(alias_pid: u32) {
101 ALIAS_PID.lock().replace(alias_pid);
102 }
103
104 impl Tube {
105 /// Create a pair of connected tubes. Request is sent in one direction while response is
106 /// received in the other direction.
107 /// The result is in the form (server, client).
pair() -> Result<(Tube, Tube)>108 pub fn pair() -> Result<(Tube, Tube)> {
109 let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
110 .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
111
112 Ok((Tube::new(socket1), Tube::new(socket2)))
113 }
114
115 /// Create a pair of connected tubes with the specified buffer size.
116 /// Request is sent in one direction while response is received in the other direction.
117 /// The result is in the form (server, client).
pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)>118 pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> {
119 let (socket1, socket2) = StreamChannel::pair_with_buffer_size(
120 BlockingMode::Blocking,
121 FramingMode::Message,
122 buffer_size,
123 )
124 .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
125 let tube1 = Tube::new(socket1);
126 let tube2 = Tube::new(socket2);
127 Ok((tube1, tube2))
128 }
129
130 // Create a new `Tube`.
new(socket: StreamChannel) -> Tube131 pub fn new(socket: StreamChannel) -> Tube {
132 Tube {
133 socket,
134 target_pid: None,
135 }
136 }
137
try_clone(&self) -> Result<Self>138 pub(crate) fn try_clone(&self) -> Result<Self> {
139 Ok(Tube {
140 socket: self.socket.try_clone().map_err(Error::Clone)?,
141 target_pid: self.target_pid,
142 })
143 }
144
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>145 fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
146 let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
147 let size_header = bytes.len();
148
149 let mut data_packet =
150 Cursor::new(Vec::with_capacity(mem::size_of::<usize>() + size_header));
151 data_packet
152 .write(&size_header.to_le_bytes())
153 .map_err(Error::from_send_io_buf_error)?;
154 data_packet.write(&bytes).map_err(Error::SendIoBuf)?;
155 self.socket
156 .write_immutable(&data_packet.into_inner())
157 .map_err(Error::from_send_error)?;
158
159 Ok(())
160 }
161
recv_proto<M: protobuf::Message>(&self) -> Result<M>162 fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
163 let mut header_bytes = [0u8; mem::size_of::<usize>()];
164 perform_read(&mut |buf| (&self.socket).read(buf), &mut header_bytes)
165 .map_err(Error::from_recv_io_error)?;
166 let size_header = usize::from_le_bytes(header_bytes);
167
168 let mut proto_bytes = vec![0u8; size_header];
169 perform_read(&mut |buf| (&self.socket).read(buf), &mut proto_bytes)
170 .map_err(Error::from_recv_io_error)?;
171 protobuf::Message::parse_from_bytes(&proto_bytes).map_err(Error::Proto)
172 }
173
send<T: Serialize>(&self, msg: &T) -> Result<()>174 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
175 serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid)
176 }
177
recv<T: DeserializeOwned>(&self) -> Result<T>178 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
179 deserialize_and_recv(|buf| (&self.socket).read(buf))
180 }
181
182 /// NOTE: On Windows this will only succeed if called on a server pipe. See #pair
183 /// documentation to ensure you have a server pipe before calling.
184 #[cfg(windows)]
flush_blocking(&mut self) -> Result<()>185 pub fn flush_blocking(&mut self) -> Result<()> {
186 self.socket.flush_blocking().map_err(Error::Flush)
187 }
188
189 /// For Tubes that span processes, this method must be used to set the PID of the other end
190 /// of the Tube, otherwise sending handles to the other end won't work.
set_target_pid(&mut self, target_pid: u32)191 pub fn set_target_pid(&mut self, target_pid: u32) {
192 self.target_pid = Some(target_pid);
193 }
194
195 /// Returns the PID of the process at the other end of the Tube, if any is set.
target_pid(&self) -> Option<u32>196 pub fn target_pid(&self) -> Option<u32> {
197 self.target_pid
198 }
199
200 /// TODO(b/145998747, b/184398671): this method should be removed.
set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()>201 pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
202 unimplemented!("To be removed/refactored upstream.");
203 }
204
205 /// TODO(b/145998747, b/184398671): this method should be removed.
set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()>206 pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
207 unimplemented!("To be removed/refactored upstream.");
208 }
209 }
210
serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>( write_fn: F, msg: &T, target_pid: Option<u32>, ) -> Result<()>211 pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>(
212 write_fn: F,
213 msg: &T,
214 target_pid: Option<u32>,
215 ) -> Result<()> {
216 let msg_serialize = SerializeDescriptors::new(&msg);
217 let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
218 let msg_descriptors = msg_serialize.into_descriptors();
219
220 let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len());
221 for desc in msg_descriptors {
222 // Safe because these handles are guaranteed to be valid. Details:
223 // 1. They come from base::descriptor_reflection::with_as_descriptor.
224 // 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File,
225 // SafeDescriptor).
226 // 3. The owning object is borrowed by msg until sending is complete.
227 duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize)
228 }
229
230 let descriptor_json = if duped_descriptors.is_empty() {
231 None
232 } else {
233 Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?)
234 };
235
236 let header = MsgHeader {
237 msg_json_size: msg_json.len(),
238 descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()),
239 };
240
241 let mut data_packet = Cursor::new(Vec::with_capacity(
242 header.as_bytes().len() + header.msg_json_size + header.descriptor_json_size,
243 ));
244 data_packet
245 .write(header.as_bytes())
246 .map_err(Error::SendIoBuf)?;
247 data_packet
248 .write(msg_json.as_slice())
249 .map_err(Error::SendIoBuf)?;
250 if let Some(descriptor_json) = descriptor_json {
251 data_packet
252 .write(descriptor_json.as_slice())
253 .map_err(Error::SendIoBuf)?;
254 }
255
256 // Multiple writers (producers) are safe because each write is atomic.
257 let data_bytes = data_packet.into_inner();
258
259 write_fn(&data_bytes).map_err(Error::from_send_error)?;
260 Ok(())
261 }
262
duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle>263 fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> {
264 match target_pid {
265 Some(pid) => match &*DH_TUBE.lock() {
266 Some(tube) => tube.request_duplicate_handle(pid, desc),
267 None => {
268 win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor)
269 }
270 },
271 None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor),
272 }
273 }
274
275 /// Reads a part of a Tube packet asserting that it was correctly read. This means:
276 /// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer. We
277 /// use this to ignore errors when reading the message header, which has the lengths we need to
278 /// allocate our buffers for the remainder of the message.
279 /// * We filled the supplied buffer.
perform_read<F: FnMut(&mut [u8]) -> io::Result<usize>>( read_fn: &mut F, buf: &mut [u8], ) -> io::Result<usize>280 fn perform_read<F: FnMut(&mut [u8]) -> io::Result<usize>>(
281 read_fn: &mut F,
282 buf: &mut [u8],
283 ) -> io::Result<usize> {
284 let bytes_read = match read_fn(buf) {
285 Ok(s) => Ok(s),
286 Err(e)
287 if e.raw_os_error()
288 .map_or(false, |errno| errno == ERROR_MORE_DATA as i32) =>
289 {
290 Ok(buf.len())
291 }
292 Err(e) => Err(e),
293 }?;
294
295 if bytes_read != buf.len() {
296 Err(io::Error::new(
297 io::ErrorKind::UnexpectedEof,
298 format!(
299 "failed to fill whole buffer, expected {} got {}",
300 buf.len(),
301 bytes_read
302 ),
303 ))
304 } else {
305 Ok(bytes_read)
306 }
307 }
308
309 /// Deserializes a Tube packet by calling the supplied read function. This function MUST
310 /// assert that the buffer was filled.
deserialize_and_recv<T: DeserializeOwned, F: FnMut(&mut [u8]) -> io::Result<usize>>( mut read_fn: F, ) -> Result<T>311 pub fn deserialize_and_recv<T: DeserializeOwned, F: FnMut(&mut [u8]) -> io::Result<usize>>(
312 mut read_fn: F,
313 ) -> Result<T> {
314 let mut header = MsgHeader::default();
315 perform_read(&mut read_fn, header.as_bytes_mut()).map_err(Error::from_recv_io_error)?;
316
317 let mut msg_json = vec![0u8; header.msg_json_size];
318 perform_read(&mut read_fn, msg_json.as_mut_slice()).map_err(Error::from_recv_io_error)?;
319
320 if msg_json.is_empty() {
321 // This means we got a message header, but there is no json body (due to a zero size in
322 // the header). This should never happen because it means the receiver is getting no
323 // data whatsoever from the sender.
324 return Err(Error::RecvUnexpectedEmptyBody);
325 }
326
327 let descriptor_usizes: Vec<usize> = if header.descriptor_json_size > 0 {
328 let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size];
329 perform_read(&mut read_fn, msg_descriptors_json.as_mut_slice())
330 .map_err(Error::from_recv_io_error)?;
331 serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?
332 } else {
333 Vec::new()
334 };
335
336 let msg_descriptors = descriptor_usizes.into_iter().map(|item| {
337 // SAFETY: the usizes are RawDescriptors that were duplicated and converted to usize in the
338 // send method.
339 unsafe { SafeDescriptor::from_raw_descriptor(item as RawDescriptor) }
340 });
341
342 deserialize_with_descriptors(|| serde_json::from_slice(&msg_json), msg_descriptors)
343 .map_err(Error::Json)
344 }
345
346 #[derive(EventToken, Eq, PartialEq, Copy, Clone)]
347 enum Token {
348 SocketReady,
349 }
350
351 impl AsRawDescriptor for Tube {
as_raw_descriptor(&self) -> RawDescriptor352 fn as_raw_descriptor(&self) -> RawDescriptor {
353 self.socket.as_raw_descriptor()
354 }
355 }
356
357 impl AsRawHandle for Tube {
as_raw_handle(&self) -> RawHandle358 fn as_raw_handle(&self) -> RawHandle {
359 self.as_raw_descriptor()
360 }
361 }
362
363 impl ReadNotifier for Tube {
get_read_notifier(&self) -> &dyn AsRawDescriptor364 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
365 self.socket.get_read_notifier()
366 }
367 }
368
369 impl CloseNotifier for Tube {
get_close_notifier(&self) -> &dyn AsRawDescriptor370 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
371 self.socket.get_close_notifier()
372 }
373 }
374
375 impl AsRawDescriptor for SendTube {
as_raw_descriptor(&self) -> RawDescriptor376 fn as_raw_descriptor(&self) -> RawDescriptor {
377 self.0.as_raw_descriptor()
378 }
379 }
380
381 impl AsRawDescriptor for RecvTube {
as_raw_descriptor(&self) -> RawDescriptor382 fn as_raw_descriptor(&self) -> RawDescriptor {
383 self.0.as_raw_descriptor()
384 }
385 }
386
387 impl CloseNotifier for SendTube {
get_close_notifier(&self) -> &dyn AsRawDescriptor388 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
389 self.0.get_close_notifier()
390 }
391 }
392
393 impl CloseNotifier for RecvTube {
get_close_notifier(&self) -> &dyn AsRawDescriptor394 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
395 self.0.get_close_notifier()
396 }
397 }
398
399 /// A request to duplicate a handle to a target process.
400 #[derive(Serialize, Deserialize, Debug)]
401 pub struct DuplicateHandleRequest {
402 pub target_alias_pid: u32,
403 pub handle: usize,
404 }
405
406 /// Contains a duplicated handle or None if an error occurred.
407 #[derive(Serialize, Deserialize, Debug)]
408 pub struct DuplicateHandleResponse {
409 pub handle: Option<usize>,
410 }
411
412 /// Wrapper for tube which is used to delegate DuplicateHandle function calls to
413 /// the broker process.
414 #[derive(Serialize, Deserialize, Debug)]
415 pub struct DuplicateHandleTube(Tube);
416
417 impl DuplicateHandleTube {
new(tube: Tube) -> Self418 pub fn new(tube: Tube) -> Self {
419 Self(tube)
420 }
421
request_duplicate_handle( &self, target_alias_pid: u32, handle: RawHandle, ) -> Result<RawHandle>422 pub fn request_duplicate_handle(
423 &self,
424 target_alias_pid: u32,
425 handle: RawHandle,
426 ) -> Result<RawHandle> {
427 let req = DuplicateHandleRequest {
428 target_alias_pid,
429 handle: handle as usize,
430 };
431 self.0.send(&req)?;
432 let res: DuplicateHandleResponse = self.0.recv()?;
433 res.handle
434 .map(|h| h as RawHandle)
435 .ok_or(Error::BrokerDupDescriptor)
436 }
437 }
438
439 /// Wrapper for Tube used for sending and recving protos. The main usecase is to send a message
440 /// without serialization bloat caused from `serde-json`.
441 #[derive(Serialize, Deserialize)]
442 pub struct ProtoTube(Tube);
443
444 impl ProtoTube {
pair() -> Result<(ProtoTube, ProtoTube)>445 pub fn pair() -> Result<(ProtoTube, ProtoTube)> {
446 Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
447 }
448
pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)>449 pub fn pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)> {
450 Tube::pair_with_buffer_size(size).map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
451 }
452
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>453 pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
454 self.0.send_proto(msg)
455 }
456
recv_proto<M: protobuf::Message>(&self) -> Result<M>457 pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
458 self.0.recv_proto()
459 }
460 }
461
462 impl ReadNotifier for ProtoTube {
get_read_notifier(&self) -> &dyn AsRawDescriptor463 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
464 self.0.get_read_notifier()
465 }
466 }
467
468 impl AsRawDescriptor for ProtoTube {
as_raw_descriptor(&self) -> RawDescriptor469 fn as_raw_descriptor(&self) -> RawDescriptor {
470 self.0.as_raw_descriptor()
471 }
472 }
473
474 /// A wrapper around a named pipe that uses Tube serialization.
475 ///
476 /// This limited form of `Tube` offers absolutely no notifier support, and can only send/recv
477 /// blocking messages.
478 pub struct PipeTube {
479 pipe: PipeConnection,
480
481 // Default target_pid to current PID on serialization (see `Tube` comment header for details).
482 target_pid: Option<u32>,
483 }
484
485 impl PipeTube {
from(pipe: PipeConnection, target_pid: Option<u32>) -> Self486 pub fn from(pipe: PipeConnection, target_pid: Option<u32>) -> Self {
487 Self { pipe, target_pid }
488 }
489
send<T: Serialize>(&self, msg: &T) -> Result<()>490 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
491 serialize_and_send(|buf| self.pipe.write(buf), msg, self.target_pid)
492 }
493
recv<T: DeserializeOwned>(&self) -> Result<T>494 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
495 deserialize_and_recv(|buf| {
496 // SAFETY:
497 // 1. We are reading bytes, so no matter what data is on the pipe, it is representable
498 // as bytes.
499 // 2. A read is quantized in bytes, so no partial reads are possible.
500 unsafe { self.pipe.read(buf) }
501 })
502 }
503 }
504
505 /// Wrapper around a Tube which is known to be the server end of a named pipe. This wrapper ensures
506 /// that the Tube is flushed before it is dropped.
507 pub struct FlushOnDropTube(pub Tube);
508
509 impl FlushOnDropTube {
from(tube: Tube) -> Self510 pub fn from(tube: Tube) -> Self {
511 Self(tube)
512 }
513 }
514
515 impl Drop for FlushOnDropTube {
drop(&mut self)516 fn drop(&mut self) {
517 if let Err(e) = self.0.flush_blocking() {
518 warn!("failed to flush Tube: {}", e)
519 }
520 }
521 }
522
523 impl Error {
map_io_error(e: io::Error, err_ctor: fn(io::Error) -> Error) -> Error524 fn map_io_error(e: io::Error, err_ctor: fn(io::Error) -> Error) -> Error {
525 if e.kind() == io::ErrorKind::BrokenPipe {
526 Error::Disconnected
527 } else {
528 err_ctor(e)
529 }
530 }
531
from_recv_io_error(e: io::Error) -> Error532 fn from_recv_io_error(e: io::Error) -> Error {
533 Self::map_io_error(e, Error::Recv)
534 }
535
from_send_error(e: io::Error) -> Error536 fn from_send_error(e: io::Error) -> Error {
537 Self::map_io_error(e, Error::Send)
538 }
539
from_send_io_buf_error(e: io::Error) -> Error540 fn from_send_io_buf_error(e: io::Error) -> Error {
541 Self::map_io_error(e, Error::SendIoBuf)
542 }
543 }
544