• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::sync::mpsc;
2 
3 use futures::future::poll_fn;
4 use loom::future::block_on;
5 use loom::sync::Arc;
6 use loom::thread;
7 use tokio_test::assert_ok;
8 
9 #[test]
closing_tx()10 fn closing_tx() {
11     loom::model(|| {
12         let (tx, mut rx) = mpsc::channel(16);
13 
14         thread::spawn(move || {
15             tx.try_send(()).unwrap();
16             drop(tx);
17         });
18 
19         let v = block_on(rx.recv());
20         assert!(v.is_some());
21 
22         let v = block_on(rx.recv());
23         assert!(v.is_none());
24     });
25 }
26 
27 #[test]
closing_unbounded_tx()28 fn closing_unbounded_tx() {
29     loom::model(|| {
30         let (tx, mut rx) = mpsc::unbounded_channel();
31 
32         thread::spawn(move || {
33             tx.send(()).unwrap();
34             drop(tx);
35         });
36 
37         let v = block_on(rx.recv());
38         assert!(v.is_some());
39 
40         let v = block_on(rx.recv());
41         assert!(v.is_none());
42     });
43 }
44 
45 #[test]
closing_bounded_rx()46 fn closing_bounded_rx() {
47     loom::model(|| {
48         let (tx1, rx) = mpsc::channel::<()>(16);
49         let tx2 = tx1.clone();
50         thread::spawn(move || {
51             drop(rx);
52         });
53 
54         block_on(tx1.closed());
55         block_on(tx2.closed());
56     });
57 }
58 
59 #[test]
closing_and_sending()60 fn closing_and_sending() {
61     loom::model(|| {
62         let (tx1, mut rx) = mpsc::channel::<()>(16);
63         let tx1 = Arc::new(tx1);
64         let tx2 = tx1.clone();
65 
66         let th1 = thread::spawn(move || {
67             tx1.try_send(()).unwrap();
68         });
69 
70         let th2 = thread::spawn(move || {
71             block_on(tx2.closed());
72         });
73 
74         let th3 = thread::spawn(move || {
75             let v = block_on(rx.recv());
76             assert!(v.is_some());
77             drop(rx);
78         });
79 
80         assert_ok!(th1.join());
81         assert_ok!(th2.join());
82         assert_ok!(th3.join());
83     });
84 }
85 
86 #[test]
closing_unbounded_rx()87 fn closing_unbounded_rx() {
88     loom::model(|| {
89         let (tx1, rx) = mpsc::unbounded_channel::<()>();
90         let tx2 = tx1.clone();
91         thread::spawn(move || {
92             drop(rx);
93         });
94 
95         block_on(tx1.closed());
96         block_on(tx2.closed());
97     });
98 }
99 
100 #[test]
dropping_tx()101 fn dropping_tx() {
102     loom::model(|| {
103         let (tx, mut rx) = mpsc::channel::<()>(16);
104 
105         for _ in 0..2 {
106             let tx = tx.clone();
107             thread::spawn(move || {
108                 drop(tx);
109             });
110         }
111         drop(tx);
112 
113         let v = block_on(rx.recv());
114         assert!(v.is_none());
115     });
116 }
117 
118 #[test]
dropping_unbounded_tx()119 fn dropping_unbounded_tx() {
120     loom::model(|| {
121         let (tx, mut rx) = mpsc::unbounded_channel::<()>();
122 
123         for _ in 0..2 {
124             let tx = tx.clone();
125             thread::spawn(move || {
126                 drop(tx);
127             });
128         }
129         drop(tx);
130 
131         let v = block_on(rx.recv());
132         assert!(v.is_none());
133     });
134 }
135