1 use crate::sync::Notify;
2
3 use loom::future::block_on;
4 use loom::sync::Arc;
5 use loom::thread;
6
7 use tokio_test::{assert_pending, assert_ready};
8
9 /// `util::wake_list::NUM_WAKERS`
10 const WAKE_LIST_SIZE: usize = 32;
11
12 #[test]
notify_one()13 fn notify_one() {
14 loom::model(|| {
15 let tx = Arc::new(Notify::new());
16 let rx = tx.clone();
17
18 let th = thread::spawn(move || {
19 block_on(async {
20 rx.notified().await;
21 });
22 });
23
24 tx.notify_one();
25 th.join().unwrap();
26 });
27 }
28
29 #[test]
notify_waiters()30 fn notify_waiters() {
31 loom::model(|| {
32 let notify = Arc::new(Notify::new());
33 let tx = notify.clone();
34 let notified1 = notify.notified();
35 let notified2 = notify.notified();
36
37 let th = thread::spawn(move || {
38 tx.notify_waiters();
39 });
40
41 block_on(async {
42 notified1.await;
43 notified2.await;
44 });
45
46 th.join().unwrap();
47 });
48 }
49
50 #[test]
notify_waiters_and_one()51 fn notify_waiters_and_one() {
52 loom::model(|| {
53 let notify = Arc::new(Notify::new());
54 let tx1 = notify.clone();
55 let tx2 = notify.clone();
56
57 let th1 = thread::spawn(move || {
58 tx1.notify_waiters();
59 });
60
61 let th2 = thread::spawn(move || {
62 tx2.notify_one();
63 });
64
65 let th3 = thread::spawn(move || {
66 let notified = notify.notified();
67
68 block_on(async {
69 notified.await;
70 });
71 });
72
73 th1.join().unwrap();
74 th2.join().unwrap();
75 th3.join().unwrap();
76 });
77 }
78
79 #[test]
notify_multi()80 fn notify_multi() {
81 loom::model(|| {
82 let notify = Arc::new(Notify::new());
83
84 let mut ths = vec![];
85
86 for _ in 0..2 {
87 let notify = notify.clone();
88
89 ths.push(thread::spawn(move || {
90 block_on(async {
91 notify.notified().await;
92 notify.notify_one();
93 })
94 }));
95 }
96
97 notify.notify_one();
98
99 for th in ths.drain(..) {
100 th.join().unwrap();
101 }
102
103 block_on(async {
104 notify.notified().await;
105 });
106 });
107 }
108
109 #[test]
notify_drop()110 fn notify_drop() {
111 use crate::future::poll_fn;
112 use std::future::Future;
113 use std::task::Poll;
114
115 loom::model(|| {
116 let notify = Arc::new(Notify::new());
117 let rx1 = notify.clone();
118 let rx2 = notify.clone();
119
120 let th1 = thread::spawn(move || {
121 let mut recv = Box::pin(rx1.notified());
122
123 block_on(poll_fn(|cx| {
124 if recv.as_mut().poll(cx).is_ready() {
125 rx1.notify_one();
126 }
127 Poll::Ready(())
128 }));
129 });
130
131 let th2 = thread::spawn(move || {
132 block_on(async {
133 rx2.notified().await;
134 // Trigger second notification
135 rx2.notify_one();
136 rx2.notified().await;
137 });
138 });
139
140 notify.notify_one();
141
142 th1.join().unwrap();
143 th2.join().unwrap();
144 });
145 }
146
147 /// Polls two `Notified` futures and checks if poll results are consistent
148 /// with each other. If the first future is notified by a `notify_waiters`
149 /// call, then the second one must be notified as well.
150 #[test]
notify_waiters_poll_consistency()151 fn notify_waiters_poll_consistency() {
152 fn notify_waiters_poll_consistency_variant(poll_setting: [bool; 2]) {
153 let notify = Arc::new(Notify::new());
154 let mut notified = [
155 tokio_test::task::spawn(notify.notified()),
156 tokio_test::task::spawn(notify.notified()),
157 ];
158 for i in 0..2 {
159 if poll_setting[i] {
160 assert_pending!(notified[i].poll());
161 }
162 }
163
164 let tx = notify.clone();
165 let th = thread::spawn(move || {
166 tx.notify_waiters();
167 });
168
169 let res1 = notified[0].poll();
170 let res2 = notified[1].poll();
171
172 // If res1 is ready, then res2 must also be ready.
173 assert!(res1.is_pending() || res2.is_ready());
174
175 th.join().unwrap();
176 }
177
178 // We test different scenarios in which pending futures had or had not
179 // been polled before the call to `notify_waiters`.
180 loom::model(|| notify_waiters_poll_consistency_variant([false, false]));
181 loom::model(|| notify_waiters_poll_consistency_variant([true, false]));
182 loom::model(|| notify_waiters_poll_consistency_variant([false, true]));
183 loom::model(|| notify_waiters_poll_consistency_variant([true, true]));
184 }
185
186 /// Polls two `Notified` futures and checks if poll results are consistent
187 /// with each other. If the first future is notified by a `notify_waiters`
188 /// call, then the second one must be notified as well.
189 ///
190 /// Here we also add other `Notified` futures in between to force the two
191 /// tested futures to end up in different chunks.
192 #[test]
notify_waiters_poll_consistency_many()193 fn notify_waiters_poll_consistency_many() {
194 fn notify_waiters_poll_consistency_many_variant(order: [usize; 2]) {
195 let notify = Arc::new(Notify::new());
196
197 let mut futs = (0..WAKE_LIST_SIZE + 1)
198 .map(|_| tokio_test::task::spawn(notify.notified()))
199 .collect::<Vec<_>>();
200
201 assert_pending!(futs[order[0]].poll());
202 for i in 2..futs.len() {
203 assert_pending!(futs[i].poll());
204 }
205 assert_pending!(futs[order[1]].poll());
206
207 let tx = notify.clone();
208 let th = thread::spawn(move || {
209 tx.notify_waiters();
210 });
211
212 let res1 = futs[0].poll();
213 let res2 = futs[1].poll();
214
215 // If res1 is ready, then res2 must also be ready.
216 assert!(res1.is_pending() || res2.is_ready());
217
218 th.join().unwrap();
219 }
220
221 // We test different scenarios in which futures are polled in different order.
222 loom::model(|| notify_waiters_poll_consistency_many_variant([0, 1]));
223 loom::model(|| notify_waiters_poll_consistency_many_variant([1, 0]));
224 }
225
226 /// Checks if a call to `notify_waiters` is observed as atomic when combined
227 /// with a concurrent call to `notify_one`.
228 #[test]
notify_waiters_is_atomic()229 fn notify_waiters_is_atomic() {
230 fn notify_waiters_is_atomic_variant(tested_fut_index: usize) {
231 let notify = Arc::new(Notify::new());
232
233 let mut futs = (0..WAKE_LIST_SIZE + 1)
234 .map(|_| tokio_test::task::spawn(notify.notified()))
235 .collect::<Vec<_>>();
236
237 for fut in &mut futs {
238 assert_pending!(fut.poll());
239 }
240
241 let tx = notify.clone();
242 let th = thread::spawn(move || {
243 tx.notify_waiters();
244 });
245
246 block_on(async {
247 // If awaiting one of the futures completes, then we should be
248 // able to assume that all pending futures are notified. Therefore
249 // a notification from a subsequent `notify_one` call should not
250 // be consumed by an old future.
251 futs.remove(tested_fut_index).await;
252
253 let mut new_fut = tokio_test::task::spawn(notify.notified());
254 assert_pending!(new_fut.poll());
255
256 notify.notify_one();
257
258 // `new_fut` must consume the notification from `notify_one`.
259 assert_ready!(new_fut.poll());
260 });
261
262 th.join().unwrap();
263 }
264
265 // We test different scenarios in which the tested future is at the beginning
266 // or at the end of the waiters queue used by `Notify`.
267 loom::model(|| notify_waiters_is_atomic_variant(0));
268 loom::model(|| notify_waiters_is_atomic_variant(32));
269 }
270
271 /// Checks if a single call to `notify_waiters` does not get through two `Notified`
272 /// futures created and awaited sequentially like this:
273 /// ```ignore
274 /// notify.notified().await;
275 /// notify.notified().await;
276 /// ```
277 #[test]
notify_waiters_sequential_notified_await()278 fn notify_waiters_sequential_notified_await() {
279 use crate::sync::oneshot;
280
281 loom::model(|| {
282 let notify = Arc::new(Notify::new());
283
284 let (tx_fst, rx_fst) = oneshot::channel();
285 let (tx_snd, rx_snd) = oneshot::channel();
286
287 let receiver = thread::spawn({
288 let notify = notify.clone();
289 move || {
290 block_on(async {
291 // Poll the first `Notified` to put it as the first waiter
292 // in the queue.
293 let mut first_notified = tokio_test::task::spawn(notify.notified());
294 assert_pending!(first_notified.poll());
295
296 // Create additional waiters to force `notify_waiters` to
297 // release the lock at least once.
298 let _task_pile = (0..WAKE_LIST_SIZE + 1)
299 .map(|_| {
300 let mut fut = tokio_test::task::spawn(notify.notified());
301 assert_pending!(fut.poll());
302 fut
303 })
304 .collect::<Vec<_>>();
305
306 // We are ready for the notify_waiters call.
307 tx_fst.send(()).unwrap();
308
309 first_notified.await;
310
311 // Poll the second `Notified` future to try to insert
312 // it to the waiters queue.
313 let mut second_notified = tokio_test::task::spawn(notify.notified());
314 assert_pending!(second_notified.poll());
315
316 // Wait for the `notify_waiters` to end and check if we
317 // are woken up.
318 rx_snd.await.unwrap();
319 assert_pending!(second_notified.poll());
320 });
321 }
322 });
323
324 // Wait for the signal and call `notify_waiters`.
325 block_on(rx_fst).unwrap();
326 notify.notify_waiters();
327 tx_snd.send(()).unwrap();
328
329 receiver.join().unwrap();
330 });
331 }
332