• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Asynchronous sinks
2 //!
3 //! This crate contains the `Sink` trait which allows values to be sent
4 //! asynchronously.
5 
6 #![cfg_attr(not(feature = "std"), no_std)]
7 #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms, unreachable_pub)]
8 // It cannot be included in the published code because this lints have false positives in the minimum required version.
9 #![cfg_attr(test, warn(single_use_lifetimes))]
10 #![doc(test(
11     no_crate_inject,
12     attr(
13         deny(warnings, rust_2018_idioms, single_use_lifetimes),
14         allow(dead_code, unused_assignments, unused_variables)
15     )
16 ))]
17 
18 #[cfg(feature = "alloc")]
19 extern crate alloc;
20 
21 use core::ops::DerefMut;
22 use core::pin::Pin;
23 use core::task::{Context, Poll};
24 
25 /// A `Sink` is a value into which other values can be sent, asynchronously.
26 ///
27 /// Basic examples of sinks include the sending side of:
28 ///
29 /// - Channels
30 /// - Sockets
31 /// - Pipes
32 ///
33 /// In addition to such "primitive" sinks, it's typical to layer additional
34 /// functionality, such as buffering, on top of an existing sink.
35 ///
36 /// Sending to a sink is "asynchronous" in the sense that the value may not be
37 /// sent in its entirety immediately. Instead, values are sent in a two-phase
38 /// way: first by initiating a send, and then by polling for completion. This
39 /// two-phase setup is analogous to buffered writing in synchronous code, where
40 /// writes often succeed immediately, but internally are buffered and are
41 /// *actually* written only upon flushing.
42 ///
43 /// In addition, the `Sink` may be *full*, in which case it is not even possible
44 /// to start the sending process.
45 ///
46 /// As with `Future` and `Stream`, the `Sink` trait is built from a few core
47 /// required methods, and a host of default methods for working in a
48 /// higher-level way. The `Sink::send_all` combinator is of particular
49 /// importance: you can use it to send an entire stream to a sink, which is
50 /// the simplest way to ultimately consume a stream.
51 #[must_use = "sinks do nothing unless polled"]
52 pub trait Sink<Item> {
53     /// The type of value produced by the sink when an error occurs.
54     type Error;
55 
56     /// Attempts to prepare the `Sink` to receive a value.
57     ///
58     /// This method must be called and return `Poll::Ready(Ok(()))` prior to
59     /// each call to `start_send`.
60     ///
61     /// This method returns `Poll::Ready` once the underlying sink is ready to
62     /// receive data. If this method returns `Poll::Pending`, the current task
63     /// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready`
64     /// should be called again.
65     ///
66     /// In most cases, if the sink encounters an error, the sink will
67     /// permanently be unable to receive items.
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>68     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
69 
70     /// Begin the process of sending a value to the sink.
71     /// Each call to this function must be preceded by a successful call to
72     /// `poll_ready` which returned `Poll::Ready(Ok(()))`.
73     ///
74     /// As the name suggests, this method only *begins* the process of sending
75     /// the item. If the sink employs buffering, the item isn't fully processed
76     /// until the buffer is fully flushed. Since sinks are designed to work with
77     /// asynchronous I/O, the process of actually writing out the data to an
78     /// underlying object takes place asynchronously. **You *must* use
79     /// `poll_flush` or `poll_close` in order to guarantee completion of a
80     /// send**.
81     ///
82     /// Implementations of `poll_ready` and `start_send` will usually involve
83     /// flushing behind the scenes in order to make room for new messages.
84     /// It is only necessary to call `poll_flush` if you need to guarantee that
85     /// *all* of the items placed into the `Sink` have been sent.
86     ///
87     /// In most cases, if the sink encounters an error, the sink will
88     /// permanently be unable to receive items.
start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>89     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
90 
91     /// Flush any remaining output from this sink.
92     ///
93     /// Returns `Poll::Ready(Ok(()))` when no buffered items remain. If this
94     /// value is returned then it is guaranteed that all previous values sent
95     /// via `start_send` have been flushed.
96     ///
97     /// Returns `Poll::Pending` if there is more work left to do, in which
98     /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
99     /// `poll_flush` should be called again.
100     ///
101     /// In most cases, if the sink encounters an error, the sink will
102     /// permanently be unable to receive items.
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>103     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
104 
105     /// Flush any remaining output and close this sink, if necessary.
106     ///
107     /// Returns `Poll::Ready(Ok(()))` when no buffered items remain and the sink
108     /// has been successfully closed.
109     ///
110     /// Returns `Poll::Pending` if there is more work left to do, in which
111     /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
112     /// `poll_close` should be called again.
113     ///
114     /// If this function encounters an error, the sink should be considered to
115     /// have failed permanently, and no more `Sink` methods should be called.
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>116     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
117 }
118 
119 impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for &mut S {
120     type Error = S::Error;
121 
poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>122     fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
123         Pin::new(&mut **self).poll_ready(cx)
124     }
125 
start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>126     fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
127         Pin::new(&mut **self).start_send(item)
128     }
129 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>130     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
131         Pin::new(&mut **self).poll_flush(cx)
132     }
133 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>134     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
135         Pin::new(&mut **self).poll_close(cx)
136     }
137 }
138 
139 impl<P, Item> Sink<Item> for Pin<P>
140 where
141     P: DerefMut + Unpin,
142     P::Target: Sink<Item>,
143 {
144     type Error = <P::Target as Sink<Item>>::Error;
145 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>146     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147         self.get_mut().as_mut().poll_ready(cx)
148     }
149 
start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>150     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
151         self.get_mut().as_mut().start_send(item)
152     }
153 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>154     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155         self.get_mut().as_mut().poll_flush(cx)
156     }
157 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>158     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
159         self.get_mut().as_mut().poll_close(cx)
160     }
161 }
162 
163 #[cfg(feature = "alloc")]
164 mod if_alloc {
165     use super::*;
166     use core::convert::Infallible as Never;
167 
168     impl<T> Sink<T> for alloc::vec::Vec<T> {
169         type Error = Never;
170 
poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>171         fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
172             Poll::Ready(Ok(()))
173         }
174 
start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error>175         fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
176             // TODO: impl<T> Unpin for Vec<T> {}
177             unsafe { self.get_unchecked_mut() }.push(item);
178             Ok(())
179         }
180 
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>181         fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182             Poll::Ready(Ok(()))
183         }
184 
poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>185         fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
186             Poll::Ready(Ok(()))
187         }
188     }
189 
190     impl<T> Sink<T> for alloc::collections::VecDeque<T> {
191         type Error = Never;
192 
poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>193         fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194             Poll::Ready(Ok(()))
195         }
196 
start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error>197         fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
198             // TODO: impl<T> Unpin for Vec<T> {}
199             unsafe { self.get_unchecked_mut() }.push_back(item);
200             Ok(())
201         }
202 
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>203         fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
204             Poll::Ready(Ok(()))
205         }
206 
poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>207         fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
208             Poll::Ready(Ok(()))
209         }
210     }
211 
212     impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for alloc::boxed::Box<S> {
213         type Error = S::Error;
214 
poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>215         fn poll_ready(
216             mut self: Pin<&mut Self>,
217             cx: &mut Context<'_>,
218         ) -> Poll<Result<(), Self::Error>> {
219             Pin::new(&mut **self).poll_ready(cx)
220         }
221 
start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>222         fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
223             Pin::new(&mut **self).start_send(item)
224         }
225 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>226         fn poll_flush(
227             mut self: Pin<&mut Self>,
228             cx: &mut Context<'_>,
229         ) -> Poll<Result<(), Self::Error>> {
230             Pin::new(&mut **self).poll_flush(cx)
231         }
232 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>233         fn poll_close(
234             mut self: Pin<&mut Self>,
235             cx: &mut Context<'_>,
236         ) -> Poll<Result<(), Self::Error>> {
237             Pin::new(&mut **self).poll_close(cx)
238         }
239     }
240 }
241