1 // Copyright 2021 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::io; 6 7 use remain::sorted; 8 use thiserror::Error as ThisError; 9 10 #[cfg_attr(windows, path = "sys/windows/tube.rs")] 11 #[cfg_attr(not(windows), path = "sys/unix/tube.rs")] 12 mod tube; 13 use std::time::Duration; 14 15 use serde::de::DeserializeOwned; 16 use serde::Deserialize; 17 use serde::Serialize; 18 pub use tube::*; 19 20 impl Tube { 21 /// Creates a Send/Recv pair of Tubes. directional_pair() -> Result<(SendTube, RecvTube)>22 pub fn directional_pair() -> Result<(SendTube, RecvTube)> { 23 let (t1, t2) = Self::pair()?; 24 Ok((SendTube(t1), RecvTube(t2))) 25 } 26 try_clone_send_tube(&self) -> Result<SendTube>27 pub fn try_clone_send_tube(&self) -> Result<SendTube> { 28 // Safe because receiving is only allowed on original Tube. 29 #[allow(deprecated)] 30 let send_end = self.try_clone()?; 31 Ok(SendTube(send_end)) 32 } 33 } 34 35 use crate::AsRawDescriptor; 36 use crate::ReadNotifier; 37 38 #[derive(Serialize, Deserialize)] 39 #[serde(transparent)] 40 /// A Tube end which can only send messages. Cloneable. 41 pub struct SendTube(Tube); 42 43 #[allow(dead_code)] 44 impl SendTube { 45 /// TODO(b/145998747, b/184398671): this method should be removed. set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()>46 pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> { 47 unimplemented!("To be removed/refactored upstream."); 48 } 49 send<T: Serialize>(&self, msg: &T) -> Result<()>50 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> { 51 self.0.send(msg) 52 } 53 try_clone(&self) -> Result<Self>54 pub fn try_clone(&self) -> Result<Self> { 55 Ok(SendTube( 56 #[allow(deprecated)] 57 self.0.try_clone()?, 58 )) 59 } 60 61 /// Never call this function, it is for use by cros_async to provide 62 /// directional wrapper types only. Using it in any other context may 63 /// violate concurrency assumptions. (Type splitting across crates has put 64 /// us in a situation where we can't use Rust privacy to enforce this.) 65 #[deprecated] into_tube(self) -> Tube66 pub fn into_tube(self) -> Tube { 67 self.0 68 } 69 } 70 71 #[derive(Serialize, Deserialize)] 72 #[serde(transparent)] 73 /// A Tube end which can only recv messages. 74 pub struct RecvTube(Tube); 75 76 #[allow(dead_code)] 77 impl RecvTube { recv<T: DeserializeOwned>(&self) -> Result<T>78 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> { 79 self.0.recv() 80 } 81 82 /// TODO(b/145998747, b/184398671): this method should be removed. set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()>83 pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> { 84 unimplemented!("To be removed/refactored upstream."); 85 } 86 87 /// Never call this function, it is for use by cros_async to provide 88 /// directional wrapper types only. Using it in any other context may 89 /// violate concurrency assumptions. (Type splitting across crates has put 90 /// us in a situation where we can't use Rust privacy to enforce this.) 91 #[deprecated] into_tube(self) -> Tube92 pub fn into_tube(self) -> Tube { 93 self.0 94 } 95 } 96 97 impl ReadNotifier for RecvTube { get_read_notifier(&self) -> &dyn AsRawDescriptor98 fn get_read_notifier(&self) -> &dyn AsRawDescriptor { 99 self.0.get_read_notifier() 100 } 101 } 102 103 #[sorted] 104 #[derive(ThisError, Debug)] 105 pub enum Error { 106 #[cfg(windows)] 107 #[error("attempt to duplicate descriptor via broker failed")] 108 BrokerDupDescriptor, 109 #[error("failed to clone transport: {0}")] 110 Clone(io::Error), 111 #[error("tube was disconnected")] 112 Disconnected, 113 #[error("failed to duplicate descriptor: {0}")] 114 DupDescriptor(io::Error), 115 #[cfg(windows)] 116 #[error("failed to flush named pipe: {0}")] 117 Flush(io::Error), 118 #[cfg(unix)] 119 #[error("byte framing mode is not supported")] 120 InvalidFramingMode, 121 #[error("failed to serialize/deserialize json from packet: {0}")] 122 Json(serde_json::Error), 123 #[error("cancelled a queued async operation")] 124 OperationCancelled, 125 #[error("failed to crate tube pair: {0}")] 126 Pair(io::Error), 127 #[cfg(windows)] 128 #[error("encountered protobuf error: {0}")] 129 Proto(protobuf::ProtobufError), 130 #[error("failed to receive packet: {0}")] 131 Recv(io::Error), 132 #[error("Received a message with a zero sized body. This should not happen.")] 133 RecvUnexpectedEmptyBody, 134 #[error("failed to send packet: {0}")] 135 Send(crate::platform::Error), 136 #[error("failed to send packet: {0}")] 137 SendIo(io::Error), 138 #[error("failed to write packet to intermediate buffer: {0}")] 139 SendIoBuf(io::Error), 140 #[error("attempted to send too many file descriptors")] 141 SendTooManyFds, 142 #[error("failed to set recv timeout: {0}")] 143 SetRecvTimeout(io::Error), 144 #[error("failed to set send timeout: {0}")] 145 SetSendTimeout(io::Error), 146 } 147 148 pub type Result<T> = std::result::Result<T, Error>; 149