• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4 
5 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7 
8 use tokio::sync::broadcast;
9 use tokio_test::task;
10 use tokio_test::{
11     assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
12 };
13 
14 use std::sync::Arc;
15 
16 macro_rules! assert_recv {
17     ($e:expr) => {
18         match $e.try_recv() {
19             Ok(value) => value,
20             Err(e) => panic!("expected recv; got = {:?}", e),
21         }
22     };
23 }
24 
25 macro_rules! assert_empty {
26     ($e:expr) => {
27         match $e.try_recv() {
28             Ok(value) => panic!("expected empty; got = {:?}", value),
29             Err(broadcast::error::TryRecvError::Empty) => {}
30             Err(e) => panic!("expected empty; got = {:?}", e),
31         }
32     };
33 }
34 
35 macro_rules! assert_lagged {
36     ($e:expr, $n:expr) => {
37         match assert_err!($e) {
38             broadcast::error::TryRecvError::Lagged(n) => {
39                 assert_eq!(n, $n);
40             }
41             _ => panic!("did not lag"),
42         }
43     };
44 }
45 
46 macro_rules! assert_closed {
47     ($e:expr) => {
48         match assert_err!($e) {
49             broadcast::error::TryRecvError::Closed => {}
50             _ => panic!("is not closed"),
51         }
52     };
53 }
54 
55 trait AssertSend: Send + Sync {}
56 impl AssertSend for broadcast::Sender<i32> {}
57 impl AssertSend for broadcast::Receiver<i32> {}
58 
59 #[test]
send_try_recv_bounded()60 fn send_try_recv_bounded() {
61     let (tx, mut rx) = broadcast::channel(16);
62 
63     assert_empty!(rx);
64 
65     let n = assert_ok!(tx.send("hello"));
66     assert_eq!(n, 1);
67 
68     let val = assert_recv!(rx);
69     assert_eq!(val, "hello");
70 
71     assert_empty!(rx);
72 }
73 
74 #[test]
send_two_recv()75 fn send_two_recv() {
76     let (tx, mut rx1) = broadcast::channel(16);
77     let mut rx2 = tx.subscribe();
78 
79     assert_empty!(rx1);
80     assert_empty!(rx2);
81 
82     let n = assert_ok!(tx.send("hello"));
83     assert_eq!(n, 2);
84 
85     let val = assert_recv!(rx1);
86     assert_eq!(val, "hello");
87 
88     let val = assert_recv!(rx2);
89     assert_eq!(val, "hello");
90 
91     assert_empty!(rx1);
92     assert_empty!(rx2);
93 }
94 
95 #[test]
send_recv_bounded()96 fn send_recv_bounded() {
97     let (tx, mut rx) = broadcast::channel(16);
98 
99     let mut recv = task::spawn(rx.recv());
100 
101     assert_pending!(recv.poll());
102 
103     assert_ok!(tx.send("hello"));
104 
105     assert!(recv.is_woken());
106     let val = assert_ready_ok!(recv.poll());
107     assert_eq!(val, "hello");
108 }
109 
110 #[test]
send_two_recv_bounded()111 fn send_two_recv_bounded() {
112     let (tx, mut rx1) = broadcast::channel(16);
113     let mut rx2 = tx.subscribe();
114 
115     let mut recv1 = task::spawn(rx1.recv());
116     let mut recv2 = task::spawn(rx2.recv());
117 
118     assert_pending!(recv1.poll());
119     assert_pending!(recv2.poll());
120 
121     assert_ok!(tx.send("hello"));
122 
123     assert!(recv1.is_woken());
124     assert!(recv2.is_woken());
125 
126     let val1 = assert_ready_ok!(recv1.poll());
127     let val2 = assert_ready_ok!(recv2.poll());
128     assert_eq!(val1, "hello");
129     assert_eq!(val2, "hello");
130 
131     drop((recv1, recv2));
132 
133     let mut recv1 = task::spawn(rx1.recv());
134     let mut recv2 = task::spawn(rx2.recv());
135 
136     assert_pending!(recv1.poll());
137 
138     assert_ok!(tx.send("world"));
139 
140     assert!(recv1.is_woken());
141     assert!(!recv2.is_woken());
142 
143     let val1 = assert_ready_ok!(recv1.poll());
144     let val2 = assert_ready_ok!(recv2.poll());
145     assert_eq!(val1, "world");
146     assert_eq!(val2, "world");
147 }
148 
149 #[test]
change_tasks()150 fn change_tasks() {
151     let (tx, mut rx) = broadcast::channel(1);
152 
153     let mut recv = Box::pin(rx.recv());
154 
155     let mut task1 = task::spawn(&mut recv);
156     assert_pending!(task1.poll());
157 
158     let mut task2 = task::spawn(&mut recv);
159     assert_pending!(task2.poll());
160 
161     tx.send("hello").unwrap();
162 
163     assert!(task2.is_woken());
164 }
165 
166 #[test]
send_slow_rx()167 fn send_slow_rx() {
168     let (tx, mut rx1) = broadcast::channel(16);
169     let mut rx2 = tx.subscribe();
170 
171     {
172         let mut recv2 = task::spawn(rx2.recv());
173 
174         {
175             let mut recv1 = task::spawn(rx1.recv());
176 
177             assert_pending!(recv1.poll());
178             assert_pending!(recv2.poll());
179 
180             assert_ok!(tx.send("one"));
181 
182             assert!(recv1.is_woken());
183             assert!(recv2.is_woken());
184 
185             assert_ok!(tx.send("two"));
186 
187             let val = assert_ready_ok!(recv1.poll());
188             assert_eq!(val, "one");
189         }
190 
191         let val = assert_ready_ok!(task::spawn(rx1.recv()).poll());
192         assert_eq!(val, "two");
193 
194         let mut recv1 = task::spawn(rx1.recv());
195 
196         assert_pending!(recv1.poll());
197 
198         assert_ok!(tx.send("three"));
199 
200         assert!(recv1.is_woken());
201 
202         let val = assert_ready_ok!(recv1.poll());
203         assert_eq!(val, "three");
204 
205         let val = assert_ready_ok!(recv2.poll());
206         assert_eq!(val, "one");
207     }
208 
209     let val = assert_recv!(rx2);
210     assert_eq!(val, "two");
211 
212     let val = assert_recv!(rx2);
213     assert_eq!(val, "three");
214 }
215 
216 #[test]
drop_rx_while_values_remain()217 fn drop_rx_while_values_remain() {
218     let (tx, mut rx1) = broadcast::channel(16);
219     let mut rx2 = tx.subscribe();
220 
221     assert_ok!(tx.send("one"));
222     assert_ok!(tx.send("two"));
223 
224     assert_recv!(rx1);
225     assert_recv!(rx2);
226 
227     drop(rx2);
228     drop(rx1);
229 }
230 
231 #[test]
lagging_rx()232 fn lagging_rx() {
233     let (tx, mut rx1) = broadcast::channel(2);
234     let mut rx2 = tx.subscribe();
235 
236     assert_ok!(tx.send("one"));
237     assert_ok!(tx.send("two"));
238 
239     assert_eq!("one", assert_recv!(rx1));
240 
241     assert_ok!(tx.send("three"));
242 
243     // Lagged too far
244     let x = dbg!(rx2.try_recv());
245     assert_lagged!(x, 1);
246 
247     // Calling again gets the next value
248     assert_eq!("two", assert_recv!(rx2));
249 
250     assert_eq!("two", assert_recv!(rx1));
251     assert_eq!("three", assert_recv!(rx1));
252 
253     assert_ok!(tx.send("four"));
254     assert_ok!(tx.send("five"));
255 
256     assert_lagged!(rx2.try_recv(), 1);
257 
258     assert_ok!(tx.send("six"));
259 
260     assert_lagged!(rx2.try_recv(), 1);
261 }
262 
263 #[test]
send_no_rx()264 fn send_no_rx() {
265     let (tx, _) = broadcast::channel(16);
266 
267     assert_err!(tx.send("hello"));
268 
269     let mut rx = tx.subscribe();
270 
271     assert_ok!(tx.send("world"));
272 
273     let val = assert_recv!(rx);
274     assert_eq!("world", val);
275 }
276 
277 #[test]
278 #[should_panic]
279 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
zero_capacity()280 fn zero_capacity() {
281     broadcast::channel::<()>(0);
282 }
283 
284 #[test]
285 #[should_panic]
286 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
capacity_too_big()287 fn capacity_too_big() {
288     use std::usize;
289 
290     broadcast::channel::<()>(1 + (usize::MAX >> 1));
291 }
292 
293 #[test]
294 #[cfg(panic = "unwind")]
295 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
panic_in_clone()296 fn panic_in_clone() {
297     use std::panic::{self, AssertUnwindSafe};
298 
299     #[derive(Eq, PartialEq, Debug)]
300     struct MyVal(usize);
301 
302     impl Clone for MyVal {
303         fn clone(&self) -> MyVal {
304             assert_ne!(0, self.0);
305             MyVal(self.0)
306         }
307     }
308 
309     let (tx, mut rx) = broadcast::channel(16);
310 
311     assert_ok!(tx.send(MyVal(0)));
312     assert_ok!(tx.send(MyVal(1)));
313 
314     let res = panic::catch_unwind(AssertUnwindSafe(|| {
315         let _ = rx.try_recv();
316     }));
317 
318     assert_err!(res);
319 
320     let val = assert_recv!(rx);
321     assert_eq!(val, MyVal(1));
322 }
323 
324 #[test]
dropping_tx_notifies_rx()325 fn dropping_tx_notifies_rx() {
326     let (tx, mut rx1) = broadcast::channel::<()>(16);
327     let mut rx2 = tx.subscribe();
328 
329     let tx2 = tx.clone();
330 
331     let mut recv1 = task::spawn(rx1.recv());
332     let mut recv2 = task::spawn(rx2.recv());
333 
334     assert_pending!(recv1.poll());
335     assert_pending!(recv2.poll());
336 
337     drop(tx);
338 
339     assert_pending!(recv1.poll());
340     assert_pending!(recv2.poll());
341 
342     drop(tx2);
343 
344     assert!(recv1.is_woken());
345     assert!(recv2.is_woken());
346 
347     let err = assert_ready_err!(recv1.poll());
348     assert!(is_closed(err));
349 
350     let err = assert_ready_err!(recv2.poll());
351     assert!(is_closed(err));
352 }
353 
354 #[test]
unconsumed_messages_are_dropped()355 fn unconsumed_messages_are_dropped() {
356     let (tx, rx) = broadcast::channel(16);
357 
358     let msg = Arc::new(());
359 
360     assert_ok!(tx.send(msg.clone()));
361 
362     assert_eq!(2, Arc::strong_count(&msg));
363 
364     drop(rx);
365 
366     assert_eq!(1, Arc::strong_count(&msg));
367 }
368 
369 #[test]
single_capacity_recvs()370 fn single_capacity_recvs() {
371     let (tx, mut rx) = broadcast::channel(1);
372 
373     assert_ok!(tx.send(1));
374 
375     assert_eq!(assert_recv!(rx), 1);
376     assert_empty!(rx);
377 }
378 
379 #[test]
single_capacity_recvs_after_drop_1()380 fn single_capacity_recvs_after_drop_1() {
381     let (tx, mut rx) = broadcast::channel(1);
382 
383     assert_ok!(tx.send(1));
384     drop(tx);
385 
386     assert_eq!(assert_recv!(rx), 1);
387     assert_closed!(rx.try_recv());
388 }
389 
390 #[test]
single_capacity_recvs_after_drop_2()391 fn single_capacity_recvs_after_drop_2() {
392     let (tx, mut rx) = broadcast::channel(1);
393 
394     assert_ok!(tx.send(1));
395     assert_ok!(tx.send(2));
396     drop(tx);
397 
398     assert_lagged!(rx.try_recv(), 1);
399     assert_eq!(assert_recv!(rx), 2);
400     assert_closed!(rx.try_recv());
401 }
402 
403 #[test]
dropping_sender_does_not_overwrite()404 fn dropping_sender_does_not_overwrite() {
405     let (tx, mut rx) = broadcast::channel(2);
406 
407     assert_ok!(tx.send(1));
408     assert_ok!(tx.send(2));
409     drop(tx);
410 
411     assert_eq!(assert_recv!(rx), 1);
412     assert_eq!(assert_recv!(rx), 2);
413     assert_closed!(rx.try_recv());
414 }
415 
416 #[test]
lagging_receiver_recovers_after_wrap_closed_1()417 fn lagging_receiver_recovers_after_wrap_closed_1() {
418     let (tx, mut rx) = broadcast::channel(2);
419 
420     assert_ok!(tx.send(1));
421     assert_ok!(tx.send(2));
422     assert_ok!(tx.send(3));
423     drop(tx);
424 
425     assert_lagged!(rx.try_recv(), 1);
426     assert_eq!(assert_recv!(rx), 2);
427     assert_eq!(assert_recv!(rx), 3);
428     assert_closed!(rx.try_recv());
429 }
430 
431 #[test]
lagging_receiver_recovers_after_wrap_closed_2()432 fn lagging_receiver_recovers_after_wrap_closed_2() {
433     let (tx, mut rx) = broadcast::channel(2);
434 
435     assert_ok!(tx.send(1));
436     assert_ok!(tx.send(2));
437     assert_ok!(tx.send(3));
438     assert_ok!(tx.send(4));
439     drop(tx);
440 
441     assert_lagged!(rx.try_recv(), 2);
442     assert_eq!(assert_recv!(rx), 3);
443     assert_eq!(assert_recv!(rx), 4);
444     assert_closed!(rx.try_recv());
445 }
446 
447 #[test]
lagging_receiver_recovers_after_wrap_open()448 fn lagging_receiver_recovers_after_wrap_open() {
449     let (tx, mut rx) = broadcast::channel(2);
450 
451     assert_ok!(tx.send(1));
452     assert_ok!(tx.send(2));
453     assert_ok!(tx.send(3));
454 
455     assert_lagged!(rx.try_recv(), 1);
456     assert_eq!(assert_recv!(rx), 2);
457     assert_eq!(assert_recv!(rx), 3);
458     assert_empty!(rx);
459 }
460 
461 #[test]
receiver_len_with_lagged()462 fn receiver_len_with_lagged() {
463     let (tx, mut rx) = broadcast::channel(3);
464 
465     tx.send(10).unwrap();
466     tx.send(20).unwrap();
467     tx.send(30).unwrap();
468     tx.send(40).unwrap();
469 
470     assert_eq!(rx.len(), 4);
471     assert_eq!(assert_recv!(rx), 10);
472 
473     tx.send(50).unwrap();
474     tx.send(60).unwrap();
475 
476     assert_eq!(rx.len(), 5);
477     assert_lagged!(rx.try_recv(), 1);
478 }
479 
is_closed(err: broadcast::error::RecvError) -> bool480 fn is_closed(err: broadcast::error::RecvError) -> bool {
481     matches!(err, broadcast::error::RecvError::Closed)
482 }
483 
484 #[test]
resubscribe_points_to_tail()485 fn resubscribe_points_to_tail() {
486     let (tx, mut rx) = broadcast::channel(3);
487     tx.send(1).unwrap();
488 
489     let mut rx_resub = rx.resubscribe();
490 
491     // verify we're one behind at the start
492     assert_empty!(rx_resub);
493     assert_eq!(assert_recv!(rx), 1);
494 
495     // verify we do not affect rx
496     tx.send(2).unwrap();
497     assert_eq!(assert_recv!(rx_resub), 2);
498     tx.send(3).unwrap();
499     assert_eq!(assert_recv!(rx), 2);
500     assert_eq!(assert_recv!(rx), 3);
501     assert_empty!(rx);
502 
503     assert_eq!(assert_recv!(rx_resub), 3);
504     assert_empty!(rx_resub);
505 }
506 
507 #[test]
resubscribe_lagged()508 fn resubscribe_lagged() {
509     let (tx, mut rx) = broadcast::channel(1);
510     tx.send(1).unwrap();
511     tx.send(2).unwrap();
512 
513     let mut rx_resub = rx.resubscribe();
514     assert_lagged!(rx.try_recv(), 1);
515     assert_empty!(rx_resub);
516 
517     assert_eq!(assert_recv!(rx), 2);
518     assert_empty!(rx);
519     assert_empty!(rx_resub);
520 }
521 
522 #[test]
resubscribe_to_closed_channel()523 fn resubscribe_to_closed_channel() {
524     let (tx, rx) = tokio::sync::broadcast::channel::<u32>(2);
525     drop(tx);
526 
527     let mut rx_resub = rx.resubscribe();
528     assert_closed!(rx_resub.try_recv());
529 }
530 
531 #[test]
sender_len()532 fn sender_len() {
533     let (tx, mut rx1) = broadcast::channel(4);
534     let mut rx2 = tx.subscribe();
535 
536     assert_eq!(tx.len(), 0);
537     assert!(tx.is_empty());
538 
539     tx.send(1).unwrap();
540     tx.send(2).unwrap();
541     tx.send(3).unwrap();
542 
543     assert_eq!(tx.len(), 3);
544     assert!(!tx.is_empty());
545 
546     assert_recv!(rx1);
547     assert_recv!(rx1);
548 
549     assert_eq!(tx.len(), 3);
550     assert!(!tx.is_empty());
551 
552     assert_recv!(rx2);
553 
554     assert_eq!(tx.len(), 2);
555     assert!(!tx.is_empty());
556 
557     tx.send(4).unwrap();
558     tx.send(5).unwrap();
559     tx.send(6).unwrap();
560 
561     assert_eq!(tx.len(), 4);
562     assert!(!tx.is_empty());
563 }
564 
565 #[test]
566 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
sender_len_random()567 fn sender_len_random() {
568     use rand::Rng;
569 
570     let (tx, mut rx1) = broadcast::channel(16);
571     let mut rx2 = tx.subscribe();
572 
573     for _ in 0..1000 {
574         match rand::thread_rng().gen_range(0..4) {
575             0 => {
576                 let _ = rx1.try_recv();
577             }
578             1 => {
579                 let _ = rx2.try_recv();
580             }
581             _ => {
582                 tx.send(0).unwrap();
583             }
584         }
585 
586         let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16);
587         assert_eq!(tx.len(), expected_len);
588     }
589 }
590 
591 #[test]
send_in_waker_drop()592 fn send_in_waker_drop() {
593     use futures::task::ArcWake;
594     use std::future::Future;
595     use std::task::Context;
596 
597     struct SendOnDrop(broadcast::Sender<()>);
598 
599     impl Drop for SendOnDrop {
600         fn drop(&mut self) {
601             let _ = self.0.send(());
602         }
603     }
604 
605     impl ArcWake for SendOnDrop {
606         fn wake_by_ref(_arc_self: &Arc<Self>) {}
607     }
608 
609     // Test if there is no deadlock when replacing the old waker.
610 
611     let (tx, mut rx) = broadcast::channel(16);
612 
613     let mut fut = Box::pin(async {
614         let _ = rx.recv().await;
615     });
616 
617     // Store our special waker in the receiving future.
618     let waker = futures::task::waker(Arc::new(SendOnDrop(tx)));
619     let mut cx = Context::from_waker(&waker);
620     assert!(fut.as_mut().poll(&mut cx).is_pending());
621     drop(waker);
622 
623     // Second poll shouldn't deadlock.
624     let mut cx = Context::from_waker(futures::task::noop_waker_ref());
625     let _ = fut.as_mut().poll(&mut cx);
626 
627     // Test if there is no deadlock when calling waker.wake().
628 
629     let (tx, mut rx) = broadcast::channel(16);
630 
631     let mut fut = Box::pin(async {
632         let _ = rx.recv().await;
633     });
634 
635     // Store our special waker in the receiving future.
636     let waker = futures::task::waker(Arc::new(SendOnDrop(tx.clone())));
637     let mut cx = Context::from_waker(&waker);
638     assert!(fut.as_mut().poll(&mut cx).is_pending());
639     drop(waker);
640 
641     // Shouldn't deadlock.
642     let _ = tx.send(());
643 }
644