• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io;
6 
7 use remain::sorted;
8 use thiserror::Error as ThisError;
9 
10 #[cfg_attr(windows, path = "sys/windows/tube.rs")]
11 #[cfg_attr(not(windows), path = "sys/unix/tube.rs")]
12 mod tube;
13 use std::time::Duration;
14 
15 use serde::de::DeserializeOwned;
16 use serde::Deserialize;
17 use serde::Serialize;
18 pub use tube::*;
19 
20 impl Tube {
21     /// Creates a Send/Recv pair of Tubes.
directional_pair() -> Result<(SendTube, RecvTube)>22     pub fn directional_pair() -> Result<(SendTube, RecvTube)> {
23         let (t1, t2) = Self::pair()?;
24         Ok((SendTube(t1), RecvTube(t2)))
25     }
26 
try_clone_send_tube(&self) -> Result<SendTube>27     pub fn try_clone_send_tube(&self) -> Result<SendTube> {
28         // Safe because receiving is only allowed on original Tube.
29         #[allow(deprecated)]
30         let send_end = self.try_clone()?;
31         Ok(SendTube(send_end))
32     }
33 }
34 
35 use crate::AsRawDescriptor;
36 use crate::ReadNotifier;
37 
38 #[derive(Serialize, Deserialize)]
39 #[serde(transparent)]
40 /// A Tube end which can only send messages. Cloneable.
41 pub struct SendTube(Tube);
42 
43 #[allow(dead_code)]
44 impl SendTube {
45     /// TODO(b/145998747, b/184398671): this method should be removed.
set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()>46     pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
47         unimplemented!("To be removed/refactored upstream.");
48     }
49 
send<T: Serialize>(&self, msg: &T) -> Result<()>50     pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
51         self.0.send(msg)
52     }
53 
try_clone(&self) -> Result<Self>54     pub fn try_clone(&self) -> Result<Self> {
55         Ok(SendTube(
56             #[allow(deprecated)]
57             self.0.try_clone()?,
58         ))
59     }
60 
61     /// Never call this function, it is for use by cros_async to provide
62     /// directional wrapper types only. Using it in any other context may
63     /// violate concurrency assumptions. (Type splitting across crates has put
64     /// us in a situation where we can't use Rust privacy to enforce this.)
65     #[deprecated]
into_tube(self) -> Tube66     pub fn into_tube(self) -> Tube {
67         self.0
68     }
69 }
70 
71 #[derive(Serialize, Deserialize)]
72 #[serde(transparent)]
73 /// A Tube end which can only recv messages.
74 pub struct RecvTube(Tube);
75 
76 #[allow(dead_code)]
77 impl RecvTube {
recv<T: DeserializeOwned>(&self) -> Result<T>78     pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
79         self.0.recv()
80     }
81 
82     /// TODO(b/145998747, b/184398671): this method should be removed.
set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()>83     pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
84         unimplemented!("To be removed/refactored upstream.");
85     }
86 
87     /// Never call this function, it is for use by cros_async to provide
88     /// directional wrapper types only. Using it in any other context may
89     /// violate concurrency assumptions. (Type splitting across crates has put
90     /// us in a situation where we can't use Rust privacy to enforce this.)
91     #[deprecated]
into_tube(self) -> Tube92     pub fn into_tube(self) -> Tube {
93         self.0
94     }
95 }
96 
97 impl ReadNotifier for RecvTube {
get_read_notifier(&self) -> &dyn AsRawDescriptor98     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
99         self.0.get_read_notifier()
100     }
101 }
102 
103 #[sorted]
104 #[derive(ThisError, Debug)]
105 pub enum Error {
106     #[cfg(windows)]
107     #[error("attempt to duplicate descriptor via broker failed")]
108     BrokerDupDescriptor,
109     #[error("failed to clone transport: {0}")]
110     Clone(io::Error),
111     #[error("tube was disconnected")]
112     Disconnected,
113     #[error("failed to duplicate descriptor: {0}")]
114     DupDescriptor(io::Error),
115     #[cfg(windows)]
116     #[error("failed to flush named pipe: {0}")]
117     Flush(io::Error),
118     #[cfg(unix)]
119     #[error("byte framing mode is not supported")]
120     InvalidFramingMode,
121     #[error("failed to serialize/deserialize json from packet: {0}")]
122     Json(serde_json::Error),
123     #[error("cancelled a queued async operation")]
124     OperationCancelled,
125     #[error("failed to crate tube pair: {0}")]
126     Pair(io::Error),
127     #[cfg(windows)]
128     #[error("encountered protobuf error: {0}")]
129     Proto(protobuf::ProtobufError),
130     #[error("failed to receive packet: {0}")]
131     Recv(io::Error),
132     #[error("Received a message with a zero sized body. This should not happen.")]
133     RecvUnexpectedEmptyBody,
134     #[error("failed to send packet: {0}")]
135     Send(crate::platform::Error),
136     #[error("failed to send packet: {0}")]
137     SendIo(io::Error),
138     #[error("failed to write packet to intermediate buffer: {0}")]
139     SendIoBuf(io::Error),
140     #[error("attempted to send too many file descriptors")]
141     SendTooManyFds,
142     #[error("failed to set recv timeout: {0}")]
143     SetRecvTimeout(io::Error),
144     #[error("failed to set send timeout: {0}")]
145     SetSendTimeout(io::Error),
146 }
147 
148 pub type Result<T> = std::result::Result<T, Error>;
149