1 //! Tests for the zero channel flavor.
2
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8
9 use crossbeam_channel::{bounded, select, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16 Duration::from_millis(ms)
17 }
18
19 #[test]
smoke()20 fn smoke() {
21 let (s, r) = bounded(0);
22 assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
23 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
24 }
25
26 #[test]
capacity()27 fn capacity() {
28 let (s, r) = bounded::<()>(0);
29 assert_eq!(s.capacity(), Some(0));
30 assert_eq!(r.capacity(), Some(0));
31 }
32
33 #[test]
len_empty_full()34 fn len_empty_full() {
35 let (s, r) = bounded(0);
36
37 assert_eq!(s.len(), 0);
38 assert!(s.is_empty());
39 assert!(s.is_full());
40 assert_eq!(r.len(), 0);
41 assert!(r.is_empty());
42 assert!(r.is_full());
43
44 scope(|scope| {
45 scope.spawn(|_| s.send(0).unwrap());
46 scope.spawn(|_| r.recv().unwrap());
47 })
48 .unwrap();
49
50 assert_eq!(s.len(), 0);
51 assert!(s.is_empty());
52 assert!(s.is_full());
53 assert_eq!(r.len(), 0);
54 assert!(r.is_empty());
55 assert!(r.is_full());
56 }
57
58 #[test]
try_recv()59 fn try_recv() {
60 let (s, r) = bounded(0);
61
62 scope(|scope| {
63 scope.spawn(move |_| {
64 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
65 thread::sleep(ms(1500));
66 assert_eq!(r.try_recv(), Ok(7));
67 thread::sleep(ms(500));
68 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
69 });
70 scope.spawn(move |_| {
71 thread::sleep(ms(1000));
72 s.send(7).unwrap();
73 });
74 })
75 .unwrap();
76 }
77
78 #[test]
recv()79 fn recv() {
80 let (s, r) = bounded(0);
81
82 scope(|scope| {
83 scope.spawn(move |_| {
84 assert_eq!(r.recv(), Ok(7));
85 thread::sleep(ms(1000));
86 assert_eq!(r.recv(), Ok(8));
87 thread::sleep(ms(1000));
88 assert_eq!(r.recv(), Ok(9));
89 assert_eq!(r.recv(), Err(RecvError));
90 });
91 scope.spawn(move |_| {
92 thread::sleep(ms(1500));
93 s.send(7).unwrap();
94 s.send(8).unwrap();
95 s.send(9).unwrap();
96 });
97 })
98 .unwrap();
99 }
100
101 #[test]
recv_timeout()102 fn recv_timeout() {
103 let (s, r) = bounded::<i32>(0);
104
105 scope(|scope| {
106 scope.spawn(move |_| {
107 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
108 assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
109 assert_eq!(
110 r.recv_timeout(ms(1000)),
111 Err(RecvTimeoutError::Disconnected)
112 );
113 });
114 scope.spawn(move |_| {
115 thread::sleep(ms(1500));
116 s.send(7).unwrap();
117 });
118 })
119 .unwrap();
120 }
121
122 #[test]
try_send()123 fn try_send() {
124 let (s, r) = bounded(0);
125
126 scope(|scope| {
127 scope.spawn(move |_| {
128 assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
129 thread::sleep(ms(1500));
130 assert_eq!(s.try_send(8), Ok(()));
131 thread::sleep(ms(500));
132 assert_eq!(s.try_send(9), Err(TrySendError::Disconnected(9)));
133 });
134 scope.spawn(move |_| {
135 thread::sleep(ms(1000));
136 assert_eq!(r.recv(), Ok(8));
137 });
138 })
139 .unwrap();
140 }
141
142 #[test]
send()143 fn send() {
144 let (s, r) = bounded(0);
145
146 scope(|scope| {
147 scope.spawn(move |_| {
148 s.send(7).unwrap();
149 thread::sleep(ms(1000));
150 s.send(8).unwrap();
151 thread::sleep(ms(1000));
152 s.send(9).unwrap();
153 });
154 scope.spawn(move |_| {
155 thread::sleep(ms(1500));
156 assert_eq!(r.recv(), Ok(7));
157 assert_eq!(r.recv(), Ok(8));
158 assert_eq!(r.recv(), Ok(9));
159 });
160 })
161 .unwrap();
162 }
163
164 #[test]
send_timeout()165 fn send_timeout() {
166 let (s, r) = bounded(0);
167
168 scope(|scope| {
169 scope.spawn(move |_| {
170 assert_eq!(
171 s.send_timeout(7, ms(1000)),
172 Err(SendTimeoutError::Timeout(7))
173 );
174 assert_eq!(s.send_timeout(8, ms(1000)), Ok(()));
175 assert_eq!(
176 s.send_timeout(9, ms(1000)),
177 Err(SendTimeoutError::Disconnected(9))
178 );
179 });
180 scope.spawn(move |_| {
181 thread::sleep(ms(1500));
182 assert_eq!(r.recv(), Ok(8));
183 });
184 })
185 .unwrap();
186 }
187
188 #[test]
len()189 fn len() {
190 #[cfg(miri)]
191 const COUNT: usize = 50;
192 #[cfg(not(miri))]
193 const COUNT: usize = 25_000;
194
195 let (s, r) = bounded(0);
196
197 assert_eq!(s.len(), 0);
198 assert_eq!(r.len(), 0);
199
200 scope(|scope| {
201 scope.spawn(|_| {
202 for i in 0..COUNT {
203 assert_eq!(r.recv(), Ok(i));
204 assert_eq!(r.len(), 0);
205 }
206 });
207
208 scope.spawn(|_| {
209 for i in 0..COUNT {
210 s.send(i).unwrap();
211 assert_eq!(s.len(), 0);
212 }
213 });
214 })
215 .unwrap();
216
217 assert_eq!(s.len(), 0);
218 assert_eq!(r.len(), 0);
219 }
220
221 #[test]
disconnect_wakes_sender()222 fn disconnect_wakes_sender() {
223 let (s, r) = bounded(0);
224
225 scope(|scope| {
226 scope.spawn(move |_| {
227 assert_eq!(s.send(()), Err(SendError(())));
228 });
229 scope.spawn(move |_| {
230 thread::sleep(ms(1000));
231 drop(r);
232 });
233 })
234 .unwrap();
235 }
236
237 #[test]
disconnect_wakes_receiver()238 fn disconnect_wakes_receiver() {
239 let (s, r) = bounded::<()>(0);
240
241 scope(|scope| {
242 scope.spawn(move |_| {
243 assert_eq!(r.recv(), Err(RecvError));
244 });
245 scope.spawn(move |_| {
246 thread::sleep(ms(1000));
247 drop(s);
248 });
249 })
250 .unwrap();
251 }
252
253 #[test]
spsc()254 fn spsc() {
255 #[cfg(miri)]
256 const COUNT: usize = 50;
257 #[cfg(not(miri))]
258 const COUNT: usize = 100_000;
259
260 let (s, r) = bounded(0);
261
262 scope(|scope| {
263 scope.spawn(move |_| {
264 for i in 0..COUNT {
265 assert_eq!(r.recv(), Ok(i));
266 }
267 assert_eq!(r.recv(), Err(RecvError));
268 });
269 scope.spawn(move |_| {
270 for i in 0..COUNT {
271 s.send(i).unwrap();
272 }
273 });
274 })
275 .unwrap();
276 }
277
278 #[test]
mpmc()279 fn mpmc() {
280 #[cfg(miri)]
281 const COUNT: usize = 50;
282 #[cfg(not(miri))]
283 const COUNT: usize = 25_000;
284 const THREADS: usize = 4;
285
286 let (s, r) = bounded::<usize>(0);
287 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
288
289 scope(|scope| {
290 for _ in 0..THREADS {
291 scope.spawn(|_| {
292 for _ in 0..COUNT {
293 let n = r.recv().unwrap();
294 v[n].fetch_add(1, Ordering::SeqCst);
295 }
296 });
297 }
298 for _ in 0..THREADS {
299 scope.spawn(|_| {
300 for i in 0..COUNT {
301 s.send(i).unwrap();
302 }
303 });
304 }
305 })
306 .unwrap();
307
308 for c in v {
309 assert_eq!(c.load(Ordering::SeqCst), THREADS);
310 }
311 }
312
313 #[test]
stress_oneshot()314 fn stress_oneshot() {
315 #[cfg(miri)]
316 const COUNT: usize = 50;
317 #[cfg(not(miri))]
318 const COUNT: usize = 10_000;
319
320 for _ in 0..COUNT {
321 let (s, r) = bounded(1);
322
323 scope(|scope| {
324 scope.spawn(|_| r.recv().unwrap());
325 scope.spawn(|_| s.send(0).unwrap());
326 })
327 .unwrap();
328 }
329 }
330
331 #[test]
stress_iter()332 fn stress_iter() {
333 #[cfg(miri)]
334 const COUNT: usize = 50;
335 #[cfg(not(miri))]
336 const COUNT: usize = 1000;
337
338 let (request_s, request_r) = bounded(0);
339 let (response_s, response_r) = bounded(0);
340
341 scope(|scope| {
342 scope.spawn(move |_| {
343 let mut count = 0;
344 loop {
345 for x in response_r.try_iter() {
346 count += x;
347 if count == COUNT {
348 return;
349 }
350 }
351 let _ = request_s.try_send(());
352 }
353 });
354
355 for _ in request_r.iter() {
356 if response_s.send(1).is_err() {
357 break;
358 }
359 }
360 })
361 .unwrap();
362 }
363
364 #[test]
stress_timeout_two_threads()365 fn stress_timeout_two_threads() {
366 const COUNT: usize = 100;
367
368 let (s, r) = bounded(0);
369
370 scope(|scope| {
371 scope.spawn(|_| {
372 for i in 0..COUNT {
373 if i % 2 == 0 {
374 thread::sleep(ms(50));
375 }
376 loop {
377 if let Ok(()) = s.send_timeout(i, ms(10)) {
378 break;
379 }
380 }
381 }
382 });
383
384 scope.spawn(|_| {
385 for i in 0..COUNT {
386 if i % 2 == 0 {
387 thread::sleep(ms(50));
388 }
389 loop {
390 if let Ok(x) = r.recv_timeout(ms(10)) {
391 assert_eq!(x, i);
392 break;
393 }
394 }
395 }
396 });
397 })
398 .unwrap();
399 }
400
401 #[test]
drops()402 fn drops() {
403 #[cfg(miri)]
404 const RUNS: usize = 20;
405 #[cfg(not(miri))]
406 const RUNS: usize = 100;
407 #[cfg(miri)]
408 const STEPS: usize = 100;
409 #[cfg(not(miri))]
410 const STEPS: usize = 10_000;
411
412 static DROPS: AtomicUsize = AtomicUsize::new(0);
413
414 #[derive(Debug, PartialEq)]
415 struct DropCounter;
416
417 impl Drop for DropCounter {
418 fn drop(&mut self) {
419 DROPS.fetch_add(1, Ordering::SeqCst);
420 }
421 }
422
423 let mut rng = thread_rng();
424
425 for _ in 0..RUNS {
426 let steps = rng.gen_range(0..STEPS);
427
428 DROPS.store(0, Ordering::SeqCst);
429 let (s, r) = bounded::<DropCounter>(0);
430
431 scope(|scope| {
432 scope.spawn(|_| {
433 for _ in 0..steps {
434 r.recv().unwrap();
435 }
436 });
437
438 scope.spawn(|_| {
439 for _ in 0..steps {
440 s.send(DropCounter).unwrap();
441 }
442 });
443 })
444 .unwrap();
445
446 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
447 drop(s);
448 drop(r);
449 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
450 }
451 }
452
453 #[test]
fairness()454 fn fairness() {
455 #[cfg(miri)]
456 const COUNT: usize = 50;
457 #[cfg(not(miri))]
458 const COUNT: usize = 10_000;
459
460 let (s1, r1) = bounded::<()>(0);
461 let (s2, r2) = bounded::<()>(0);
462
463 scope(|scope| {
464 scope.spawn(|_| {
465 let mut hits = [0usize; 2];
466 for _ in 0..COUNT {
467 select! {
468 recv(r1) -> _ => hits[0] += 1,
469 recv(r2) -> _ => hits[1] += 1,
470 }
471 }
472 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
473 });
474
475 let mut hits = [0usize; 2];
476 for _ in 0..COUNT {
477 select! {
478 send(s1, ()) -> _ => hits[0] += 1,
479 send(s2, ()) -> _ => hits[1] += 1,
480 }
481 }
482 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
483 })
484 .unwrap();
485 }
486
487 #[test]
fairness_duplicates()488 fn fairness_duplicates() {
489 #[cfg(miri)]
490 const COUNT: usize = 100;
491 #[cfg(not(miri))]
492 const COUNT: usize = 10_000;
493
494 let (s, r) = bounded::<()>(0);
495
496 scope(|scope| {
497 scope.spawn(|_| {
498 let mut hits = [0usize; 5];
499 for _ in 0..COUNT {
500 select! {
501 recv(r) -> _ => hits[0] += 1,
502 recv(r) -> _ => hits[1] += 1,
503 recv(r) -> _ => hits[2] += 1,
504 recv(r) -> _ => hits[3] += 1,
505 recv(r) -> _ => hits[4] += 1,
506 }
507 }
508 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
509 });
510
511 let mut hits = [0usize; 5];
512 for _ in 0..COUNT {
513 select! {
514 send(s, ()) -> _ => hits[0] += 1,
515 send(s, ()) -> _ => hits[1] += 1,
516 send(s, ()) -> _ => hits[2] += 1,
517 send(s, ()) -> _ => hits[3] += 1,
518 send(s, ()) -> _ => hits[4] += 1,
519 }
520 }
521 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
522 })
523 .unwrap();
524 }
525
526 #[test]
recv_in_send()527 fn recv_in_send() {
528 let (s, r) = bounded(0);
529
530 scope(|scope| {
531 scope.spawn(|_| {
532 thread::sleep(ms(100));
533 r.recv()
534 });
535
536 scope.spawn(|_| {
537 thread::sleep(ms(500));
538 s.send(()).unwrap();
539 });
540
541 select! {
542 send(s, r.recv().unwrap()) -> _ => {}
543 }
544 })
545 .unwrap();
546 }
547
548 #[test]
channel_through_channel()549 fn channel_through_channel() {
550 #[cfg(miri)]
551 const COUNT: usize = 50;
552 #[cfg(not(miri))]
553 const COUNT: usize = 1000;
554
555 type T = Box<dyn Any + Send>;
556
557 let (s, r) = bounded::<T>(0);
558
559 scope(|scope| {
560 scope.spawn(move |_| {
561 let mut s = s;
562
563 for _ in 0..COUNT {
564 let (new_s, new_r) = bounded(0);
565 let new_r: T = Box::new(Some(new_r));
566
567 s.send(new_r).unwrap();
568 s = new_s;
569 }
570 });
571
572 scope.spawn(move |_| {
573 let mut r = r;
574
575 for _ in 0..COUNT {
576 r = r
577 .recv()
578 .unwrap()
579 .downcast_mut::<Option<Receiver<T>>>()
580 .unwrap()
581 .take()
582 .unwrap()
583 }
584 });
585 })
586 .unwrap();
587 }
588