• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![allow(clippy::redundant_clone)]
2 
3 use crate::future::poll_fn;
4 use crate::park::{Park, Unpark};
5 use crate::runtime::driver::Driver;
6 use crate::sync::Notify;
7 use crate::util::{waker_ref, Wake};
8 
9 use std::sync::{Arc, Mutex};
10 use std::task::Context;
11 use std::task::Poll::{Pending, Ready};
12 use std::{future::Future, sync::PoisonError};
13 
14 #[derive(Debug)]
15 pub(super) struct Shell {
16     driver: Mutex<Option<Driver>>,
17 
18     notify: Notify,
19 
20     /// TODO: don't store this
21     unpark: Arc<Handle>,
22 }
23 
24 #[derive(Debug)]
25 struct Handle(<Driver as Park>::Unpark);
26 
27 impl Shell {
new(driver: Driver) -> Shell28     pub(super) fn new(driver: Driver) -> Shell {
29         let unpark = Arc::new(Handle(driver.unpark()));
30 
31         Shell {
32             driver: Mutex::new(Some(driver)),
33             notify: Notify::new(),
34             unpark,
35         }
36     }
37 
block_on<F>(&self, f: F) -> F::Output where F: Future,38     pub(super) fn block_on<F>(&self, f: F) -> F::Output
39     where
40         F: Future,
41     {
42         let mut enter = crate::runtime::enter(true);
43 
44         pin!(f);
45 
46         loop {
47             if let Some(driver) = &mut self.take_driver() {
48                 return driver.block_on(f);
49             } else {
50                 let notified = self.notify.notified();
51                 pin!(notified);
52 
53                 if let Some(out) = enter
54                     .block_on(poll_fn(|cx| {
55                         if notified.as_mut().poll(cx).is_ready() {
56                             return Ready(None);
57                         }
58 
59                         if let Ready(out) = f.as_mut().poll(cx) {
60                             return Ready(Some(out));
61                         }
62 
63                         Pending
64                     }))
65                     .expect("Failed to `Enter::block_on`")
66                 {
67                     return out;
68                 }
69             }
70         }
71     }
72 
take_driver(&self) -> Option<DriverGuard<'_>>73     fn take_driver(&self) -> Option<DriverGuard<'_>> {
74         let mut lock = self.driver.lock().unwrap();
75         let driver = lock.take()?;
76 
77         Some(DriverGuard {
78             inner: Some(driver),
79             shell: &self,
80         })
81     }
82 }
83 
84 impl Wake for Handle {
85     /// Wake by value
wake(self: Arc<Self>)86     fn wake(self: Arc<Self>) {
87         Wake::wake_by_ref(&self);
88     }
89 
90     /// Wake by reference
wake_by_ref(arc_self: &Arc<Self>)91     fn wake_by_ref(arc_self: &Arc<Self>) {
92         arc_self.0.unpark();
93     }
94 }
95 
96 struct DriverGuard<'a> {
97     inner: Option<Driver>,
98     shell: &'a Shell,
99 }
100 
101 impl DriverGuard<'_> {
block_on<F: Future>(&mut self, f: F) -> F::Output102     fn block_on<F: Future>(&mut self, f: F) -> F::Output {
103         let driver = self.inner.as_mut().unwrap();
104 
105         pin!(f);
106 
107         let waker = waker_ref(&self.shell.unpark);
108         let mut cx = Context::from_waker(&waker);
109 
110         loop {
111             if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
112                 return v;
113             }
114 
115             driver.park().unwrap();
116         }
117     }
118 }
119 
120 impl Drop for DriverGuard<'_> {
drop(&mut self)121     fn drop(&mut self) {
122         if let Some(inner) = self.inner.take() {
123             self.shell
124                 .driver
125                 .lock()
126                 .unwrap_or_else(PoisonError::into_inner)
127                 .replace(inner);
128 
129             self.shell.notify.notify_one();
130         }
131     }
132 }
133