• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #[test]
successful_future()2 fn successful_future() {
3     use futures::executor::block_on_stream;
4     use futures::future::{ok, TryFutureExt};
5     use futures::stream::{self, StreamExt};
6 
7     let stream_items = vec![17, 19];
8     let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok));
9 
10     let stream = future_of_a_stream.try_flatten_stream();
11 
12     let mut iter = block_on_stream(stream);
13     assert_eq!(Ok(17), iter.next().unwrap());
14     assert_eq!(Ok(19), iter.next().unwrap());
15     assert_eq!(None, iter.next());
16 }
17 
18 #[test]
failed_future()19 fn failed_future() {
20     use core::marker::PhantomData;
21     use core::pin::Pin;
22     use futures::executor::block_on_stream;
23     use futures::future::{err, TryFutureExt};
24     use futures::stream::Stream;
25     use futures::task::{Context, Poll};
26 
27     struct PanickingStream<T, E> {
28         _marker: PhantomData<(T, E)>
29     }
30 
31     impl<T, E> Stream for PanickingStream<T, E> {
32         type Item = Result<T, E>;
33 
34         fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
35             panic!()
36         }
37     }
38 
39     let future_of_a_stream = err::<PanickingStream<bool, u32>, _>(10);
40     let stream = future_of_a_stream.try_flatten_stream();
41     let mut iter = block_on_stream(stream);
42     assert_eq!(Err(10), iter.next().unwrap());
43     assert_eq!(None, iter.next());
44 }
45 
46 #[test]
assert_impls()47 fn assert_impls() {
48     use core::marker::PhantomData;
49     use core::pin::Pin;
50     use futures::sink::Sink;
51     use futures::stream::Stream;
52     use futures::task::{Context, Poll};
53     use futures::future::{ok, TryFutureExt};
54 
55     struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>);
56 
57     impl<T, E, Item> Stream for StreamSink<T, E, Item> {
58         type Item = Result<T, E>;
59         fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60             panic!()
61         }
62     }
63 
64     impl<T, E, Item> Sink<Item> for StreamSink<T, E, Item> {
65         type Error = E;
66         fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67             panic!()
68         }
69         fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> {
70             panic!()
71         }
72         fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73             panic!()
74         }
75         fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76             panic!()
77         }
78     }
79 
80     fn assert_stream<S: Stream>(_: &S) {}
81     fn assert_sink<S: Sink<Item>, Item>(_: &S) {}
82     fn assert_stream_sink<S: Stream + Sink<Item>, Item>(_: &S) {}
83 
84     let s = ok(StreamSink::<(), (), ()>(PhantomData)).try_flatten_stream();
85     assert_stream(&s);
86     assert_sink(&s);
87     assert_stream_sink(&s);
88     let s = ok(StreamSink::<(), (), ()>(PhantomData)).flatten_sink();
89     assert_stream(&s);
90     assert_sink(&s);
91     assert_stream_sink(&s);
92 }
93