1 use crate::future::poll_fn;
2 use crate::loom::sync::atomic::AtomicBool;
3 use crate::loom::sync::Mutex;
4 use crate::park::{Park, Unpark};
5 use crate::runtime::task::{self, JoinHandle, Schedule, Task};
6 use crate::sync::notify::Notify;
7 use crate::util::linked_list::{Link, LinkedList};
8 use crate::util::{waker_ref, Wake, WakerRef};
9
10 use std::cell::RefCell;
11 use std::collections::VecDeque;
12 use std::fmt;
13 use std::future::Future;
14 use std::ptr::NonNull;
15 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
16 use std::sync::Arc;
17 use std::task::Poll::{Pending, Ready};
18 use std::time::Duration;
19
20 /// Executes tasks on the current thread
21 pub(crate) struct BasicScheduler<P: Park> {
22 /// Inner state guarded by a mutex that is shared
23 /// between all `block_on` calls.
24 inner: Mutex<Option<Inner<P>>>,
25
26 /// Notifier for waking up other threads to steal the
27 /// parker.
28 notify: Notify,
29
30 /// Sendable task spawner
31 spawner: Spawner,
32 }
33
34 /// The inner scheduler that owns the task queue and the main parker P.
35 struct Inner<P: Park> {
36 /// Scheduler run queue
37 ///
38 /// When the scheduler is executed, the queue is removed from `self` and
39 /// moved into `Context`.
40 ///
41 /// This indirection is to allow `BasicScheduler` to be `Send`.
42 tasks: Option<Tasks>,
43
44 /// Sendable task spawner
45 spawner: Spawner,
46
47 /// Current tick
48 tick: u8,
49
50 /// Thread park handle
51 park: P,
52 }
53
54 #[derive(Clone)]
55 pub(crate) struct Spawner {
56 shared: Arc<Shared>,
57 }
58
59 struct Tasks {
60 /// Collection of all active tasks spawned onto this executor.
61 owned: LinkedList<Task<Arc<Shared>>, <Task<Arc<Shared>> as Link>::Target>,
62
63 /// Local run queue.
64 ///
65 /// Tasks notified from the current thread are pushed into this queue.
66 queue: VecDeque<task::Notified<Arc<Shared>>>,
67 }
68
69 /// A remote scheduler entry.
70 ///
71 /// These are filled in by remote threads sending instructions to the scheduler.
72 enum Entry {
73 /// A remote thread wants to spawn a task.
74 Schedule(task::Notified<Arc<Shared>>),
75 /// A remote thread wants a task to be released by the scheduler. We only
76 /// have access to its header.
77 Release(NonNull<task::Header>),
78 }
79
80 // Safety: Used correctly, the task header is "thread safe". Ultimately the task
81 // is owned by the current thread executor, for which this instruction is being
82 // sent.
83 unsafe impl Send for Entry {}
84
85 /// Scheduler state shared between threads.
86 struct Shared {
87 /// Remote run queue
88 queue: Mutex<VecDeque<Entry>>,
89
90 /// Unpark the blocked thread
91 unpark: Box<dyn Unpark>,
92
93 // indicates whether the blocked on thread was woken
94 woken: AtomicBool,
95 }
96
97 /// Thread-local context.
98 struct Context {
99 /// Shared scheduler state
100 shared: Arc<Shared>,
101
102 /// Local queue
103 tasks: RefCell<Tasks>,
104 }
105
106 /// Initial queue capacity.
107 const INITIAL_CAPACITY: usize = 64;
108
109 /// Max number of tasks to poll per tick.
110 #[cfg(loom)]
111 const MAX_TASKS_PER_TICK: usize = 4;
112 #[cfg(not(loom))]
113 const MAX_TASKS_PER_TICK: usize = 61;
114
115 /// How often to check the remote queue first.
116 const REMOTE_FIRST_INTERVAL: u8 = 31;
117
118 // Tracks the current BasicScheduler.
119 scoped_thread_local!(static CURRENT: Context);
120
121 impl<P: Park> BasicScheduler<P> {
new(park: P) -> BasicScheduler<P>122 pub(crate) fn new(park: P) -> BasicScheduler<P> {
123 let unpark = Box::new(park.unpark());
124
125 let spawner = Spawner {
126 shared: Arc::new(Shared {
127 queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
128 unpark: unpark as Box<dyn Unpark>,
129 woken: AtomicBool::new(false),
130 }),
131 };
132
133 let inner = Mutex::new(Some(Inner {
134 tasks: Some(Tasks {
135 owned: LinkedList::new(),
136 queue: VecDeque::with_capacity(INITIAL_CAPACITY),
137 }),
138 spawner: spawner.clone(),
139 tick: 0,
140 park,
141 }));
142
143 BasicScheduler {
144 inner,
145 notify: Notify::new(),
146 spawner,
147 }
148 }
149
spawner(&self) -> &Spawner150 pub(crate) fn spawner(&self) -> &Spawner {
151 &self.spawner
152 }
153
block_on<F: Future>(&self, future: F) -> F::Output154 pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
155 pin!(future);
156
157 // Attempt to steal the dedicated parker and block_on the future if we can there,
158 // otherwise, lets select on a notification that the parker is available
159 // or the future is complete.
160 loop {
161 if let Some(inner) = &mut self.take_inner() {
162 return inner.block_on(future);
163 } else {
164 let mut enter = crate::runtime::enter(false);
165
166 let notified = self.notify.notified();
167 pin!(notified);
168
169 if let Some(out) = enter
170 .block_on(poll_fn(|cx| {
171 if notified.as_mut().poll(cx).is_ready() {
172 return Ready(None);
173 }
174
175 if let Ready(out) = future.as_mut().poll(cx) {
176 return Ready(Some(out));
177 }
178
179 Pending
180 }))
181 .expect("Failed to `Enter::block_on`")
182 {
183 return out;
184 }
185 }
186 }
187 }
188
take_inner(&self) -> Option<InnerGuard<'_, P>>189 fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
190 let inner = self.inner.lock().take()?;
191
192 Some(InnerGuard {
193 inner: Some(inner),
194 basic_scheduler: &self,
195 })
196 }
197 }
198
199 impl<P: Park> Inner<P> {
200 /// Block on the future provided and drive the runtime's driver.
block_on<F: Future>(&mut self, future: F) -> F::Output201 fn block_on<F: Future>(&mut self, future: F) -> F::Output {
202 enter(self, |scheduler, context| {
203 let _enter = crate::runtime::enter(false);
204 let waker = scheduler.spawner.waker_ref();
205 let mut cx = std::task::Context::from_waker(&waker);
206 let mut polled = false;
207
208 pin!(future);
209
210 'outer: loop {
211 if scheduler.spawner.was_woken() || !polled {
212 polled = true;
213 if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
214 return v;
215 }
216 }
217
218 for _ in 0..MAX_TASKS_PER_TICK {
219 // Get and increment the current tick
220 let tick = scheduler.tick;
221 scheduler.tick = scheduler.tick.wrapping_add(1);
222
223 let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
224 scheduler.spawner.pop().or_else(|| {
225 context
226 .tasks
227 .borrow_mut()
228 .queue
229 .pop_front()
230 .map(Entry::Schedule)
231 })
232 } else {
233 context
234 .tasks
235 .borrow_mut()
236 .queue
237 .pop_front()
238 .map(Entry::Schedule)
239 .or_else(|| scheduler.spawner.pop())
240 };
241
242 let entry = match entry {
243 Some(entry) => entry,
244 None => {
245 // Park until the thread is signaled
246 scheduler.park.park().expect("failed to park");
247
248 // Try polling the `block_on` future next
249 continue 'outer;
250 }
251 };
252
253 match entry {
254 Entry::Schedule(task) => crate::coop::budget(|| task.run()),
255 Entry::Release(ptr) => {
256 // Safety: the task header is only legally provided
257 // internally in the header, so we know that it is a
258 // valid (or in particular *allocated*) header that
259 // is part of the linked list.
260 unsafe {
261 let removed = context.tasks.borrow_mut().owned.remove(ptr);
262
263 // TODO: This seems like it should hold, because
264 // there doesn't seem to be an avenue for anyone
265 // else to fiddle with the owned tasks
266 // collection *after* a remote thread has marked
267 // it as released, and at that point, the only
268 // location at which it can be removed is here
269 // or in the Drop implementation of the
270 // scheduler.
271 debug_assert!(removed.is_some());
272 }
273 }
274 }
275 }
276
277 // Yield to the park, this drives the timer and pulls any pending
278 // I/O events.
279 scheduler
280 .park
281 .park_timeout(Duration::from_millis(0))
282 .expect("failed to park");
283 }
284 })
285 }
286 }
287
288 /// Enter the scheduler context. This sets the queue and other necessary
289 /// scheduler state in the thread-local
enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R where F: FnOnce(&mut Inner<P>, &Context) -> R, P: Park,290 fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
291 where
292 F: FnOnce(&mut Inner<P>, &Context) -> R,
293 P: Park,
294 {
295 // Ensures the run queue is placed back in the `BasicScheduler` instance
296 // once `block_on` returns.`
297 struct Guard<'a, P: Park> {
298 context: Option<Context>,
299 scheduler: &'a mut Inner<P>,
300 }
301
302 impl<P: Park> Drop for Guard<'_, P> {
303 fn drop(&mut self) {
304 let Context { tasks, .. } = self.context.take().expect("context missing");
305 self.scheduler.tasks = Some(tasks.into_inner());
306 }
307 }
308
309 // Remove `tasks` from `self` and place it in a `Context`.
310 let tasks = scheduler.tasks.take().expect("invalid state");
311
312 let guard = Guard {
313 context: Some(Context {
314 shared: scheduler.spawner.shared.clone(),
315 tasks: RefCell::new(tasks),
316 }),
317 scheduler,
318 };
319
320 let context = guard.context.as_ref().unwrap();
321 let scheduler = &mut *guard.scheduler;
322
323 CURRENT.set(context, || f(scheduler, context))
324 }
325
326 impl<P: Park> Drop for BasicScheduler<P> {
drop(&mut self)327 fn drop(&mut self) {
328 // Avoid a double panic if we are currently panicking and
329 // the lock may be poisoned.
330
331 let mut inner = match self.inner.lock().take() {
332 Some(inner) => inner,
333 None if std::thread::panicking() => return,
334 None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
335 };
336
337 enter(&mut inner, |scheduler, context| {
338 // Loop required here to ensure borrow is dropped between iterations
339 #[allow(clippy::while_let_loop)]
340 loop {
341 let task = match context.tasks.borrow_mut().owned.pop_back() {
342 Some(task) => task,
343 None => break,
344 };
345
346 task.shutdown();
347 }
348
349 // Drain local queue
350 for task in context.tasks.borrow_mut().queue.drain(..) {
351 task.shutdown();
352 }
353
354 // Drain remote queue
355 for entry in scheduler.spawner.shared.queue.lock().drain(..) {
356 match entry {
357 Entry::Schedule(task) => {
358 task.shutdown();
359 }
360 Entry::Release(..) => {
361 // Do nothing, each entry in the linked list was *just*
362 // dropped by the scheduler above.
363 }
364 }
365 }
366
367 assert!(context.tasks.borrow().owned.is_empty());
368 });
369 }
370 }
371
372 impl<P: Park> fmt::Debug for BasicScheduler<P> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result373 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
374 fmt.debug_struct("BasicScheduler").finish()
375 }
376 }
377
378 // ===== impl Spawner =====
379
380 impl Spawner {
381 /// Spawns a future onto the thread pool
spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,382 pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
383 where
384 F: Future + Send + 'static,
385 F::Output: Send + 'static,
386 {
387 let (task, handle) = task::joinable(future);
388 self.shared.schedule(task);
389 handle
390 }
391
pop(&self) -> Option<Entry>392 fn pop(&self) -> Option<Entry> {
393 self.shared.queue.lock().pop_front()
394 }
395
waker_ref(&self) -> WakerRef<'_>396 fn waker_ref(&self) -> WakerRef<'_> {
397 // clear the woken bit
398 self.shared.woken.swap(false, AcqRel);
399 waker_ref(&self.shared)
400 }
401
was_woken(&self) -> bool402 fn was_woken(&self) -> bool {
403 self.shared.woken.load(Acquire)
404 }
405 }
406
407 impl fmt::Debug for Spawner {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result408 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
409 fmt.debug_struct("Spawner").finish()
410 }
411 }
412
413 // ===== impl Shared =====
414
415 impl Schedule for Arc<Shared> {
bind(task: Task<Self>) -> Arc<Shared>416 fn bind(task: Task<Self>) -> Arc<Shared> {
417 CURRENT.with(|maybe_cx| {
418 let cx = maybe_cx.expect("scheduler context missing");
419 cx.tasks.borrow_mut().owned.push_front(task);
420 cx.shared.clone()
421 })
422 }
423
release(&self, task: &Task<Self>) -> Option<Task<Self>>424 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
425 CURRENT.with(|maybe_cx| {
426 let ptr = NonNull::from(task.header());
427
428 if let Some(cx) = maybe_cx {
429 // safety: the task is inserted in the list in `bind`.
430 unsafe { cx.tasks.borrow_mut().owned.remove(ptr) }
431 } else {
432 self.queue.lock().push_back(Entry::Release(ptr));
433 self.unpark.unpark();
434 // Returning `None` here prevents the task plumbing from being
435 // freed. It is then up to the scheduler through the queue we
436 // just added to, or its Drop impl to free the task.
437 None
438 }
439 })
440 }
441
schedule(&self, task: task::Notified<Self>)442 fn schedule(&self, task: task::Notified<Self>) {
443 CURRENT.with(|maybe_cx| match maybe_cx {
444 Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
445 cx.tasks.borrow_mut().queue.push_back(task);
446 }
447 _ => {
448 self.queue.lock().push_back(Entry::Schedule(task));
449 self.unpark.unpark();
450 }
451 });
452 }
453 }
454
455 impl Wake for Shared {
wake(self: Arc<Self>)456 fn wake(self: Arc<Self>) {
457 Wake::wake_by_ref(&self)
458 }
459
460 /// Wake by reference
wake_by_ref(arc_self: &Arc<Self>)461 fn wake_by_ref(arc_self: &Arc<Self>) {
462 arc_self.woken.store(true, Release);
463 arc_self.unpark.unpark();
464 }
465 }
466
467 // ===== InnerGuard =====
468
469 /// Used to ensure we always place the Inner value
470 /// back into its slot in `BasicScheduler`, even if the
471 /// future panics.
472 struct InnerGuard<'a, P: Park> {
473 inner: Option<Inner<P>>,
474 basic_scheduler: &'a BasicScheduler<P>,
475 }
476
477 impl<P: Park> InnerGuard<'_, P> {
block_on<F: Future>(&mut self, future: F) -> F::Output478 fn block_on<F: Future>(&mut self, future: F) -> F::Output {
479 // The only time inner gets set to `None` is if we have dropped
480 // already so this unwrap is safe.
481 self.inner.as_mut().unwrap().block_on(future)
482 }
483 }
484
485 impl<P: Park> Drop for InnerGuard<'_, P> {
drop(&mut self)486 fn drop(&mut self) {
487 if let Some(scheduler) = self.inner.take() {
488 let mut lock = self.basic_scheduler.inner.lock();
489
490 // Replace old scheduler back into the state to allow
491 // other threads to pick it up and drive it.
492 lock.replace(scheduler);
493
494 // Wake up other possible threads that could steal
495 // the dedicated parker P.
496 self.basic_scheduler.notify.notify_one()
497 }
498 }
499 }
500