1 //! Tests for the after channel flavor.
2
3 #![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow
4
5 use std::sync::atomic::AtomicUsize;
6 use std::sync::atomic::Ordering;
7 use std::thread;
8 use std::time::{Duration, Instant};
9
10 use crossbeam_channel::{after, select, Select, TryRecvError};
11 use crossbeam_utils::thread::scope;
12
ms(ms: u64) -> Duration13 fn ms(ms: u64) -> Duration {
14 Duration::from_millis(ms)
15 }
16
17 #[test]
fire()18 fn fire() {
19 let start = Instant::now();
20 let r = after(ms(50));
21
22 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
23 thread::sleep(ms(100));
24
25 let fired = r.try_recv().unwrap();
26 assert!(start < fired);
27 assert!(fired - start >= ms(50));
28
29 let now = Instant::now();
30 assert!(fired < now);
31 assert!(now - fired >= ms(50));
32
33 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
34
35 select! {
36 recv(r) -> _ => panic!(),
37 default => {}
38 }
39
40 select! {
41 recv(r) -> _ => panic!(),
42 recv(after(ms(200))) -> _ => {}
43 }
44 }
45
46 #[test]
capacity()47 fn capacity() {
48 const COUNT: usize = 10;
49
50 for i in 0..COUNT {
51 let r = after(ms(i as u64));
52 assert_eq!(r.capacity(), Some(1));
53 }
54 }
55
56 #[test]
len_empty_full()57 fn len_empty_full() {
58 let r = after(ms(50));
59
60 assert_eq!(r.len(), 0);
61 assert!(r.is_empty());
62 assert!(!r.is_full());
63
64 thread::sleep(ms(100));
65
66 assert_eq!(r.len(), 1);
67 assert!(!r.is_empty());
68 assert!(r.is_full());
69
70 r.try_recv().unwrap();
71
72 assert_eq!(r.len(), 0);
73 assert!(r.is_empty());
74 assert!(!r.is_full());
75 }
76
77 #[test]
try_recv()78 fn try_recv() {
79 let r = after(ms(200));
80 assert!(r.try_recv().is_err());
81
82 thread::sleep(ms(100));
83 assert!(r.try_recv().is_err());
84
85 thread::sleep(ms(200));
86 assert!(r.try_recv().is_ok());
87 assert!(r.try_recv().is_err());
88
89 thread::sleep(ms(200));
90 assert!(r.try_recv().is_err());
91 }
92
93 #[test]
recv()94 fn recv() {
95 let start = Instant::now();
96 let r = after(ms(50));
97
98 let fired = r.recv().unwrap();
99 assert!(start < fired);
100 assert!(fired - start >= ms(50));
101
102 let now = Instant::now();
103 assert!(fired < now);
104 assert!(now - fired < fired - start);
105
106 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
107 }
108
109 #[test]
recv_timeout()110 fn recv_timeout() {
111 let start = Instant::now();
112 let r = after(ms(200));
113
114 assert!(r.recv_timeout(ms(100)).is_err());
115 let now = Instant::now();
116 assert!(now - start >= ms(100));
117 assert!(now - start <= ms(150));
118
119 let fired = r.recv_timeout(ms(200)).unwrap();
120 assert!(fired - start >= ms(200));
121 assert!(fired - start <= ms(250));
122
123 assert!(r.recv_timeout(ms(200)).is_err());
124 let now = Instant::now();
125 assert!(now - start >= ms(400));
126 assert!(now - start <= ms(450));
127
128 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
129 }
130
131 #[test]
recv_two()132 fn recv_two() {
133 let r1 = after(ms(50));
134 let r2 = after(ms(50));
135
136 scope(|scope| {
137 scope.spawn(|_| {
138 select! {
139 recv(r1) -> _ => {}
140 recv(r2) -> _ => {}
141 }
142 });
143 scope.spawn(|_| {
144 select! {
145 recv(r1) -> _ => {}
146 recv(r2) -> _ => {}
147 }
148 });
149 })
150 .unwrap();
151 }
152
153 #[test]
recv_race()154 fn recv_race() {
155 select! {
156 recv(after(ms(50))) -> _ => {}
157 recv(after(ms(100))) -> _ => panic!(),
158 }
159
160 select! {
161 recv(after(ms(100))) -> _ => panic!(),
162 recv(after(ms(50))) -> _ => {}
163 }
164 }
165
166 #[test]
stress_default()167 fn stress_default() {
168 const COUNT: usize = 10;
169
170 for _ in 0..COUNT {
171 select! {
172 recv(after(ms(0))) -> _ => {}
173 default => panic!(),
174 }
175 }
176
177 for _ in 0..COUNT {
178 select! {
179 recv(after(ms(100))) -> _ => panic!(),
180 default => {}
181 }
182 }
183 }
184
185 #[test]
select()186 fn select() {
187 const THREADS: usize = 4;
188 const COUNT: usize = 1000;
189 const TIMEOUT_MS: u64 = 100;
190
191 let v = (0..COUNT)
192 .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2)))
193 .collect::<Vec<_>>();
194 let hits = AtomicUsize::new(0);
195
196 scope(|scope| {
197 for _ in 0..THREADS {
198 scope.spawn(|_| {
199 let v: Vec<&_> = v.iter().collect();
200
201 loop {
202 let timeout = after(ms(TIMEOUT_MS));
203 let mut sel = Select::new();
204 for r in &v {
205 sel.recv(r);
206 }
207 let oper_timeout = sel.recv(&timeout);
208
209 let oper = sel.select();
210 match oper.index() {
211 i if i == oper_timeout => {
212 oper.recv(&timeout).unwrap();
213 break;
214 }
215 i => {
216 oper.recv(v[i]).unwrap();
217 hits.fetch_add(1, Ordering::SeqCst);
218 }
219 }
220 }
221 });
222 }
223 })
224 .unwrap();
225
226 assert_eq!(hits.load(Ordering::SeqCst), COUNT);
227 }
228
229 #[test]
ready()230 fn ready() {
231 const THREADS: usize = 4;
232 const COUNT: usize = 1000;
233 const TIMEOUT_MS: u64 = 100;
234
235 let v = (0..COUNT)
236 .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2)))
237 .collect::<Vec<_>>();
238 let hits = AtomicUsize::new(0);
239
240 scope(|scope| {
241 for _ in 0..THREADS {
242 scope.spawn(|_| {
243 let v: Vec<&_> = v.iter().collect();
244
245 loop {
246 let timeout = after(ms(TIMEOUT_MS));
247 let mut sel = Select::new();
248 for r in &v {
249 sel.recv(r);
250 }
251 let oper_timeout = sel.recv(&timeout);
252
253 loop {
254 let i = sel.ready();
255 if i == oper_timeout {
256 timeout.try_recv().unwrap();
257 return;
258 } else if v[i].try_recv().is_ok() {
259 hits.fetch_add(1, Ordering::SeqCst);
260 break;
261 }
262 }
263 }
264 });
265 }
266 })
267 .unwrap();
268
269 assert_eq!(hits.load(Ordering::SeqCst), COUNT);
270 }
271
272 #[test]
stress_clone()273 fn stress_clone() {
274 const RUNS: usize = 1000;
275 const THREADS: usize = 10;
276 const COUNT: usize = 50;
277
278 for i in 0..RUNS {
279 let r = after(ms(i as u64));
280
281 scope(|scope| {
282 for _ in 0..THREADS {
283 scope.spawn(|_| {
284 let r = r.clone();
285 let _ = r.try_recv();
286
287 for _ in 0..COUNT {
288 drop(r.clone());
289 thread::yield_now();
290 }
291 });
292 }
293 })
294 .unwrap();
295 }
296 }
297
298 #[test]
fairness()299 fn fairness() {
300 const COUNT: usize = 1000;
301
302 for &dur in &[0, 1] {
303 let mut hits = [0usize; 2];
304
305 for _ in 0..COUNT {
306 select! {
307 recv(after(ms(dur))) -> _ => hits[0] += 1,
308 recv(after(ms(dur))) -> _ => hits[1] += 1,
309 }
310 }
311
312 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
313 }
314 }
315
316 #[test]
fairness_duplicates()317 fn fairness_duplicates() {
318 const COUNT: usize = 1000;
319
320 for &dur in &[0, 1] {
321 let mut hits = [0usize; 5];
322
323 for _ in 0..COUNT {
324 let r = after(ms(dur));
325 select! {
326 recv(r) -> _ => hits[0] += 1,
327 recv(r) -> _ => hits[1] += 1,
328 recv(r) -> _ => hits[2] += 1,
329 recv(r) -> _ => hits[3] += 1,
330 recv(r) -> _ => hits[4] += 1,
331 }
332 }
333
334 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
335 }
336 }
337