• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Runs `!Send` futures on the current thread.
2 use crate::loom::sync::{Arc, Mutex};
3 use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
4 use crate::sync::AtomicWaker;
5 use crate::util::VecDequeCell;
6 
7 use std::cell::Cell;
8 use std::collections::VecDeque;
9 use std::fmt;
10 use std::future::Future;
11 use std::marker::PhantomData;
12 use std::pin::Pin;
13 use std::task::Poll;
14 
15 use pin_project_lite::pin_project;
16 
17 cfg_rt! {
18     /// A set of tasks which are executed on the same thread.
19     ///
20     /// In some cases, it is necessary to run one or more futures that do not
21     /// implement [`Send`] and thus are unsafe to send between threads. In these
22     /// cases, a [local task set] may be used to schedule one or more `!Send`
23     /// futures to run together on the same thread.
24     ///
25     /// For example, the following code will not compile:
26     ///
27     /// ```rust,compile_fail
28     /// use std::rc::Rc;
29     ///
30     /// #[tokio::main]
31     /// async fn main() {
32     ///     // `Rc` does not implement `Send`, and thus may not be sent between
33     ///     // threads safely.
34     ///     let unsend_data = Rc::new("my unsend data...");
35     ///
36     ///     let unsend_data = unsend_data.clone();
37     ///     // Because the `async` block here moves `unsend_data`, the future is `!Send`.
38     ///     // Since `tokio::spawn` requires the spawned future to implement `Send`, this
39     ///     // will not compile.
40     ///     tokio::spawn(async move {
41     ///         println!("{}", unsend_data);
42     ///         // ...
43     ///     }).await.unwrap();
44     /// }
45     /// ```
46     ///
47     /// # Use with `run_until`
48     ///
49     /// To spawn `!Send` futures, we can use a local task set to schedule them
50     /// on the thread calling [`Runtime::block_on`]. When running inside of the
51     /// local task set, we can use [`task::spawn_local`], which can spawn
52     /// `!Send` futures. For example:
53     ///
54     /// ```rust
55     /// use std::rc::Rc;
56     /// use tokio::task;
57     ///
58     /// #[tokio::main]
59     /// async fn main() {
60     ///     let unsend_data = Rc::new("my unsend data...");
61     ///
62     ///     // Construct a local task set that can run `!Send` futures.
63     ///     let local = task::LocalSet::new();
64     ///
65     ///     // Run the local task set.
66     ///     local.run_until(async move {
67     ///         let unsend_data = unsend_data.clone();
68     ///         // `spawn_local` ensures that the future is spawned on the local
69     ///         // task set.
70     ///         task::spawn_local(async move {
71     ///             println!("{}", unsend_data);
72     ///             // ...
73     ///         }).await.unwrap();
74     ///     }).await;
75     /// }
76     /// ```
77     /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
78     /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
79     /// cannot be used inside a task spawned with `tokio::spawn`.
80     ///
81     /// ## Awaiting a `LocalSet`
82     ///
83     /// Additionally, a `LocalSet` itself implements `Future`, completing when
84     /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
85     /// several futures on a `LocalSet` and drive the whole set until they
86     /// complete. For example,
87     ///
88     /// ```rust
89     /// use tokio::{task, time};
90     /// use std::rc::Rc;
91     ///
92     /// #[tokio::main]
93     /// async fn main() {
94     ///     let unsend_data = Rc::new("world");
95     ///     let local = task::LocalSet::new();
96     ///
97     ///     let unsend_data2 = unsend_data.clone();
98     ///     local.spawn_local(async move {
99     ///         // ...
100     ///         println!("hello {}", unsend_data2)
101     ///     });
102     ///
103     ///     local.spawn_local(async move {
104     ///         time::sleep(time::Duration::from_millis(100)).await;
105     ///         println!("goodbye {}", unsend_data)
106     ///     });
107     ///
108     ///     // ...
109     ///
110     ///     local.await;
111     /// }
112     /// ```
113     /// **Note:** Awaiting a `LocalSet` can only be done inside
114     /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
115     /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
116     /// `tokio::spawn`.
117     ///
118     /// ## Use inside `tokio::spawn`
119     ///
120     /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
121     /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
122     /// something else. The solution is to create the `LocalSet` somewhere else,
123     /// and communicate with it using an [`mpsc`] channel.
124     ///
125     /// The following example puts the `LocalSet` inside a new thread.
126     /// ```
127     /// use tokio::runtime::Builder;
128     /// use tokio::sync::{mpsc, oneshot};
129     /// use tokio::task::LocalSet;
130     ///
131     /// // This struct describes the task you want to spawn. Here we include
132     /// // some simple examples. The oneshot channel allows sending a response
133     /// // to the spawner.
134     /// #[derive(Debug)]
135     /// enum Task {
136     ///     PrintNumber(u32),
137     ///     AddOne(u32, oneshot::Sender<u32>),
138     /// }
139     ///
140     /// #[derive(Clone)]
141     /// struct LocalSpawner {
142     ///    send: mpsc::UnboundedSender<Task>,
143     /// }
144     ///
145     /// impl LocalSpawner {
146     ///     pub fn new() -> Self {
147     ///         let (send, mut recv) = mpsc::unbounded_channel();
148     ///
149     ///         let rt = Builder::new_current_thread()
150     ///             .enable_all()
151     ///             .build()
152     ///             .unwrap();
153     ///
154     ///         std::thread::spawn(move || {
155     ///             let local = LocalSet::new();
156     ///
157     ///             local.spawn_local(async move {
158     ///                 while let Some(new_task) = recv.recv().await {
159     ///                     tokio::task::spawn_local(run_task(new_task));
160     ///                 }
161     ///                 // If the while loop returns, then all the LocalSpawner
162     ///                 // objects have have been dropped.
163     ///             });
164     ///
165     ///             // This will return once all senders are dropped and all
166     ///             // spawned tasks have returned.
167     ///             rt.block_on(local);
168     ///         });
169     ///
170     ///         Self {
171     ///             send,
172     ///         }
173     ///     }
174     ///
175     ///     pub fn spawn(&self, task: Task) {
176     ///         self.send.send(task).expect("Thread with LocalSet has shut down.");
177     ///     }
178     /// }
179     ///
180     /// // This task may do !Send stuff. We use printing a number as an example,
181     /// // but it could be anything.
182     /// //
183     /// // The Task struct is an enum to support spawning many different kinds
184     /// // of operations.
185     /// async fn run_task(task: Task) {
186     ///     match task {
187     ///         Task::PrintNumber(n) => {
188     ///             println!("{}", n);
189     ///         },
190     ///         Task::AddOne(n, response) => {
191     ///             // We ignore failures to send the response.
192     ///             let _ = response.send(n + 1);
193     ///         },
194     ///     }
195     /// }
196     ///
197     /// #[tokio::main]
198     /// async fn main() {
199     ///     let spawner = LocalSpawner::new();
200     ///
201     ///     let (send, response) = oneshot::channel();
202     ///     spawner.spawn(Task::AddOne(10, send));
203     ///     let eleven = response.await.unwrap();
204     ///     assert_eq!(eleven, 11);
205     /// }
206     /// ```
207     ///
208     /// [`Send`]: trait@std::marker::Send
209     /// [local task set]: struct@LocalSet
210     /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
211     /// [`task::spawn_local`]: fn@spawn_local
212     /// [`mpsc`]: mod@crate::sync::mpsc
213     pub struct LocalSet {
214         /// Current scheduler tick.
215         tick: Cell<u8>,
216 
217         /// State available from thread-local.
218         context: Context,
219 
220         /// This type should not be Send.
221         _not_send: PhantomData<*const ()>,
222     }
223 }
224 
225 /// State available from the thread-local.
226 struct Context {
227     /// Collection of all active tasks spawned onto this executor.
228     owned: LocalOwnedTasks<Arc<Shared>>,
229 
230     /// Local run queue sender and receiver.
231     queue: VecDequeCell<task::Notified<Arc<Shared>>>,
232 
233     /// State shared between threads.
234     shared: Arc<Shared>,
235 }
236 
237 /// LocalSet state shared between threads.
238 struct Shared {
239     /// Remote run queue sender.
240     queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
241 
242     /// Wake the `LocalSet` task.
243     waker: AtomicWaker,
244 }
245 
246 pin_project! {
247     #[derive(Debug)]
248     struct RunUntil<'a, F> {
249         local_set: &'a LocalSet,
250         #[pin]
251         future: F,
252     }
253 }
254 
255 scoped_thread_local!(static CURRENT: Context);
256 
257 cfg_rt! {
258     /// Spawns a `!Send` future on the local task set.
259     ///
260     /// The spawned future will be run on the same thread that called `spawn_local.`
261     /// This may only be called from the context of a local task set.
262     ///
263     /// # Panics
264     ///
265     /// - This function panics if called outside of a local task set.
266     ///
267     /// # Examples
268     ///
269     /// ```rust
270     /// use std::rc::Rc;
271     /// use tokio::task;
272     ///
273     /// #[tokio::main]
274     /// async fn main() {
275     ///     let unsend_data = Rc::new("my unsend data...");
276     ///
277     ///     let local = task::LocalSet::new();
278     ///
279     ///     // Run the local task set.
280     ///     local.run_until(async move {
281     ///         let unsend_data = unsend_data.clone();
282     ///         task::spawn_local(async move {
283     ///             println!("{}", unsend_data);
284     ///             // ...
285     ///         }).await.unwrap();
286     ///     }).await;
287     /// }
288     /// ```
289     #[cfg_attr(tokio_track_caller, track_caller)]
290     pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
291     where
292         F: Future + 'static,
293         F::Output: 'static,
294     {
295         spawn_local_inner(future, None)
296     }
297 
298     pub(super) fn spawn_local_inner<F>(future: F, name: Option<&str>) -> JoinHandle<F::Output>
299     where F: Future + 'static,
300           F::Output: 'static
301     {
302         let future = crate::util::trace::task(future, "local", name);
303         CURRENT.with(|maybe_cx| {
304             let cx = maybe_cx
305                 .expect("`spawn_local` called from outside of a `task::LocalSet`");
306 
307             let (handle, notified) = cx.owned.bind(future, cx.shared.clone());
308 
309             if let Some(notified) = notified {
310                 cx.shared.schedule(notified);
311             }
312 
313             handle
314         })
315     }
316 }
317 
318 /// Initial queue capacity.
319 const INITIAL_CAPACITY: usize = 64;
320 
321 /// Max number of tasks to poll per tick.
322 const MAX_TASKS_PER_TICK: usize = 61;
323 
324 /// How often it check the remote queue first.
325 const REMOTE_FIRST_INTERVAL: u8 = 31;
326 
327 impl LocalSet {
328     /// Returns a new local task set.
new() -> LocalSet329     pub fn new() -> LocalSet {
330         LocalSet {
331             tick: Cell::new(0),
332             context: Context {
333                 owned: LocalOwnedTasks::new(),
334                 queue: VecDequeCell::with_capacity(INITIAL_CAPACITY),
335                 shared: Arc::new(Shared {
336                     queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
337                     waker: AtomicWaker::new(),
338                 }),
339             },
340             _not_send: PhantomData,
341         }
342     }
343 
344     /// Spawns a `!Send` task onto the local task set.
345     ///
346     /// This task is guaranteed to be run on the current thread.
347     ///
348     /// Unlike the free function [`spawn_local`], this method may be used to
349     /// spawn local tasks when the task set is _not_ running. For example:
350     /// ```rust
351     /// use tokio::task;
352     ///
353     /// #[tokio::main]
354     /// async fn main() {
355     ///     let local = task::LocalSet::new();
356     ///
357     ///     // Spawn a future on the local set. This future will be run when
358     ///     // we call `run_until` to drive the task set.
359     ///     local.spawn_local(async {
360     ///        // ...
361     ///     });
362     ///
363     ///     // Run the local task set.
364     ///     local.run_until(async move {
365     ///         // ...
366     ///     }).await;
367     ///
368     ///     // When `run` finishes, we can spawn _more_ futures, which will
369     ///     // run in subsequent calls to `run_until`.
370     ///     local.spawn_local(async {
371     ///        // ...
372     ///     });
373     ///
374     ///     local.run_until(async move {
375     ///         // ...
376     ///     }).await;
377     /// }
378     /// ```
379     /// [`spawn_local`]: fn@spawn_local
380     #[cfg_attr(tokio_track_caller, track_caller)]
spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,381     pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
382     where
383         F: Future + 'static,
384         F::Output: 'static,
385     {
386         let future = crate::util::trace::task(future, "local", None);
387 
388         let (handle, notified) = self.context.owned.bind(future, self.context.shared.clone());
389 
390         if let Some(notified) = notified {
391             self.context.shared.schedule(notified);
392         }
393 
394         self.context.shared.waker.wake();
395         handle
396     }
397 
398     /// Runs a future to completion on the provided runtime, driving any local
399     /// futures spawned on this task set on the current thread.
400     ///
401     /// This runs the given future on the runtime, blocking until it is
402     /// complete, and yielding its resolved result. Any tasks or timers which
403     /// the future spawns internally will be executed on the runtime. The future
404     /// may also call [`spawn_local`] to spawn_local additional local futures on the
405     /// current thread.
406     ///
407     /// This method should not be called from an asynchronous context.
408     ///
409     /// # Panics
410     ///
411     /// This function panics if the executor is at capacity, if the provided
412     /// future panics, or if called within an asynchronous execution context.
413     ///
414     /// # Notes
415     ///
416     /// Since this function internally calls [`Runtime::block_on`], and drives
417     /// futures in the local task set inside that call to `block_on`, the local
418     /// futures may not use [in-place blocking]. If a blocking call needs to be
419     /// issued from a local task, the [`spawn_blocking`] API may be used instead.
420     ///
421     /// For example, this will panic:
422     /// ```should_panic
423     /// use tokio::runtime::Runtime;
424     /// use tokio::task;
425     ///
426     /// let rt  = Runtime::new().unwrap();
427     /// let local = task::LocalSet::new();
428     /// local.block_on(&rt, async {
429     ///     let join = task::spawn_local(async {
430     ///         let blocking_result = task::block_in_place(|| {
431     ///             // ...
432     ///         });
433     ///         // ...
434     ///     });
435     ///     join.await.unwrap();
436     /// })
437     /// ```
438     /// This, however, will not panic:
439     /// ```
440     /// use tokio::runtime::Runtime;
441     /// use tokio::task;
442     ///
443     /// let rt  = Runtime::new().unwrap();
444     /// let local = task::LocalSet::new();
445     /// local.block_on(&rt, async {
446     ///     let join = task::spawn_local(async {
447     ///         let blocking_result = task::spawn_blocking(|| {
448     ///             // ...
449     ///         }).await;
450     ///         // ...
451     ///     });
452     ///     join.await.unwrap();
453     /// })
454     /// ```
455     ///
456     /// [`spawn_local`]: fn@spawn_local
457     /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
458     /// [in-place blocking]: fn@crate::task::block_in_place
459     /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
460     #[cfg(feature = "rt")]
461     #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output where F: Future,462     pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
463     where
464         F: Future,
465     {
466         rt.block_on(self.run_until(future))
467     }
468 
469     /// Runs a future to completion on the local set, returning its output.
470     ///
471     /// This returns a future that runs the given future with a local set,
472     /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
473     /// Any local futures spawned on the local set will be driven in the
474     /// background until the future passed to `run_until` completes. When the future
475     /// passed to `run` finishes, any local futures which have not completed
476     /// will remain on the local set, and will be driven on subsequent calls to
477     /// `run_until` or when [awaiting the local set] itself.
478     ///
479     /// # Examples
480     ///
481     /// ```rust
482     /// use tokio::task;
483     ///
484     /// #[tokio::main]
485     /// async fn main() {
486     ///     task::LocalSet::new().run_until(async {
487     ///         task::spawn_local(async move {
488     ///             // ...
489     ///         }).await.unwrap();
490     ///         // ...
491     ///     }).await;
492     /// }
493     /// ```
494     ///
495     /// [`spawn_local`]: fn@spawn_local
496     /// [awaiting the local set]: #awaiting-a-localset
run_until<F>(&self, future: F) -> F::Output where F: Future,497     pub async fn run_until<F>(&self, future: F) -> F::Output
498     where
499         F: Future,
500     {
501         let run_until = RunUntil {
502             future,
503             local_set: self,
504         };
505         run_until.await
506     }
507 
508     /// Ticks the scheduler, returning whether the local future needs to be
509     /// notified again.
tick(&self) -> bool510     fn tick(&self) -> bool {
511         for _ in 0..MAX_TASKS_PER_TICK {
512             match self.next_task() {
513                 // Run the task
514                 //
515                 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
516                 // used. We are responsible for maintaining the invariant that
517                 // `run_unchecked` is only called on threads that spawned the
518                 // task initially. Because `LocalSet` itself is `!Send`, and
519                 // `spawn_local` spawns into the `LocalSet` on the current
520                 // thread, the invariant is maintained.
521                 Some(task) => crate::coop::budget(|| task.run()),
522                 // We have fully drained the queue of notified tasks, so the
523                 // local future doesn't need to be notified again — it can wait
524                 // until something else wakes a task in the local set.
525                 None => return false,
526             }
527         }
528 
529         true
530     }
531 
next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>>532     fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
533         let tick = self.tick.get();
534         self.tick.set(tick.wrapping_add(1));
535 
536         let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
537             self.context
538                 .shared
539                 .queue
540                 .lock()
541                 .as_mut()
542                 .and_then(|queue| queue.pop_front())
543                 .or_else(|| self.context.queue.pop_front())
544         } else {
545             self.context.queue.pop_front().or_else(|| {
546                 self.context
547                     .shared
548                     .queue
549                     .lock()
550                     .as_mut()
551                     .and_then(|queue| queue.pop_front())
552             })
553         };
554 
555         task.map(|task| self.context.owned.assert_owner(task))
556     }
557 
with<T>(&self, f: impl FnOnce() -> T) -> T558     fn with<T>(&self, f: impl FnOnce() -> T) -> T {
559         CURRENT.set(&self.context, f)
560     }
561 }
562 
563 impl fmt::Debug for LocalSet {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result564     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
565         fmt.debug_struct("LocalSet").finish()
566     }
567 }
568 
569 impl Future for LocalSet {
570     type Output = ();
571 
poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>572     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
573         // Register the waker before starting to work
574         self.context.shared.waker.register_by_ref(cx.waker());
575 
576         if self.with(|| self.tick()) {
577             // If `tick` returns true, we need to notify the local future again:
578             // there are still tasks remaining in the run queue.
579             cx.waker().wake_by_ref();
580             Poll::Pending
581         } else if self.context.owned.is_empty() {
582             // If the scheduler has no remaining futures, we're done!
583             Poll::Ready(())
584         } else {
585             // There are still futures in the local set, but we've polled all the
586             // futures in the run queue. Therefore, we can just return Pending
587             // since the remaining futures will be woken from somewhere else.
588             Poll::Pending
589         }
590     }
591 }
592 
593 impl Default for LocalSet {
default() -> LocalSet594     fn default() -> LocalSet {
595         LocalSet::new()
596     }
597 }
598 
599 impl Drop for LocalSet {
drop(&mut self)600     fn drop(&mut self) {
601         self.with(|| {
602             // Shut down all tasks in the LocalOwnedTasks and close it to
603             // prevent new tasks from ever being added.
604             self.context.owned.close_and_shutdown_all();
605 
606             // We already called shutdown on all tasks above, so there is no
607             // need to call shutdown.
608             for task in self.context.queue.take() {
609                 drop(task);
610             }
611 
612             // Take the queue from the Shared object to prevent pushing
613             // notifications to it in the future.
614             let queue = self.context.shared.queue.lock().take().unwrap();
615             for task in queue {
616                 drop(task);
617             }
618 
619             assert!(self.context.owned.is_empty());
620         });
621     }
622 }
623 
624 // === impl LocalFuture ===
625 
626 impl<T: Future> Future for RunUntil<'_, T> {
627     type Output = T::Output;
628 
poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>629     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
630         let me = self.project();
631 
632         me.local_set.with(|| {
633             me.local_set
634                 .context
635                 .shared
636                 .waker
637                 .register_by_ref(cx.waker());
638 
639             let _no_blocking = crate::runtime::enter::disallow_blocking();
640             let f = me.future;
641 
642             if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
643                 return Poll::Ready(output);
644             }
645 
646             if me.local_set.tick() {
647                 // If `tick` returns `true`, we need to notify the local future again:
648                 // there are still tasks remaining in the run queue.
649                 cx.waker().wake_by_ref();
650             }
651 
652             Poll::Pending
653         })
654     }
655 }
656 
657 impl Shared {
658     /// Schedule the provided task on the scheduler.
schedule(&self, task: task::Notified<Arc<Self>>)659     fn schedule(&self, task: task::Notified<Arc<Self>>) {
660         CURRENT.with(|maybe_cx| match maybe_cx {
661             Some(cx) if cx.shared.ptr_eq(self) => {
662                 cx.queue.push_back(task);
663             }
664             _ => {
665                 // First check whether the queue is still there (if not, the
666                 // LocalSet is dropped). Then push to it if so, and if not,
667                 // do nothing.
668                 let mut lock = self.queue.lock();
669 
670                 if let Some(queue) = lock.as_mut() {
671                     queue.push_back(task);
672                     drop(lock);
673                     self.waker.wake();
674                 }
675             }
676         });
677     }
678 
ptr_eq(&self, other: &Shared) -> bool679     fn ptr_eq(&self, other: &Shared) -> bool {
680         std::ptr::eq(self, other)
681     }
682 }
683 
684 impl task::Schedule for Arc<Shared> {
release(&self, task: &Task<Self>) -> Option<Task<Self>>685     fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
686         CURRENT.with(|maybe_cx| {
687             let cx = maybe_cx.expect("scheduler context missing");
688             assert!(cx.shared.ptr_eq(self));
689             cx.owned.remove(task)
690         })
691     }
692 
schedule(&self, task: task::Notified<Self>)693     fn schedule(&self, task: task::Notified<Self>) {
694         Shared::schedule(self, task);
695     }
696 }
697