• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 mod queue;
2 mod shutdown;
3 mod yield_now;
4 
5 /// Full runtime loom tests. These are heavy tests and take significant time to
6 /// run on CI.
7 ///
8 /// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test.
9 ///
10 /// In order to speed up the C
11 use crate::runtime::tests::loom_oneshot as oneshot;
12 use crate::runtime::{self, Runtime};
13 use crate::{spawn, task};
14 use tokio_test::assert_ok;
15 
16 use loom::sync::atomic::{AtomicBool, AtomicUsize};
17 use loom::sync::Arc;
18 
19 use pin_project_lite::pin_project;
20 use std::future::{poll_fn, Future};
21 use std::pin::Pin;
22 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
23 use std::task::{ready, Context, Poll};
24 
25 mod atomic_take {
26     use loom::sync::atomic::AtomicBool;
27     use std::mem::MaybeUninit;
28     use std::sync::atomic::Ordering::SeqCst;
29 
30     pub(super) struct AtomicTake<T> {
31         inner: MaybeUninit<T>,
32         taken: AtomicBool,
33     }
34 
35     impl<T> AtomicTake<T> {
new(value: T) -> Self36         pub(super) fn new(value: T) -> Self {
37             Self {
38                 inner: MaybeUninit::new(value),
39                 taken: AtomicBool::new(false),
40             }
41         }
42 
take(&self) -> Option<T>43         pub(super) fn take(&self) -> Option<T> {
44             // safety: Only one thread will see the boolean change from false
45             // to true, so that thread is able to take the value.
46             match self.taken.fetch_or(true, SeqCst) {
47                 false => unsafe { Some(std::ptr::read(self.inner.as_ptr())) },
48                 true => None,
49             }
50         }
51     }
52 
53     impl<T> Drop for AtomicTake<T> {
drop(&mut self)54         fn drop(&mut self) {
55             drop(self.take());
56         }
57     }
58 }
59 
60 #[derive(Clone)]
61 struct AtomicOneshot<T> {
62     value: std::sync::Arc<atomic_take::AtomicTake<oneshot::Sender<T>>>,
63 }
64 impl<T> AtomicOneshot<T> {
new(sender: oneshot::Sender<T>) -> Self65     fn new(sender: oneshot::Sender<T>) -> Self {
66         Self {
67             value: std::sync::Arc::new(atomic_take::AtomicTake::new(sender)),
68         }
69     }
70 
assert_send(&self, value: T)71     fn assert_send(&self, value: T) {
72         self.value.take().unwrap().send(value);
73     }
74 }
75 
76 /// Tests are divided into groups to make the runs faster on CI.
77 mod group_a {
78     use super::*;
79 
80     #[test]
racy_shutdown()81     fn racy_shutdown() {
82         loom::model(|| {
83             let pool = mk_pool(1);
84 
85             // here's the case we want to exercise:
86             //
87             // a worker that still has tasks in its local queue gets sent to the blocking pool (due to
88             // block_in_place). the blocking pool is shut down, so drops the worker. the worker's
89             // shutdown method never gets run.
90             //
91             // we do this by spawning two tasks on one worker, the first of which does block_in_place,
92             // and then immediately drop the pool.
93 
94             pool.spawn(track(async {
95                 crate::task::block_in_place(|| {});
96             }));
97             pool.spawn(track(async {}));
98             drop(pool);
99         });
100     }
101 
102     #[test]
pool_multi_spawn()103     fn pool_multi_spawn() {
104         loom::model(|| {
105             let pool = mk_pool(2);
106             let c1 = Arc::new(AtomicUsize::new(0));
107 
108             let (tx, rx) = oneshot::channel();
109             let tx1 = AtomicOneshot::new(tx);
110 
111             // Spawn a task
112             let c2 = c1.clone();
113             let tx2 = tx1.clone();
114             pool.spawn(track(async move {
115                 spawn(track(async move {
116                     if 1 == c1.fetch_add(1, Relaxed) {
117                         tx1.assert_send(());
118                     }
119                 }));
120             }));
121 
122             // Spawn a second task
123             pool.spawn(track(async move {
124                 spawn(track(async move {
125                     if 1 == c2.fetch_add(1, Relaxed) {
126                         tx2.assert_send(());
127                     }
128                 }));
129             }));
130 
131             rx.recv();
132         });
133     }
134 
only_blocking_inner(first_pending: bool)135     fn only_blocking_inner(first_pending: bool) {
136         loom::model(move || {
137             let pool = mk_pool(1);
138             let (block_tx, block_rx) = oneshot::channel();
139 
140             pool.spawn(track(async move {
141                 crate::task::block_in_place(move || {
142                     block_tx.send(());
143                 });
144                 if first_pending {
145                     task::yield_now().await
146                 }
147             }));
148 
149             block_rx.recv();
150             drop(pool);
151         });
152     }
153 
154     #[test]
only_blocking_without_pending()155     fn only_blocking_without_pending() {
156         only_blocking_inner(false)
157     }
158 
159     #[test]
only_blocking_with_pending()160     fn only_blocking_with_pending() {
161         only_blocking_inner(true)
162     }
163 }
164 
165 mod group_b {
166     use super::*;
167 
blocking_and_regular_inner(first_pending: bool)168     fn blocking_and_regular_inner(first_pending: bool) {
169         const NUM: usize = 3;
170         loom::model(move || {
171             let pool = mk_pool(1);
172             let cnt = Arc::new(AtomicUsize::new(0));
173 
174             let (block_tx, block_rx) = oneshot::channel();
175             let (done_tx, done_rx) = oneshot::channel();
176             let done_tx = AtomicOneshot::new(done_tx);
177 
178             pool.spawn(track(async move {
179                 crate::task::block_in_place(move || {
180                     block_tx.send(());
181                 });
182                 if first_pending {
183                     task::yield_now().await
184                 }
185             }));
186 
187             for _ in 0..NUM {
188                 let cnt = cnt.clone();
189                 let done_tx = done_tx.clone();
190 
191                 pool.spawn(track(async move {
192                     if NUM == cnt.fetch_add(1, Relaxed) + 1 {
193                         done_tx.assert_send(());
194                     }
195                 }));
196             }
197 
198             done_rx.recv();
199             block_rx.recv();
200 
201             drop(pool);
202         });
203     }
204 
205     #[test]
blocking_and_regular()206     fn blocking_and_regular() {
207         blocking_and_regular_inner(false);
208     }
209 
210     #[test]
blocking_and_regular_with_pending()211     fn blocking_and_regular_with_pending() {
212         blocking_and_regular_inner(true);
213     }
214 
215     #[test]
join_output()216     fn join_output() {
217         loom::model(|| {
218             let rt = mk_pool(1);
219 
220             rt.block_on(async {
221                 let t = crate::spawn(track(async { "hello" }));
222 
223                 let out = assert_ok!(t.await);
224                 assert_eq!("hello", out.into_inner());
225             });
226         });
227     }
228 
229     #[test]
poll_drop_handle_then_drop()230     fn poll_drop_handle_then_drop() {
231         loom::model(|| {
232             let rt = mk_pool(1);
233 
234             rt.block_on(async move {
235                 let mut t = crate::spawn(track(async { "hello" }));
236 
237                 poll_fn(|cx| {
238                     let _ = Pin::new(&mut t).poll(cx);
239                     Poll::Ready(())
240                 })
241                 .await;
242             });
243         })
244     }
245 
246     #[test]
complete_block_on_under_load()247     fn complete_block_on_under_load() {
248         loom::model(|| {
249             let pool = mk_pool(1);
250 
251             pool.block_on(async {
252                 // Trigger a re-schedule
253                 crate::spawn(track(async {
254                     for _ in 0..2 {
255                         task::yield_now().await;
256                     }
257                 }));
258 
259                 gated2(true).await
260             });
261         });
262     }
263 
264     #[test]
shutdown_with_notification()265     fn shutdown_with_notification() {
266         use crate::sync::oneshot;
267 
268         loom::model(|| {
269             let rt = mk_pool(2);
270             let (done_tx, done_rx) = oneshot::channel::<()>();
271 
272             rt.spawn(track(async move {
273                 let (tx, rx) = oneshot::channel::<()>();
274 
275                 crate::spawn(async move {
276                     crate::task::spawn_blocking(move || {
277                         let _ = tx.send(());
278                     });
279 
280                     let _ = done_rx.await;
281                 });
282 
283                 let _ = rx.await;
284 
285                 let _ = done_tx.send(());
286             }));
287         });
288     }
289 }
290 
291 mod group_c {
292     use super::*;
293 
294     #[test]
pool_shutdown()295     fn pool_shutdown() {
296         loom::model(|| {
297             let pool = mk_pool(2);
298 
299             pool.spawn(track(async move {
300                 gated2(true).await;
301             }));
302 
303             pool.spawn(track(async move {
304                 gated2(false).await;
305             }));
306 
307             drop(pool);
308         });
309     }
310 }
311 
312 mod group_d {
313     use super::*;
314 
315     #[test]
pool_multi_notify()316     fn pool_multi_notify() {
317         loom::model(|| {
318             let pool = mk_pool(2);
319 
320             let c1 = Arc::new(AtomicUsize::new(0));
321 
322             let (done_tx, done_rx) = oneshot::channel();
323             let done_tx1 = AtomicOneshot::new(done_tx);
324             let done_tx2 = done_tx1.clone();
325 
326             // Spawn a task
327             let c2 = c1.clone();
328             pool.spawn(track(async move {
329                 multi_gated().await;
330 
331                 if 1 == c1.fetch_add(1, Relaxed) {
332                     done_tx1.assert_send(());
333                 }
334             }));
335 
336             // Spawn a second task
337             pool.spawn(track(async move {
338                 multi_gated().await;
339 
340                 if 1 == c2.fetch_add(1, Relaxed) {
341                     done_tx2.assert_send(());
342                 }
343             }));
344 
345             done_rx.recv();
346         });
347     }
348 }
349 
mk_pool(num_threads: usize) -> Runtime350 fn mk_pool(num_threads: usize) -> Runtime {
351     runtime::Builder::new_multi_thread()
352         .worker_threads(num_threads)
353         // Set the intervals to avoid tuning logic
354         .event_interval(2)
355         .build()
356         .unwrap()
357 }
358 
gated2(thread: bool) -> impl Future<Output = &'static str>359 fn gated2(thread: bool) -> impl Future<Output = &'static str> {
360     use loom::thread;
361     use std::sync::Arc;
362 
363     let gate = Arc::new(AtomicBool::new(false));
364     let mut fired = false;
365 
366     poll_fn(move |cx| {
367         if !fired {
368             let gate = gate.clone();
369             let waker = cx.waker().clone();
370 
371             if thread {
372                 thread::spawn(move || {
373                     gate.store(true, SeqCst);
374                     waker.wake_by_ref();
375                 });
376             } else {
377                 spawn(track(async move {
378                     gate.store(true, SeqCst);
379                     waker.wake_by_ref();
380                 }));
381             }
382 
383             fired = true;
384 
385             return Poll::Pending;
386         }
387 
388         if gate.load(SeqCst) {
389             Poll::Ready("hello world")
390         } else {
391             Poll::Pending
392         }
393     })
394 }
395 
multi_gated()396 async fn multi_gated() {
397     struct Gate {
398         waker: loom::future::AtomicWaker,
399         count: AtomicUsize,
400     }
401 
402     let gate = Arc::new(Gate {
403         waker: loom::future::AtomicWaker::new(),
404         count: AtomicUsize::new(0),
405     });
406 
407     {
408         let gate = gate.clone();
409         spawn(track(async move {
410             for i in 1..3 {
411                 gate.count.store(i, SeqCst);
412                 gate.waker.wake();
413             }
414         }));
415     }
416 
417     poll_fn(move |cx| {
418         gate.waker.register_by_ref(cx.waker());
419         if gate.count.load(SeqCst) < 2 {
420             Poll::Pending
421         } else {
422             Poll::Ready(())
423         }
424     })
425     .await;
426 }
427 
track<T: Future>(f: T) -> Track<T>428 fn track<T: Future>(f: T) -> Track<T> {
429     Track {
430         inner: f,
431         arc: Arc::new(()),
432     }
433 }
434 
435 pin_project! {
436     struct Track<T> {
437         #[pin]
438         inner: T,
439         // Arc is used to hook into loom's leak tracking.
440         arc: Arc<()>,
441     }
442 }
443 
444 impl<T> Track<T> {
into_inner(self) -> T445     fn into_inner(self) -> T {
446         self.inner
447     }
448 }
449 
450 impl<T: Future> Future for Track<T> {
451     type Output = Track<T::Output>;
452 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>453     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
454         let me = self.project();
455 
456         Poll::Ready(Track {
457             inner: ready!(me.inner.poll(cx)),
458             arc: me.arc.clone(),
459         })
460     }
461 }
462