1 //! Runs `!Send` futures on the current thread. 2 use crate::runtime::task::{self, JoinHandle, Task}; 3 use crate::sync::AtomicWaker; 4 use crate::util::linked_list::{Link, LinkedList}; 5 6 use std::cell::{Cell, RefCell}; 7 use std::collections::VecDeque; 8 use std::fmt; 9 use std::future::Future; 10 use std::marker::PhantomData; 11 use std::pin::Pin; 12 use std::sync::{Arc, Mutex}; 13 use std::task::Poll; 14 15 use pin_project_lite::pin_project; 16 17 cfg_rt! { 18 /// A set of tasks which are executed on the same thread. 19 /// 20 /// In some cases, it is necessary to run one or more futures that do not 21 /// implement [`Send`] and thus are unsafe to send between threads. In these 22 /// cases, a [local task set] may be used to schedule one or more `!Send` 23 /// futures to run together on the same thread. 24 /// 25 /// For example, the following code will not compile: 26 /// 27 /// ```rust,compile_fail 28 /// use std::rc::Rc; 29 /// 30 /// #[tokio::main] 31 /// async fn main() { 32 /// // `Rc` does not implement `Send`, and thus may not be sent between 33 /// // threads safely. 34 /// let unsend_data = Rc::new("my unsend data..."); 35 /// 36 /// let unsend_data = unsend_data.clone(); 37 /// // Because the `async` block here moves `unsend_data`, the future is `!Send`. 38 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this 39 /// // will not compile. 40 /// tokio::spawn(async move { 41 /// println!("{}", unsend_data); 42 /// // ... 43 /// }).await.unwrap(); 44 /// } 45 /// ``` 46 /// 47 /// # Use with `run_until` 48 /// 49 /// To spawn `!Send` futures, we can use a local task set to schedule them 50 /// on the thread calling [`Runtime::block_on`]. When running inside of the 51 /// local task set, we can use [`task::spawn_local`], which can spawn 52 /// `!Send` futures. For example: 53 /// 54 /// ```rust 55 /// use std::rc::Rc; 56 /// use tokio::task; 57 /// 58 /// #[tokio::main] 59 /// async fn main() { 60 /// let unsend_data = Rc::new("my unsend data..."); 61 /// 62 /// // Construct a local task set that can run `!Send` futures. 63 /// let local = task::LocalSet::new(); 64 /// 65 /// // Run the local task set. 66 /// local.run_until(async move { 67 /// let unsend_data = unsend_data.clone(); 68 /// // `spawn_local` ensures that the future is spawned on the local 69 /// // task set. 70 /// task::spawn_local(async move { 71 /// println!("{}", unsend_data); 72 /// // ... 73 /// }).await.unwrap(); 74 /// }).await; 75 /// } 76 /// ``` 77 /// **Note:** The `run_until` method can only be used in `#[tokio::main]`, 78 /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It 79 /// cannot be used inside a task spawned with `tokio::spawn`. 80 /// 81 /// ## Awaiting a `LocalSet` 82 /// 83 /// Additionally, a `LocalSet` itself implements `Future`, completing when 84 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run 85 /// several futures on a `LocalSet` and drive the whole set until they 86 /// complete. For example, 87 /// 88 /// ```rust 89 /// use tokio::{task, time}; 90 /// use std::rc::Rc; 91 /// 92 /// #[tokio::main] 93 /// async fn main() { 94 /// let unsend_data = Rc::new("world"); 95 /// let local = task::LocalSet::new(); 96 /// 97 /// let unsend_data2 = unsend_data.clone(); 98 /// local.spawn_local(async move { 99 /// // ... 100 /// println!("hello {}", unsend_data2) 101 /// }); 102 /// 103 /// local.spawn_local(async move { 104 /// time::sleep(time::Duration::from_millis(100)).await; 105 /// println!("goodbye {}", unsend_data) 106 /// }); 107 /// 108 /// // ... 109 /// 110 /// local.await; 111 /// } 112 /// ``` 113 /// **Note:** Awaiting a `LocalSet` can only be done inside 114 /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to 115 /// [`Runtime::block_on`]. It cannot be used inside a task spawned with 116 /// `tokio::spawn`. 117 /// 118 /// ## Use inside `tokio::spawn` 119 /// 120 /// The two methods mentioned above cannot be used inside `tokio::spawn`, so 121 /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do 122 /// something else. The solution is to create the `LocalSet` somewhere else, 123 /// and communicate with it using an [`mpsc`] channel. 124 /// 125 /// The following example puts the `LocalSet` inside a new thread. 126 /// ``` 127 /// use tokio::runtime::Builder; 128 /// use tokio::sync::{mpsc, oneshot}; 129 /// use tokio::task::LocalSet; 130 /// 131 /// // This struct describes the task you want to spawn. Here we include 132 /// // some simple examples. The oneshot channel allows sending a response 133 /// // to the spawner. 134 /// #[derive(Debug)] 135 /// enum Task { 136 /// PrintNumber(u32), 137 /// AddOne(u32, oneshot::Sender<u32>), 138 /// } 139 /// 140 /// #[derive(Clone)] 141 /// struct LocalSpawner { 142 /// send: mpsc::UnboundedSender<Task>, 143 /// } 144 /// 145 /// impl LocalSpawner { 146 /// pub fn new() -> Self { 147 /// let (send, mut recv) = mpsc::unbounded_channel(); 148 /// 149 /// let rt = Builder::new_current_thread() 150 /// .enable_all() 151 /// .build() 152 /// .unwrap(); 153 /// 154 /// std::thread::spawn(move || { 155 /// let local = LocalSet::new(); 156 /// 157 /// local.spawn_local(async move { 158 /// while let Some(new_task) = recv.recv().await { 159 /// tokio::task::spawn_local(run_task(new_task)); 160 /// } 161 /// // If the while loop returns, then all the LocalSpawner 162 /// // objects have have been dropped. 163 /// }); 164 /// 165 /// // This will return once all senders are dropped and all 166 /// // spawned tasks have returned. 167 /// rt.block_on(local); 168 /// }); 169 /// 170 /// Self { 171 /// send, 172 /// } 173 /// } 174 /// 175 /// pub fn spawn(&self, task: Task) { 176 /// self.send.send(task).expect("Thread with LocalSet has shut down."); 177 /// } 178 /// } 179 /// 180 /// // This task may do !Send stuff. We use printing a number as an example, 181 /// // but it could be anything. 182 /// // 183 /// // The Task struct is an enum to support spawning many different kinds 184 /// // of operations. 185 /// async fn run_task(task: Task) { 186 /// match task { 187 /// Task::PrintNumber(n) => { 188 /// println!("{}", n); 189 /// }, 190 /// Task::AddOne(n, response) => { 191 /// // We ignore failures to send the response. 192 /// let _ = response.send(n + 1); 193 /// }, 194 /// } 195 /// } 196 /// 197 /// #[tokio::main] 198 /// async fn main() { 199 /// let spawner = LocalSpawner::new(); 200 /// 201 /// let (send, response) = oneshot::channel(); 202 /// spawner.spawn(Task::AddOne(10, send)); 203 /// let eleven = response.await.unwrap(); 204 /// assert_eq!(eleven, 11); 205 /// } 206 /// ``` 207 /// 208 /// [`Send`]: trait@std::marker::Send 209 /// [local task set]: struct@LocalSet 210 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on 211 /// [`task::spawn_local`]: fn@spawn_local 212 /// [`mpsc`]: mod@crate::sync::mpsc 213 pub struct LocalSet { 214 /// Current scheduler tick 215 tick: Cell<u8>, 216 217 /// State available from thread-local 218 context: Context, 219 220 /// This type should not be Send. 221 _not_send: PhantomData<*const ()>, 222 } 223 } 224 225 /// State available from the thread-local 226 struct Context { 227 /// Owned task set and local run queue 228 tasks: RefCell<Tasks>, 229 230 /// State shared between threads. 231 shared: Arc<Shared>, 232 } 233 234 struct Tasks { 235 /// Collection of all active tasks spawned onto this executor. 236 owned: LinkedList<Task<Arc<Shared>>, <Task<Arc<Shared>> as Link>::Target>, 237 238 /// Local run queue sender and receiver. 239 queue: VecDeque<task::Notified<Arc<Shared>>>, 240 } 241 242 /// LocalSet state shared between threads. 243 struct Shared { 244 /// Remote run queue sender 245 queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>, 246 247 /// Wake the `LocalSet` task 248 waker: AtomicWaker, 249 } 250 251 pin_project! { 252 #[derive(Debug)] 253 struct RunUntil<'a, F> { 254 local_set: &'a LocalSet, 255 #[pin] 256 future: F, 257 } 258 } 259 260 scoped_thread_local!(static CURRENT: Context); 261 262 cfg_rt! { 263 /// Spawns a `!Send` future on the local task set. 264 /// 265 /// The spawned future will be run on the same thread that called `spawn_local.` 266 /// This may only be called from the context of a local task set. 267 /// 268 /// # Panics 269 /// 270 /// - This function panics if called outside of a local task set. 271 /// 272 /// # Examples 273 /// 274 /// ```rust 275 /// use std::rc::Rc; 276 /// use tokio::task; 277 /// 278 /// #[tokio::main] 279 /// async fn main() { 280 /// let unsend_data = Rc::new("my unsend data..."); 281 /// 282 /// let local = task::LocalSet::new(); 283 /// 284 /// // Run the local task set. 285 /// local.run_until(async move { 286 /// let unsend_data = unsend_data.clone(); 287 /// task::spawn_local(async move { 288 /// println!("{}", unsend_data); 289 /// // ... 290 /// }).await.unwrap(); 291 /// }).await; 292 /// } 293 /// ``` 294 #[cfg_attr(tokio_track_caller, track_caller)] 295 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output> 296 where 297 F: Future + 'static, 298 F::Output: 'static, 299 { 300 let future = crate::util::trace::task(future, "local"); 301 CURRENT.with(|maybe_cx| { 302 let cx = maybe_cx 303 .expect("`spawn_local` called from outside of a `task::LocalSet`"); 304 305 // Safety: Tasks are only polled and dropped from the thread that 306 // spawns them. 307 let (task, handle) = unsafe { task::joinable_local(future) }; 308 cx.tasks.borrow_mut().queue.push_back(task); 309 handle 310 }) 311 } 312 } 313 314 /// Initial queue capacity 315 const INITIAL_CAPACITY: usize = 64; 316 317 /// Max number of tasks to poll per tick. 318 const MAX_TASKS_PER_TICK: usize = 61; 319 320 /// How often it check the remote queue first 321 const REMOTE_FIRST_INTERVAL: u8 = 31; 322 323 impl LocalSet { 324 /// Returns a new local task set. new() -> LocalSet325 pub fn new() -> LocalSet { 326 LocalSet { 327 tick: Cell::new(0), 328 context: Context { 329 tasks: RefCell::new(Tasks { 330 owned: LinkedList::new(), 331 queue: VecDeque::with_capacity(INITIAL_CAPACITY), 332 }), 333 shared: Arc::new(Shared { 334 queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), 335 waker: AtomicWaker::new(), 336 }), 337 }, 338 _not_send: PhantomData, 339 } 340 } 341 342 /// Spawns a `!Send` task onto the local task set. 343 /// 344 /// This task is guaranteed to be run on the current thread. 345 /// 346 /// Unlike the free function [`spawn_local`], this method may be used to 347 /// spawn local tasks when the task set is _not_ running. For example: 348 /// ```rust 349 /// use tokio::task; 350 /// 351 /// #[tokio::main] 352 /// async fn main() { 353 /// let local = task::LocalSet::new(); 354 /// 355 /// // Spawn a future on the local set. This future will be run when 356 /// // we call `run_until` to drive the task set. 357 /// local.spawn_local(async { 358 /// // ... 359 /// }); 360 /// 361 /// // Run the local task set. 362 /// local.run_until(async move { 363 /// // ... 364 /// }).await; 365 /// 366 /// // When `run` finishes, we can spawn _more_ futures, which will 367 /// // run in subsequent calls to `run_until`. 368 /// local.spawn_local(async { 369 /// // ... 370 /// }); 371 /// 372 /// local.run_until(async move { 373 /// // ... 374 /// }).await; 375 /// } 376 /// ``` 377 /// [`spawn_local`]: fn@spawn_local 378 #[cfg_attr(tokio_track_caller, track_caller)] spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,379 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> 380 where 381 F: Future + 'static, 382 F::Output: 'static, 383 { 384 let future = crate::util::trace::task(future, "local"); 385 let (task, handle) = unsafe { task::joinable_local(future) }; 386 self.context.tasks.borrow_mut().queue.push_back(task); 387 self.context.shared.waker.wake(); 388 handle 389 } 390 391 /// Runs a future to completion on the provided runtime, driving any local 392 /// futures spawned on this task set on the current thread. 393 /// 394 /// This runs the given future on the runtime, blocking until it is 395 /// complete, and yielding its resolved result. Any tasks or timers which 396 /// the future spawns internally will be executed on the runtime. The future 397 /// may also call [`spawn_local`] to spawn_local additional local futures on the 398 /// current thread. 399 /// 400 /// This method should not be called from an asynchronous context. 401 /// 402 /// # Panics 403 /// 404 /// This function panics if the executor is at capacity, if the provided 405 /// future panics, or if called within an asynchronous execution context. 406 /// 407 /// # Notes 408 /// 409 /// Since this function internally calls [`Runtime::block_on`], and drives 410 /// futures in the local task set inside that call to `block_on`, the local 411 /// futures may not use [in-place blocking]. If a blocking call needs to be 412 /// issued from a local task, the [`spawn_blocking`] API may be used instead. 413 /// 414 /// For example, this will panic: 415 /// ```should_panic 416 /// use tokio::runtime::Runtime; 417 /// use tokio::task; 418 /// 419 /// let rt = Runtime::new().unwrap(); 420 /// let local = task::LocalSet::new(); 421 /// local.block_on(&rt, async { 422 /// let join = task::spawn_local(async { 423 /// let blocking_result = task::block_in_place(|| { 424 /// // ... 425 /// }); 426 /// // ... 427 /// }); 428 /// join.await.unwrap(); 429 /// }) 430 /// ``` 431 /// This, however, will not panic: 432 /// ``` 433 /// use tokio::runtime::Runtime; 434 /// use tokio::task; 435 /// 436 /// let rt = Runtime::new().unwrap(); 437 /// let local = task::LocalSet::new(); 438 /// local.block_on(&rt, async { 439 /// let join = task::spawn_local(async { 440 /// let blocking_result = task::spawn_blocking(|| { 441 /// // ... 442 /// }).await; 443 /// // ... 444 /// }); 445 /// join.await.unwrap(); 446 /// }) 447 /// ``` 448 /// 449 /// [`spawn_local`]: fn@spawn_local 450 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on 451 /// [in-place blocking]: fn@crate::task::block_in_place 452 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking 453 #[cfg(feature = "rt")] 454 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output where F: Future,455 pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output 456 where 457 F: Future, 458 { 459 rt.block_on(self.run_until(future)) 460 } 461 462 /// Run a future to completion on the local set, returning its output. 463 /// 464 /// This returns a future that runs the given future with a local set, 465 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures. 466 /// Any local futures spawned on the local set will be driven in the 467 /// background until the future passed to `run_until` completes. When the future 468 /// passed to `run` finishes, any local futures which have not completed 469 /// will remain on the local set, and will be driven on subsequent calls to 470 /// `run_until` or when [awaiting the local set] itself. 471 /// 472 /// # Examples 473 /// 474 /// ```rust 475 /// use tokio::task; 476 /// 477 /// #[tokio::main] 478 /// async fn main() { 479 /// task::LocalSet::new().run_until(async { 480 /// task::spawn_local(async move { 481 /// // ... 482 /// }).await.unwrap(); 483 /// // ... 484 /// }).await; 485 /// } 486 /// ``` 487 /// 488 /// [`spawn_local`]: fn@spawn_local 489 /// [awaiting the local set]: #awaiting-a-localset run_until<F>(&self, future: F) -> F::Output where F: Future,490 pub async fn run_until<F>(&self, future: F) -> F::Output 491 where 492 F: Future, 493 { 494 let run_until = RunUntil { 495 future, 496 local_set: self, 497 }; 498 run_until.await 499 } 500 501 /// Tick the scheduler, returning whether the local future needs to be 502 /// notified again. tick(&self) -> bool503 fn tick(&self) -> bool { 504 for _ in 0..MAX_TASKS_PER_TICK { 505 match self.next_task() { 506 // Run the task 507 // 508 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be 509 // used. We are responsible for maintaining the invariant that 510 // `run_unchecked` is only called on threads that spawned the 511 // task initially. Because `LocalSet` itself is `!Send`, and 512 // `spawn_local` spawns into the `LocalSet` on the current 513 // thread, the invariant is maintained. 514 Some(task) => crate::coop::budget(|| task.run()), 515 // We have fully drained the queue of notified tasks, so the 516 // local future doesn't need to be notified again — it can wait 517 // until something else wakes a task in the local set. 518 None => return false, 519 } 520 } 521 522 true 523 } 524 next_task(&self) -> Option<task::Notified<Arc<Shared>>>525 fn next_task(&self) -> Option<task::Notified<Arc<Shared>>> { 526 let tick = self.tick.get(); 527 self.tick.set(tick.wrapping_add(1)); 528 529 if tick % REMOTE_FIRST_INTERVAL == 0 { 530 self.context 531 .shared 532 .queue 533 .lock() 534 .unwrap() 535 .pop_front() 536 .or_else(|| self.context.tasks.borrow_mut().queue.pop_front()) 537 } else { 538 self.context 539 .tasks 540 .borrow_mut() 541 .queue 542 .pop_front() 543 .or_else(|| self.context.shared.queue.lock().unwrap().pop_front()) 544 } 545 } 546 with<T>(&self, f: impl FnOnce() -> T) -> T547 fn with<T>(&self, f: impl FnOnce() -> T) -> T { 548 CURRENT.set(&self.context, f) 549 } 550 } 551 552 impl fmt::Debug for LocalSet { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result553 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 554 fmt.debug_struct("LocalSet").finish() 555 } 556 } 557 558 impl Future for LocalSet { 559 type Output = (); 560 poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>561 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { 562 // Register the waker before starting to work 563 self.context.shared.waker.register_by_ref(cx.waker()); 564 565 if self.with(|| self.tick()) { 566 // If `tick` returns true, we need to notify the local future again: 567 // there are still tasks remaining in the run queue. 568 cx.waker().wake_by_ref(); 569 Poll::Pending 570 } else if self.context.tasks.borrow().owned.is_empty() { 571 // If the scheduler has no remaining futures, we're done! 572 Poll::Ready(()) 573 } else { 574 // There are still futures in the local set, but we've polled all the 575 // futures in the run queue. Therefore, we can just return Pending 576 // since the remaining futures will be woken from somewhere else. 577 Poll::Pending 578 } 579 } 580 } 581 582 impl Default for LocalSet { default() -> LocalSet583 fn default() -> LocalSet { 584 LocalSet::new() 585 } 586 } 587 588 impl Drop for LocalSet { drop(&mut self)589 fn drop(&mut self) { 590 self.with(|| { 591 // Loop required here to ensure borrow is dropped between iterations 592 #[allow(clippy::while_let_loop)] 593 loop { 594 let task = match self.context.tasks.borrow_mut().owned.pop_back() { 595 Some(task) => task, 596 None => break, 597 }; 598 599 // Safety: same as `run_unchecked`. 600 task.shutdown(); 601 } 602 603 for task in self.context.tasks.borrow_mut().queue.drain(..) { 604 task.shutdown(); 605 } 606 607 for task in self.context.shared.queue.lock().unwrap().drain(..) { 608 task.shutdown(); 609 } 610 611 assert!(self.context.tasks.borrow().owned.is_empty()); 612 }); 613 } 614 } 615 616 // === impl LocalFuture === 617 618 impl<T: Future> Future for RunUntil<'_, T> { 619 type Output = T::Output; 620 poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>621 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { 622 let me = self.project(); 623 624 me.local_set.with(|| { 625 me.local_set 626 .context 627 .shared 628 .waker 629 .register_by_ref(cx.waker()); 630 631 let _no_blocking = crate::runtime::enter::disallow_blocking(); 632 let f = me.future; 633 634 if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) { 635 return Poll::Ready(output); 636 } 637 638 if me.local_set.tick() { 639 // If `tick` returns `true`, we need to notify the local future again: 640 // there are still tasks remaining in the run queue. 641 cx.waker().wake_by_ref(); 642 } 643 644 Poll::Pending 645 }) 646 } 647 } 648 649 impl Shared { 650 /// Schedule the provided task on the scheduler. schedule(&self, task: task::Notified<Arc<Self>>)651 fn schedule(&self, task: task::Notified<Arc<Self>>) { 652 CURRENT.with(|maybe_cx| match maybe_cx { 653 Some(cx) if cx.shared.ptr_eq(self) => { 654 cx.tasks.borrow_mut().queue.push_back(task); 655 } 656 _ => { 657 self.queue.lock().unwrap().push_back(task); 658 self.waker.wake(); 659 } 660 }); 661 } 662 ptr_eq(&self, other: &Shared) -> bool663 fn ptr_eq(&self, other: &Shared) -> bool { 664 std::ptr::eq(self, other) 665 } 666 } 667 668 impl task::Schedule for Arc<Shared> { bind(task: Task<Self>) -> Arc<Shared>669 fn bind(task: Task<Self>) -> Arc<Shared> { 670 CURRENT.with(|maybe_cx| { 671 let cx = maybe_cx.expect("scheduler context missing"); 672 cx.tasks.borrow_mut().owned.push_front(task); 673 cx.shared.clone() 674 }) 675 } 676 release(&self, task: &Task<Self>) -> Option<Task<Self>>677 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { 678 use std::ptr::NonNull; 679 680 CURRENT.with(|maybe_cx| { 681 let cx = maybe_cx.expect("scheduler context missing"); 682 683 assert!(cx.shared.ptr_eq(self)); 684 685 let ptr = NonNull::from(task.header()); 686 // safety: task must be contained by list. It is inserted into the 687 // list in `bind`. 688 unsafe { cx.tasks.borrow_mut().owned.remove(ptr) } 689 }) 690 } 691 schedule(&self, task: task::Notified<Self>)692 fn schedule(&self, task: task::Notified<Self>) { 693 Shared::schedule(self, task); 694 } 695 } 696