• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(not(feature = "full"), allow(dead_code))]
2 
3 //! Yield points for improved cooperative scheduling.
4 //!
5 //! Documentation for this can be found in the [`tokio::task`] module.
6 //!
7 //! [`tokio::task`]: crate::task.
8 
9 // ```ignore
10 // # use tokio_stream::{Stream, StreamExt};
11 // async fn drop_all<I: Stream + Unpin>(mut input: I) {
12 //     while let Some(_) = input.next().await {
13 //         tokio::coop::proceed().await;
14 //     }
15 // }
16 // ```
17 //
18 // The `proceed` future will coordinate with the executor to make sure that
19 // every so often control is yielded back to the executor so it can run other
20 // tasks.
21 //
22 // # Placing yield points
23 //
24 // Voluntary yield points should be placed _after_ at least some work has been
25 // done. If they are not, a future sufficiently deep in the task hierarchy may
26 // end up _never_ getting to run because of the number of yield points that
27 // inevitably appear before it is reached. In general, you will want yield
28 // points to only appear in "leaf" futures -- those that do not themselves poll
29 // other futures. By doing this, you avoid double-counting each iteration of
30 // the outer future against the cooperating budget.
31 
32 use crate::runtime::context;
33 
34 /// Opaque type tracking the amount of "work" a task may still do before
35 /// yielding back to the scheduler.
36 #[derive(Debug, Copy, Clone)]
37 pub(crate) struct Budget(Option<u8>);
38 
39 pub(crate) struct BudgetDecrement {
40     success: bool,
41     hit_zero: bool,
42 }
43 
44 impl Budget {
45     /// Budget assigned to a task on each poll.
46     ///
47     /// The value itself is chosen somewhat arbitrarily. It needs to be high
48     /// enough to amortize wakeup and scheduling costs, but low enough that we
49     /// do not starve other tasks for too long. The value also needs to be high
50     /// enough that particularly deep tasks are able to do at least some useful
51     /// work at all.
52     ///
53     /// Note that as more yield points are added in the ecosystem, this value
54     /// will probably also have to be raised.
initial() -> Budget55     const fn initial() -> Budget {
56         Budget(Some(128))
57     }
58 
59     /// Returns an unconstrained budget. Operations will not be limited.
unconstrained() -> Budget60     pub(super) const fn unconstrained() -> Budget {
61         Budget(None)
62     }
63 
has_remaining(self) -> bool64     fn has_remaining(self) -> bool {
65         self.0.map(|budget| budget > 0).unwrap_or(true)
66     }
67 }
68 
69 /// Runs the given closure with a cooperative task budget. When the function
70 /// returns, the budget is reset to the value prior to calling the function.
71 #[inline(always)]
budget<R>(f: impl FnOnce() -> R) -> R72 pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
73     with_budget(Budget::initial(), f)
74 }
75 
76 /// Runs the given closure with an unconstrained task budget. When the function returns, the budget
77 /// is reset to the value prior to calling the function.
78 #[inline(always)]
with_unconstrained<R>(f: impl FnOnce() -> R) -> R79 pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
80     with_budget(Budget::unconstrained(), f)
81 }
82 
83 #[inline(always)]
with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R84 fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
85     struct ResetGuard {
86         prev: Budget,
87     }
88 
89     impl Drop for ResetGuard {
90         fn drop(&mut self) {
91             let _ = context::budget(|cell| {
92                 cell.set(self.prev);
93             });
94         }
95     }
96 
97     #[allow(unused_variables)]
98     let maybe_guard = context::budget(|cell| {
99         let prev = cell.get();
100         cell.set(budget);
101 
102         ResetGuard { prev }
103     });
104 
105     // The function is called regardless even if the budget is not successfully
106     // set due to the thread-local being destroyed.
107     f()
108 }
109 
110 #[inline(always)]
has_budget_remaining() -> bool111 pub(crate) fn has_budget_remaining() -> bool {
112     // If the current budget cannot be accessed due to the thread-local being
113     // shutdown, then we assume there is budget remaining.
114     context::budget(|cell| cell.get().has_remaining()).unwrap_or(true)
115 }
116 
117 cfg_rt_multi_thread! {
118     /// Sets the current task's budget.
119     pub(crate) fn set(budget: Budget) {
120         let _ = context::budget(|cell| cell.set(budget));
121     }
122 }
123 
124 cfg_rt! {
125     /// Forcibly removes the budgeting constraints early.
126     ///
127     /// Returns the remaining budget
128     pub(crate) fn stop() -> Budget {
129         context::budget(|cell| {
130             let prev = cell.get();
131             cell.set(Budget::unconstrained());
132             prev
133         }).unwrap_or(Budget::unconstrained())
134     }
135 }
136 
137 cfg_coop! {
138     use std::cell::Cell;
139     use std::task::{Context, Poll};
140 
141     #[must_use]
142     pub(crate) struct RestoreOnPending(Cell<Budget>);
143 
144     impl RestoreOnPending {
145         pub(crate) fn made_progress(&self) {
146             self.0.set(Budget::unconstrained());
147         }
148     }
149 
150     impl Drop for RestoreOnPending {
151         fn drop(&mut self) {
152             // Don't reset if budget was unconstrained or if we made progress.
153             // They are both represented as the remembered budget being unconstrained.
154             let budget = self.0.get();
155             if !budget.is_unconstrained() {
156                 let _ = context::budget(|cell| {
157                     cell.set(budget);
158                 });
159             }
160         }
161     }
162 
163     /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
164     ///
165     /// When you call this method, the current budget is decremented. However, to ensure that
166     /// progress is made every time a task is polled, the budget is automatically restored to its
167     /// former value if the returned `RestoreOnPending` is dropped. It is the caller's
168     /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure
169     /// that the budget empties appropriately.
170     ///
171     /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
172     /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
173     /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
174     /// that progress was made.
175     #[inline]
176     pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
177         context::budget(|cell| {
178             let mut budget = cell.get();
179 
180             let decrement = budget.decrement();
181 
182             if decrement.success {
183                 let restore = RestoreOnPending(Cell::new(cell.get()));
184                 cell.set(budget);
185 
186                 // avoid double counting
187                 if decrement.hit_zero {
188                     inc_budget_forced_yield_count();
189                 }
190 
191                 Poll::Ready(restore)
192             } else {
193                 cx.waker().wake_by_ref();
194                 Poll::Pending
195             }
196         }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
197     }
198 
199     cfg_rt! {
200         cfg_metrics! {
201             #[inline(always)]
202             fn inc_budget_forced_yield_count() {
203                 let _ = context::with_current(|handle| {
204                     handle.scheduler_metrics().inc_budget_forced_yield_count();
205                 });
206             }
207         }
208 
209         cfg_not_metrics! {
210             #[inline(always)]
211             fn inc_budget_forced_yield_count() {}
212         }
213     }
214 
215     cfg_not_rt! {
216         #[inline(always)]
217         fn inc_budget_forced_yield_count() {}
218     }
219 
220     impl Budget {
221         /// Decrements the budget. Returns `true` if successful. Decrementing fails
222         /// when there is not enough remaining budget.
223         fn decrement(&mut self) -> BudgetDecrement {
224             if let Some(num) = &mut self.0 {
225                 if *num > 0 {
226                     *num -= 1;
227 
228                     let hit_zero = *num == 0;
229 
230                     BudgetDecrement { success: true, hit_zero }
231                 } else {
232                     BudgetDecrement { success: false, hit_zero: false }
233                 }
234             } else {
235                 BudgetDecrement { success: true, hit_zero: false }
236             }
237         }
238 
239         fn is_unconstrained(self) -> bool {
240             self.0.is_none()
241         }
242     }
243 }
244 
245 #[cfg(all(test, not(loom)))]
246 mod test {
247     use super::*;
248 
249     #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
250     use wasm_bindgen_test::wasm_bindgen_test as test;
251 
get() -> Budget252     fn get() -> Budget {
253         context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained())
254     }
255 
256     #[test]
budgeting()257     fn budgeting() {
258         use futures::future::poll_fn;
259         use tokio_test::*;
260 
261         assert!(get().0.is_none());
262 
263         let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
264 
265         assert!(get().0.is_none());
266         drop(coop);
267         assert!(get().0.is_none());
268 
269         budget(|| {
270             assert_eq!(get().0, Budget::initial().0);
271 
272             let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
273             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
274             drop(coop);
275             // we didn't make progress
276             assert_eq!(get().0, Budget::initial().0);
277 
278             let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
279             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
280             coop.made_progress();
281             drop(coop);
282             // we _did_ make progress
283             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
284 
285             let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
286             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
287             coop.made_progress();
288             drop(coop);
289             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
290 
291             budget(|| {
292                 assert_eq!(get().0, Budget::initial().0);
293 
294                 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
295                 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
296                 coop.made_progress();
297                 drop(coop);
298                 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
299             });
300 
301             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
302         });
303 
304         assert!(get().0.is_none());
305 
306         budget(|| {
307             let n = get().0.unwrap();
308 
309             for _ in 0..n {
310                 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
311                 coop.made_progress();
312             }
313 
314             let mut task = task::spawn(poll_fn(|cx| {
315                 let coop = ready!(poll_proceed(cx));
316                 coop.made_progress();
317                 Poll::Ready(())
318             }));
319 
320             assert_pending!(task.poll());
321         });
322     }
323 }
324