1 use futures::future::{self, FutureExt, TryFutureExt}; 2 use futures_test::future::FutureTestExt; 3 use std::sync::mpsc; 4 5 #[test] basic_future_combinators()6fn basic_future_combinators() { 7 let (tx1, rx) = mpsc::channel(); 8 let tx2 = tx1.clone(); 9 let tx3 = tx1.clone(); 10 11 let fut = future::ready(1) 12 .then(move |x| { 13 tx1.send(x).unwrap(); // Send 1 14 tx1.send(2).unwrap(); // Send 2 15 future::ready(3) 16 }).map(move |x| { 17 tx2.send(x).unwrap(); // Send 3 18 tx2.send(4).unwrap(); // Send 4 19 5 20 }).map(move |x| { 21 tx3.send(x).unwrap(); // Send 5 22 }); 23 24 assert!(rx.try_recv().is_err()); // Not started yet 25 fut.run_in_background(); // Start it 26 for i in 1..=5 { assert_eq!(rx.recv(), Ok(i)); } // Check it 27 assert!(rx.recv().is_err()); // Should be done 28 } 29 30 #[test] basic_try_future_combinators()31fn basic_try_future_combinators() { 32 let (tx1, rx) = mpsc::channel(); 33 let tx2 = tx1.clone(); 34 let tx3 = tx1.clone(); 35 let tx4 = tx1.clone(); 36 let tx5 = tx1.clone(); 37 let tx6 = tx1.clone(); 38 let tx7 = tx1.clone(); 39 let tx8 = tx1.clone(); 40 let tx9 = tx1.clone(); 41 let tx10 = tx1.clone(); 42 43 let fut = future::ready(Ok(1)) 44 .and_then(move |x: i32| { 45 tx1.send(x).unwrap(); // Send 1 46 tx1.send(2).unwrap(); // Send 2 47 future::ready(Ok(3)) 48 }) 49 .or_else(move |x: i32| { 50 tx2.send(x).unwrap(); // Should not run 51 tx2.send(-1).unwrap(); 52 future::ready(Ok(-1)) 53 }) 54 .map_ok(move |x: i32| { 55 tx3.send(x).unwrap(); // Send 3 56 tx3.send(4).unwrap(); // Send 4 57 5 58 }) 59 .map_err(move |x: i32| { 60 tx4.send(x).unwrap(); // Should not run 61 tx4.send(-1).unwrap(); 62 -1 63 }) 64 .map(move |x: Result<i32, i32>| { 65 tx5.send(x.unwrap()).unwrap(); // Send 5 66 tx5.send(6).unwrap(); // Send 6 67 Err(7) // Now return errors! 68 }) 69 .and_then(move |x: i32| { 70 tx6.send(x).unwrap(); // Should not run 71 tx6.send(-1).unwrap(); 72 future::ready(Err(-1)) 73 }) 74 .or_else(move |x: i32| { 75 tx7.send(x).unwrap(); // Send 7 76 tx7.send(8).unwrap(); // Send 8 77 future::ready(Err(9)) 78 }) 79 .map_ok(move |x: i32| { 80 tx8.send(x).unwrap(); // Should not run 81 tx8.send(-1).unwrap(); 82 -1 83 }) 84 .map_err(move |x: i32| { 85 tx9.send(x).unwrap(); // Send 9 86 tx9.send(10).unwrap(); // Send 10 87 11 88 }) 89 .map(move |x: Result<i32, i32>| { 90 tx10.send(x.err().unwrap()).unwrap(); // Send 11 91 tx10.send(12).unwrap(); // Send 12 92 }); 93 94 assert!(rx.try_recv().is_err()); // Not started yet 95 fut.run_in_background(); // Start it 96 for i in 1..=12 { assert_eq!(rx.recv(), Ok(i)); } // Check it 97 assert!(rx.recv().is_err()); // Should be done 98 } 99