1 use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
2 use crate::runtime::tests::NoopSchedule;
3
4 use std::collections::VecDeque;
5 use std::future::Future;
6 use std::sync::atomic::{AtomicBool, Ordering};
7 use std::sync::{Arc, Mutex};
8
9 struct AssertDropHandle {
10 is_dropped: Arc<AtomicBool>,
11 }
12 impl AssertDropHandle {
13 #[track_caller]
assert_dropped(&self)14 fn assert_dropped(&self) {
15 assert!(self.is_dropped.load(Ordering::SeqCst));
16 }
17
18 #[track_caller]
assert_not_dropped(&self)19 fn assert_not_dropped(&self) {
20 assert!(!self.is_dropped.load(Ordering::SeqCst));
21 }
22 }
23
24 struct AssertDrop {
25 is_dropped: Arc<AtomicBool>,
26 }
27 impl AssertDrop {
new() -> (Self, AssertDropHandle)28 fn new() -> (Self, AssertDropHandle) {
29 let shared = Arc::new(AtomicBool::new(false));
30 (
31 AssertDrop {
32 is_dropped: shared.clone(),
33 },
34 AssertDropHandle {
35 is_dropped: shared.clone(),
36 },
37 )
38 }
39 }
40 impl Drop for AssertDrop {
drop(&mut self)41 fn drop(&mut self) {
42 self.is_dropped.store(true, Ordering::SeqCst);
43 }
44 }
45
46 // A Notified does not shut down on drop, but it is dropped once the ref-count
47 // hits zero.
48 #[test]
create_drop1()49 fn create_drop1() {
50 let (ad, handle) = AssertDrop::new();
51 let (notified, join) = unowned(
52 async {
53 drop(ad);
54 unreachable!()
55 },
56 NoopSchedule,
57 Id::next(),
58 );
59 drop(notified);
60 handle.assert_not_dropped();
61 drop(join);
62 handle.assert_dropped();
63 }
64
65 #[test]
create_drop2()66 fn create_drop2() {
67 let (ad, handle) = AssertDrop::new();
68 let (notified, join) = unowned(
69 async {
70 drop(ad);
71 unreachable!()
72 },
73 NoopSchedule,
74 Id::next(),
75 );
76 drop(join);
77 handle.assert_not_dropped();
78 drop(notified);
79 handle.assert_dropped();
80 }
81
82 #[test]
drop_abort_handle1()83 fn drop_abort_handle1() {
84 let (ad, handle) = AssertDrop::new();
85 let (notified, join) = unowned(
86 async {
87 drop(ad);
88 unreachable!()
89 },
90 NoopSchedule,
91 Id::next(),
92 );
93 let abort = join.abort_handle();
94 drop(join);
95 handle.assert_not_dropped();
96 drop(notified);
97 handle.assert_not_dropped();
98 drop(abort);
99 handle.assert_dropped();
100 }
101
102 #[test]
drop_abort_handle2()103 fn drop_abort_handle2() {
104 let (ad, handle) = AssertDrop::new();
105 let (notified, join) = unowned(
106 async {
107 drop(ad);
108 unreachable!()
109 },
110 NoopSchedule,
111 Id::next(),
112 );
113 let abort = join.abort_handle();
114 drop(notified);
115 handle.assert_not_dropped();
116 drop(abort);
117 handle.assert_not_dropped();
118 drop(join);
119 handle.assert_dropped();
120 }
121
122 // Shutting down through Notified works
123 #[test]
create_shutdown1()124 fn create_shutdown1() {
125 let (ad, handle) = AssertDrop::new();
126 let (notified, join) = unowned(
127 async {
128 drop(ad);
129 unreachable!()
130 },
131 NoopSchedule,
132 Id::next(),
133 );
134 drop(join);
135 handle.assert_not_dropped();
136 notified.shutdown();
137 handle.assert_dropped();
138 }
139
140 #[test]
create_shutdown2()141 fn create_shutdown2() {
142 let (ad, handle) = AssertDrop::new();
143 let (notified, join) = unowned(
144 async {
145 drop(ad);
146 unreachable!()
147 },
148 NoopSchedule,
149 Id::next(),
150 );
151 handle.assert_not_dropped();
152 notified.shutdown();
153 handle.assert_dropped();
154 drop(join);
155 }
156
157 #[test]
unowned_poll()158 fn unowned_poll() {
159 let (task, _) = unowned(async {}, NoopSchedule, Id::next());
160 task.run();
161 }
162
163 #[test]
schedule()164 fn schedule() {
165 with(|rt| {
166 rt.spawn(async {
167 crate::task::yield_now().await;
168 });
169
170 assert_eq!(2, rt.tick());
171 rt.shutdown();
172 })
173 }
174
175 #[test]
shutdown()176 fn shutdown() {
177 with(|rt| {
178 rt.spawn(async {
179 loop {
180 crate::task::yield_now().await;
181 }
182 });
183
184 rt.tick_max(1);
185
186 rt.shutdown();
187 })
188 }
189
190 #[test]
shutdown_immediately()191 fn shutdown_immediately() {
192 with(|rt| {
193 rt.spawn(async {
194 loop {
195 crate::task::yield_now().await;
196 }
197 });
198
199 rt.shutdown();
200 })
201 }
202
203 #[test]
spawn_during_shutdown()204 fn spawn_during_shutdown() {
205 static DID_SPAWN: AtomicBool = AtomicBool::new(false);
206
207 struct SpawnOnDrop(Runtime);
208 impl Drop for SpawnOnDrop {
209 fn drop(&mut self) {
210 DID_SPAWN.store(true, Ordering::SeqCst);
211 self.0.spawn(async {});
212 }
213 }
214
215 with(|rt| {
216 let rt2 = rt.clone();
217 rt.spawn(async move {
218 let _spawn_on_drop = SpawnOnDrop(rt2);
219
220 loop {
221 crate::task::yield_now().await;
222 }
223 });
224
225 rt.tick_max(1);
226 rt.shutdown();
227 });
228
229 assert!(DID_SPAWN.load(Ordering::SeqCst));
230 }
231
with(f: impl FnOnce(Runtime))232 fn with(f: impl FnOnce(Runtime)) {
233 struct Reset;
234
235 impl Drop for Reset {
236 fn drop(&mut self) {
237 let _rt = CURRENT.try_lock().unwrap().take();
238 }
239 }
240
241 let _reset = Reset;
242
243 let rt = Runtime(Arc::new(Inner {
244 owned: OwnedTasks::new(),
245 core: Mutex::new(Core {
246 queue: VecDeque::new(),
247 }),
248 }));
249
250 *CURRENT.try_lock().unwrap() = Some(rt.clone());
251 f(rt)
252 }
253
254 #[derive(Clone)]
255 struct Runtime(Arc<Inner>);
256
257 struct Inner {
258 core: Mutex<Core>,
259 owned: OwnedTasks<Runtime>,
260 }
261
262 struct Core {
263 queue: VecDeque<task::Notified<Runtime>>,
264 }
265
266 static CURRENT: Mutex<Option<Runtime>> = Mutex::new(None);
267
268 impl Runtime {
spawn<T>(&self, future: T) -> JoinHandle<T::Output> where T: 'static + Send + Future, T::Output: 'static + Send,269 fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
270 where
271 T: 'static + Send + Future,
272 T::Output: 'static + Send,
273 {
274 let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next());
275
276 if let Some(notified) = notified {
277 self.schedule(notified);
278 }
279
280 handle
281 }
282
tick(&self) -> usize283 fn tick(&self) -> usize {
284 self.tick_max(usize::MAX)
285 }
286
tick_max(&self, max: usize) -> usize287 fn tick_max(&self, max: usize) -> usize {
288 let mut n = 0;
289
290 while !self.is_empty() && n < max {
291 let task = self.next_task();
292 n += 1;
293 let task = self.0.owned.assert_owner(task);
294 task.run();
295 }
296
297 n
298 }
299
is_empty(&self) -> bool300 fn is_empty(&self) -> bool {
301 self.0.core.try_lock().unwrap().queue.is_empty()
302 }
303
next_task(&self) -> task::Notified<Runtime>304 fn next_task(&self) -> task::Notified<Runtime> {
305 self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
306 }
307
shutdown(&self)308 fn shutdown(&self) {
309 let mut core = self.0.core.try_lock().unwrap();
310
311 self.0.owned.close_and_shutdown_all();
312
313 while let Some(task) = core.queue.pop_back() {
314 drop(task);
315 }
316
317 drop(core);
318
319 assert!(self.0.owned.is_empty());
320 }
321 }
322
323 impl Schedule for Runtime {
release(&self, task: &Task<Self>) -> Option<Task<Self>>324 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
325 self.0.owned.remove(task)
326 }
327
schedule(&self, task: task::Notified<Self>)328 fn schedule(&self, task: task::Notified<Self>) {
329 self.0.core.try_lock().unwrap().queue.push_back(task);
330 }
331 }
332