• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures::future;
2 use futures::stream;
3 use futures::task;
4 use futures_util::lock::BiLock;
5 use std::thread;
6 
7 // mod support;
8 // use support::*;
9 
10 #[test]
smoke()11 fn smoke() {
12     let future = future::lazy(|_| {
13         let (a, b) = BiLock::new(1);
14 
15         {
16             let mut lock = match a.poll_lock() {
17                 Poll::Ready(l) => l,
18                 Poll::Pending => panic!("poll not ready"),
19             };
20             assert_eq!(*lock, 1);
21             *lock = 2;
22 
23             assert!(b.poll_lock().is_pending());
24             assert!(a.poll_lock().is_pending());
25         }
26 
27         assert!(b.poll_lock().is_ready());
28         assert!(a.poll_lock().is_ready());
29 
30         {
31             let lock = match b.poll_lock() {
32                 Poll::Ready(l) => l,
33                 Poll::Pending => panic!("poll not ready"),
34             };
35             assert_eq!(*lock, 2);
36         }
37 
38         assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
39 
40         Ok::<(), ()>(())
41     });
42 
43     assert!(task::spawn(future)
44         .poll_future_notify(&notify_noop(), 0)
45         .expect("failure in poll")
46         .is_ready());
47 }
48 
49 #[test]
concurrent()50 fn concurrent() {
51     const N: usize = 10000;
52     let (a, b) = BiLock::new(0);
53 
54     let a = Increment { a: Some(a), remaining: N };
55     let b = stream::iter_ok(0..N).fold(b, |b, _n| {
56         b.lock().map(|mut b| {
57             *b += 1;
58             b.unlock()
59         })
60     });
61 
62     let t1 = thread::spawn(move || a.wait());
63     let b = b.wait().expect("b error");
64     let a = t1.join().unwrap().expect("a error");
65 
66     match a.poll_lock() {
67         Poll::Ready(l) => assert_eq!(*l, 2 * N),
68         Poll::Pending => panic!("poll not ready"),
69     }
70     match b.poll_lock() {
71         Poll::Ready(l) => assert_eq!(*l, 2 * N),
72         Poll::Pending => panic!("poll not ready"),
73     }
74 
75     assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
76 
77     struct Increment {
78         remaining: usize,
79         a: Option<BiLock<usize>>,
80     }
81 
82     impl Future for Increment {
83         type Item = BiLock<usize>;
84         type Error = ();
85 
86         fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
87             loop {
88                 if self.remaining == 0 {
89                     return Ok(self.a.take().unwrap().into());
90                 }
91 
92                 let a = self.a.as_ref().unwrap();
93                 let mut a = match a.poll_lock() {
94                     Poll::Ready(l) => l,
95                     Poll::Pending => return Ok(Poll::Pending),
96                 };
97                 self.remaining -= 1;
98                 *a += 1;
99             }
100         }
101     }
102 }
103