1 use crate::sync::mpsc; 2 3 use futures::future::poll_fn; 4 use loom::future::block_on; 5 use loom::sync::Arc; 6 use loom::thread; 7 use tokio_test::assert_ok; 8 9 #[test] closing_tx()10fn closing_tx() { 11 loom::model(|| { 12 let (tx, mut rx) = mpsc::channel(16); 13 14 thread::spawn(move || { 15 tx.try_send(()).unwrap(); 16 drop(tx); 17 }); 18 19 let v = block_on(rx.recv()); 20 assert!(v.is_some()); 21 22 let v = block_on(rx.recv()); 23 assert!(v.is_none()); 24 }); 25 } 26 27 #[test] closing_unbounded_tx()28fn closing_unbounded_tx() { 29 loom::model(|| { 30 let (tx, mut rx) = mpsc::unbounded_channel(); 31 32 thread::spawn(move || { 33 tx.send(()).unwrap(); 34 drop(tx); 35 }); 36 37 let v = block_on(rx.recv()); 38 assert!(v.is_some()); 39 40 let v = block_on(rx.recv()); 41 assert!(v.is_none()); 42 }); 43 } 44 45 #[test] closing_bounded_rx()46fn closing_bounded_rx() { 47 loom::model(|| { 48 let (tx1, rx) = mpsc::channel::<()>(16); 49 let tx2 = tx1.clone(); 50 thread::spawn(move || { 51 drop(rx); 52 }); 53 54 block_on(tx1.closed()); 55 block_on(tx2.closed()); 56 }); 57 } 58 59 #[test] closing_and_sending()60fn closing_and_sending() { 61 loom::model(|| { 62 let (tx1, mut rx) = mpsc::channel::<()>(16); 63 let tx1 = Arc::new(tx1); 64 let tx2 = tx1.clone(); 65 66 let th1 = thread::spawn(move || { 67 tx1.try_send(()).unwrap(); 68 }); 69 70 let th2 = thread::spawn(move || { 71 block_on(tx2.closed()); 72 }); 73 74 let th3 = thread::spawn(move || { 75 let v = block_on(rx.recv()); 76 assert!(v.is_some()); 77 drop(rx); 78 }); 79 80 assert_ok!(th1.join()); 81 assert_ok!(th2.join()); 82 assert_ok!(th3.join()); 83 }); 84 } 85 86 #[test] closing_unbounded_rx()87fn closing_unbounded_rx() { 88 loom::model(|| { 89 let (tx1, rx) = mpsc::unbounded_channel::<()>(); 90 let tx2 = tx1.clone(); 91 thread::spawn(move || { 92 drop(rx); 93 }); 94 95 block_on(tx1.closed()); 96 block_on(tx2.closed()); 97 }); 98 } 99 100 #[test] dropping_tx()101fn dropping_tx() { 102 loom::model(|| { 103 let (tx, mut rx) = mpsc::channel::<()>(16); 104 105 for _ in 0..2 { 106 let tx = tx.clone(); 107 thread::spawn(move || { 108 drop(tx); 109 }); 110 } 111 drop(tx); 112 113 let v = block_on(rx.recv()); 114 assert!(v.is_none()); 115 }); 116 } 117 118 #[test] dropping_unbounded_tx()119fn dropping_unbounded_tx() { 120 loom::model(|| { 121 let (tx, mut rx) = mpsc::unbounded_channel::<()>(); 122 123 for _ in 0..2 { 124 let tx = tx.clone(); 125 thread::spawn(move || { 126 drop(tx); 127 }); 128 } 129 drop(tx); 130 131 let v = block_on(rx.recv()); 132 assert!(v.is_none()); 133 }); 134 } 135