1 mod queue;
2 mod shutdown;
3 mod yield_now;
4
5 /// Full runtime loom tests. These are heavy tests and take significant time to
6 /// run on CI.
7 ///
8 /// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test.
9 ///
10 /// In order to speed up the C
11 use crate::runtime::tests::loom_oneshot as oneshot;
12 use crate::runtime::{self, Runtime};
13 use crate::{spawn, task};
14 use tokio_test::assert_ok;
15
16 use loom::sync::atomic::{AtomicBool, AtomicUsize};
17 use loom::sync::Arc;
18
19 use pin_project_lite::pin_project;
20 use std::future::{poll_fn, Future};
21 use std::pin::Pin;
22 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
23 use std::task::{ready, Context, Poll};
24
25 mod atomic_take {
26 use loom::sync::atomic::AtomicBool;
27 use std::mem::MaybeUninit;
28 use std::sync::atomic::Ordering::SeqCst;
29
30 pub(super) struct AtomicTake<T> {
31 inner: MaybeUninit<T>,
32 taken: AtomicBool,
33 }
34
35 impl<T> AtomicTake<T> {
new(value: T) -> Self36 pub(super) fn new(value: T) -> Self {
37 Self {
38 inner: MaybeUninit::new(value),
39 taken: AtomicBool::new(false),
40 }
41 }
42
take(&self) -> Option<T>43 pub(super) fn take(&self) -> Option<T> {
44 // safety: Only one thread will see the boolean change from false
45 // to true, so that thread is able to take the value.
46 match self.taken.fetch_or(true, SeqCst) {
47 false => unsafe { Some(std::ptr::read(self.inner.as_ptr())) },
48 true => None,
49 }
50 }
51 }
52
53 impl<T> Drop for AtomicTake<T> {
drop(&mut self)54 fn drop(&mut self) {
55 drop(self.take());
56 }
57 }
58 }
59
60 #[derive(Clone)]
61 struct AtomicOneshot<T> {
62 value: std::sync::Arc<atomic_take::AtomicTake<oneshot::Sender<T>>>,
63 }
64 impl<T> AtomicOneshot<T> {
new(sender: oneshot::Sender<T>) -> Self65 fn new(sender: oneshot::Sender<T>) -> Self {
66 Self {
67 value: std::sync::Arc::new(atomic_take::AtomicTake::new(sender)),
68 }
69 }
70
assert_send(&self, value: T)71 fn assert_send(&self, value: T) {
72 self.value.take().unwrap().send(value);
73 }
74 }
75
76 /// Tests are divided into groups to make the runs faster on CI.
77 mod group_a {
78 use super::*;
79
80 #[test]
racy_shutdown()81 fn racy_shutdown() {
82 loom::model(|| {
83 let pool = mk_pool(1);
84
85 // here's the case we want to exercise:
86 //
87 // a worker that still has tasks in its local queue gets sent to the blocking pool (due to
88 // block_in_place). the blocking pool is shut down, so drops the worker. the worker's
89 // shutdown method never gets run.
90 //
91 // we do this by spawning two tasks on one worker, the first of which does block_in_place,
92 // and then immediately drop the pool.
93
94 pool.spawn(track(async {
95 crate::task::block_in_place(|| {});
96 }));
97 pool.spawn(track(async {}));
98 drop(pool);
99 });
100 }
101
102 #[test]
pool_multi_spawn()103 fn pool_multi_spawn() {
104 loom::model(|| {
105 let pool = mk_pool(2);
106 let c1 = Arc::new(AtomicUsize::new(0));
107
108 let (tx, rx) = oneshot::channel();
109 let tx1 = AtomicOneshot::new(tx);
110
111 // Spawn a task
112 let c2 = c1.clone();
113 let tx2 = tx1.clone();
114 pool.spawn(track(async move {
115 spawn(track(async move {
116 if 1 == c1.fetch_add(1, Relaxed) {
117 tx1.assert_send(());
118 }
119 }));
120 }));
121
122 // Spawn a second task
123 pool.spawn(track(async move {
124 spawn(track(async move {
125 if 1 == c2.fetch_add(1, Relaxed) {
126 tx2.assert_send(());
127 }
128 }));
129 }));
130
131 rx.recv();
132 });
133 }
134
only_blocking_inner(first_pending: bool)135 fn only_blocking_inner(first_pending: bool) {
136 loom::model(move || {
137 let pool = mk_pool(1);
138 let (block_tx, block_rx) = oneshot::channel();
139
140 pool.spawn(track(async move {
141 crate::task::block_in_place(move || {
142 block_tx.send(());
143 });
144 if first_pending {
145 task::yield_now().await
146 }
147 }));
148
149 block_rx.recv();
150 drop(pool);
151 });
152 }
153
154 #[test]
only_blocking_without_pending()155 fn only_blocking_without_pending() {
156 only_blocking_inner(false)
157 }
158
159 #[test]
only_blocking_with_pending()160 fn only_blocking_with_pending() {
161 only_blocking_inner(true)
162 }
163 }
164
165 mod group_b {
166 use super::*;
167
blocking_and_regular_inner(first_pending: bool)168 fn blocking_and_regular_inner(first_pending: bool) {
169 const NUM: usize = 3;
170 loom::model(move || {
171 let pool = mk_pool(1);
172 let cnt = Arc::new(AtomicUsize::new(0));
173
174 let (block_tx, block_rx) = oneshot::channel();
175 let (done_tx, done_rx) = oneshot::channel();
176 let done_tx = AtomicOneshot::new(done_tx);
177
178 pool.spawn(track(async move {
179 crate::task::block_in_place(move || {
180 block_tx.send(());
181 });
182 if first_pending {
183 task::yield_now().await
184 }
185 }));
186
187 for _ in 0..NUM {
188 let cnt = cnt.clone();
189 let done_tx = done_tx.clone();
190
191 pool.spawn(track(async move {
192 if NUM == cnt.fetch_add(1, Relaxed) + 1 {
193 done_tx.assert_send(());
194 }
195 }));
196 }
197
198 done_rx.recv();
199 block_rx.recv();
200
201 drop(pool);
202 });
203 }
204
205 #[test]
blocking_and_regular()206 fn blocking_and_regular() {
207 blocking_and_regular_inner(false);
208 }
209
210 #[test]
blocking_and_regular_with_pending()211 fn blocking_and_regular_with_pending() {
212 blocking_and_regular_inner(true);
213 }
214
215 #[test]
join_output()216 fn join_output() {
217 loom::model(|| {
218 let rt = mk_pool(1);
219
220 rt.block_on(async {
221 let t = crate::spawn(track(async { "hello" }));
222
223 let out = assert_ok!(t.await);
224 assert_eq!("hello", out.into_inner());
225 });
226 });
227 }
228
229 #[test]
poll_drop_handle_then_drop()230 fn poll_drop_handle_then_drop() {
231 loom::model(|| {
232 let rt = mk_pool(1);
233
234 rt.block_on(async move {
235 let mut t = crate::spawn(track(async { "hello" }));
236
237 poll_fn(|cx| {
238 let _ = Pin::new(&mut t).poll(cx);
239 Poll::Ready(())
240 })
241 .await;
242 });
243 })
244 }
245
246 #[test]
complete_block_on_under_load()247 fn complete_block_on_under_load() {
248 loom::model(|| {
249 let pool = mk_pool(1);
250
251 pool.block_on(async {
252 // Trigger a re-schedule
253 crate::spawn(track(async {
254 for _ in 0..2 {
255 task::yield_now().await;
256 }
257 }));
258
259 gated2(true).await
260 });
261 });
262 }
263
264 #[test]
shutdown_with_notification()265 fn shutdown_with_notification() {
266 use crate::sync::oneshot;
267
268 loom::model(|| {
269 let rt = mk_pool(2);
270 let (done_tx, done_rx) = oneshot::channel::<()>();
271
272 rt.spawn(track(async move {
273 let (tx, rx) = oneshot::channel::<()>();
274
275 crate::spawn(async move {
276 crate::task::spawn_blocking(move || {
277 let _ = tx.send(());
278 });
279
280 let _ = done_rx.await;
281 });
282
283 let _ = rx.await;
284
285 let _ = done_tx.send(());
286 }));
287 });
288 }
289 }
290
291 mod group_c {
292 use super::*;
293
294 #[test]
pool_shutdown()295 fn pool_shutdown() {
296 loom::model(|| {
297 let pool = mk_pool(2);
298
299 pool.spawn(track(async move {
300 gated2(true).await;
301 }));
302
303 pool.spawn(track(async move {
304 gated2(false).await;
305 }));
306
307 drop(pool);
308 });
309 }
310 }
311
312 mod group_d {
313 use super::*;
314
315 #[test]
pool_multi_notify()316 fn pool_multi_notify() {
317 loom::model(|| {
318 let pool = mk_pool(2);
319
320 let c1 = Arc::new(AtomicUsize::new(0));
321
322 let (done_tx, done_rx) = oneshot::channel();
323 let done_tx1 = AtomicOneshot::new(done_tx);
324 let done_tx2 = done_tx1.clone();
325
326 // Spawn a task
327 let c2 = c1.clone();
328 pool.spawn(track(async move {
329 multi_gated().await;
330
331 if 1 == c1.fetch_add(1, Relaxed) {
332 done_tx1.assert_send(());
333 }
334 }));
335
336 // Spawn a second task
337 pool.spawn(track(async move {
338 multi_gated().await;
339
340 if 1 == c2.fetch_add(1, Relaxed) {
341 done_tx2.assert_send(());
342 }
343 }));
344
345 done_rx.recv();
346 });
347 }
348 }
349
mk_pool(num_threads: usize) -> Runtime350 fn mk_pool(num_threads: usize) -> Runtime {
351 runtime::Builder::new_multi_thread()
352 .worker_threads(num_threads)
353 // Set the intervals to avoid tuning logic
354 .event_interval(2)
355 .build()
356 .unwrap()
357 }
358
gated2(thread: bool) -> impl Future<Output = &'static str>359 fn gated2(thread: bool) -> impl Future<Output = &'static str> {
360 use loom::thread;
361 use std::sync::Arc;
362
363 let gate = Arc::new(AtomicBool::new(false));
364 let mut fired = false;
365
366 poll_fn(move |cx| {
367 if !fired {
368 let gate = gate.clone();
369 let waker = cx.waker().clone();
370
371 if thread {
372 thread::spawn(move || {
373 gate.store(true, SeqCst);
374 waker.wake_by_ref();
375 });
376 } else {
377 spawn(track(async move {
378 gate.store(true, SeqCst);
379 waker.wake_by_ref();
380 }));
381 }
382
383 fired = true;
384
385 return Poll::Pending;
386 }
387
388 if gate.load(SeqCst) {
389 Poll::Ready("hello world")
390 } else {
391 Poll::Pending
392 }
393 })
394 }
395
multi_gated()396 async fn multi_gated() {
397 struct Gate {
398 waker: loom::future::AtomicWaker,
399 count: AtomicUsize,
400 }
401
402 let gate = Arc::new(Gate {
403 waker: loom::future::AtomicWaker::new(),
404 count: AtomicUsize::new(0),
405 });
406
407 {
408 let gate = gate.clone();
409 spawn(track(async move {
410 for i in 1..3 {
411 gate.count.store(i, SeqCst);
412 gate.waker.wake();
413 }
414 }));
415 }
416
417 poll_fn(move |cx| {
418 gate.waker.register_by_ref(cx.waker());
419 if gate.count.load(SeqCst) < 2 {
420 Poll::Pending
421 } else {
422 Poll::Ready(())
423 }
424 })
425 .await;
426 }
427
track<T: Future>(f: T) -> Track<T>428 fn track<T: Future>(f: T) -> Track<T> {
429 Track {
430 inner: f,
431 arc: Arc::new(()),
432 }
433 }
434
435 pin_project! {
436 struct Track<T> {
437 #[pin]
438 inner: T,
439 // Arc is used to hook into loom's leak tracking.
440 arc: Arc<()>,
441 }
442 }
443
444 impl<T> Track<T> {
into_inner(self) -> T445 fn into_inner(self) -> T {
446 self.inner
447 }
448 }
449
450 impl<T: Future> Future for Track<T> {
451 type Output = Track<T::Output>;
452
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>453 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
454 let me = self.project();
455
456 Poll::Ready(Track {
457 inner: ready!(me.inner.poll(cx)),
458 arc: me.arc.clone(),
459 })
460 }
461 }
462