1 #[test]
successful_future()2 fn successful_future() {
3 use futures::executor::block_on_stream;
4 use futures::future::{ok, TryFutureExt};
5 use futures::stream::{self, StreamExt};
6
7 let stream_items = vec![17, 19];
8 let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok));
9
10 let stream = future_of_a_stream.try_flatten_stream();
11
12 let mut iter = block_on_stream(stream);
13 assert_eq!(Ok(17), iter.next().unwrap());
14 assert_eq!(Ok(19), iter.next().unwrap());
15 assert_eq!(None, iter.next());
16 }
17
18 #[test]
failed_future()19 fn failed_future() {
20 use core::marker::PhantomData;
21 use core::pin::Pin;
22 use futures::executor::block_on_stream;
23 use futures::future::{err, TryFutureExt};
24 use futures::stream::Stream;
25 use futures::task::{Context, Poll};
26
27 struct PanickingStream<T, E> {
28 _marker: PhantomData<(T, E)>
29 }
30
31 impl<T, E> Stream for PanickingStream<T, E> {
32 type Item = Result<T, E>;
33
34 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
35 panic!()
36 }
37 }
38
39 let future_of_a_stream = err::<PanickingStream<bool, u32>, _>(10);
40 let stream = future_of_a_stream.try_flatten_stream();
41 let mut iter = block_on_stream(stream);
42 assert_eq!(Err(10), iter.next().unwrap());
43 assert_eq!(None, iter.next());
44 }
45
46 #[test]
assert_impls()47 fn assert_impls() {
48 use core::marker::PhantomData;
49 use core::pin::Pin;
50 use futures::sink::Sink;
51 use futures::stream::Stream;
52 use futures::task::{Context, Poll};
53 use futures::future::{ok, TryFutureExt};
54
55 struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>);
56
57 impl<T, E, Item> Stream for StreamSink<T, E, Item> {
58 type Item = Result<T, E>;
59 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60 panic!()
61 }
62 }
63
64 impl<T, E, Item> Sink<Item> for StreamSink<T, E, Item> {
65 type Error = E;
66 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67 panic!()
68 }
69 fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> {
70 panic!()
71 }
72 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73 panic!()
74 }
75 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76 panic!()
77 }
78 }
79
80 fn assert_stream<S: Stream>(_: &S) {}
81 fn assert_sink<S: Sink<Item>, Item>(_: &S) {}
82 fn assert_stream_sink<S: Stream + Sink<Item>, Item>(_: &S) {}
83
84 let s = ok(StreamSink::<(), (), ()>(PhantomData)).try_flatten_stream();
85 assert_stream(&s);
86 assert_sink(&s);
87 assert_stream_sink(&s);
88 let s = ok(StreamSink::<(), (), ()>(PhantomData)).flatten_sink();
89 assert_stream(&s);
90 assert_sink(&s);
91 assert_stream_sink(&s);
92 }
93