1 /// Full runtime loom tests. These are heavy tests and take significant time to
2 /// run on CI.
3 ///
4 /// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test.
5 ///
6 /// In order to speed up the C
7 use crate::future::poll_fn;
8 use crate::runtime::tests::loom_oneshot as oneshot;
9 use crate::runtime::{self, Runtime};
10 use crate::{spawn, task};
11 use tokio_test::assert_ok;
12
13 use loom::sync::atomic::{AtomicBool, AtomicUsize};
14 use loom::sync::Arc;
15
16 use pin_project_lite::pin_project;
17 use std::future::Future;
18 use std::pin::Pin;
19 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
20 use std::task::{Context, Poll};
21
22 mod atomic_take {
23 use loom::sync::atomic::AtomicBool;
24 use std::mem::MaybeUninit;
25 use std::sync::atomic::Ordering::SeqCst;
26
27 pub(super) struct AtomicTake<T> {
28 inner: MaybeUninit<T>,
29 taken: AtomicBool,
30 }
31
32 impl<T> AtomicTake<T> {
new(value: T) -> Self33 pub(super) fn new(value: T) -> Self {
34 Self {
35 inner: MaybeUninit::new(value),
36 taken: AtomicBool::new(false),
37 }
38 }
39
take(&self) -> Option<T>40 pub(super) fn take(&self) -> Option<T> {
41 // safety: Only one thread will see the boolean change from false
42 // to true, so that thread is able to take the value.
43 match self.taken.fetch_or(true, SeqCst) {
44 false => unsafe { Some(std::ptr::read(self.inner.as_ptr())) },
45 true => None,
46 }
47 }
48 }
49
50 impl<T> Drop for AtomicTake<T> {
drop(&mut self)51 fn drop(&mut self) {
52 drop(self.take());
53 }
54 }
55 }
56
57 #[derive(Clone)]
58 struct AtomicOneshot<T> {
59 value: std::sync::Arc<atomic_take::AtomicTake<oneshot::Sender<T>>>,
60 }
61 impl<T> AtomicOneshot<T> {
new(sender: oneshot::Sender<T>) -> Self62 fn new(sender: oneshot::Sender<T>) -> Self {
63 Self {
64 value: std::sync::Arc::new(atomic_take::AtomicTake::new(sender)),
65 }
66 }
67
assert_send(&self, value: T)68 fn assert_send(&self, value: T) {
69 self.value.take().unwrap().send(value);
70 }
71 }
72
73 /// Tests are divided into groups to make the runs faster on CI.
74 mod group_a {
75 use super::*;
76
77 #[test]
racy_shutdown()78 fn racy_shutdown() {
79 loom::model(|| {
80 let pool = mk_pool(1);
81
82 // here's the case we want to exercise:
83 //
84 // a worker that still has tasks in its local queue gets sent to the blocking pool (due to
85 // block_in_place). the blocking pool is shut down, so drops the worker. the worker's
86 // shutdown method never gets run.
87 //
88 // we do this by spawning two tasks on one worker, the first of which does block_in_place,
89 // and then immediately drop the pool.
90
91 pool.spawn(track(async {
92 crate::task::block_in_place(|| {});
93 }));
94 pool.spawn(track(async {}));
95 drop(pool);
96 });
97 }
98
99 #[test]
pool_multi_spawn()100 fn pool_multi_spawn() {
101 loom::model(|| {
102 let pool = mk_pool(2);
103 let c1 = Arc::new(AtomicUsize::new(0));
104
105 let (tx, rx) = oneshot::channel();
106 let tx1 = AtomicOneshot::new(tx);
107
108 // Spawn a task
109 let c2 = c1.clone();
110 let tx2 = tx1.clone();
111 pool.spawn(track(async move {
112 spawn(track(async move {
113 if 1 == c1.fetch_add(1, Relaxed) {
114 tx1.assert_send(());
115 }
116 }));
117 }));
118
119 // Spawn a second task
120 pool.spawn(track(async move {
121 spawn(track(async move {
122 if 1 == c2.fetch_add(1, Relaxed) {
123 tx2.assert_send(());
124 }
125 }));
126 }));
127
128 rx.recv();
129 });
130 }
131
only_blocking_inner(first_pending: bool)132 fn only_blocking_inner(first_pending: bool) {
133 loom::model(move || {
134 let pool = mk_pool(1);
135 let (block_tx, block_rx) = oneshot::channel();
136
137 pool.spawn(track(async move {
138 crate::task::block_in_place(move || {
139 block_tx.send(());
140 });
141 if first_pending {
142 task::yield_now().await
143 }
144 }));
145
146 block_rx.recv();
147 drop(pool);
148 });
149 }
150
151 #[test]
only_blocking_without_pending()152 fn only_blocking_without_pending() {
153 only_blocking_inner(false)
154 }
155
156 #[test]
only_blocking_with_pending()157 fn only_blocking_with_pending() {
158 only_blocking_inner(true)
159 }
160 }
161
162 mod group_b {
163 use super::*;
164
blocking_and_regular_inner(first_pending: bool)165 fn blocking_and_regular_inner(first_pending: bool) {
166 const NUM: usize = 3;
167 loom::model(move || {
168 let pool = mk_pool(1);
169 let cnt = Arc::new(AtomicUsize::new(0));
170
171 let (block_tx, block_rx) = oneshot::channel();
172 let (done_tx, done_rx) = oneshot::channel();
173 let done_tx = AtomicOneshot::new(done_tx);
174
175 pool.spawn(track(async move {
176 crate::task::block_in_place(move || {
177 block_tx.send(());
178 });
179 if first_pending {
180 task::yield_now().await
181 }
182 }));
183
184 for _ in 0..NUM {
185 let cnt = cnt.clone();
186 let done_tx = done_tx.clone();
187
188 pool.spawn(track(async move {
189 if NUM == cnt.fetch_add(1, Relaxed) + 1 {
190 done_tx.assert_send(());
191 }
192 }));
193 }
194
195 done_rx.recv();
196 block_rx.recv();
197
198 drop(pool);
199 });
200 }
201
202 #[test]
blocking_and_regular()203 fn blocking_and_regular() {
204 blocking_and_regular_inner(false);
205 }
206
207 #[test]
blocking_and_regular_with_pending()208 fn blocking_and_regular_with_pending() {
209 blocking_and_regular_inner(true);
210 }
211
212 #[test]
join_output()213 fn join_output() {
214 loom::model(|| {
215 let rt = mk_pool(1);
216
217 rt.block_on(async {
218 let t = crate::spawn(track(async { "hello" }));
219
220 let out = assert_ok!(t.await);
221 assert_eq!("hello", out.into_inner());
222 });
223 });
224 }
225
226 #[test]
poll_drop_handle_then_drop()227 fn poll_drop_handle_then_drop() {
228 loom::model(|| {
229 let rt = mk_pool(1);
230
231 rt.block_on(async move {
232 let mut t = crate::spawn(track(async { "hello" }));
233
234 poll_fn(|cx| {
235 let _ = Pin::new(&mut t).poll(cx);
236 Poll::Ready(())
237 })
238 .await;
239 });
240 })
241 }
242
243 #[test]
complete_block_on_under_load()244 fn complete_block_on_under_load() {
245 loom::model(|| {
246 let pool = mk_pool(1);
247
248 pool.block_on(async {
249 // Trigger a re-schedule
250 crate::spawn(track(async {
251 for _ in 0..2 {
252 task::yield_now().await;
253 }
254 }));
255
256 gated2(true).await
257 });
258 });
259 }
260
261 #[test]
shutdown_with_notification()262 fn shutdown_with_notification() {
263 use crate::sync::oneshot;
264
265 loom::model(|| {
266 let rt = mk_pool(2);
267 let (done_tx, done_rx) = oneshot::channel::<()>();
268
269 rt.spawn(track(async move {
270 let (tx, rx) = oneshot::channel::<()>();
271
272 crate::spawn(async move {
273 crate::task::spawn_blocking(move || {
274 let _ = tx.send(());
275 });
276
277 let _ = done_rx.await;
278 });
279
280 let _ = rx.await;
281
282 let _ = done_tx.send(());
283 }));
284 });
285 }
286 }
287
288 mod group_c {
289 use super::*;
290
291 #[test]
pool_shutdown()292 fn pool_shutdown() {
293 loom::model(|| {
294 let pool = mk_pool(2);
295
296 pool.spawn(track(async move {
297 gated2(true).await;
298 }));
299
300 pool.spawn(track(async move {
301 gated2(false).await;
302 }));
303
304 drop(pool);
305 });
306 }
307 }
308
309 mod group_d {
310 use super::*;
311
312 #[test]
pool_multi_notify()313 fn pool_multi_notify() {
314 loom::model(|| {
315 let pool = mk_pool(2);
316
317 let c1 = Arc::new(AtomicUsize::new(0));
318
319 let (done_tx, done_rx) = oneshot::channel();
320 let done_tx1 = AtomicOneshot::new(done_tx);
321 let done_tx2 = done_tx1.clone();
322
323 // Spawn a task
324 let c2 = c1.clone();
325 pool.spawn(track(async move {
326 gated().await;
327 gated().await;
328
329 if 1 == c1.fetch_add(1, Relaxed) {
330 done_tx1.assert_send(());
331 }
332 }));
333
334 // Spawn a second task
335 pool.spawn(track(async move {
336 gated().await;
337 gated().await;
338
339 if 1 == c2.fetch_add(1, Relaxed) {
340 done_tx2.assert_send(());
341 }
342 }));
343
344 done_rx.recv();
345 });
346 }
347 }
348
mk_pool(num_threads: usize) -> Runtime349 fn mk_pool(num_threads: usize) -> Runtime {
350 runtime::Builder::new_multi_thread()
351 .worker_threads(num_threads)
352 .build()
353 .unwrap()
354 }
355
gated() -> impl Future<Output = &'static str>356 fn gated() -> impl Future<Output = &'static str> {
357 gated2(false)
358 }
359
gated2(thread: bool) -> impl Future<Output = &'static str>360 fn gated2(thread: bool) -> impl Future<Output = &'static str> {
361 use loom::thread;
362 use std::sync::Arc;
363
364 let gate = Arc::new(AtomicBool::new(false));
365 let mut fired = false;
366
367 poll_fn(move |cx| {
368 if !fired {
369 let gate = gate.clone();
370 let waker = cx.waker().clone();
371
372 if thread {
373 thread::spawn(move || {
374 gate.store(true, SeqCst);
375 waker.wake_by_ref();
376 });
377 } else {
378 spawn(track(async move {
379 gate.store(true, SeqCst);
380 waker.wake_by_ref();
381 }));
382 }
383
384 fired = true;
385
386 return Poll::Pending;
387 }
388
389 if gate.load(SeqCst) {
390 Poll::Ready("hello world")
391 } else {
392 Poll::Pending
393 }
394 })
395 }
396
track<T: Future>(f: T) -> Track<T>397 fn track<T: Future>(f: T) -> Track<T> {
398 Track {
399 inner: f,
400 arc: Arc::new(()),
401 }
402 }
403
404 pin_project! {
405 struct Track<T> {
406 #[pin]
407 inner: T,
408 // Arc is used to hook into loom's leak tracking.
409 arc: Arc<()>,
410 }
411 }
412
413 impl<T> Track<T> {
into_inner(self) -> T414 fn into_inner(self) -> T {
415 self.inner
416 }
417 }
418
419 impl<T: Future> Future for Track<T> {
420 type Output = Track<T::Output>;
421
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>422 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
423 let me = self.project();
424
425 Poll::Ready(Track {
426 inner: ready!(me.inner.poll(cx)),
427 arc: me.arc.clone(),
428 })
429 }
430 }
431