• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
2 use crate::runtime::tests::NoopSchedule;
3 
4 use std::collections::VecDeque;
5 use std::future::Future;
6 use std::sync::atomic::{AtomicBool, Ordering};
7 use std::sync::{Arc, Mutex};
8 
9 struct AssertDropHandle {
10     is_dropped: Arc<AtomicBool>,
11 }
12 impl AssertDropHandle {
13     #[track_caller]
assert_dropped(&self)14     fn assert_dropped(&self) {
15         assert!(self.is_dropped.load(Ordering::SeqCst));
16     }
17 
18     #[track_caller]
assert_not_dropped(&self)19     fn assert_not_dropped(&self) {
20         assert!(!self.is_dropped.load(Ordering::SeqCst));
21     }
22 }
23 
24 struct AssertDrop {
25     is_dropped: Arc<AtomicBool>,
26 }
27 impl AssertDrop {
new() -> (Self, AssertDropHandle)28     fn new() -> (Self, AssertDropHandle) {
29         let shared = Arc::new(AtomicBool::new(false));
30         (
31             AssertDrop {
32                 is_dropped: shared.clone(),
33             },
34             AssertDropHandle {
35                 is_dropped: shared.clone(),
36             },
37         )
38     }
39 }
40 impl Drop for AssertDrop {
drop(&mut self)41     fn drop(&mut self) {
42         self.is_dropped.store(true, Ordering::SeqCst);
43     }
44 }
45 
46 // A Notified does not shut down on drop, but it is dropped once the ref-count
47 // hits zero.
48 #[test]
create_drop1()49 fn create_drop1() {
50     let (ad, handle) = AssertDrop::new();
51     let (notified, join) = unowned(
52         async {
53             drop(ad);
54             unreachable!()
55         },
56         NoopSchedule,
57         Id::next(),
58     );
59     drop(notified);
60     handle.assert_not_dropped();
61     drop(join);
62     handle.assert_dropped();
63 }
64 
65 #[test]
create_drop2()66 fn create_drop2() {
67     let (ad, handle) = AssertDrop::new();
68     let (notified, join) = unowned(
69         async {
70             drop(ad);
71             unreachable!()
72         },
73         NoopSchedule,
74         Id::next(),
75     );
76     drop(join);
77     handle.assert_not_dropped();
78     drop(notified);
79     handle.assert_dropped();
80 }
81 
82 #[test]
drop_abort_handle1()83 fn drop_abort_handle1() {
84     let (ad, handle) = AssertDrop::new();
85     let (notified, join) = unowned(
86         async {
87             drop(ad);
88             unreachable!()
89         },
90         NoopSchedule,
91         Id::next(),
92     );
93     let abort = join.abort_handle();
94     drop(join);
95     handle.assert_not_dropped();
96     drop(notified);
97     handle.assert_not_dropped();
98     drop(abort);
99     handle.assert_dropped();
100 }
101 
102 #[test]
drop_abort_handle2()103 fn drop_abort_handle2() {
104     let (ad, handle) = AssertDrop::new();
105     let (notified, join) = unowned(
106         async {
107             drop(ad);
108             unreachable!()
109         },
110         NoopSchedule,
111         Id::next(),
112     );
113     let abort = join.abort_handle();
114     drop(notified);
115     handle.assert_not_dropped();
116     drop(abort);
117     handle.assert_not_dropped();
118     drop(join);
119     handle.assert_dropped();
120 }
121 
122 // Shutting down through Notified works
123 #[test]
create_shutdown1()124 fn create_shutdown1() {
125     let (ad, handle) = AssertDrop::new();
126     let (notified, join) = unowned(
127         async {
128             drop(ad);
129             unreachable!()
130         },
131         NoopSchedule,
132         Id::next(),
133     );
134     drop(join);
135     handle.assert_not_dropped();
136     notified.shutdown();
137     handle.assert_dropped();
138 }
139 
140 #[test]
create_shutdown2()141 fn create_shutdown2() {
142     let (ad, handle) = AssertDrop::new();
143     let (notified, join) = unowned(
144         async {
145             drop(ad);
146             unreachable!()
147         },
148         NoopSchedule,
149         Id::next(),
150     );
151     handle.assert_not_dropped();
152     notified.shutdown();
153     handle.assert_dropped();
154     drop(join);
155 }
156 
157 #[test]
unowned_poll()158 fn unowned_poll() {
159     let (task, _) = unowned(async {}, NoopSchedule, Id::next());
160     task.run();
161 }
162 
163 #[test]
schedule()164 fn schedule() {
165     with(|rt| {
166         rt.spawn(async {
167             crate::task::yield_now().await;
168         });
169 
170         assert_eq!(2, rt.tick());
171         rt.shutdown();
172     })
173 }
174 
175 #[test]
shutdown()176 fn shutdown() {
177     with(|rt| {
178         rt.spawn(async {
179             loop {
180                 crate::task::yield_now().await;
181             }
182         });
183 
184         rt.tick_max(1);
185 
186         rt.shutdown();
187     })
188 }
189 
190 #[test]
shutdown_immediately()191 fn shutdown_immediately() {
192     with(|rt| {
193         rt.spawn(async {
194             loop {
195                 crate::task::yield_now().await;
196             }
197         });
198 
199         rt.shutdown();
200     })
201 }
202 
203 #[test]
spawn_during_shutdown()204 fn spawn_during_shutdown() {
205     static DID_SPAWN: AtomicBool = AtomicBool::new(false);
206 
207     struct SpawnOnDrop(Runtime);
208     impl Drop for SpawnOnDrop {
209         fn drop(&mut self) {
210             DID_SPAWN.store(true, Ordering::SeqCst);
211             self.0.spawn(async {});
212         }
213     }
214 
215     with(|rt| {
216         let rt2 = rt.clone();
217         rt.spawn(async move {
218             let _spawn_on_drop = SpawnOnDrop(rt2);
219 
220             loop {
221                 crate::task::yield_now().await;
222             }
223         });
224 
225         rt.tick_max(1);
226         rt.shutdown();
227     });
228 
229     assert!(DID_SPAWN.load(Ordering::SeqCst));
230 }
231 
with(f: impl FnOnce(Runtime))232 fn with(f: impl FnOnce(Runtime)) {
233     struct Reset;
234 
235     impl Drop for Reset {
236         fn drop(&mut self) {
237             let _rt = CURRENT.try_lock().unwrap().take();
238         }
239     }
240 
241     let _reset = Reset;
242 
243     let rt = Runtime(Arc::new(Inner {
244         owned: OwnedTasks::new(),
245         core: Mutex::new(Core {
246             queue: VecDeque::new(),
247         }),
248     }));
249 
250     *CURRENT.try_lock().unwrap() = Some(rt.clone());
251     f(rt)
252 }
253 
254 #[derive(Clone)]
255 struct Runtime(Arc<Inner>);
256 
257 struct Inner {
258     core: Mutex<Core>,
259     owned: OwnedTasks<Runtime>,
260 }
261 
262 struct Core {
263     queue: VecDeque<task::Notified<Runtime>>,
264 }
265 
266 static CURRENT: Mutex<Option<Runtime>> = Mutex::new(None);
267 
268 impl Runtime {
spawn<T>(&self, future: T) -> JoinHandle<T::Output> where T: 'static + Send + Future, T::Output: 'static + Send,269     fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
270     where
271         T: 'static + Send + Future,
272         T::Output: 'static + Send,
273     {
274         let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next());
275 
276         if let Some(notified) = notified {
277             self.schedule(notified);
278         }
279 
280         handle
281     }
282 
tick(&self) -> usize283     fn tick(&self) -> usize {
284         self.tick_max(usize::MAX)
285     }
286 
tick_max(&self, max: usize) -> usize287     fn tick_max(&self, max: usize) -> usize {
288         let mut n = 0;
289 
290         while !self.is_empty() && n < max {
291             let task = self.next_task();
292             n += 1;
293             let task = self.0.owned.assert_owner(task);
294             task.run();
295         }
296 
297         n
298     }
299 
is_empty(&self) -> bool300     fn is_empty(&self) -> bool {
301         self.0.core.try_lock().unwrap().queue.is_empty()
302     }
303 
next_task(&self) -> task::Notified<Runtime>304     fn next_task(&self) -> task::Notified<Runtime> {
305         self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
306     }
307 
shutdown(&self)308     fn shutdown(&self) {
309         let mut core = self.0.core.try_lock().unwrap();
310 
311         self.0.owned.close_and_shutdown_all();
312 
313         while let Some(task) = core.queue.pop_back() {
314             drop(task);
315         }
316 
317         drop(core);
318 
319         assert!(self.0.owned.is_empty());
320     }
321 }
322 
323 impl Schedule for Runtime {
release(&self, task: &Task<Self>) -> Option<Task<Self>>324     fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
325         self.0.owned.remove(task)
326     }
327 
schedule(&self, task: task::Notified<Self>)328     fn schedule(&self, task: task::Notified<Self>) {
329         self.0.core.try_lock().unwrap().queue.push_back(task);
330     }
331 }
332