• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Tests for the after channel flavor.
2 
3 #![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow
4 
5 use std::sync::atomic::AtomicUsize;
6 use std::sync::atomic::Ordering;
7 use std::thread;
8 use std::time::{Duration, Instant};
9 
10 use crossbeam_channel::{after, select, Select, TryRecvError};
11 use crossbeam_utils::thread::scope;
12 
ms(ms: u64) -> Duration13 fn ms(ms: u64) -> Duration {
14     Duration::from_millis(ms)
15 }
16 
17 #[test]
fire()18 fn fire() {
19     let start = Instant::now();
20     let r = after(ms(50));
21 
22     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
23     thread::sleep(ms(100));
24 
25     let fired = r.try_recv().unwrap();
26     assert!(start < fired);
27     assert!(fired - start >= ms(50));
28 
29     let now = Instant::now();
30     assert!(fired < now);
31     assert!(now - fired >= ms(50));
32 
33     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
34 
35     select! {
36         recv(r) -> _ => panic!(),
37         default => {}
38     }
39 
40     select! {
41         recv(r) -> _ => panic!(),
42         recv(after(ms(200))) -> _ => {}
43     }
44 }
45 
46 #[test]
capacity()47 fn capacity() {
48     const COUNT: usize = 10;
49 
50     for i in 0..COUNT {
51         let r = after(ms(i as u64));
52         assert_eq!(r.capacity(), Some(1));
53     }
54 }
55 
56 #[test]
len_empty_full()57 fn len_empty_full() {
58     let r = after(ms(50));
59 
60     assert_eq!(r.len(), 0);
61     assert!(r.is_empty());
62     assert!(!r.is_full());
63 
64     thread::sleep(ms(100));
65 
66     assert_eq!(r.len(), 1);
67     assert!(!r.is_empty());
68     assert!(r.is_full());
69 
70     r.try_recv().unwrap();
71 
72     assert_eq!(r.len(), 0);
73     assert!(r.is_empty());
74     assert!(!r.is_full());
75 }
76 
77 #[test]
try_recv()78 fn try_recv() {
79     let r = after(ms(200));
80     assert!(r.try_recv().is_err());
81 
82     thread::sleep(ms(100));
83     assert!(r.try_recv().is_err());
84 
85     thread::sleep(ms(200));
86     assert!(r.try_recv().is_ok());
87     assert!(r.try_recv().is_err());
88 
89     thread::sleep(ms(200));
90     assert!(r.try_recv().is_err());
91 }
92 
93 #[test]
recv()94 fn recv() {
95     let start = Instant::now();
96     let r = after(ms(50));
97 
98     let fired = r.recv().unwrap();
99     assert!(start < fired);
100     assert!(fired - start >= ms(50));
101 
102     let now = Instant::now();
103     assert!(fired < now);
104     assert!(now - fired < fired - start);
105 
106     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
107 }
108 
109 #[test]
recv_timeout()110 fn recv_timeout() {
111     let start = Instant::now();
112     let r = after(ms(200));
113 
114     assert!(r.recv_timeout(ms(100)).is_err());
115     let now = Instant::now();
116     assert!(now - start >= ms(100));
117     assert!(now - start <= ms(150));
118 
119     let fired = r.recv_timeout(ms(200)).unwrap();
120     assert!(fired - start >= ms(200));
121     assert!(fired - start <= ms(250));
122 
123     assert!(r.recv_timeout(ms(200)).is_err());
124     let now = Instant::now();
125     assert!(now - start >= ms(400));
126     assert!(now - start <= ms(450));
127 
128     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
129 }
130 
131 #[test]
recv_two()132 fn recv_two() {
133     let r1 = after(ms(50));
134     let r2 = after(ms(50));
135 
136     scope(|scope| {
137         scope.spawn(|_| {
138             select! {
139                 recv(r1) -> _ => {}
140                 recv(r2) -> _ => {}
141             }
142         });
143         scope.spawn(|_| {
144             select! {
145                 recv(r1) -> _ => {}
146                 recv(r2) -> _ => {}
147             }
148         });
149     })
150     .unwrap();
151 }
152 
153 #[test]
recv_race()154 fn recv_race() {
155     select! {
156         recv(after(ms(50))) -> _ => {}
157         recv(after(ms(100))) -> _ => panic!(),
158     }
159 
160     select! {
161         recv(after(ms(100))) -> _ => panic!(),
162         recv(after(ms(50))) -> _ => {}
163     }
164 }
165 
166 #[test]
stress_default()167 fn stress_default() {
168     const COUNT: usize = 10;
169 
170     for _ in 0..COUNT {
171         select! {
172             recv(after(ms(0))) -> _ => {}
173             default => panic!(),
174         }
175     }
176 
177     for _ in 0..COUNT {
178         select! {
179             recv(after(ms(100))) -> _ => panic!(),
180             default => {}
181         }
182     }
183 }
184 
185 #[test]
select()186 fn select() {
187     const THREADS: usize = 4;
188     const COUNT: usize = 1000;
189     const TIMEOUT_MS: u64 = 100;
190 
191     let v = (0..COUNT)
192         .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2)))
193         .collect::<Vec<_>>();
194     let hits = AtomicUsize::new(0);
195 
196     scope(|scope| {
197         for _ in 0..THREADS {
198             scope.spawn(|_| {
199                 let v: Vec<&_> = v.iter().collect();
200 
201                 loop {
202                     let timeout = after(ms(TIMEOUT_MS));
203                     let mut sel = Select::new();
204                     for r in &v {
205                         sel.recv(r);
206                     }
207                     let oper_timeout = sel.recv(&timeout);
208 
209                     let oper = sel.select();
210                     match oper.index() {
211                         i if i == oper_timeout => {
212                             oper.recv(&timeout).unwrap();
213                             break;
214                         }
215                         i => {
216                             oper.recv(v[i]).unwrap();
217                             hits.fetch_add(1, Ordering::SeqCst);
218                         }
219                     }
220                 }
221             });
222         }
223     })
224     .unwrap();
225 
226     assert_eq!(hits.load(Ordering::SeqCst), COUNT);
227 }
228 
229 #[test]
ready()230 fn ready() {
231     const THREADS: usize = 4;
232     const COUNT: usize = 1000;
233     const TIMEOUT_MS: u64 = 100;
234 
235     let v = (0..COUNT)
236         .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2)))
237         .collect::<Vec<_>>();
238     let hits = AtomicUsize::new(0);
239 
240     scope(|scope| {
241         for _ in 0..THREADS {
242             scope.spawn(|_| {
243                 let v: Vec<&_> = v.iter().collect();
244 
245                 loop {
246                     let timeout = after(ms(TIMEOUT_MS));
247                     let mut sel = Select::new();
248                     for r in &v {
249                         sel.recv(r);
250                     }
251                     let oper_timeout = sel.recv(&timeout);
252 
253                     loop {
254                         let i = sel.ready();
255                         if i == oper_timeout {
256                             timeout.try_recv().unwrap();
257                             return;
258                         } else if v[i].try_recv().is_ok() {
259                             hits.fetch_add(1, Ordering::SeqCst);
260                             break;
261                         }
262                     }
263                 }
264             });
265         }
266     })
267     .unwrap();
268 
269     assert_eq!(hits.load(Ordering::SeqCst), COUNT);
270 }
271 
272 #[test]
stress_clone()273 fn stress_clone() {
274     const RUNS: usize = 1000;
275     const THREADS: usize = 10;
276     const COUNT: usize = 50;
277 
278     for i in 0..RUNS {
279         let r = after(ms(i as u64));
280 
281         scope(|scope| {
282             for _ in 0..THREADS {
283                 scope.spawn(|_| {
284                     let r = r.clone();
285                     let _ = r.try_recv();
286 
287                     for _ in 0..COUNT {
288                         drop(r.clone());
289                         thread::yield_now();
290                     }
291                 });
292             }
293         })
294         .unwrap();
295     }
296 }
297 
298 #[test]
fairness()299 fn fairness() {
300     const COUNT: usize = 1000;
301 
302     for &dur in &[0, 1] {
303         let mut hits = [0usize; 2];
304 
305         for _ in 0..COUNT {
306             select! {
307                 recv(after(ms(dur))) -> _ => hits[0] += 1,
308                 recv(after(ms(dur))) -> _ => hits[1] += 1,
309             }
310         }
311 
312         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
313     }
314 }
315 
316 #[test]
fairness_duplicates()317 fn fairness_duplicates() {
318     const COUNT: usize = 1000;
319 
320     for &dur in &[0, 1] {
321         let mut hits = [0usize; 5];
322 
323         for _ in 0..COUNT {
324             let r = after(ms(dur));
325             select! {
326                 recv(r) -> _ => hits[0] += 1,
327                 recv(r) -> _ => hits[1] += 1,
328                 recv(r) -> _ => hits[2] += 1,
329                 recv(r) -> _ => hits[3] += 1,
330                 recv(r) -> _ => hits[4] += 1,
331             }
332         }
333 
334         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
335     }
336 }
337