1 //! Asynchronous sinks 2 //! 3 //! This crate contains the `Sink` trait which allows values to be sent 4 //! asynchronously. 5 6 #![cfg_attr(not(feature = "std"), no_std)] 7 #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms, unreachable_pub)] 8 // It cannot be included in the published code because this lints have false positives in the minimum required version. 9 #![cfg_attr(test, warn(single_use_lifetimes))] 10 #![doc(test( 11 no_crate_inject, 12 attr( 13 deny(warnings, rust_2018_idioms, single_use_lifetimes), 14 allow(dead_code, unused_assignments, unused_variables) 15 ) 16 ))] 17 18 #[cfg(feature = "alloc")] 19 extern crate alloc; 20 21 use core::ops::DerefMut; 22 use core::pin::Pin; 23 use core::task::{Context, Poll}; 24 25 /// A `Sink` is a value into which other values can be sent, asynchronously. 26 /// 27 /// Basic examples of sinks include the sending side of: 28 /// 29 /// - Channels 30 /// - Sockets 31 /// - Pipes 32 /// 33 /// In addition to such "primitive" sinks, it's typical to layer additional 34 /// functionality, such as buffering, on top of an existing sink. 35 /// 36 /// Sending to a sink is "asynchronous" in the sense that the value may not be 37 /// sent in its entirety immediately. Instead, values are sent in a two-phase 38 /// way: first by initiating a send, and then by polling for completion. This 39 /// two-phase setup is analogous to buffered writing in synchronous code, where 40 /// writes often succeed immediately, but internally are buffered and are 41 /// *actually* written only upon flushing. 42 /// 43 /// In addition, the `Sink` may be *full*, in which case it is not even possible 44 /// to start the sending process. 45 /// 46 /// As with `Future` and `Stream`, the `Sink` trait is built from a few core 47 /// required methods, and a host of default methods for working in a 48 /// higher-level way. The `Sink::send_all` combinator is of particular 49 /// importance: you can use it to send an entire stream to a sink, which is 50 /// the simplest way to ultimately consume a stream. 51 #[must_use = "sinks do nothing unless polled"] 52 pub trait Sink<Item> { 53 /// The type of value produced by the sink when an error occurs. 54 type Error; 55 56 /// Attempts to prepare the `Sink` to receive a value. 57 /// 58 /// This method must be called and return `Poll::Ready(Ok(()))` prior to 59 /// each call to `start_send`. 60 /// 61 /// This method returns `Poll::Ready` once the underlying sink is ready to 62 /// receive data. If this method returns `Poll::Pending`, the current task 63 /// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready` 64 /// should be called again. 65 /// 66 /// In most cases, if the sink encounters an error, the sink will 67 /// permanently be unable to receive items. poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>68 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; 69 70 /// Begin the process of sending a value to the sink. 71 /// Each call to this function must be preceded by a successful call to 72 /// `poll_ready` which returned `Poll::Ready(Ok(()))`. 73 /// 74 /// As the name suggests, this method only *begins* the process of sending 75 /// the item. If the sink employs buffering, the item isn't fully processed 76 /// until the buffer is fully flushed. Since sinks are designed to work with 77 /// asynchronous I/O, the process of actually writing out the data to an 78 /// underlying object takes place asynchronously. **You *must* use 79 /// `poll_flush` or `poll_close` in order to guarantee completion of a 80 /// send**. 81 /// 82 /// Implementations of `poll_ready` and `start_send` will usually involve 83 /// flushing behind the scenes in order to make room for new messages. 84 /// It is only necessary to call `poll_flush` if you need to guarantee that 85 /// *all* of the items placed into the `Sink` have been sent. 86 /// 87 /// In most cases, if the sink encounters an error, the sink will 88 /// permanently be unable to receive items. start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>89 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>; 90 91 /// Flush any remaining output from this sink. 92 /// 93 /// Returns `Poll::Ready(Ok(()))` when no buffered items remain. If this 94 /// value is returned then it is guaranteed that all previous values sent 95 /// via `start_send` have been flushed. 96 /// 97 /// Returns `Poll::Pending` if there is more work left to do, in which 98 /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when 99 /// `poll_flush` should be called again. 100 /// 101 /// In most cases, if the sink encounters an error, the sink will 102 /// permanently be unable to receive items. poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>103 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; 104 105 /// Flush any remaining output and close this sink, if necessary. 106 /// 107 /// Returns `Poll::Ready(Ok(()))` when no buffered items remain and the sink 108 /// has been successfully closed. 109 /// 110 /// Returns `Poll::Pending` if there is more work left to do, in which 111 /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when 112 /// `poll_close` should be called again. 113 /// 114 /// If this function encounters an error, the sink should be considered to 115 /// have failed permanently, and no more `Sink` methods should be called. poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>116 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; 117 } 118 119 impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for &mut S { 120 type Error = S::Error; 121 poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>122 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 123 Pin::new(&mut **self).poll_ready(cx) 124 } 125 start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>126 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { 127 Pin::new(&mut **self).start_send(item) 128 } 129 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>130 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 131 Pin::new(&mut **self).poll_flush(cx) 132 } 133 poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>134 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 135 Pin::new(&mut **self).poll_close(cx) 136 } 137 } 138 139 impl<P, Item> Sink<Item> for Pin<P> 140 where 141 P: DerefMut + Unpin, 142 P::Target: Sink<Item>, 143 { 144 type Error = <P::Target as Sink<Item>>::Error; 145 poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>146 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 147 self.get_mut().as_mut().poll_ready(cx) 148 } 149 start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>150 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { 151 self.get_mut().as_mut().start_send(item) 152 } 153 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>154 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 155 self.get_mut().as_mut().poll_flush(cx) 156 } 157 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>158 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 159 self.get_mut().as_mut().poll_close(cx) 160 } 161 } 162 163 #[cfg(feature = "alloc")] 164 mod if_alloc { 165 use super::*; 166 use core::convert::Infallible as Never; 167 168 impl<T> Sink<T> for alloc::vec::Vec<T> { 169 type Error = Never; 170 poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>171 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 172 Poll::Ready(Ok(())) 173 } 174 start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error>175 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { 176 // TODO: impl<T> Unpin for Vec<T> {} 177 unsafe { self.get_unchecked_mut() }.push(item); 178 Ok(()) 179 } 180 poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>181 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 182 Poll::Ready(Ok(())) 183 } 184 poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>185 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 186 Poll::Ready(Ok(())) 187 } 188 } 189 190 impl<T> Sink<T> for alloc::collections::VecDeque<T> { 191 type Error = Never; 192 poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>193 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 194 Poll::Ready(Ok(())) 195 } 196 start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error>197 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { 198 // TODO: impl<T> Unpin for Vec<T> {} 199 unsafe { self.get_unchecked_mut() }.push_back(item); 200 Ok(()) 201 } 202 poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>203 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 204 Poll::Ready(Ok(())) 205 } 206 poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>207 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 208 Poll::Ready(Ok(())) 209 } 210 } 211 212 impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for alloc::boxed::Box<S> { 213 type Error = S::Error; 214 poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>215 fn poll_ready( 216 mut self: Pin<&mut Self>, 217 cx: &mut Context<'_>, 218 ) -> Poll<Result<(), Self::Error>> { 219 Pin::new(&mut **self).poll_ready(cx) 220 } 221 start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>222 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { 223 Pin::new(&mut **self).start_send(item) 224 } 225 poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>226 fn poll_flush( 227 mut self: Pin<&mut Self>, 228 cx: &mut Context<'_>, 229 ) -> Poll<Result<(), Self::Error>> { 230 Pin::new(&mut **self).poll_flush(cx) 231 } 232 poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>233 fn poll_close( 234 mut self: Pin<&mut Self>, 235 cx: &mut Context<'_>, 236 ) -> Poll<Result<(), Self::Error>> { 237 Pin::new(&mut **self).poll_close(cx) 238 } 239 } 240 } 241