1 use core::fmt;
2 use core::future::Future;
3 use core::marker::PhantomData;
4 use core::mem;
5 use core::ptr::NonNull;
6 use core::sync::atomic::Ordering;
7 use core::task::Waker;
8
9 use alloc::boxed::Box;
10
11 use crate::header::Header;
12 use crate::raw::RawTask;
13 use crate::state::*;
14 use crate::Task;
15
16 mod sealed {
17 use super::*;
18 pub trait Sealed<M> {}
19
20 impl<M, F> Sealed<M> for F where F: Fn(Runnable<M>) {}
21
22 impl<M, F> Sealed<M> for WithInfo<F> where F: Fn(Runnable<M>, ScheduleInfo) {}
23 }
24
25 /// A builder that creates a new task.
26 #[derive(Debug)]
27 pub struct Builder<M> {
28 /// The metadata associated with the task.
29 pub(crate) metadata: M,
30
31 /// Whether or not a panic that occurs in the task should be propagated.
32 #[cfg(feature = "std")]
33 pub(crate) propagate_panic: bool,
34 }
35
36 impl<M: Default> Default for Builder<M> {
default() -> Self37 fn default() -> Self {
38 Builder::new().metadata(M::default())
39 }
40 }
41
42 /// Extra scheduling information that can be passed to the scheduling function.
43 ///
44 /// The data source of this struct is directly from the actual implementation
45 /// of the crate itself, different from [`Runnable`]'s metadata, which is
46 /// managed by the caller.
47 ///
48 /// # Examples
49 ///
50 /// ```
51 /// use async_task::{Runnable, ScheduleInfo, WithInfo};
52 /// use std::sync::{Arc, Mutex};
53 ///
54 /// // The future inside the task.
55 /// let future = async {
56 /// println!("Hello, world!");
57 /// };
58 ///
59 /// // If the task gets woken up while running, it will be sent into this channel.
60 /// let (s, r) = flume::unbounded();
61 /// // Otherwise, it will be placed into this slot.
62 /// let lifo_slot = Arc::new(Mutex::new(None));
63 /// let schedule = move |runnable: Runnable, info: ScheduleInfo| {
64 /// if info.woken_while_running {
65 /// s.send(runnable).unwrap()
66 /// } else {
67 /// let last = lifo_slot.lock().unwrap().replace(runnable);
68 /// if let Some(last) = last {
69 /// s.send(last).unwrap()
70 /// }
71 /// }
72 /// };
73 ///
74 /// // Create the actual scheduler to be spawned with some future.
75 /// let scheduler = WithInfo(schedule);
76 /// // Create a task with the future and the scheduler.
77 /// let (runnable, task) = async_task::spawn(future, scheduler);
78 /// ```
79 #[derive(Debug, Copy, Clone)]
80 #[non_exhaustive]
81 pub struct ScheduleInfo {
82 /// Indicates whether the task gets woken up while running.
83 ///
84 /// It is set to true usually because the task has yielded itself to the
85 /// scheduler.
86 pub woken_while_running: bool,
87 }
88
89 impl ScheduleInfo {
new(woken_while_running: bool) -> Self90 pub(crate) fn new(woken_while_running: bool) -> Self {
91 ScheduleInfo {
92 woken_while_running,
93 }
94 }
95 }
96
97 /// The trait for scheduling functions.
98 pub trait Schedule<M = ()>: sealed::Sealed<M> {
99 /// The actual scheduling procedure.
schedule(&self, runnable: Runnable<M>, info: ScheduleInfo)100 fn schedule(&self, runnable: Runnable<M>, info: ScheduleInfo);
101 }
102
103 impl<M, F> Schedule<M> for F
104 where
105 F: Fn(Runnable<M>),
106 {
schedule(&self, runnable: Runnable<M>, _: ScheduleInfo)107 fn schedule(&self, runnable: Runnable<M>, _: ScheduleInfo) {
108 self(runnable)
109 }
110 }
111
112 /// Pass a scheduling function with more scheduling information - a.k.a.
113 /// [`ScheduleInfo`].
114 ///
115 /// Sometimes, it's useful to pass the runnable's state directly to the
116 /// scheduling function, such as whether it's woken up while running. The
117 /// scheduler can thus use the information to determine its scheduling
118 /// strategy.
119 ///
120 /// The data source of [`ScheduleInfo`] is directly from the actual
121 /// implementation of the crate itself, different from [`Runnable`]'s metadata,
122 /// which is managed by the caller.
123 ///
124 /// # Examples
125 ///
126 /// ```
127 /// use async_task::{ScheduleInfo, WithInfo};
128 /// use std::sync::{Arc, Mutex};
129 ///
130 /// // The future inside the task.
131 /// let future = async {
132 /// println!("Hello, world!");
133 /// };
134 ///
135 /// // If the task gets woken up while running, it will be sent into this channel.
136 /// let (s, r) = flume::unbounded();
137 /// // Otherwise, it will be placed into this slot.
138 /// let lifo_slot = Arc::new(Mutex::new(None));
139 /// let schedule = move |runnable, info: ScheduleInfo| {
140 /// if info.woken_while_running {
141 /// s.send(runnable).unwrap()
142 /// } else {
143 /// let last = lifo_slot.lock().unwrap().replace(runnable);
144 /// if let Some(last) = last {
145 /// s.send(last).unwrap()
146 /// }
147 /// }
148 /// };
149 ///
150 /// // Create a task with the future and the schedule function.
151 /// let (runnable, task) = async_task::spawn(future, WithInfo(schedule));
152 /// ```
153 #[derive(Debug)]
154 pub struct WithInfo<F>(pub F);
155
156 impl<F> From<F> for WithInfo<F> {
from(value: F) -> Self157 fn from(value: F) -> Self {
158 WithInfo(value)
159 }
160 }
161
162 impl<M, F> Schedule<M> for WithInfo<F>
163 where
164 F: Fn(Runnable<M>, ScheduleInfo),
165 {
schedule(&self, runnable: Runnable<M>, info: ScheduleInfo)166 fn schedule(&self, runnable: Runnable<M>, info: ScheduleInfo) {
167 (self.0)(runnable, info)
168 }
169 }
170
171 impl Builder<()> {
172 /// Creates a new task builder.
173 ///
174 /// By default, this task builder has no metadata. Use the [`metadata`] method to
175 /// set the metadata.
176 ///
177 /// # Examples
178 ///
179 /// ```
180 /// use async_task::Builder;
181 ///
182 /// let (runnable, task) = Builder::new().spawn(|()| async {}, |_| {});
183 /// ```
new() -> Builder<()>184 pub fn new() -> Builder<()> {
185 Builder {
186 metadata: (),
187 #[cfg(feature = "std")]
188 propagate_panic: false,
189 }
190 }
191
192 /// Adds metadata to the task.
193 ///
194 /// In certain cases, it may be useful to associate some metadata with a task. For instance,
195 /// you may want to associate a name with a task, or a priority for a priority queue. This
196 /// method allows the user to attach arbitrary metadata to a task that is available through
197 /// the [`Runnable`] or the [`Task`].
198 ///
199 /// # Examples
200 ///
201 /// This example creates an executor that associates a "priority" number with each task, and
202 /// then runs the tasks in order of priority.
203 ///
204 /// ```
205 /// use async_task::{Builder, Runnable};
206 /// use once_cell::sync::Lazy;
207 /// use std::cmp;
208 /// use std::collections::BinaryHeap;
209 /// use std::sync::Mutex;
210 ///
211 /// # smol::future::block_on(async {
212 /// /// A wrapper around a `Runnable<usize>` that implements `Ord` so that it can be used in a
213 /// /// priority queue.
214 /// struct TaskWrapper(Runnable<usize>);
215 ///
216 /// impl PartialEq for TaskWrapper {
217 /// fn eq(&self, other: &Self) -> bool {
218 /// self.0.metadata() == other.0.metadata()
219 /// }
220 /// }
221 ///
222 /// impl Eq for TaskWrapper {}
223 ///
224 /// impl PartialOrd for TaskWrapper {
225 /// fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
226 /// Some(self.cmp(other))
227 /// }
228 /// }
229 ///
230 /// impl Ord for TaskWrapper {
231 /// fn cmp(&self, other: &Self) -> cmp::Ordering {
232 /// self.0.metadata().cmp(other.0.metadata())
233 /// }
234 /// }
235 ///
236 /// static EXECUTOR: Lazy<Mutex<BinaryHeap<TaskWrapper>>> = Lazy::new(|| {
237 /// Mutex::new(BinaryHeap::new())
238 /// });
239 ///
240 /// let schedule = |runnable| {
241 /// EXECUTOR.lock().unwrap().push(TaskWrapper(runnable));
242 /// };
243 ///
244 /// // Spawn a few tasks with different priorities.
245 /// let spawn_task = move |priority| {
246 /// let (runnable, task) = Builder::new().metadata(priority).spawn(
247 /// move |_| async move { priority },
248 /// schedule,
249 /// );
250 /// runnable.schedule();
251 /// task
252 /// };
253 ///
254 /// let t1 = spawn_task(1);
255 /// let t2 = spawn_task(2);
256 /// let t3 = spawn_task(3);
257 ///
258 /// // Run the tasks in order of priority.
259 /// let mut metadata_seen = vec![];
260 /// while let Some(TaskWrapper(runnable)) = EXECUTOR.lock().unwrap().pop() {
261 /// metadata_seen.push(*runnable.metadata());
262 /// runnable.run();
263 /// }
264 ///
265 /// assert_eq!(metadata_seen, vec![3, 2, 1]);
266 /// assert_eq!(t1.await, 1);
267 /// assert_eq!(t2.await, 2);
268 /// assert_eq!(t3.await, 3);
269 /// # });
270 /// ```
metadata<M>(self, metadata: M) -> Builder<M>271 pub fn metadata<M>(self, metadata: M) -> Builder<M> {
272 Builder {
273 metadata,
274 #[cfg(feature = "std")]
275 propagate_panic: self.propagate_panic,
276 }
277 }
278 }
279
280 impl<M> Builder<M> {
281 /// Propagates panics that occur in the task.
282 ///
283 /// When this is `true`, panics that occur in the task will be propagated to the caller of
284 /// the [`Task`]. When this is false, no special action is taken when a panic occurs in the
285 /// task, meaning that the caller of [`Runnable::run`] will observe a panic.
286 ///
287 /// This is only available when the `std` feature is enabled. By default, this is `false`.
288 ///
289 /// # Examples
290 ///
291 /// ```
292 /// use async_task::Builder;
293 /// use futures_lite::future::poll_fn;
294 /// use std::future::Future;
295 /// use std::panic;
296 /// use std::pin::Pin;
297 /// use std::task::{Context, Poll};
298 ///
299 /// fn did_panic<F: FnOnce()>(f: F) -> bool {
300 /// panic::catch_unwind(panic::AssertUnwindSafe(f)).is_err()
301 /// }
302 ///
303 /// # smol::future::block_on(async {
304 /// let (runnable1, mut task1) = Builder::new()
305 /// .propagate_panic(true)
306 /// .spawn(|()| async move { panic!() }, |_| {});
307 ///
308 /// let (runnable2, mut task2) = Builder::new()
309 /// .propagate_panic(false)
310 /// .spawn(|()| async move { panic!() }, |_| {});
311 ///
312 /// assert!(!did_panic(|| { runnable1.run(); }));
313 /// assert!(did_panic(|| { runnable2.run(); }));
314 ///
315 /// let waker = poll_fn(|cx| Poll::Ready(cx.waker().clone())).await;
316 /// let mut cx = Context::from_waker(&waker);
317 /// assert!(did_panic(|| { let _ = Pin::new(&mut task1).poll(&mut cx); }));
318 /// assert!(did_panic(|| { let _ = Pin::new(&mut task2).poll(&mut cx); }));
319 /// # });
320 /// ```
321 #[cfg(feature = "std")]
propagate_panic(self, propagate_panic: bool) -> Builder<M>322 pub fn propagate_panic(self, propagate_panic: bool) -> Builder<M> {
323 Builder {
324 metadata: self.metadata,
325 propagate_panic,
326 }
327 }
328
329 /// Creates a new task.
330 ///
331 /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
332 /// output.
333 ///
334 /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
335 /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
336 /// again.
337 ///
338 /// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
339 /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
340 /// should push it into a task queue so that it can be processed later.
341 ///
342 /// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider
343 /// using [`spawn_local()`] or [`spawn_unchecked()`] instead.
344 ///
345 /// # Examples
346 ///
347 /// ```
348 /// use async_task::Builder;
349 ///
350 /// // The future inside the task.
351 /// let future = async {
352 /// println!("Hello, world!");
353 /// };
354 ///
355 /// // A function that schedules the task when it gets woken up.
356 /// let (s, r) = flume::unbounded();
357 /// let schedule = move |runnable| s.send(runnable).unwrap();
358 ///
359 /// // Create a task with the future and the schedule function.
360 /// let (runnable, task) = Builder::new().spawn(|()| future, schedule);
361 /// ```
spawn<F, Fut, S>(self, future: F, schedule: S) -> (Runnable<M>, Task<Fut::Output, M>) where F: FnOnce(&M) -> Fut, Fut: Future + Send + 'static, Fut::Output: Send + 'static, S: Schedule<M> + Send + Sync + 'static,362 pub fn spawn<F, Fut, S>(self, future: F, schedule: S) -> (Runnable<M>, Task<Fut::Output, M>)
363 where
364 F: FnOnce(&M) -> Fut,
365 Fut: Future + Send + 'static,
366 Fut::Output: Send + 'static,
367 S: Schedule<M> + Send + Sync + 'static,
368 {
369 unsafe { self.spawn_unchecked(future, schedule) }
370 }
371
372 /// Creates a new thread-local task.
373 ///
374 /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the
375 /// [`Runnable`] is used or dropped on another thread, a panic will occur.
376 ///
377 /// This function is only available when the `std` feature for this crate is enabled.
378 ///
379 /// # Examples
380 ///
381 /// ```
382 /// use async_task::{Builder, Runnable};
383 /// use flume::{Receiver, Sender};
384 /// use std::rc::Rc;
385 ///
386 /// thread_local! {
387 /// // A queue that holds scheduled tasks.
388 /// static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
389 /// }
390 ///
391 /// // Make a non-Send future.
392 /// let msg: Rc<str> = "Hello, world!".into();
393 /// let future = async move {
394 /// println!("{}", msg);
395 /// };
396 ///
397 /// // A function that schedules the task when it gets woken up.
398 /// let s = QUEUE.with(|(s, _)| s.clone());
399 /// let schedule = move |runnable| s.send(runnable).unwrap();
400 ///
401 /// // Create a task with the future and the schedule function.
402 /// let (runnable, task) = Builder::new().spawn_local(move |()| future, schedule);
403 /// ```
404 #[cfg(feature = "std")]
spawn_local<F, Fut, S>( self, future: F, schedule: S, ) -> (Runnable<M>, Task<Fut::Output, M>) where F: FnOnce(&M) -> Fut, Fut: Future + 'static, Fut::Output: 'static, S: Schedule<M> + Send + Sync + 'static,405 pub fn spawn_local<F, Fut, S>(
406 self,
407 future: F,
408 schedule: S,
409 ) -> (Runnable<M>, Task<Fut::Output, M>)
410 where
411 F: FnOnce(&M) -> Fut,
412 Fut: Future + 'static,
413 Fut::Output: 'static,
414 S: Schedule<M> + Send + Sync + 'static,
415 {
416 use std::mem::ManuallyDrop;
417 use std::pin::Pin;
418 use std::task::{Context, Poll};
419 use std::thread::{self, ThreadId};
420
421 #[inline]
422 fn thread_id() -> ThreadId {
423 thread_local! {
424 static ID: ThreadId = thread::current().id();
425 }
426 ID.try_with(|id| *id)
427 .unwrap_or_else(|_| thread::current().id())
428 }
429
430 struct Checked<F> {
431 id: ThreadId,
432 inner: ManuallyDrop<F>,
433 }
434
435 impl<F> Drop for Checked<F> {
436 fn drop(&mut self) {
437 assert!(
438 self.id == thread_id(),
439 "local task dropped by a thread that didn't spawn it"
440 );
441 unsafe {
442 ManuallyDrop::drop(&mut self.inner);
443 }
444 }
445 }
446
447 impl<F: Future> Future for Checked<F> {
448 type Output = F::Output;
449
450 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451 assert!(
452 self.id == thread_id(),
453 "local task polled by a thread that didn't spawn it"
454 );
455 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
456 }
457 }
458
459 // Wrap the future into one that checks which thread it's on.
460 let future = move |meta| {
461 let future = future(meta);
462
463 Checked {
464 id: thread_id(),
465 inner: ManuallyDrop::new(future),
466 }
467 };
468
469 unsafe { self.spawn_unchecked(future, schedule) }
470 }
471
472 /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
473 ///
474 /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and
475 /// `'static` on `future` and `schedule`.
476 ///
477 /// # Safety
478 ///
479 /// - If `Fut` is not [`Send`], its [`Runnable`] must be used and dropped on the original
480 /// thread.
481 /// - If `Fut` is not `'static`, borrowed non-metadata variables must outlive its [`Runnable`].
482 /// - If `schedule` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`]
483 /// must be used and dropped on the original thread.
484 /// - If `schedule` is not `'static`, borrowed variables must outlive all instances of the
485 /// [`Runnable`]'s [`Waker`].
486 ///
487 /// # Examples
488 ///
489 /// ```
490 /// use async_task::Builder;
491 ///
492 /// // The future inside the task.
493 /// let future = async {
494 /// println!("Hello, world!");
495 /// };
496 ///
497 /// // If the task gets woken up, it will be sent into this channel.
498 /// let (s, r) = flume::unbounded();
499 /// let schedule = move |runnable| s.send(runnable).unwrap();
500 ///
501 /// // Create a task with the future and the schedule function.
502 /// let (runnable, task) = unsafe { Builder::new().spawn_unchecked(move |()| future, schedule) };
503 /// ```
spawn_unchecked<'a, F, Fut, S>( self, future: F, schedule: S, ) -> (Runnable<M>, Task<Fut::Output, M>) where F: FnOnce(&'a M) -> Fut, Fut: Future + 'a, S: Schedule<M>, M: 'a,504 pub unsafe fn spawn_unchecked<'a, F, Fut, S>(
505 self,
506 future: F,
507 schedule: S,
508 ) -> (Runnable<M>, Task<Fut::Output, M>)
509 where
510 F: FnOnce(&'a M) -> Fut,
511 Fut: Future + 'a,
512 S: Schedule<M>,
513 M: 'a,
514 {
515 // Allocate large futures on the heap.
516 let ptr = if mem::size_of::<Fut>() >= 2048 {
517 let future = |meta| {
518 let future = future(meta);
519 Box::pin(future)
520 };
521
522 RawTask::<_, Fut::Output, S, M>::allocate(future, schedule, self)
523 } else {
524 RawTask::<Fut, Fut::Output, S, M>::allocate(future, schedule, self)
525 };
526
527 let runnable = Runnable {
528 ptr,
529 _marker: PhantomData,
530 };
531 let task = Task {
532 ptr,
533 _marker: PhantomData,
534 };
535 (runnable, task)
536 }
537 }
538
539 /// Creates a new task.
540 ///
541 /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
542 /// output.
543 ///
544 /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
545 /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
546 /// again.
547 ///
548 /// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
549 /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
550 /// should push it into a task queue so that it can be processed later.
551 ///
552 /// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider
553 /// using [`spawn_local()`] or [`spawn_unchecked()`] instead.
554 ///
555 /// # Examples
556 ///
557 /// ```
558 /// // The future inside the task.
559 /// let future = async {
560 /// println!("Hello, world!");
561 /// };
562 ///
563 /// // A function that schedules the task when it gets woken up.
564 /// let (s, r) = flume::unbounded();
565 /// let schedule = move |runnable| s.send(runnable).unwrap();
566 ///
567 /// // Create a task with the future and the schedule function.
568 /// let (runnable, task) = async_task::spawn(future, schedule);
569 /// ```
spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) where F: Future + Send + 'static, F::Output: Send + 'static, S: Schedule + Send + Sync + 'static,570 pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
571 where
572 F: Future + Send + 'static,
573 F::Output: Send + 'static,
574 S: Schedule + Send + Sync + 'static,
575 {
576 unsafe { spawn_unchecked(future, schedule) }
577 }
578
579 /// Creates a new thread-local task.
580 ///
581 /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the
582 /// [`Runnable`] is used or dropped on another thread, a panic will occur.
583 ///
584 /// This function is only available when the `std` feature for this crate is enabled.
585 ///
586 /// # Examples
587 ///
588 /// ```
589 /// use async_task::Runnable;
590 /// use flume::{Receiver, Sender};
591 /// use std::rc::Rc;
592 ///
593 /// thread_local! {
594 /// // A queue that holds scheduled tasks.
595 /// static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
596 /// }
597 ///
598 /// // Make a non-Send future.
599 /// let msg: Rc<str> = "Hello, world!".into();
600 /// let future = async move {
601 /// println!("{}", msg);
602 /// };
603 ///
604 /// // A function that schedules the task when it gets woken up.
605 /// let s = QUEUE.with(|(s, _)| s.clone());
606 /// let schedule = move |runnable| s.send(runnable).unwrap();
607 ///
608 /// // Create a task with the future and the schedule function.
609 /// let (runnable, task) = async_task::spawn_local(future, schedule);
610 /// ```
611 #[cfg(feature = "std")]
spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) where F: Future + 'static, F::Output: 'static, S: Schedule + Send + Sync + 'static,612 pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
613 where
614 F: Future + 'static,
615 F::Output: 'static,
616 S: Schedule + Send + Sync + 'static,
617 {
618 Builder::new().spawn_local(move |()| future, schedule)
619 }
620
621 /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
622 ///
623 /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and
624 /// `'static` on `future` and `schedule`.
625 ///
626 /// # Safety
627 ///
628 /// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original
629 /// thread.
630 /// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`].
631 /// - If `schedule` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`]
632 /// must be used and dropped on the original thread.
633 /// - If `schedule` is not `'static`, borrowed variables must outlive all instances of the
634 /// [`Runnable`]'s [`Waker`].
635 ///
636 /// # Examples
637 ///
638 /// ```
639 /// // The future inside the task.
640 /// let future = async {
641 /// println!("Hello, world!");
642 /// };
643 ///
644 /// // If the task gets woken up, it will be sent into this channel.
645 /// let (s, r) = flume::unbounded();
646 /// let schedule = move |runnable| s.send(runnable).unwrap();
647 ///
648 /// // Create a task with the future and the schedule function.
649 /// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
650 /// ```
spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) where F: Future, S: Schedule,651 pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
652 where
653 F: Future,
654 S: Schedule,
655 {
656 Builder::new().spawn_unchecked(move |()| future, schedule)
657 }
658
659 /// A handle to a runnable task.
660 ///
661 /// Every spawned task has a single [`Runnable`] handle, which only exists when the task is
662 /// scheduled for running.
663 ///
664 /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
665 /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
666 /// again.
667 ///
668 /// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and
669 /// awaiting the [`Task`] after that will result in a panic.
670 ///
671 /// # Examples
672 ///
673 /// ```
674 /// use async_task::Runnable;
675 /// use once_cell::sync::Lazy;
676 /// use std::{panic, thread};
677 ///
678 /// // A simple executor.
679 /// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
680 /// let (sender, receiver) = flume::unbounded::<Runnable>();
681 /// thread::spawn(|| {
682 /// for runnable in receiver {
683 /// let _ignore_panic = panic::catch_unwind(|| runnable.run());
684 /// }
685 /// });
686 /// sender
687 /// });
688 ///
689 /// // Create a task with a simple future.
690 /// let schedule = |runnable| QUEUE.send(runnable).unwrap();
691 /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
692 ///
693 /// // Schedule the task and await its output.
694 /// runnable.schedule();
695 /// assert_eq!(smol::future::block_on(task), 3);
696 /// ```
697 pub struct Runnable<M = ()> {
698 /// A pointer to the heap-allocated task.
699 pub(crate) ptr: NonNull<()>,
700
701 /// A marker capturing generic type `M`.
702 pub(crate) _marker: PhantomData<M>,
703 }
704
705 unsafe impl<M: Send + Sync> Send for Runnable<M> {}
706 unsafe impl<M: Send + Sync> Sync for Runnable<M> {}
707
708 #[cfg(feature = "std")]
709 impl<M> std::panic::UnwindSafe for Runnable<M> {}
710 #[cfg(feature = "std")]
711 impl<M> std::panic::RefUnwindSafe for Runnable<M> {}
712
713 impl<M> Runnable<M> {
714 /// Get the metadata associated with this task.
715 ///
716 /// Tasks can be created with a metadata object associated with them; by default, this
717 /// is a `()` value. See the [`Builder::metadata()`] method for more information.
metadata(&self) -> &M718 pub fn metadata(&self) -> &M {
719 &self.header().metadata
720 }
721
722 /// Schedules the task.
723 ///
724 /// This is a convenience method that passes the [`Runnable`] to the schedule function.
725 ///
726 /// # Examples
727 ///
728 /// ```
729 /// // A function that schedules the task when it gets woken up.
730 /// let (s, r) = flume::unbounded();
731 /// let schedule = move |runnable| s.send(runnable).unwrap();
732 ///
733 /// // Create a task with a simple future and the schedule function.
734 /// let (runnable, task) = async_task::spawn(async {}, schedule);
735 ///
736 /// // Schedule the task.
737 /// assert_eq!(r.len(), 0);
738 /// runnable.schedule();
739 /// assert_eq!(r.len(), 1);
740 /// ```
schedule(self)741 pub fn schedule(self) {
742 let ptr = self.ptr.as_ptr();
743 let header = ptr as *const Header<M>;
744 mem::forget(self);
745
746 unsafe {
747 ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
748 }
749 }
750
751 /// Runs the task by polling its future.
752 ///
753 /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets
754 /// rescheduled at the end of this method invocation. Otherwise, returns `false` and the
755 /// [`Runnable`] vanishes until the task is woken.
756 /// The return value is just a hint: `true` usually indicates that the task has yielded, i.e.
757 /// it woke itself and then gave the control back to the executor.
758 ///
759 /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then
760 /// this method simply destroys the task.
761 ///
762 /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`]
763 /// after that will also result in a panic.
764 ///
765 /// # Examples
766 ///
767 /// ```
768 /// // A function that schedules the task when it gets woken up.
769 /// let (s, r) = flume::unbounded();
770 /// let schedule = move |runnable| s.send(runnable).unwrap();
771 ///
772 /// // Create a task with a simple future and the schedule function.
773 /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
774 ///
775 /// // Run the task and check its output.
776 /// runnable.run();
777 /// assert_eq!(smol::future::block_on(task), 3);
778 /// ```
run(self) -> bool779 pub fn run(self) -> bool {
780 let ptr = self.ptr.as_ptr();
781 let header = ptr as *const Header<M>;
782 mem::forget(self);
783
784 unsafe { ((*header).vtable.run)(ptr) }
785 }
786
787 /// Returns a waker associated with this task.
788 ///
789 /// # Examples
790 ///
791 /// ```
792 /// use smol::future;
793 ///
794 /// // A function that schedules the task when it gets woken up.
795 /// let (s, r) = flume::unbounded();
796 /// let schedule = move |runnable| s.send(runnable).unwrap();
797 ///
798 /// // Create a task with a simple future and the schedule function.
799 /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule);
800 ///
801 /// // Take a waker and run the task.
802 /// let waker = runnable.waker();
803 /// runnable.run();
804 ///
805 /// // Reschedule the task by waking it.
806 /// assert_eq!(r.len(), 0);
807 /// waker.wake();
808 /// assert_eq!(r.len(), 1);
809 /// ```
waker(&self) -> Waker810 pub fn waker(&self) -> Waker {
811 let ptr = self.ptr.as_ptr();
812 let header = ptr as *const Header<M>;
813
814 unsafe {
815 let raw_waker = ((*header).vtable.clone_waker)(ptr);
816 Waker::from_raw(raw_waker)
817 }
818 }
819
header(&self) -> &Header<M>820 fn header(&self) -> &Header<M> {
821 unsafe { &*(self.ptr.as_ptr() as *const Header<M>) }
822 }
823 }
824
825 impl<M> Drop for Runnable<M> {
drop(&mut self)826 fn drop(&mut self) {
827 let ptr = self.ptr.as_ptr();
828 let header = self.header();
829
830 unsafe {
831 let mut state = header.state.load(Ordering::Acquire);
832
833 loop {
834 // If the task has been completed or closed, it can't be canceled.
835 if state & (COMPLETED | CLOSED) != 0 {
836 break;
837 }
838
839 // Mark the task as closed.
840 match header.state.compare_exchange_weak(
841 state,
842 state | CLOSED,
843 Ordering::AcqRel,
844 Ordering::Acquire,
845 ) {
846 Ok(_) => break,
847 Err(s) => state = s,
848 }
849 }
850
851 // Drop the future.
852 (header.vtable.drop_future)(ptr);
853
854 // Mark the task as unscheduled.
855 let state = header.state.fetch_and(!SCHEDULED, Ordering::AcqRel);
856
857 // Notify the awaiter that the future has been dropped.
858 if state & AWAITER != 0 {
859 (*header).notify(None);
860 }
861
862 // Drop the task reference.
863 (header.vtable.drop_ref)(ptr);
864 }
865 }
866 }
867
868 impl<M: fmt::Debug> fmt::Debug for Runnable<M> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result869 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
870 let ptr = self.ptr.as_ptr();
871 let header = ptr as *const Header<M>;
872
873 f.debug_struct("Runnable")
874 .field("header", unsafe { &(*header) })
875 .finish()
876 }
877 }
878