• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures::channel::oneshot;
2 use futures::executor::{block_on, LocalPool};
3 use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt};
4 use futures::task::LocalSpawn;
5 use std::cell::{Cell, RefCell};
6 use std::rc::Rc;
7 use std::task::Poll;
8 use std::thread;
9 
10 struct CountClone(Rc<Cell<i32>>);
11 
12 impl Clone for CountClone {
clone(&self) -> Self13     fn clone(&self) -> Self {
14         self.0.set(self.0.get() + 1);
15         Self(self.0.clone())
16     }
17 }
18 
send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32)19 fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) {
20     let (tx, rx) = oneshot::channel::<i32>();
21     let f = rx.shared();
22     let join_handles = (0..threads_number)
23         .map(|_| {
24             let cloned_future = f.clone();
25             thread::spawn(move || {
26                 assert_eq!(block_on(cloned_future).unwrap(), 6);
27             })
28         })
29         .collect::<Vec<_>>();
30 
31     tx.send(6).unwrap();
32 
33     assert_eq!(block_on(f).unwrap(), 6);
34     for join_handle in join_handles {
35         join_handle.join().unwrap();
36     }
37 }
38 
39 #[test]
one_thread()40 fn one_thread() {
41     send_shared_oneshot_and_wait_on_multiple_threads(1);
42 }
43 
44 #[test]
two_threads()45 fn two_threads() {
46     send_shared_oneshot_and_wait_on_multiple_threads(2);
47 }
48 
49 #[test]
many_threads()50 fn many_threads() {
51     send_shared_oneshot_and_wait_on_multiple_threads(1000);
52 }
53 
54 #[test]
drop_on_one_task_ok()55 fn drop_on_one_task_ok() {
56     let (tx, rx) = oneshot::channel::<u32>();
57     let f1 = rx.shared();
58     let f2 = f1.clone();
59 
60     let (tx2, rx2) = oneshot::channel::<u32>();
61 
62     let t1 = thread::spawn(|| {
63         let f = future::try_select(f1.map_err(|_| ()), rx2.map_err(|_| ()));
64         drop(block_on(f));
65     });
66 
67     let (tx3, rx3) = oneshot::channel::<u32>();
68 
69     let t2 = thread::spawn(|| {
70         let _ = block_on(f2.map_ok(|x| tx3.send(x).unwrap()).map_err(|_| ()));
71     });
72 
73     tx2.send(11).unwrap(); // cancel `f1`
74     t1.join().unwrap();
75 
76     tx.send(42).unwrap(); // Should cause `f2` and then `rx3` to get resolved.
77     let result = block_on(rx3).unwrap();
78     assert_eq!(result, 42);
79     t2.join().unwrap();
80 }
81 
82 #[test]
drop_in_poll()83 fn drop_in_poll() {
84     let slot1 = Rc::new(RefCell::new(None));
85     let slot2 = slot1.clone();
86 
87     let future1 = future::lazy(move |_| {
88         slot2.replace(None); // Drop future
89         1
90     })
91     .shared();
92 
93     let future2 = LocalFutureObj::new(Box::new(future1.clone()));
94     slot1.replace(Some(future2));
95 
96     assert_eq!(block_on(future1), 1);
97 }
98 
99 #[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
100 #[test]
peek()101 fn peek() {
102     let mut local_pool = LocalPool::new();
103     let spawn = &mut local_pool.spawner();
104 
105     let (tx0, rx0) = oneshot::channel::<i32>();
106     let f1 = rx0.shared();
107     let f2 = f1.clone();
108 
109     // Repeated calls on the original or clone do not change the outcome.
110     for _ in 0..2 {
111         assert!(f1.peek().is_none());
112         assert!(f2.peek().is_none());
113     }
114 
115     // Completing the underlying future has no effect, because the value has not been `poll`ed in.
116     tx0.send(42).unwrap();
117     for _ in 0..2 {
118         assert!(f1.peek().is_none());
119         assert!(f2.peek().is_none());
120     }
121 
122     // Once the Shared has been polled, the value is peekable on the clone.
123     spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap();
124     local_pool.run();
125     for _ in 0..2 {
126         assert_eq!(*f2.peek().unwrap(), Ok(42));
127     }
128 }
129 
130 #[test]
downgrade()131 fn downgrade() {
132     let (tx, rx) = oneshot::channel::<i32>();
133     let shared = rx.shared();
134     // Since there are outstanding `Shared`s, we can get a `WeakShared`.
135     let weak = shared.downgrade().unwrap();
136     // It should upgrade fine right now.
137     let mut shared2 = weak.upgrade().unwrap();
138 
139     tx.send(42).unwrap();
140     assert_eq!(block_on(shared).unwrap(), 42);
141 
142     // We should still be able to get a new `WeakShared` and upgrade it
143     // because `shared2` is outstanding.
144     assert!(shared2.downgrade().is_some());
145     assert!(weak.upgrade().is_some());
146 
147     assert_eq!(block_on(&mut shared2).unwrap(), 42);
148     // Now that all `Shared`s have been exhausted, we should not be able
149     // to get a new `WeakShared` or upgrade an existing one.
150     assert!(weak.upgrade().is_none());
151     assert!(shared2.downgrade().is_none());
152 }
153 
154 #[test]
dont_clone_in_single_owner_shared_future()155 fn dont_clone_in_single_owner_shared_future() {
156     let counter = CountClone(Rc::new(Cell::new(0)));
157     let (tx, rx) = oneshot::channel();
158 
159     let rx = rx.shared();
160 
161     tx.send(counter).ok().unwrap();
162 
163     assert_eq!(block_on(rx).unwrap().0.get(), 0);
164 }
165 
166 #[test]
dont_do_unnecessary_clones_on_output()167 fn dont_do_unnecessary_clones_on_output() {
168     let counter = CountClone(Rc::new(Cell::new(0)));
169     let (tx, rx) = oneshot::channel();
170 
171     let rx = rx.shared();
172 
173     tx.send(counter).ok().unwrap();
174 
175     assert_eq!(block_on(rx.clone()).unwrap().0.get(), 1);
176     assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2);
177     assert_eq!(block_on(rx).unwrap().0.get(), 2);
178 }
179 
180 #[test]
shared_future_that_wakes_itself_until_pending_is_returned()181 fn shared_future_that_wakes_itself_until_pending_is_returned() {
182     let proceed = Cell::new(false);
183     let fut = futures::future::poll_fn(|cx| {
184         if proceed.get() {
185             Poll::Ready(())
186         } else {
187             cx.waker().wake_by_ref();
188             Poll::Pending
189         }
190     })
191     .shared();
192 
193     // The join future can only complete if the second future gets a chance to run after the first
194     // has returned pending
195     assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ()));
196 }
197